This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch catalog-feature
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/catalog-feature by this push:
     new d94db48177 IGNITE-19798 Add functionality to the catalog to switch the 
IndexManager to catalog events (#2247)
d94db48177 is described below

commit d94db481775e7e2d4bfcc9d596850c2475b2b294
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Jun 26 12:22:14 2023 +0300

    IGNITE-19798 Add functionality to the catalog to switch the IndexManager to 
catalog events (#2247)
---
 .../ignite/internal/catalog/CatalogService.java    |   2 +
 .../internal/catalog/CatalogServiceImpl.java       |  46 ++++--
 .../catalog/events/AddColumnEventParameters.java   |  10 +-
 .../catalog/events/AlterColumnEventParameters.java |   5 +-
 .../catalog/events/AlterZoneEventParameters.java   |   5 +-
 .../catalog/events/CatalogEventParameters.java     |  14 +-
 .../catalog/events/CreateIndexEventParameters.java |   5 +-
 .../catalog/events/CreateTableEventParameters.java |   5 +-
 .../catalog/events/CreateZoneEventParameters.java  |   5 +-
 .../catalog/events/DropColumnEventParameters.java  |   5 +-
 .../catalog/events/DropIndexEventParameters.java   |  14 +-
 .../catalog/events/DropTableEventParameters.java   |   5 +-
 .../catalog/events/DropZoneEventParameters.java    |   5 +-
 .../internal/catalog/storage/AlterColumnEntry.java |   4 +-
 .../internal/catalog/storage/AlterZoneEntry.java   |   4 +-
 .../internal/catalog/storage/DropColumnsEntry.java |   4 +-
 .../internal/catalog/storage/DropIndexEntry.java   |  15 +-
 .../internal/catalog/storage/DropTableEntry.java   |   4 +-
 .../internal/catalog/storage/DropZoneEntry.java    |   4 +-
 .../ignite/internal/catalog/storage/Fireable.java  |   3 +-
 .../internal/catalog/storage/NewColumnsEntry.java  |   4 +-
 .../internal/catalog/storage/NewIndexEntry.java    |   4 +-
 .../internal/catalog/storage/NewTableEntry.java    |   4 +-
 .../internal/catalog/storage/NewZoneEntry.java     |   4 +-
 .../ignite/internal/catalog/storage/UpdateLog.java |   3 +-
 .../internal/catalog/storage/UpdateLogImpl.java    |   6 +-
 .../internal/catalog/CatalogServiceSelfTest.java   | 171 +++++++++++++++++----
 .../catalog/storage/UpdateLogImplTest.java         |  49 +++---
 28 files changed, 310 insertions(+), 99 deletions(-)

diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index b078bbd3c5..c7364ebea3 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -44,6 +44,8 @@ public interface CatalogService {
 
     CatalogTableDescriptor table(int tableId, long timestamp);
 
+    CatalogTableDescriptor table(int tableId, int catalogVersion);
+
     CatalogIndexDescriptor index(String indexName, long timestamp);
 
     CatalogIndexDescriptor index(int indexId, long timestamp);
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index ace40aa938..22d3563dc5 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -191,6 +191,11 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
         return catalogAt(timestamp).table(tableId);
     }
 
+    @Override
+    public CatalogTableDescriptor table(int tableId, int catalogVersion) {
+        return catalog(catalogVersion).table(tableId);
+    }
+
     @Override
     public CatalogIndexDescriptor index(String indexName, long timestamp) {
         return 
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).index(indexName);
@@ -275,11 +280,16 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
             CatalogZoneDescriptor zone = getZone(catalog, 
Objects.requireNonNullElse(params.zone(), DEFAULT_ZONE_NAME));
 
-            CatalogTableDescriptor table = 
CatalogUtils.fromParams(catalog.objectIdGenState(), zone.id(), params);
+            int id = catalog.objectIdGenState();
+
+            CatalogTableDescriptor table = CatalogUtils.fromParams(id++, 
zone.id(), params);
+
+            CatalogHashIndexDescriptor pkIndex = 
createHashIndexDescriptor(table, id++, createPkIndexParams(params));
 
             return List.of(
                     new NewTableEntry(table),
-                    new ObjectIdGenUpdateEntry(1)
+                    new NewIndexEntry(pkIndex),
+                    new ObjectIdGenUpdateEntry(id - catalog.objectIdGenState())
             );
         });
     }
@@ -295,7 +305,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
             Arrays.stream(schema.indexes())
                     .filter(index -> index.tableId() == table.id())
-                    .forEach(index -> updateEntries.add(new 
DropIndexEntry(index.id())));
+                    .forEach(index -> updateEntries.add(new 
DropIndexEntry(index.id(), index.tableId())));
 
             updateEntries.add(new DropTableEntry(table.id()));
 
@@ -386,9 +396,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
             CatalogTableDescriptor table = getTable(schema, 
params.tableName());
 
-            validateCreateHashIndexParams(params, table);
-
-            CatalogHashIndexDescriptor index = 
CatalogUtils.fromParams(catalog.objectIdGenState(), table.id(), params);
+            CatalogHashIndexDescriptor index = 
createHashIndexDescriptor(table, catalog.objectIdGenState(), params);
 
             return List.of(
                     new NewIndexEntry(index),
@@ -431,7 +439,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
             }
 
             return List.of(
-                    new DropIndexEntry(index.id())
+                    new DropIndexEntry(index.id(), index.tableId())
             );
         });
     }
