This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 209698938f IGNITE-19641 Catalog events are triggered too early. (#2231)
209698938f is described below

commit 209698938ff1d5206fc3a9403bb019c423a1f65b
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Jun 23 10:22:32 2023 +0300

    IGNITE-19641 Catalog events are triggered too early. (#2231)
---
 .../internal/catalog/CatalogServiceImpl.java       | 270 ++-------------------
 .../internal/catalog/storage/AlterColumnEntry.java |  56 ++++-
 .../internal/catalog/storage/AlterZoneEntry.java   |  32 ++-
 .../internal/catalog/storage/DropColumnsEntry.java |  54 ++++-
 .../internal/catalog/storage/DropIndexEntry.java   |  42 +++-
 .../internal/catalog/storage/DropTableEntry.java   |  42 +++-
 .../internal/catalog/storage/DropZoneEntry.java    |  30 ++-
 .../storage/{UpdateEntry.java => Fireable.java}    |  18 +-
 .../internal/catalog/storage/NewColumnsEntry.java  |  51 +++-
 .../internal/catalog/storage/NewIndexEntry.java    |  41 +++-
 .../internal/catalog/storage/NewTableEntry.java    |  41 +++-
 .../internal/catalog/storage/NewZoneEntry.java     |  30 ++-
 .../catalog/storage/ObjectIdGenUpdateEntry.java    |  13 +-
 .../internal/catalog/storage/UpdateEntry.java      |  10 +-
 .../internal/catalog/storage/UpdateLogImpl.java    |   1 +
 .../internal/catalog/CatalogServiceSelfTest.java   |  23 ++
 .../catalog/storage/UpdateLogImplTest.java         |   6 +
 17 files changed, 487 insertions(+), 273 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 3ae0c88abf..010fa208ee 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.catalog;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.catalog.commands.CreateZoneParams.INFINITE_TIMER_VALUE;
 import static 
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR;
 
@@ -31,7 +30,6 @@ import java.util.List;
 import java.util.Map.Entry;
 import java.util.NavigableMap;
 import java.util.Objects;
-import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentSkipListMap;
 import java.util.function.Predicate;
@@ -55,24 +53,15 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
-import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
-import org.apache.ignite.internal.catalog.events.AlterColumnEventParameters;
-import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
 import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
-import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
-import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
-import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
-import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
-import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
-import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
 import org.apache.ignite.internal.catalog.storage.AlterColumnEntry;
 import org.apache.ignite.internal.catalog.storage.AlterZoneEntry;
 import org.apache.ignite.internal.catalog.storage.DropColumnsEntry;
 import org.apache.ignite.internal.catalog.storage.DropIndexEntry;
 import org.apache.ignite.internal.catalog.storage.DropTableEntry;
 import org.apache.ignite.internal.catalog.storage.DropZoneEntry;
+import org.apache.ignite.internal.catalog.storage.Fireable;
 import org.apache.ignite.internal.catalog.storage.NewColumnsEntry;
 import org.apache.ignite.internal.catalog.storage.NewIndexEntry;
 import org.apache.ignite.internal.catalog.storage.NewTableEntry;
@@ -86,8 +75,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.Producer;
-import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.internal.util.CollectionUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.lang.ColumnAlreadyExistsException;
 import org.apache.ignite.lang.ColumnNotFoundException;
@@ -814,254 +801,31 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
         @Override
         public void handle(VersionedUpdate update) {
             int version = update.version();
-            long activationTimestamp = update.activationTimestamp();
             Catalog catalog = catalogByVer.get(version - 1);
 
-            assert catalog != null;
-
-            List<CompletableFuture<?>> eventFutures = new 
ArrayList<>(update.entries().size());
+            assert catalog != null : version - 1;
 
             for (UpdateEntry entry : update.entries()) {
-                String schemaName = CatalogService.PUBLIC;
-                CatalogSchemaDescriptor schema = 
Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " + 
schemaName);
-
-                if (entry instanceof NewTableEntry) {
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            catalog.zones(),
-                            List.of(new CatalogSchemaDescriptor(
-                                    schema.id(),
-                                    schema.name(),
-                                    ArrayUtils.concat(schema.tables(), 
((NewTableEntry) entry).descriptor()),
-                                    schema.indexes()
-                            ))
-                    );
-
-                    eventFutures.add(fireEvent(
-                            CatalogEvent.TABLE_CREATE,
-                            new CreateTableEventParameters(version, 
((NewTableEntry) entry).descriptor())
-                    ));
-
-                } else if (entry instanceof DropTableEntry) {
-                    int tableId = ((DropTableEntry) entry).tableId();
-
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            catalog.zones(),
-                            List.of(new CatalogSchemaDescriptor(
-                                    schema.id(),
-                                    schema.name(),
-                                    Arrays.stream(schema.tables()).filter(t -> 
t.id() != tableId).toArray(CatalogTableDescriptor[]::new),
-                                    schema.indexes()
-                            ))
-                    );
-
-                    eventFutures.add(fireEvent(
-                            CatalogEvent.TABLE_DROP,
-                            new DropTableEventParameters(version, tableId)
-                    ));
-                } else if (entry instanceof NewColumnsEntry) {
-                    int tableId = ((NewColumnsEntry) entry).tableId();
-                    List<CatalogTableColumnDescriptor> columnDescriptors = 
((NewColumnsEntry) entry).descriptors();
-
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            catalog.zones(),
-                            List.of(new CatalogSchemaDescriptor(
-                                    schema.id(),
-                                    schema.name(),
-                                    Arrays.stream(schema.tables())
-                                            .map(table -> table.id() != tableId
-                                                    ? table
-                                                    : new 
CatalogTableDescriptor(
-                                                            table.id(),
-                                                            table.name(),
-                                                            table.zoneId(),
-                                                            
CollectionUtils.concat(table.columns(), columnDescriptors),
-                                                            
table.primaryKeyColumns(),
-                                                            
table.colocationColumns())
-                                            )
-                                            
.toArray(CatalogTableDescriptor[]::new),
-                                    schema.indexes()
-                            ))
-                    );
-
-                    eventFutures.add(fireEvent(
-                            CatalogEvent.TABLE_ALTER,
-                            new AddColumnEventParameters(version, tableId, 
columnDescriptors)
-                    ));
-                } else if (entry instanceof DropColumnsEntry) {
-                    int tableId = ((DropColumnsEntry) entry).tableId();
-                    Set<String> columns = ((DropColumnsEntry) entry).columns();
-
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            catalog.zones(),
-                            List.of(new CatalogSchemaDescriptor(
-                                    schema.id(),
-                                    schema.name(),
-                                    Arrays.stream(schema.tables())
-                                            .map(table -> table.id() != tableId
-                                                    ? table
-                                                    : new 
CatalogTableDescriptor(
-                                                            table.id(),
-                                                            table.name(),
-                                                            table.zoneId(),
-                                                            
table.columns().stream()
-                                                                    
.filter(col -> !columns.contains(col.name()))
-                                                                    
.collect(toList()),
-                                                            
table.primaryKeyColumns(),
-                                                            
table.colocationColumns())
-                                            )
-                                            
.toArray(CatalogTableDescriptor[]::new),
-                                    schema.indexes()
-                            ))
-                    );
-
-                    eventFutures.add(fireEvent(
-                            CatalogEvent.TABLE_ALTER,
-                            new DropColumnEventParameters(version, tableId, 
columns)
-                    ));
-                } else if (entry instanceof NewIndexEntry) {
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            catalog.zones(),
-                            List.of(new CatalogSchemaDescriptor(
-                                    schema.id(),
-                                    schema.name(),
-                                    schema.tables(),
-                                    ArrayUtils.concat(schema.indexes(), 
((NewIndexEntry) entry).descriptor())
-                            ))
-                    );
-
-                    eventFutures.add(fireEvent(
-                            CatalogEvent.INDEX_CREATE,
-                            new CreateIndexEventParameters(version, 
((NewIndexEntry) entry).descriptor())
-                    ));
-                } else if (entry instanceof DropIndexEntry) {
-                    int indexId = ((DropIndexEntry) entry).indexId();
-
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            catalog.zones(),
-                            List.of(new CatalogSchemaDescriptor(
-                                    schema.id(),
-                                    schema.name(),
-                                    schema.tables(),
-                                    Arrays.stream(schema.indexes()).filter(t 
-> t.id() != indexId).toArray(CatalogIndexDescriptor[]::new)
-                            ))
-                    );
+                catalog = entry.applyUpdate(catalog);
+            }
 
-                    eventFutures.add(fireEvent(
-                            CatalogEvent.INDEX_DROP,
-                            new DropIndexEventParameters(version, indexId)
-                    ));
-                } else if (entry instanceof NewZoneEntry) {
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            CollectionUtils.concat(catalog.zones(), 
List.of(((NewZoneEntry) entry).descriptor())),
-                            catalog.schemas()
-                    );
+            catalog = applyUpdateFinal(catalog, update);
 
-                    eventFutures.add(fireEvent(
-                            CatalogEvent.ZONE_CREATE,
-                            new CreateZoneEventParameters(version, 
((NewZoneEntry) entry).descriptor())
-                    ));
-                } else if (entry instanceof DropZoneEntry) {
-                    int zoneId = ((DropZoneEntry) entry).zoneId();
-
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            catalog.zones().stream().filter(z -> z.id() != 
zoneId).collect(toList()),
-                            catalog.schemas()
-                    );
+            registerCatalog(catalog);
 
-                    eventFutures.add(fireEvent(
-                            CatalogEvent.ZONE_DROP,
-                            new DropZoneEventParameters(version, zoneId)
-                    ));
-                } else if (entry instanceof AlterZoneEntry) {
-                    CatalogZoneDescriptor descriptor = ((AlterZoneEntry) 
entry).descriptor();
-
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            catalog.zones().stream()
-                                    .map(z -> z.id() == descriptor.id() ? 
descriptor : z)
-                                    .collect(toList()),
-                            catalog.schemas()
-                    );
+            List<CompletableFuture<?>> eventFutures = new 
ArrayList<>(update.entries().size());
 