@@ -615,7 +623,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
     class OnUpdateHandlerImpl implements OnUpdateHandler {
         @Override
-        public void handle(VersionedUpdate update, HybridTimestamp 
metaStorageUpdateTimestamp) {
+        public void handle(VersionedUpdate update, HybridTimestamp 
metaStorageUpdateTimestamp, long causalityToken) {
             int version = update.version();
             Catalog catalog = catalogByVer.get(version - 1);
 
@@ -637,7 +645,7 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
 
                     eventFutures.add(fireEvent(
                             fireEvent.eventType(),
-                            fireEvent.createEventParameters(version)
+                            fireEvent.createEventParameters(causalityToken, 
version)
                     ));
                 }
             }
@@ -877,4 +885,24 @@ public class CatalogServiceImpl extends 
Producer<CatalogEvent, CatalogEventParam
             throwUnsupportedDdl("Cannot decrease precision to {} for column 
'{}'.", target.precision(), origin.name());
         }
     }
+
+    private static CreateHashIndexParams createPkIndexParams(CreateTableParams 
params) {
+        return CreateHashIndexParams.builder()
+                .schemaName(params.schemaName())
+                .tableName(params.tableName())
+                .indexName(params.tableName() + "_PK")
+                .columns(params.primaryKeyColumns())
+                .unique()
+                .build();
+    }
+
+    private static CatalogHashIndexDescriptor createHashIndexDescriptor(
+            CatalogTableDescriptor table,
+            int indexId,
+            CreateHashIndexParams params
+    ) {
+        validateCreateHashIndexParams(params, table);
+
+        return CatalogUtils.fromParams(indexId, table.id(), params);
+    }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
index ef43a835f9..6f0226d80f 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
@@ -32,11 +32,17 @@ public class AddColumnEventParameters extends 
CatalogEventParameters {
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param tableId An id of table, which columns are added to.
      * @param columnDescriptors New columns descriptors.
      */
-    public AddColumnEventParameters(long causalityToken, int tableId, 
List<CatalogTableColumnDescriptor> columnDescriptors) {
-        super(causalityToken);
+    public AddColumnEventParameters(
+            long causalityToken,
+            int catalogVersion,
+            int tableId,
+            List<CatalogTableColumnDescriptor> columnDescriptors
+    ) {
+        super(causalityToken, catalogVersion);
 
         this.tableId = tableId;
         this.columnDescriptors = columnDescriptors;
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
index 525b8ae06c..fdd124198d 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
@@ -32,11 +32,12 @@ public class AlterColumnEventParameters extends 
CatalogEventParameters {
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param tableId Returns an id the table to be modified.
      * @param columnDescriptor Descriptor for the column to be replaced.
      */
-    public AlterColumnEventParameters(long causalityToken, int tableId, 
CatalogTableColumnDescriptor columnDescriptor) {
-        super(causalityToken);
+    public AlterColumnEventParameters(long causalityToken, int catalogVersion, 
int tableId, CatalogTableColumnDescriptor columnDescriptor) {
+        super(causalityToken, catalogVersion);
 
         this.tableId = tableId;
         this.columnDescriptor = columnDescriptor;
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java
index d4895e8a58..7fe598fa19 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java
@@ -30,10 +30,11 @@ public class AlterZoneEventParameters extends 
CatalogEventParameters {
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param zoneDescriptor Newly created distribution zone descriptor.
      */
-    public AlterZoneEventParameters(long causalityToken, CatalogZoneDescriptor 
zoneDescriptor) {
-        super(causalityToken);
+    public AlterZoneEventParameters(long causalityToken, int catalogVersion, 
CatalogZoneDescriptor zoneDescriptor) {
+        super(causalityToken, catalogVersion);
 
         this.zoneDescriptor = zoneDescriptor;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
index 11c03c885e..a6ec91e7de 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
@@ -23,12 +23,24 @@ import org.apache.ignite.internal.manager.EventParameters;
  * Base class for Catalog event parameters.
  */
 public abstract class CatalogEventParameters extends EventParameters {
+    private final int catalogVersion;
+
     /**
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      */
-    public CatalogEventParameters(long causalityToken) {
+    public CatalogEventParameters(long causalityToken, int catalogVersion) {
         super(causalityToken);
+
+        this.catalogVersion = catalogVersion;
+    }
+
+    /**
+     * Returns catalog version.
+     */
+    public int catalogVersion() {
+        return catalogVersion;
     }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
index 5efa219f66..64ee3e619f 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
@@ -30,10 +30,11 @@ public class CreateIndexEventParameters extends 
CatalogEventParameters {
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param indexDescriptor Newly created index descriptor.
      */
-    public CreateIndexEventParameters(long causalityToken, 
CatalogIndexDescriptor indexDescriptor) {
-        super(causalityToken);
+    public CreateIndexEventParameters(long causalityToken, int catalogVersion, 
CatalogIndexDescriptor indexDescriptor) {
+        super(causalityToken, catalogVersion);
 
         this.indexDescriptor = indexDescriptor;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
index f0a2329a45..a8cd6f6df3 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
@@ -30,10 +30,11 @@ public class CreateTableEventParameters extends 
CatalogEventParameters {
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param tableDescriptor Newly created table descriptor.
      */
-    public CreateTableEventParameters(long causalityToken, 
CatalogTableDescriptor tableDescriptor) {
-        super(causalityToken);
+    public CreateTableEventParameters(long causalityToken, int catalogVersion, 
CatalogTableDescriptor tableDescriptor) {
+        super(causalityToken, catalogVersion);
 
         this.tableDescriptor = tableDescriptor;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateZoneEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateZoneEventParameters.java
index ff7101d1c8..9e26910200 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateZoneEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateZoneEventParameters.java
@@ -30,10 +30,11 @@ public class CreateZoneEventParameters extends 
CatalogEventParameters {
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param zoneDescriptor Newly created distribution zone descriptor.
      */
-    public CreateZoneEventParameters(long causalityToken, 
CatalogZoneDescriptor zoneDescriptor) {
-        super(causalityToken);
+    public CreateZoneEventParameters(long causalityToken, int catalogVersion, 
CatalogZoneDescriptor zoneDescriptor) {
+        super(causalityToken, catalogVersion);
 
         this.zoneDescriptor = zoneDescriptor;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
index ddf491d064..46a17becb5 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
@@ -31,11 +31,12 @@ public class DropColumnEventParameters extends 
CatalogEventParameters {
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param tableId An id of table, which columns are dropped from.
      * @param columns Names of columns to drop.
      */
-    public DropColumnEventParameters(long causalityToken, int tableId, 
Collection<String> columns) {
-        super(causalityToken);
+    public DropColumnEventParameters(long causalityToken, int catalogVersion, 
int tableId, Collection<String> columns) {
+        super(causalityToken, catalogVersion);
 
         this.tableId = tableId;
         this.columns = columns;
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
index 135d46a144..96b0d16942 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
@@ -24,20 +24,30 @@ public class DropIndexEventParameters extends 
CatalogEventParameters {
 
     private final int indexId;
 
+    private final int tableId;
+
     /**
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param indexId An id of dropped index.
+     * @param tableId Table ID for which the index was removed.
      */
-    public DropIndexEventParameters(long causalityToken, int indexId) {
-        super(causalityToken);
+    public DropIndexEventParameters(long causalityToken, int catalogVersion, 
int indexId, int tableId) {
+        super(causalityToken, catalogVersion);
 
         this.indexId = indexId;
+        this.tableId = tableId;
     }
 
     /** Returns an id of dropped index. */
     public int indexId() {
         return indexId;
     }
+
+    /** Returns table ID for which the index was removed. */
+    public int tableId() {
+        return tableId;
+    }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
index 71c2e126fe..241e849a2d 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
@@ -28,10 +28,11 @@ public class DropTableEventParameters extends 
CatalogEventParameters {
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param tableId An id of dropped table.
      */
-    public DropTableEventParameters(long causalityToken, int tableId) {
-        super(causalityToken);
+    public DropTableEventParameters(long causalityToken, int catalogVersion, 
int tableId) {
+        super(causalityToken, catalogVersion);
 
         this.tableId = tableId;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropZoneEventParameters.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropZoneEventParameters.java
index 446257190b..c08faba607 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropZoneEventParameters.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropZoneEventParameters.java
@@ -28,10 +28,11 @@ public class DropZoneEventParameters extends 
CatalogEventParameters {
      * Constructor.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      * @param zoneId An id of dropped distribution zone.
      */
-    public DropZoneEventParameters(long causalityToken, int zoneId) {
-        super(causalityToken);
+    public DropZoneEventParameters(long causalityToken, int catalogVersion, 
int zoneId) {
+        super(causalityToken, catalogVersion);
 
         this.zoneId = zoneId;
     }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index b29310e45e..910a363d81 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -69,8 +69,8 @@ public class AlterColumnEntry implements UpdateEntry, 
Fireable {
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new AlterColumnEventParameters(causalityToken, tableId, column);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new AlterColumnEventParameters(causalityToken, catalogVersion, 
tableId, column);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
index cf11a3400f..c79f366333 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
@@ -54,8 +54,8 @@ public class AlterZoneEntry implements UpdateEntry, Fireable {
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new AlterZoneEventParameters(causalityToken, descriptor);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new AlterZoneEventParameters(causalityToken, catalogVersion, 
descriptor);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index 3fc593667c..e29c250283 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -68,8 +68,8 @@ public class DropColumnsEntry implements UpdateEntry, 
Fireable {
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new DropColumnEventParameters(causalityToken, tableId, columns);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new DropColumnEventParameters(causalityToken, catalogVersion, 
tableId, columns);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
index 4ba8749c2a..0479d4afdd 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -38,13 +38,17 @@ public class DropIndexEntry implements UpdateEntry, 
Fireable {
 
     private final int indexId;
 
+    private final int tableId;
+
     /**
      * Constructs the object.
      *
      * @param indexId An id of an index to drop.
+     * @param tableId Table ID for which the index was removed.
      */
-    public DropIndexEntry(int indexId) {
+    public DropIndexEntry(int indexId, int tableId) {
         this.indexId = indexId;
+        this.tableId = tableId;
     }
 
     /** Returns an id of an index to drop. */
@@ -52,14 +56,19 @@ public class DropIndexEntry implements UpdateEntry, 
Fireable {
         return indexId;
     }
 
+    /** Returns table ID for which the index was removed. */
+    public int tableId() {
+        return tableId;
+    }
+
     @Override
     public CatalogEvent eventType() {
         return CatalogEvent.INDEX_DROP;
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new DropIndexEventParameters(causalityToken, indexId);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new DropIndexEventParameters(causalityToken, catalogVersion, 
indexId, tableId);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
index 99f14204a8..6d85b55af8 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -58,8 +58,8 @@ public class DropTableEntry implements UpdateEntry, Fireable {
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new DropTableEventParameters(causalityToken, tableId);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new DropTableEventParameters(causalityToken, catalogVersion, 
tableId);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
index be3f2fdba4..2579a94f51 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
@@ -53,8 +53,8 @@ public class DropZoneEntry implements UpdateEntry, Fireable {
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new DropZoneEventParameters(causalityToken, zoneId);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new DropZoneEventParameters(causalityToken, catalogVersion, 
zoneId);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
index c2a7954976..30447e0029 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
@@ -33,6 +33,7 @@ public interface Fireable {
      * Creates parameters of the fired event.
      *
      * @param causalityToken Causality token.
+     * @param catalogVersion Catalog version.
      */
-    CatalogEventParameters createEventParameters(long causalityToken);
+    CatalogEventParameters createEventParameters(long causalityToken, int 
catalogVersion);
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index b0fb639e35..1e1aff017e 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -68,8 +68,8 @@ public class NewColumnsEntry implements UpdateEntry, Fireable 
{
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new AddColumnEventParameters(causalityToken, tableId, 
descriptors);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new AddColumnEventParameters(causalityToken, catalogVersion, 
tableId, descriptors);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
index 4e0419ab05..c31b335223 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -58,8 +58,8 @@ public class NewIndexEntry implements UpdateEntry, Fireable {
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new CreateIndexEventParameters(causalityToken, descriptor);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new CreateIndexEventParameters(causalityToken, catalogVersion, 
descriptor);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
index 7a5c85e10f..0d6ef3465e 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
@@ -58,8 +58,8 @@ public class NewTableEntry implements UpdateEntry, Fireable {
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new CreateTableEventParameters(causalityToken, descriptor);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new CreateTableEventParameters(causalityToken, catalogVersion, 
descriptor);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
index b584951bdd..0cceba1ad2 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
@@ -54,8 +54,8 @@ public class NewZoneEntry implements UpdateEntry, Fireable {
     }
 
     @Override
-    public CatalogEventParameters createEventParameters(long causalityToken) {
-        return new CreateZoneEventParameters(causalityToken, descriptor);
+    public CatalogEventParameters createEventParameters(long causalityToken, 
int catalogVersion) {
+        return new CreateZoneEventParameters(causalityToken, catalogVersion, 
descriptor);
     }
 
     @Override
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
index 94124dae55..eb17046103 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
@@ -69,7 +69,8 @@ public interface UpdateLog extends IgniteComponent {
          *
          * @param update A new update.
          * @param metaStorageUpdateTimestamp Timestamp assigned to the update 
by the Metastorage.
+         * @param causalityToken Causality token.
          */
-        void handle(VersionedUpdate update, HybridTimestamp 
metaStorageUpdateTimestamp);
+        void handle(VersionedUpdate update, HybridTimestamp 
metaStorageUpdateTimestamp, long causalityToken);
     }
 }
diff --git 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 5427173ef8..f627c51118 100644
--- 
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++ 
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -165,7 +165,9 @@ public class UpdateLogImpl implements UpdateLog {
 
             VersionedUpdate update = 
fromBytes(Objects.requireNonNull(entry.value()));
 
-            handler.handle(update, 
metastore.timestampByRevision(entry.revision()));
+            long revision = entry.revision();
+
+            handler.handle(update, metastore.timestampByRevision(revision), 
revision);
         }
     }
 
@@ -206,7 +208,7 @@ public class UpdateLogImpl implements UpdateLog {
 
                 VersionedUpdate update = fromBytes(payload);
 
-                onUpdateHandler.handle(update, event.timestamp());
+                onUpdateHandler.handle(update, event.timestamp(), 
event.revision());
             }
 
             return CompletableFuture.completedFuture(null);
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index cb4b7ad325..b2326f0142 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -38,7 +38,10 @@ import static org.junit.jupiter.api.Assertions.assertSame;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
 import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.spy;
 import static org.mockito.Mockito.timeout;
@@ -123,7 +126,6 @@ import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
 import org.junit.jupiter.params.provider.EnumSource.Mode;
 import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatchers;
 
 /**
  * Catalog service self test.
@@ -237,6 +239,8 @@ public class CatalogServiceSelfTest {
         assertNull(schema.table(TABLE_NAME));
         assertNull(service.table(TABLE_NAME, 123L));
         assertNull(service.table(1, 123L));
+        assertNull(service.index(createPkIndexName(TABLE_NAME), 123L));
+        assertNull(service.index(2, 123L));
 
         // Validate actual catalog
         schema = service.schema(1);
@@ -249,6 +253,9 @@ public class CatalogServiceSelfTest {
         assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, 
clock.nowLong()));
         assertSame(schema.table(TABLE_NAME), service.table(2, 
clock.nowLong()));
 
+        assertSame(schema.index(createPkIndexName(TABLE_NAME)), 
service.index(createPkIndexName(TABLE_NAME), clock.nowLong()));
+        assertSame(schema.index(createPkIndexName(TABLE_NAME)), 
service.index(3, clock.nowLong()));
+
         // Validate newly created table
         CatalogTableDescriptor table = schema.table(TABLE_NAME);
 
@@ -256,6 +263,15 @@ public class CatalogServiceSelfTest {
         assertEquals(TABLE_NAME, table.name());
         assertEquals(1L, table.zoneId());
 
+        // Validate newly created pk index
+        CatalogHashIndexDescriptor pkIndex = (CatalogHashIndexDescriptor) 
schema.index(createPkIndexName(TABLE_NAME));
+
+        assertEquals(3L, pkIndex.id());
+        assertEquals(createPkIndexName(TABLE_NAME), pkIndex.name());
+        assertEquals(2L, pkIndex.tableId());
+        assertEquals(table.primaryKeyColumns(), pkIndex.columns());
+        assertTrue(pkIndex.unique());
+
         // Validate another table creation.
         assertThat(service.createTable(simpleTable(TABLE_NAME_2)), 
willBe(nullValue()));
 
@@ -270,10 +286,17 @@ public class CatalogServiceSelfTest {
         assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, 
clock.nowLong()));
         assertSame(schema.table(TABLE_NAME), service.table(2, 
clock.nowLong()));
 
+        assertSame(schema.index(createPkIndexName(TABLE_NAME)), 
service.index(createPkIndexName(TABLE_NAME), clock.nowLong()));
+        assertSame(schema.index(createPkIndexName(TABLE_NAME)), 
service.index(3, clock.nowLong()));
+
         assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, 
clock.nowLong()));
-        assertSame(schema.table(TABLE_NAME_2), service.table(3, 
clock.nowLong()));
+        assertSame(schema.table(TABLE_NAME_2), service.table(4, 
clock.nowLong()));
+
+        assertSame(schema.index(createPkIndexName(TABLE_NAME_2)), 
service.index(createPkIndexName(TABLE_NAME_2), clock.nowLong()));
+        assertSame(schema.index(createPkIndexName(TABLE_NAME_2)), 
service.index(5, clock.nowLong()));
 
         assertNotSame(schema.table(TABLE_NAME), schema.table(TABLE_NAME_2));
+        assertNotSame(schema.index(createPkIndexName(TABLE_NAME)), 
schema.index(createPkIndexName(TABLE_NAME_2)));
 
         // Try to create another table with same name.
         assertThat(service.createTable(simpleTable(TABLE_NAME_2)), 
willThrowFast(TableAlreadyExistsException.class));
@@ -322,8 +345,14 @@ public class CatalogServiceSelfTest {
         assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME, 
beforeDropTimestamp));
         assertSame(schema.table(TABLE_NAME), service.table(2, 
beforeDropTimestamp));
 
+        assertSame(schema.index(createPkIndexName(TABLE_NAME)), 
service.index(createPkIndexName(TABLE_NAME), beforeDropTimestamp));
+        assertSame(schema.index(createPkIndexName(TABLE_NAME)), 
service.index(3, beforeDropTimestamp));
+
         assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, 
beforeDropTimestamp));
-        assertSame(schema.table(TABLE_NAME_2), service.table(3, 
beforeDropTimestamp));
+        assertSame(schema.table(TABLE_NAME_2), service.table(4, 
beforeDropTimestamp));
+
+        assertSame(schema.index(createPkIndexName(TABLE_NAME_2)), 
service.index(createPkIndexName(TABLE_NAME_2), beforeDropTimestamp));
+        assertSame(schema.index(createPkIndexName(TABLE_NAME_2)), 
service.index(5, beforeDropTimestamp));
 
         // Validate actual catalog
         schema = service.schema(3);
@@ -337,8 +366,15 @@ public class CatalogServiceSelfTest {
         assertNull(service.table(TABLE_NAME, clock.nowLong()));
         assertNull(service.table(2, clock.nowLong()));
 
+        assertNull(schema.index(createPkIndexName(TABLE_NAME)));
+        assertNull(service.index(createPkIndexName(TABLE_NAME), 
clock.nowLong()));
+        assertNull(service.index(3, clock.nowLong()));
+
         assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2, 
clock.nowLong()));
-        assertSame(schema.table(TABLE_NAME_2), service.table(3, 
clock.nowLong()));
+        assertSame(schema.table(TABLE_NAME_2), service.table(4, 
clock.nowLong()));
+
+        assertSame(schema.index(createPkIndexName(TABLE_NAME_2)), 
service.index(createPkIndexName(TABLE_NAME_2), clock.nowLong()));
+        assertSame(schema.index(createPkIndexName(TABLE_NAME_2)), 
service.index(5, clock.nowLong()));
 
         // Try to drop table once again.
         assertThat(service.dropTable(dropTableParams), 
willThrowFast(TableNotFoundException.class));
@@ -962,7 +998,7 @@ public class CatalogServiceSelfTest {
         assertSame(schema.table(TABLE_NAME), service.table(2, 
beforeDropTimestamp));
 
         assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, 
beforeDropTimestamp));
-        assertSame(schema.index(INDEX_NAME), service.index(3, 
beforeDropTimestamp));
+        assertSame(schema.index(INDEX_NAME), service.index(4, 
beforeDropTimestamp));
 
         // Validate actual catalog
         schema = service.schema(3);
@@ -999,7 +1035,7 @@ public class CatalogServiceSelfTest {
         assertNotNull(schema);
         assertNull(schema.index(INDEX_NAME));
         assertNull(service.index(INDEX_NAME, 123L));
-        assertNull(service.index(3, 123L));
+        assertNull(service.index(4, 123L));
 
         // Validate actual catalog
         schema = service.schema(2);
@@ -1007,12 +1043,12 @@ public class CatalogServiceSelfTest {
         assertNotNull(schema);
         assertNull(service.index(1, clock.nowLong()));
         assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, 
clock.nowLong()));
-        assertSame(schema.index(INDEX_NAME), service.index(3, 
clock.nowLong()));
+        assertSame(schema.index(INDEX_NAME), service.index(4, 
clock.nowLong()));
 
         // Validate newly created hash index
         CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor) 
schema.index(INDEX_NAME);
 
-        assertEquals(3L, index.id());
+        assertEquals(4L, index.id());
         assertEquals(INDEX_NAME, index.name());
         assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
         assertEquals(List.of("VAL", "ID"), index.columns());
@@ -1040,7 +1076,7 @@ public class CatalogServiceSelfTest {
         assertNotNull(schema);
         assertNull(schema.index(INDEX_NAME));
         assertNull(service.index(INDEX_NAME, 123L));
-        assertNull(service.index(3, 123L));
+        assertNull(service.index(4, 123L));
 
         // Validate actual catalog
         schema = service.schema(2);
@@ -1048,12 +1084,12 @@ public class CatalogServiceSelfTest {
         assertNotNull(schema);
         assertNull(service.index(1, clock.nowLong()));
         assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME, 
clock.nowLong()));
-        assertSame(schema.index(INDEX_NAME), service.index(3, 
clock.nowLong()));
+        assertSame(schema.index(INDEX_NAME), service.index(4, 
clock.nowLong()));
 
         // Validate newly created sorted index
         CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor) 
schema.index(INDEX_NAME);
 
-        assertEquals(3L, index.id());
+        assertEquals(4L, index.id());
         assertEquals(INDEX_NAME, index.name());
         assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
         assertEquals("VAL", index.columns().get(0).name());
@@ -1115,7 +1151,7 @@ public class CatalogServiceSelfTest {
                     List.of(new ObjectIdGenUpdateEntry(1))
             );
 
-            updateHandlerCapture.getValue().handle(update, clock.now());
+            updateHandlerCapture.getValue().handle(update, clock.now(), 0);
 
             return completedFuture(false);
         });
@@ -1206,10 +1242,10 @@ public class CatalogServiceSelfTest {
         service.listen(CatalogEvent.TABLE_DROP, eventListener);
 
         assertThat(service.createTable(createTableParams), 
willBe(nullValue()));
-        verify(eventListener).notify(any(CreateTableEventParameters.class), 
ArgumentMatchers.isNull());
+        verify(eventListener).notify(any(CreateTableEventParameters.class), 
isNull());
 
         assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
-        verify(eventListener).notify(any(DropTableEventParameters.class), 
ArgumentMatchers.isNull());
+        verify(eventListener).notify(any(DropTableEventParameters.class), 
isNull());
 
         verifyNoMoreInteractions(eventListener);
     }
@@ -1250,24 +1286,31 @@ public class CatalogServiceSelfTest {
         assertThat(service.createIndex(createIndexParams), 
willThrow(TableNotFoundException.class));
         verifyNoInteractions(eventListener);
 
-        // Create table.
+        // Create table with PK index.
         assertThat(service.createTable(createTableParams), 
willCompleteSuccessfully());
+        verify(eventListener).notify(any(CreateIndexEventParameters.class), 
isNull());
+
+        clearInvocations(eventListener);
 
         // Create index.
         assertThat(service.createIndex(createIndexParams), 
willCompleteSuccessfully());
-        verify(eventListener).notify(any(CreateIndexEventParameters.class), 
ArgumentMatchers.isNull());
+        verify(eventListener).notify(any(CreateIndexEventParameters.class), 
isNull());
+
+        clearInvocations(eventListener);
 
         // Drop index.
-        assertThat(service.dropIndex(dropIndexParams), 
willCompleteSuccessfully());
-        verify(eventListener).notify(any(DropIndexEventParameters.class), 
ArgumentMatchers.isNull());
+        assertThat(service.dropIndex(dropIndexParams), willBe(nullValue()));
+        verify(eventListener).notify(any(DropIndexEventParameters.class), 
isNull());
 
-        // Drop table.
-        assertThat(service.dropTable(dropTableparams), 
willCompleteSuccessfully());
+        clearInvocations(eventListener);
+
+        // Drop table with pk index.
+        assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
 
         // Try drop index once again.
         assertThat(service.dropIndex(dropIndexParams), 
willThrow(IndexNotFoundException.class));
 
-        verifyNoMoreInteractions(eventListener);
+        verify(eventListener).notify(any(DropIndexEventParameters.class), 
isNull());
     }
 
     @Test
@@ -1522,13 +1565,13 @@ public class CatalogServiceSelfTest {
 
         assertThat(fut, willCompleteSuccessfully());
 
-        verify(eventListener).notify(any(CreateZoneEventParameters.class), 
ArgumentMatchers.isNull());
+        verify(eventListener).notify(any(CreateZoneEventParameters.class), 
isNull());
 
         fut = service.dropDistributionZone(dropZoneParams);
 
         assertThat(fut, willCompleteSuccessfully());
 
-        verify(eventListener).notify(any(DropZoneEventParameters.class), 
ArgumentMatchers.isNull());
+        verify(eventListener).notify(any(DropZoneEventParameters.class), 
isNull());
         verifyNoMoreInteractions(eventListener);
     }
 
@@ -1566,11 +1609,11 @@ public class CatalogServiceSelfTest {
 
         // Add column.
         assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
-        verify(eventListener).notify(any(AddColumnEventParameters.class), 
ArgumentMatchers.isNull());
+        verify(eventListener).notify(any(AddColumnEventParameters.class), 
isNull());
 
         // Drop column.
         assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
-        verify(eventListener).notify(any(DropColumnEventParameters.class), 
ArgumentMatchers.isNull());
+        verify(eventListener).notify(any(DropColumnEventParameters.class), 
isNull());
 
         // Try drop column once again.
         assertThat(service.dropColumn(dropColumnParams), 
willThrow(ColumnNotFoundException.class));
@@ -1622,7 +1665,7 @@ public class CatalogServiceSelfTest {
 
         service.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
             try {
-                assertNotNull(service.schema((int) 
parameters.causalityToken()));
+                assertNotNull(service.schema(parameters.catalogVersion()));
 
                 result.complete(null);
 
@@ -1638,6 +1681,78 @@ public class CatalogServiceSelfTest {
         assertThat(result, willCompleteSuccessfully());
     }
 
+    @Test
+    void testGetTableByIdAndCatalogVersion() {
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+
+        assertNull(service.table(2, 0));
+        assertNotNull(service.table(2, 1));
+    }
+
+    @Test
+    void testGetTableIdOnDropIndexEvent() {
+        assertThat(service.createTable(simpleTable(TABLE_NAME)), 
willBe(nullValue()));
+
+        assertThat(
+                service.createIndex(
+                        CreateHashIndexParams.builder()
+                                .schemaName(SCHEMA_NAME)
+                                .tableName(TABLE_NAME)
+                                .indexName(INDEX_NAME)
+                                .columns(List.of("VAL"))
+                                .build()
+                ),
+                willBe(nullValue())
+        );
+
+        EventListener<CatalogEventParameters> eventListener = 
mock(EventListener.class);
+
+        ArgumentCaptor<DropIndexEventParameters> captor = 
ArgumentCaptor.forClass(DropIndexEventParameters.class);
+
+        
doReturn(completedFuture(false)).when(eventListener).notify(captor.capture(), 
any());
+
+        service.listen(CatalogEvent.INDEX_DROP, eventListener);
+
+        // Let's remove the index.
+        assertThat(
+                
service.dropIndex(DropIndexParams.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build()),
+                willBe(nullValue())
+        );
+
+        DropIndexEventParameters eventParameters = captor.getValue();
+
+        assertEquals(4L, eventParameters.indexId());
+        assertEquals(2L, eventParameters.tableId());
+
+        // Let's delete the table.
+        assertThat(
+                
service.dropTable(DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build()),
+                willBe(nullValue())
+        );
+
+        // Let's make sure that the PK index has been deleted.
+        eventParameters = captor.getValue();
+
+        assertEquals(3L, eventParameters.indexId());
+        assertEquals(2L, eventParameters.tableId());
+    }
+
+    @Test
+    void testCreateTableWithoutPkColumns() {
+        assertThat(
+                service.createTable(
+                        CreateTableParams.builder()
+                                .schemaName(SCHEMA_NAME)
+                                .zone(ZONE_NAME)
+                                .tableName(TABLE_NAME)
+                                .columns(List.of())
+                                .primaryKeyColumns(List.of())
+                                .build()
+                ),
+                willThrowFast(IgniteInternalException.class, "Missing primary 
key columns")
+        );
+    }
+
     private CompletableFuture<Void> changeColumn(
             String tab,
             String col,
@@ -1723,4 +1838,8 @@ public class CatalogServiceSelfTest {
             this.scale = scale;
         }
     }
+
+    private static String createPkIndexName(String tableName) {
+        return tableName + "_PK";
+    }
 }
diff --git 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index b7e6af23c2..7fbfb55391 100644
--- 
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++ 
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -17,14 +17,13 @@
 
 package org.apache.ignite.internal.catalog.storage;
 
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.containsString;
 import static org.hamcrest.Matchers.equalTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -76,24 +75,24 @@ class UpdateLogImplTest {
 
         long revisionBefore = metastore.appliedRevision();
 
-        updateLog.registerUpdateHandler((update, ts) -> {/* no-op */});
+        updateLog.registerUpdateHandler((update, ts, causalityToken) -> {/* 
no-op */});
         updateLog.start();
 
         assertThat("Watches were not deployed", metastore.deployWatches(), 
willCompleteSuccessfully());
 
-        List<VersionedUpdate> expectedLog = List.of(
+        List<VersionedUpdate> expectedVersions = List.of(
                 new VersionedUpdate(1, 1L, List.of(new 
TestUpdateEntry("foo"))),
                 new VersionedUpdate(2, 2L, List.of(new TestUpdateEntry("bar")))
         );
 
-        for (VersionedUpdate update : expectedLog) {
-            assertTrue(await(updateLog.append(update)));
+        for (VersionedUpdate update : expectedVersions) {
+            assertThat(updateLog.append(update), willBe(true));
         }
 
         // and wait till metastore apply necessary revision
         assertTrue(
                 waitForCondition(
-                        () -> metastore.appliedRevision() - expectedLog.size() 
== revisionBefore,
+                        () -> metastore.appliedRevision() - 
expectedVersions.size() == revisionBefore,
                         TimeUnit.SECONDS.toMillis(5)
                 )
         );
@@ -104,11 +103,18 @@ class UpdateLogImplTest {
         // and check if log is replayed on start
         updateLog = createUpdateLogImpl();
 
-        List<VersionedUpdate> actualLog = new ArrayList<>();
-        updateLog.registerUpdateHandler((update, ts) -> actualLog.add(update));
+        List<VersionedUpdate> actualVersions = new ArrayList<>();
+        List<Long> actualCausalityTokens = new ArrayList<>();
+
+        updateLog.registerUpdateHandler((update, ts, causalityToken) -> {
+            actualVersions.add(update);
+            actualCausalityTokens.add(causalityToken);
+        });
+
         updateLog.start();
 
-        assertEquals(expectedLog, actualLog);
+        assertEquals(expectedVersions, actualVersions);
+        assertEquals(List.of(revisionBefore + 1, revisionBefore + 2), 
actualCausalityTokens);
     }
 
     private UpdateLogImpl createUpdateLogImpl() {
@@ -136,33 +142,39 @@ class UpdateLogImplTest {
         UpdateLogImpl updateLog = createUpdateLogImpl();
 
         List<Integer> appliedVersions = new ArrayList<>();
-        updateLog.registerUpdateHandler((update, ts) -> 
appliedVersions.add(update.version()));
+        List<Long> causalityTokens = new ArrayList<>();
 
-        updateLog.start();
+        updateLog.registerUpdateHandler((update, ts, causalityToken) -> {
+            appliedVersions.add(update.version());
+            causalityTokens.add(causalityToken);
+        });
 
         long revisionBefore = metastore.appliedRevision();
 
+        updateLog.start();
+
         assertThat("Watches were not deployed", metastore.deployWatches(), 
willCompleteSuccessfully());
 
         // first update should always be successful
-        
assertTrue(await(updateLog.append(singleEntryUpdateOfVersion(startVersion))));
+        assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion)), 
willBe(true));
 
         // update of the same version should not be accepted
-        
assertFalse(await(updateLog.append(singleEntryUpdateOfVersion(startVersion))));
+        assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion)), 
willBe(false));
 
         // update of the version lower than the last applied should not be 
accepted
-        
assertFalse(await(updateLog.append(singleEntryUpdateOfVersion(startVersion - 
1))));
+        assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion - 
1)), willBe(false));
 
         // update of the version creating gap should not be accepted
-        
assertFalse(await(updateLog.append(singleEntryUpdateOfVersion(startVersion + 
2))));
+        assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion + 
2)), willBe(false));
 
         // regular update of next version should be accepted
-        
assertTrue(await(updateLog.append(singleEntryUpdateOfVersion(startVersion + 
1))));
+        assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion + 
1)), willBe(true));
 
         // now the gap is filled, thus update should be accepted as well
-        
assertTrue(await(updateLog.append(singleEntryUpdateOfVersion(startVersion + 
2))));
+        assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion + 
2)), willBe(true));
 
         List<Integer> expectedVersions = List.of(startVersion, startVersion + 
1, startVersion + 2);
+        List<Long> expectedTokens = List.of(revisionBefore + 1, revisionBefore 
+ 2, revisionBefore + 3);
 
         // wait till necessary revision is applied
         assertTrue(
@@ -173,6 +185,7 @@ class UpdateLogImplTest {
         );
 
         assertThat(appliedVersions, equalTo(expectedVersions));
+        assertThat(causalityTokens, equalTo(expectedTokens));
     }
 
     private static VersionedUpdate singleEntryUpdateOfVersion(int version) {

Reply via email to