-                    eventFutures.add(fireEvent(
-                            CatalogEvent.ZONE_ALTER,
-                            new AlterZoneEventParameters(version, descriptor)
-                    ));
-                } else if (entry instanceof ObjectIdGenUpdateEntry) {
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState() + 
((ObjectIdGenUpdateEntry) entry).delta(),
-                            catalog.zones(),
-                            catalog.schemas()
-                    );
-                } else if (entry instanceof AlterColumnEntry) {
-                    int tableId = ((AlterColumnEntry) entry).tableId();
-                    CatalogTableColumnDescriptor target = ((AlterColumnEntry) 
entry).descriptor();
-
-                    catalog = new Catalog(
-                            version,
-                            activationTimestamp,
-                            catalog.objectIdGenState(),
-                            catalog.zones(),
-                            List.of(new CatalogSchemaDescriptor(
-                                    schema.id(),
-                                    schema.name(),
-                                    Arrays.stream(schema.tables())
-                                            .map(table -> table.id() != tableId
-                                                    ? table
-                                                    : new 
CatalogTableDescriptor(
-                                                            table.id(),
-                                                            table.name(),
-                                                            table.zoneId(),
-                                                            
table.columns().stream()
-                                                                    
.map(source -> source.name().equals(target.name()) ? target : source)
-                                                                    
.collect(toList()),
-                                                            
table.primaryKeyColumns(),
-                                                            
table.colocationColumns())
-                                            )
-                                            
.toArray(CatalogTableDescriptor[]::new),
-                                    schema.indexes()
-                            ))
-                    );
+            for (UpdateEntry entry : update.entries()) {
+                if (entry instanceof Fireable) {
+                    Fireable fireEvent = (Fireable) entry;
 
                     eventFutures.add(fireEvent(
-                            CatalogEvent.TABLE_ALTER,
-                            new AlterColumnEventParameters(version, tableId, 
target)
+                            fireEvent.eventType(),
+                            fireEvent.createEventParameters(version)
                     ));
-                } else {
-                    assert false : entry;
                 }
             }
 
-            registerCatalog(catalog);
-
             
CompletableFuture.allOf(eventFutures.toArray(CompletableFuture[]::new))
                     .whenComplete((ignore, err) -> {
                         if (err != null) {
@@ -1089,4 +853,14 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
     interface UpdateProducer {
         List<UpdateEntry> get(Catalog catalog);
     }
+
+    private Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate update) {
+        return new Catalog(
+                update.version(),
+                update.activationTimestamp(),
+                catalog.objectIdGenState(),
+                catalog.zones(),
+                catalog.schemas()
+        );
+    }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index b5dd20008b..1067e574c0 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -17,13 +17,25 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.AlterColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.tostring.S;
 
 /**
  * Describes a column replacement.
  */
-public class AlterColumnEntry implements UpdateEntry {
+public class AlterColumnEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = -4552940987881338656L;
 
     private final int tableId;
@@ -51,7 +63,47 @@ public class AlterColumnEntry implements UpdateEntry {
         return column;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.TABLE_ALTER;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new AlterColumnEventParameters(causalityToken, tableId, column);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        CatalogSchemaDescriptor schema = 
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones(),
+                List.of(new CatalogSchemaDescriptor(
+                        schema.id(),
+                        schema.name(),
+                        Arrays.stream(schema.tables())
+                                .map(table -> table.id() != tableId
+                                        ? table
+                                        : new CatalogTableDescriptor(
+                                                table.id(),
+                                                table.name(),
+                                                table.zoneId(),
+                                                table.columns().stream()
+                                                        .map(source -> 
source.name().equals(column.name()) ? column : source)
+                                                        .collect(toList()),
+                                                table.primaryKeyColumns(),
+                                                table.colocationColumns())
+                                )
+                                .toArray(CatalogTableDescriptor[]::new),
+                        schema.indexes()
+                ))
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
index 3e2f5c56c0..cf11a3400f 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
@@ -17,13 +17,19 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static java.util.stream.Collectors.toList;
+
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.tostring.S;
 
 /**
  * Describes altering zone.
  */
-public class AlterZoneEntry implements UpdateEntry {
+public class AlterZoneEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = 7727583734058987315L;
 
     private final CatalogZoneDescriptor descriptor;
@@ -42,7 +48,29 @@ public class AlterZoneEntry implements UpdateEntry {
         return descriptor;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.ZONE_ALTER;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new AlterZoneEventParameters(causalityToken, descriptor);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones().stream()
+                        .map(z -> z.id() == descriptor.id() ? descriptor : z)
+                        .collect(toList()),
+                catalog.schemas()
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index bf1d009fdf..1d50e5e196 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -17,13 +17,25 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
 import java.util.Set;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
 import org.apache.ignite.internal.tostring.S;
 
 /**
  * Describes dropping of columns.
  */
-public class DropColumnsEntry implements UpdateEntry {
+public class DropColumnsEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = 2970125889493580121L;
 
     private final int tableId;
@@ -50,7 +62,45 @@ public class DropColumnsEntry implements UpdateEntry {
         return columns;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.TABLE_ALTER;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new DropColumnEventParameters(causalityToken, tableId, columns);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        CatalogSchemaDescriptor schema = 
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones(),
+                List.of(new CatalogSchemaDescriptor(
+                        schema.id(),
+                        schema.name(),
+                        Arrays.stream(schema.tables())
+                                .map(table -> table.id() == tableId ? new 
CatalogTableDescriptor(
+                                        table.id(),
+                                        table.name(),
+                                        table.zoneId(),
+                                        table.columns().stream()
+                                                .filter(col -> 
!columns.contains(col.name()))
+                                                .collect(toList()),
+                                        table.primaryKeyColumns(),
+                                        table.colocationColumns()) : table
+                                )
+                                .toArray(CatalogTableDescriptor[]::new),
+                        schema.indexes()
+                ))
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
index 2efef88bea..fc5b019c81 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -17,12 +17,23 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
 import org.apache.ignite.internal.tostring.S;
 
 /**
  * Describes deletion of an index.
  */
-public class DropIndexEntry implements UpdateEntry {
+public class DropIndexEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = -604729846502020728L;
 
     private final int indexId;
@@ -41,7 +52,34 @@ public class DropIndexEntry implements UpdateEntry {
         return indexId;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.INDEX_DROP;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new DropIndexEventParameters(causalityToken, indexId);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        CatalogSchemaDescriptor schema = 
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones(),
+                List.of(new CatalogSchemaDescriptor(
+                        schema.id(),
+                        schema.name(),
+                        schema.tables(),
+                        Arrays.stream(schema.indexes()).filter(t -> t.id() != 
indexId).toArray(CatalogIndexDescriptor[]::new)
+                ))
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
index ee53b390d2..54ffadfe58 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -17,12 +17,23 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
 import org.apache.ignite.internal.tostring.S;
 
 /**
  * Describes deletion of a table.
  */
-public class DropTableEntry implements UpdateEntry {
+public class DropTableEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = 7727583734058987315L;
 
     private final int tableId;
@@ -41,7 +52,34 @@ public class DropTableEntry implements UpdateEntry {
         return tableId;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.TABLE_DROP;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new DropTableEventParameters(causalityToken, tableId);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        CatalogSchemaDescriptor schema = 
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones(),
+                List.of(new CatalogSchemaDescriptor(
+                        schema.id(),
+                        schema.name(),
+                        Arrays.stream(schema.tables()).filter(t -> t.id() != 
tableId).toArray(CatalogTableDescriptor[]::new),
+                        schema.indexes()
+                ))
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
index 2dc1816eb8..be3f2fdba4 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
@@ -17,12 +17,18 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static java.util.stream.Collectors.toList;
+
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
 import org.apache.ignite.internal.tostring.S;
 
 /**
  * Describes deletion of a zone.
  */
-public class DropZoneEntry implements UpdateEntry {
+public class DropZoneEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = 7727583734058987315L;
 
     private final int zoneId;
@@ -41,7 +47,27 @@ public class DropZoneEntry implements UpdateEntry {
         return zoneId;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.ZONE_DROP;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new DropZoneEventParameters(causalityToken, zoneId);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones().stream().filter(z -> z.id() != 
zoneId).collect(toList()),
+                catalog.schemas()
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
similarity index 63%
copy from 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
copy to 
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
index 39c487447f..c2a7954976 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
@@ -17,10 +17,22 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
-import java.io.Serializable;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 
 /**
- * A marker interface describing a particular change within the {@link 
VersionedUpdate group}.
+ * Interface for updates that require firing events.
  */
-public interface UpdateEntry extends Serializable {
+public interface Fireable {
+    /**
+     * Returns the type of the fired event.
+     */
+    CatalogEvent eventType();
+
+    /**
+     * Creates parameters of the fired event.
+     *
+     * @param causalityToken Causality token.
+     */
+    CatalogEventParameters createEventParameters(long causalityToken);
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index 17f86675cc..c15fb4773f 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -17,14 +17,25 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
 import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import 
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.CollectionUtils;
 
 /**
  * Describes addition of new columns.
  */
-public class NewColumnsEntry implements UpdateEntry {
+public class NewColumnsEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = 2970125889493580121L;
 
     private final int tableId;
@@ -51,7 +62,43 @@ public class NewColumnsEntry implements UpdateEntry {
         return descriptors;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.TABLE_ALTER;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new AddColumnEventParameters(causalityToken, tableId, 
descriptors);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        CatalogSchemaDescriptor schema = 
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones(),
+                List.of(new CatalogSchemaDescriptor(
+                        schema.id(),
+                        schema.name(),
+                        Arrays.stream(schema.tables())
+                                .map(table -> table.id() == tableId ? new 
CatalogTableDescriptor(
+                                        table.id(),
+                                        table.name(),
+                                        table.zoneId(),
+                                        
CollectionUtils.concat(table.columns(), descriptors),
+                                        table.primaryKeyColumns(),
+                                        table.colocationColumns()) : table
+                                )
+                                .toArray(CatalogTableDescriptor[]::new),
+                        schema.indexes()
+                ))
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
index 8ec1b1f1eb..0c57d7eef2 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -17,13 +17,23 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.ArrayUtils;
 
 /**
  * Describes addition of a new index.
  */
-public class NewIndexEntry implements UpdateEntry {
+public class NewIndexEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = 6717363577013237711L;
 
     private final CatalogIndexDescriptor descriptor;
@@ -42,7 +52,34 @@ public class NewIndexEntry implements UpdateEntry {
         return descriptor;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.INDEX_CREATE;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new CreateIndexEventParameters(causalityToken, descriptor);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        CatalogSchemaDescriptor schema = 
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones(),
+                List.of(new CatalogSchemaDescriptor(
+                        schema.id(),
+                        schema.name(),
+                        schema.tables(),
+                        ArrayUtils.concat(schema.indexes(), descriptor)
+                ))
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
index aabfeaa418..589ef47a8a 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
@@ -17,13 +17,23 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.ArrayUtils;
 
 /**
  * Describes addition of a new table.
  */
-public class NewTableEntry implements UpdateEntry {
+public class NewTableEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = 2970125889493580121L;
 
     private final CatalogTableDescriptor descriptor;
@@ -42,7 +52,34 @@ public class NewTableEntry implements UpdateEntry {
         return descriptor;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.TABLE_CREATE;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new CreateTableEventParameters(causalityToken, descriptor);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        CatalogSchemaDescriptor schema = 
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                catalog.zones(),
+                List.of(new CatalogSchemaDescriptor(
+                        schema.id(),
+                        schema.name(),
+                        ArrayUtils.concat(schema.tables(), descriptor),
+                        schema.indexes()
+                ))
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
index dc671e2278..b584951bdd 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
@@ -17,13 +17,19 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
 import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.CollectionUtils;
 
 /**
  * Describes addition of a new zone.
  */
-public class NewZoneEntry implements UpdateEntry {
+public class NewZoneEntry implements UpdateEntry, Fireable {
     private static final long serialVersionUID = 2970125889493580121L;
 
     private final CatalogZoneDescriptor descriptor;
@@ -42,7 +48,27 @@ public class NewZoneEntry implements UpdateEntry {
         return descriptor;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public CatalogEvent eventType() {
+        return CatalogEvent.ZONE_CREATE;
+    }
+
+    @Override
+    public CatalogEventParameters createEventParameters(long causalityToken) {
+        return new CreateZoneEventParameters(causalityToken, descriptor);
+    }
+
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState(),
+                CollectionUtils.concat(catalog.zones(), List.of(descriptor)),
+                catalog.schemas()
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
index 26c97af8ec..45f8d51fa9 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.tostring.S;
 
 /**
@@ -41,7 +42,17 @@ public class ObjectIdGenUpdateEntry implements UpdateEntry {
         return delta;
     }
 
-    /** {@inheritDoc} */
+    @Override
+    public Catalog applyUpdate(Catalog catalog) {
+        return new Catalog(
+                catalog.version(),
+                catalog.time(),
+                catalog.objectIdGenState() + delta,
+                catalog.zones(),
+                catalog.schemas()
+        );
+    }
+
     @Override
     public String toString() {
         return S.toString(this);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
index 39c487447f..dc01e77f40 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
@@ -18,9 +18,17 @@
 package org.apache.ignite.internal.catalog.storage;
 
 import java.io.Serializable;
+import org.apache.ignite.internal.catalog.Catalog;
 
 /**
- * A marker interface describing a particular change within the {@link 
VersionedUpdate group}.
+ * Interface describing a particular change within the {@link VersionedUpdate 
group}.
  */
 public interface UpdateEntry extends Serializable {
+    /**
+     * Applies own change to the catalog.
+     *
+     * @param catalog Current catalog.
+     * @return New catalog.
+     */
+    Catalog applyUpdate(Catalog catalog);
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 12c5bb04a1..3652a89fa6 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -158,6 +158,7 @@ public class UpdateLogImpl implements UpdateLog {
     private void restoreStateFromVault(OnUpdateHandler handler) {
         int ver = 1;
 
+        // TODO: IGNITE-19790 Read range from metastore
         while (true) {
             VaultEntry entry = vault.get(CatalogKey.update(ver++)).join();
 
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index d4f21d0222..4afcb4b2ee 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.catalog;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -1571,6 +1572,28 @@ public class CatalogServiceSelfTest {
         verifyNoMoreInteractions(eventListener);
     }
 
+    @Test
+    void testGetCatalogEntityInCatalogEvent() {
+        CompletableFuture<Void> result = new CompletableFuture<>();
+
+        service.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
+            try {
+                assertNotNull(service.schema((int) 
parameters.causalityToken()));
+
+                result.complete(null);
+
+                return completedFuture(true);
+            } catch (Throwable t) {
+                result.completeExceptionally(t);
+
+                return failedFuture(t);
+            }
+        });
+
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe((Object) null));
+        assertThat(result, willCompleteSuccessfully());
+    }
+
     private CompletableFuture<Void> changeColumn(
             String tab,
             String col,
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index dd9bf36ee5..706ca47bb3 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.metastorage.MetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
 import 
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -185,6 +186,11 @@ class UpdateLogImplTest {
             this.payload = payload;
         }
 
+        @Override
+        public Catalog applyUpdate(Catalog catalog) {
+            return catalog;
+        }
+
         @Override
         public boolean equals(Object o) {
             if (this == o) {

Reply via email to