This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 564c4ada56 IGNITE-20519 Introduce update token for
CatalogObjectDescriptor (#2656)
564c4ada56 is described below
commit 564c4ada56bef2c6749e6113b3ed85d278d94c2d
Author: Mirza Aliev <[email protected]>
AuthorDate: Thu Oct 5 15:37:33 2023 +0400
IGNITE-20519 Introduce update token for CatalogObjectDescriptor (#2656)
---
.../internal/catalog/CatalogManagerImpl.java | 18 +-
.../catalog/commands/CreateTableCommand.java | 4 +-
.../descriptors/CatalogIndexDescriptor.java | 4 +-
.../descriptors/CatalogObjectDescriptor.java | 34 +-
.../descriptors/CatalogSchemaDescriptor.java | 5 +-
.../descriptors/CatalogSystemViewDescriptor.java | 4 +-
.../descriptors/CatalogTableDescriptor.java | 5 +-
.../catalog/descriptors/CatalogZoneDescriptor.java | 4 +-
.../internal/catalog/storage/AlterColumnEntry.java | 8 +-
.../internal/catalog/storage/AlterZoneEntry.java | 4 +-
.../internal/catalog/storage/DropColumnsEntry.java | 8 +-
.../internal/catalog/storage/DropIndexEntry.java | 5 +-
.../internal/catalog/storage/DropTableEntry.java | 5 +-
.../internal/catalog/storage/DropZoneEntry.java | 2 +-
.../internal/catalog/storage/NewColumnsEntry.java | 8 +-
.../internal/catalog/storage/NewIndexEntry.java | 7 +-
.../catalog/storage/NewSystemViewEntry.java | 7 +-
.../internal/catalog/storage/NewTableEntry.java | 7 +-
.../internal/catalog/storage/NewZoneEntry.java | 4 +-
.../catalog/storage/ObjectIdGenUpdateEntry.java | 2 +-
.../internal/catalog/storage/UpdateEntry.java | 3 +-
...CatalogManagerDescriptorCausalityTokenTest.java | 444 +++++++++++++++++++++
.../internal/catalog/CatalogManagerSelfTest.java | 17 -
.../internal/catalog/CatalogSystemViewTest.java | 34 ++
.../commands/AbstractCommandValidationTest.java | 6 +-
.../catalog/storage/UpdateLogImplTest.java | 31 +-
.../internal/catalog/BaseCatalogManagerTest.java | 23 ++
.../RebalanceUtilUpdateAssignmentsTest.java | 4 +-
.../internal/schema/CatalogSchemaManagerTest.java | 9 +-
.../CatalogToSchemaDescriptorConverterTest.java | 4 +-
.../engine/schema/CatalogSqlSchemaManagerTest.java | 13 +-
.../storage/AbstractMvTableStorageTest.java | 4 +-
.../storage/index/AbstractIndexStorageTest.java | 4 +-
.../replication/PartitionReplicaListenerTest.java | 4 +-
34 files changed, 677 insertions(+), 68 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
index 1600b76787..faf1316d9b 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogManagerImpl.java
@@ -42,6 +42,7 @@ import
org.apache.ignite.internal.catalog.commands.CreateZoneParams;
import org.apache.ignite.internal.catalog.commands.DropZoneParams;
import org.apache.ignite.internal.catalog.commands.RenameZoneParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogObjectDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import
org.apache.ignite.internal.catalog.descriptors.CatalogSystemViewDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
@@ -79,6 +80,13 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
/** Safe time to wait before new Catalog version activation. */
private static final int DEFAULT_DELAY_DURATION = 0;
+ /** Initial update token for a catalog descriptor, this token is valid
only before the first call of
+ * {@link
UpdateEntry#applyUpdate(org.apache.ignite.internal.catalog.Catalog, long)}.
+ * After that {@link CatalogObjectDescriptor#updateToken()} will be
initialised with a causality token from
+ * {@link
UpdateEntry#applyUpdate(org.apache.ignite.internal.catalog.Catalog, long)}
+ */
+ public static final long INITIAL_CAUSALITY_TOKEN = 0L;
+
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(CatalogManagerImpl.class);
@@ -129,7 +137,8 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
DEFAULT_SCHEMA_NAME,
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0],
- new CatalogSystemViewDescriptor[0]
+ new CatalogSystemViewDescriptor[0],
+ INITIAL_CAUSALITY_TOKEN
);
// TODO: IGNITE-19082 Move system schema objects initialization to
cluster init procedure.
@@ -138,7 +147,8 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
SYSTEM_SCHEMA_NAME,
new CatalogTableDescriptor[0],
new CatalogIndexDescriptor[0],
- new CatalogSystemViewDescriptor[0]
+ new CatalogSystemViewDescriptor[0],
+ INITIAL_CAUSALITY_TOKEN
);
CatalogZoneDescriptor defaultZone = fromParams(
@@ -447,7 +457,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
assert catalog != null : version - 1;
for (UpdateEntry entry : update.entries()) {
- catalog = entry.applyUpdate(catalog);
+ catalog = entry.applyUpdate(catalog, causalityToken);
}
catalog = applyUpdateFinal(catalog, update,
metaStorageUpdateTimestamp);
@@ -518,7 +528,7 @@ public class CatalogManagerImpl extends
AbstractEventProducer<CatalogEvent, Cata
List<UpdateEntry> entries = producer.get(catalog);
for (UpdateEntry entry : entries) {
- catalog = entry.applyUpdate(catalog);
+ catalog = entry.applyUpdate(catalog,
INITIAL_CAUSALITY_TOKEN);
}
bulkUpdateEntries.addAll(entries);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
index bc78766dd2..3ef142c6b4 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/commands/CreateTableCommand.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.catalog.commands;
import static java.util.Objects.requireNonNullElse;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static
org.apache.ignite.internal.catalog.CatalogParamsValidationUtils.ensureNoTableIndexOrSysViewExistsWithGivenName;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.schemaOrThrow;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.zoneOrThrow;
@@ -110,7 +111,8 @@ public class CreateTableCommand extends
AbstractTableCommand {
CatalogTableDescriptor.INITIAL_TABLE_VERSION,
columns.stream().map(CatalogUtils::fromParams).collect(toList()),
primaryKeyColumns,
- colocationColumns
+ colocationColumns,
+ INITIAL_CAUSALITY_TOKEN
);
String indexName = tableName + "_PK";
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
index de82cb9164..2a6760efd6 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogIndexDescriptor.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.descriptors;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+
import org.apache.ignite.internal.tostring.S;
/**
@@ -35,7 +37,7 @@ public abstract class CatalogIndexDescriptor extends
CatalogObjectDescriptor {
private boolean writeOnly;
CatalogIndexDescriptor(int id, String name, int tableId, boolean unique) {
- super(id, Type.INDEX, name);
+ super(id, Type.INDEX, name, INITIAL_CAUSALITY_TOKEN);
this.tableId = tableId;
this.unique = unique;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogObjectDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogObjectDescriptor.java
index 79fba735e1..b22cd6d55a 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogObjectDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogObjectDescriptor.java
@@ -17,8 +17,11 @@
package org.apache.ignite.internal.catalog.descriptors;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+
import java.io.Serializable;
import java.util.Objects;
+import org.apache.ignite.internal.catalog.storage.UpdateEntry;
import org.apache.ignite.internal.tostring.S;
/**
@@ -30,11 +33,13 @@ public abstract class CatalogObjectDescriptor implements
Serializable {
private final int id;
private final String name;
private final Type type;
+ private long updateToken;
- CatalogObjectDescriptor(int id, Type type, String name) {
+ CatalogObjectDescriptor(int id, Type type, String name, long
causalityToken) {
this.id = id;
this.type = Objects.requireNonNull(type, "type");
this.name = Objects.requireNonNull(name, "name");
+ this.updateToken = causalityToken;
}
/** Returns id of the described object. */
@@ -52,6 +57,33 @@ public abstract class CatalogObjectDescriptor implements
Serializable {
return type;
}
+
+ /**
+ * Token of the update of the descriptor.
+ * Updated when {@link
UpdateEntry#applyUpdate(org.apache.ignite.internal.catalog.Catalog, long)} is
called for the
+ * corresponding catalog descriptor. This token is the token that is
associated with the corresponding update being applied to
+ * the Catalog. Any new catalog descriptor associated with an {@link
UpdateEntry}, meaning that this token is set only once.
+ *
+ * @return Token of the update of the descriptor.
+ */
+ public long updateToken() {
+ return updateToken;
+ }
+
+ /**
+ * Set token of the update of the descriptor. Must be called only once when
+ * {@link
UpdateEntry#applyUpdate(org.apache.ignite.internal.catalog.Catalog, long)} is
called for the corresponding catalog descriptor.
+ * This token is the token that is associated with the corresponding
update being applied to
+ * the Catalog. Any new catalog descriptor associated with an {@link
UpdateEntry}, meaning that this token is set only once.
+ *
+ * @param updateToken Update token of the descriptor.
+ */
+ public void updateToken(long updateToken) {
+ assert this.updateToken == INITIAL_CAUSALITY_TOKEN : "Update token for
the descriptor must be updated only once";
+
+ this.updateToken = updateToken;
+ }
+
/** {@inheritDoc} */
@Override
public String toString() {
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
index 3dba7a76f3..f2ad73f610 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSchemaDescriptor.java
@@ -56,8 +56,9 @@ public class CatalogSchemaDescriptor extends
CatalogObjectDescriptor {
public CatalogSchemaDescriptor(int id, String name,
CatalogTableDescriptor[] tables,
CatalogIndexDescriptor[] indexes,
- CatalogSystemViewDescriptor[] systemViews) {
- super(id, Type.SCHEMA, name);
+ CatalogSystemViewDescriptor[] systemViews,
+ long causalityToken) {
+ super(id, Type.SCHEMA, name, causalityToken);
this.tables = Objects.requireNonNull(tables, "tables");
this.indexes = Objects.requireNonNull(indexes, "indexes");
this.systemViews = Objects.requireNonNull(systemViews, "systemViews");
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
index 7946ca087c..9a5af26a55 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogSystemViewDescriptor.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.descriptors;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+
import java.util.List;
import java.util.Objects;
import org.apache.ignite.internal.tostring.S;
@@ -40,7 +42,7 @@ public class CatalogSystemViewDescriptor extends
CatalogObjectDescriptor {
* @param columns View columns.
*/
public CatalogSystemViewDescriptor(int id, String name,
List<CatalogTableColumnDescriptor> columns, SystemViewType systemViewType) {
- super(id, Type.SYSTEM_VIEW, name);
+ super(id, Type.SYSTEM_VIEW, name, INITIAL_CAUSALITY_TOKEN);
this.columns = Objects.requireNonNull(columns, "columns");
this.systemViewType = Objects.requireNonNull(systemViewType,
"viewType");
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
index 4e88d325a2..65134ae8db 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogTableDescriptor.java
@@ -70,9 +70,10 @@ public class CatalogTableDescriptor extends
CatalogObjectDescriptor {
int tableVersion,
List<CatalogTableColumnDescriptor> columns,
List<String> pkCols,
- @Nullable List<String> colocationCols
+ @Nullable List<String> colocationCols,
+ long causalityToken
) {
- super(id, Type.TABLE, name);
+ super(id, Type.TABLE, name, causalityToken);
this.pkIndexId = pkIndexId;
this.zoneId = zoneId;
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java
index 21c0b11d7b..3cddc79156 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/descriptors/CatalogZoneDescriptor.java
@@ -17,6 +17,8 @@
package org.apache.ignite.internal.catalog.descriptors;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+
import org.apache.ignite.internal.tostring.S;
/**
@@ -70,7 +72,7 @@ public class CatalogZoneDescriptor extends
CatalogObjectDescriptor {
String filter,
CatalogDataStorageDescriptor dataStorage
) {
- super(id, Type.ZONE, name);
+ super(id, Type.ZONE, name, INITIAL_CAUSALITY_TOKEN);
this.partitions = partitions;
this.replicas = replicas;
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index 1edd004c1b..553c0e4bdf 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -77,7 +77,7 @@ public class AlterColumnEntry implements UpdateEntry,
Fireable {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
return new Catalog(
@@ -101,11 +101,13 @@ public class AlterColumnEntry implements UpdateEntry,
Fireable {
.map(source ->
source.name().equals(column.name()) ? column : source)
.collect(toList()),
table.primaryKeyColumns(),
- table.colocationColumns())
+ table.colocationColumns(),
+ causalityToken)
)
.toArray(CatalogTableDescriptor[]::new),
schema.indexes(),
- schema.systemViews()
+ schema.systemViews(),
+ causalityToken
), catalog.schemas())
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
index c79f366333..a45787b433 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
@@ -59,7 +59,9 @@ public class AlterZoneEntry implements UpdateEntry, Fireable {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
+ descriptor.updateToken(causalityToken);
+
return new Catalog(
catalog.version(),
catalog.time(),
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index 7853de8f87..0d193453f9 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -75,7 +75,7 @@ public class DropColumnsEntry implements UpdateEntry,
Fireable {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
return new Catalog(
@@ -97,11 +97,13 @@ public class DropColumnsEntry implements UpdateEntry,
Fireable {
.filter(col ->
!columns.contains(col.name()))
.collect(toList()),
table.primaryKeyColumns(),
- table.colocationColumns()) : table
+ table.colocationColumns(),
+ causalityToken) : table
)
.toArray(CatalogTableDescriptor[]::new),
schema.indexes(),
- schema.systemViews()
+ schema.systemViews(),
+ causalityToken
), catalog.schemas())
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
index 40d6aca8c0..564327f9e8 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -74,7 +74,7 @@ public class DropIndexEntry implements UpdateEntry, Fireable {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
return new Catalog(
@@ -87,7 +87,8 @@ public class DropIndexEntry implements UpdateEntry, Fireable {
schema.name(),
schema.tables(),
Arrays.stream(schema.indexes()).filter(t -> t.id() !=
indexId).toArray(CatalogIndexDescriptor[]::new),
- schema.systemViews()
+ schema.systemViews(),
+ causalityToken
), catalog.schemas())
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
index 7e0362fae7..07a66e00c3 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -65,7 +65,7 @@ public class DropTableEntry implements UpdateEntry, Fireable {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
return new Catalog(
@@ -78,7 +78,8 @@ public class DropTableEntry implements UpdateEntry, Fireable {
schema.name(),
Arrays.stream(schema.tables()).filter(t -> t.id() !=
tableId).toArray(CatalogTableDescriptor[]::new),
schema.indexes(),
- schema.systemViews()
+ schema.systemViews(),
+ causalityToken
), catalog.schemas())
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
index 2579a94f51..3bed25da96 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
@@ -58,7 +58,7 @@ public class DropZoneEntry implements UpdateEntry, Fireable {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
return new Catalog(
catalog.version(),
catalog.time(),
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index 96f410c63f..84acafa38b 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -74,7 +74,7 @@ public class NewColumnsEntry implements UpdateEntry, Fireable
{
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
return new Catalog(
@@ -94,11 +94,13 @@ public class NewColumnsEntry implements UpdateEntry,
Fireable {
table.tableVersion() + 1,
CollectionUtils.concat(table.columns(), descriptors),
table.primaryKeyColumns(),
- table.colocationColumns()) : table
+ table.colocationColumns(),
+ causalityToken) : table
)
.toArray(CatalogTableDescriptor[]::new),
schema.indexes(),
- schema.systemViews()
+ schema.systemViews(),
+ causalityToken
), catalog.schemas())
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
index 491080826d..b54fe14e61 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -65,9 +65,11 @@ public class NewIndexEntry implements UpdateEntry, Fireable {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
+ descriptor.updateToken(causalityToken);
+
return new Catalog(
catalog.version(),
catalog.time(),
@@ -78,7 +80,8 @@ public class NewIndexEntry implements UpdateEntry, Fireable {
schema.name(),
schema.tables(),
ArrayUtils.concat(schema.indexes(), descriptor),
- schema.systemViews()
+ schema.systemViews(),
+ causalityToken
), catalog.schemas())
);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
index 0016cbf197..0ff8bd830c 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewSystemViewEntry.java
@@ -66,9 +66,11 @@ public class NewSystemViewEntry implements UpdateEntry,
Fireable {
/** {@inheritDoc} */
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor systemSchema = catalog.schema(schemaName);
+ descriptor.updateToken(causalityToken);
+
Map<String, CatalogSystemViewDescriptor> systemViews =
Arrays.stream(systemSchema.systemViews())
.collect(Collectors.toMap(CatalogSystemViewDescriptor::name,
Function.identity()));
systemViews.put(descriptor.name(), descriptor);
@@ -80,7 +82,8 @@ public class NewSystemViewEntry implements UpdateEntry,
Fireable {
systemSchema.name(),
systemSchema.tables(),
systemSchema.indexes(),
- sysViewArray);
+ sysViewArray,
+ causalityToken);
return new Catalog(
catalog.version(),
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
index fd479f00ef..2d269bedc8 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
@@ -66,15 +66,18 @@ public class NewTableEntry implements UpdateEntry, Fireable
{
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName));
+ descriptor.updateToken(causalityToken);
+
List<CatalogSchemaDescriptor> schemas = CatalogUtils.replaceSchema(new
CatalogSchemaDescriptor(
schema.id(),
schema.name(),
ArrayUtils.concat(schema.tables(), descriptor),
schema.indexes(),
- schema.systemViews()
+ schema.systemViews(),
+ causalityToken
), catalog.schemas());
return new Catalog(
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
index 0cceba1ad2..02a09f1076 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
@@ -59,7 +59,9 @@ public class NewZoneEntry implements UpdateEntry, Fireable {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
+ descriptor.updateToken(causalityToken);
+
return new Catalog(
catalog.version(),
catalog.time(),
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
index 45f8d51fa9..aa3cfa8b09 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
@@ -43,7 +43,7 @@ public class ObjectIdGenUpdateEntry implements UpdateEntry {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
return new Catalog(
catalog.version(),
catalog.time(),
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
index dc01e77f40..1df40adc59 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
@@ -28,7 +28,8 @@ public interface UpdateEntry extends Serializable {
* Applies own change to the catalog.
*
* @param catalog Current catalog.
+ * @param causalityToken Token that is associated with the corresponding
update being applied.
* @return New catalog.
*/
- Catalog applyUpdate(Catalog catalog);
+ Catalog applyUpdate(Catalog catalog, long causalityToken);
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
new file mode 100644
index 0000000000..faf02ad412
--- /dev/null
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerDescriptorCausalityTokenTest.java
@@ -0,0 +1,444 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.catalog;
+
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
+import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
+import static
org.apache.ignite.internal.catalog.CatalogTestUtils.addColumnParams;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static
org.apache.ignite.internal.catalog.CatalogTestUtils.columnParamsBuilder;
+import static
org.apache.ignite.internal.catalog.CatalogTestUtils.dropColumnParams;
+import static
org.apache.ignite.internal.catalog.CatalogTestUtils.dropTableCommand;
+import static
org.apache.ignite.internal.catalog.commands.DefaultValue.constant;
+import static
org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.ASC_NULLS_LAST;
+import static
org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.DESC_NULLS_FIRST;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.STRING;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.nullValue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
+import static org.junit.jupiter.api.Assertions.assertNotSame;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.List;
+import org.apache.ignite.internal.catalog.commands.AlterZoneParams;
+import org.apache.ignite.internal.catalog.commands.CreateZoneParams;
+import org.apache.ignite.internal.catalog.commands.RenameZoneParams;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogHashIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogSortedIndexDescriptor;
+import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Test for checking that catalog descriptors entities' "update token" are
updated after general catalog operations.
+ */
+public class CatalogManagerDescriptorCausalityTokenTest extends
BaseCatalogManagerTest {
+ private static final String SCHEMA_NAME = DEFAULT_SCHEMA_NAME;
+ private static final String ZONE_NAME = DEFAULT_ZONE_NAME;
+ private static final String TABLE_NAME_2 = "myTable2";
+ private static final String NEW_COLUMN_NAME = "NEWCOL";
+
+ @Test
+ public void testEmptyCatalog() {
+ CatalogSchemaDescriptor defaultSchema =
manager.schema(DEFAULT_SCHEMA_NAME, 0);
+
+ assertNotNull(defaultSchema);
+ assertSame(defaultSchema, manager.activeSchema(DEFAULT_SCHEMA_NAME,
clock.nowLong()));
+ assertSame(defaultSchema, manager.schema(0));
+ assertSame(defaultSchema, manager.activeSchema(clock.nowLong()));
+
+ assertNull(manager.schema(1));
+ assertThrows(IllegalStateException.class, () ->
manager.activeSchema(-1L));
+
+ // Validate default schema.
+ assertEquals(INITIAL_CAUSALITY_TOKEN, defaultSchema.updateToken());
+
+ // Default distribution zone must exists.
+ CatalogZoneDescriptor zone = manager.zone(DEFAULT_ZONE_NAME,
clock.nowLong());
+
+ assertNotNull(zone);
+
+ assertEquals(INITIAL_CAUSALITY_TOKEN, zone.updateToken());
+ }
+
+ @Test
+ public void testCreateTable() {
+ assertThat(
+ manager.execute(createTableCommand(
+ TABLE_NAME,
+ List.of(columnParams("key1", INT32),
columnParams("key2", INT32), columnParams("val", INT32, true)),
+ List.of("key1", "key2"),
+ List.of("key2")
+ )),
+ willBe(nullValue())
+ );
+
+ // Validate catalog version from the past.
+ CatalogSchemaDescriptor schema = manager.schema(0);
+
+ assertNotNull(schema);
+ assertEquals(SCHEMA_NAME, schema.name());
+ assertSame(schema, manager.activeSchema(123L));
+ assertEquals(INITIAL_CAUSALITY_TOKEN, schema.updateToken());
+
+ assertNull(schema.table(TABLE_NAME));
+ assertNull(manager.table(TABLE_NAME, 123L));
+
+ // Validate actual catalog.
+ schema = manager.schema(SCHEMA_NAME, 1);
+ CatalogTableDescriptor table = schema.table(TABLE_NAME);
+
+ assertNotNull(schema);
+ assertEquals(SCHEMA_NAME, schema.name());
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
+
+ assertSame(table, manager.table(TABLE_NAME, clock.nowLong()));
+ assertSame(table, manager.table(table.id(), clock.nowLong()));
+
+ // Validate newly created table.
+ assertEquals(TABLE_NAME, table.name());
+ assertTrue(table.updateToken() > INITIAL_CAUSALITY_TOKEN);
+ assertEquals(table.updateToken(), schema.updateToken());
+
+ // Validate another table creation.
+ assertThat(manager.execute(simpleTable(TABLE_NAME_2)),
willBe(nullValue()));
+
+ // Validate actual catalog. has both tables.
+ schema = manager.schema(2);
+ table = schema.table(TABLE_NAME);
+ CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
+
+ assertNotNull(schema);
+ assertEquals(SCHEMA_NAME, schema.name());
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
+
+ assertSame(table, manager.table(TABLE_NAME, clock.nowLong()));
+ assertSame(table, manager.table(table.id(), clock.nowLong()));
+
+ assertSame(table2, manager.table(TABLE_NAME_2, clock.nowLong()));
+ assertSame(table2, manager.table(table2.id(), clock.nowLong()));
+
+ assertNotSame(table, table2);
+
+ // Assert that causality token of the last update of table2 is greater
than for earlier created table.
+ assertTrue(table2.updateToken() > table.updateToken());
+
+ assertEquals(table2.updateToken(), schema.updateToken());
+ }
+
+ @Test
+ public void testDropTable() {
+ assertThat(manager.execute(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+ assertThat(manager.execute(simpleTable(TABLE_NAME_2)),
willBe(nullValue()));
+
+ long beforeDropTimestamp = clock.nowLong();
+
+ assertThat(manager.execute(dropTableCommand(TABLE_NAME)),
willBe(nullValue()));
+
+ // Validate catalog version from the past.
+ CatalogSchemaDescriptor schema = manager.schema(2);
+ CatalogTableDescriptor table1 = schema.table(TABLE_NAME);
+ CatalogTableDescriptor table2 = schema.table(TABLE_NAME_2);
+
+ assertNotEquals(table1.id(), table2.id());
+
+ assertNotNull(schema);
+ assertEquals(SCHEMA_NAME, schema.name());
+ assertSame(schema, manager.activeSchema(beforeDropTimestamp));
+
+ long causalityToken = schema.updateToken();
+ assertTrue(causalityToken > INITIAL_CAUSALITY_TOKEN);
+
+ assertSame(table1, manager.table(TABLE_NAME, beforeDropTimestamp));
+ assertSame(table1, manager.table(table1.id(), beforeDropTimestamp));
+
+ assertSame(table2, manager.table(TABLE_NAME_2, beforeDropTimestamp));
+ assertSame(table2, manager.table(table2.id(), beforeDropTimestamp));
+
+ // Validate actual catalog.
+ schema = manager.schema(3);
+
+ assertNotNull(schema);
+ assertEquals(SCHEMA_NAME, schema.name());
+ assertSame(schema, manager.activeSchema(clock.nowLong()));
+
+ assertNull(schema.table(TABLE_NAME));
+ assertNull(manager.table(TABLE_NAME, clock.nowLong()));
+ assertNull(manager.table(table1.id(), clock.nowLong()));
+
+ // Assert that drop table changes schema's last update token.
+ assertTrue(schema.updateToken() > causalityToken);
+ }
+
+ @Test
+ public void testAddColumn() {
+ assertThat(manager.execute(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+
+ long beforeAddedTimestamp = clock.nowLong();
+
+ assertThat(
+ manager.execute(addColumnParams(TABLE_NAME,
+ columnParamsBuilder(NEW_COLUMN_NAME, STRING, 11,
true).defaultValue(constant("Ignite!")).build()
+ )),
+ willBe(nullValue())
+ );
+
+ // Validate catalog version from the past.
+ CatalogSchemaDescriptor schema =
manager.activeSchema(beforeAddedTimestamp);
+ assertNotNull(schema);
+ CatalogTableDescriptor table = schema.table(TABLE_NAME);
+ assertNotNull(table);
+
+ long schemaCausalityToken = schema.updateToken();
+ assertTrue(schemaCausalityToken > INITIAL_CAUSALITY_TOKEN);
+ assertEquals(schemaCausalityToken, table.updateToken());
+
+ assertNull(schema.table(TABLE_NAME).column(NEW_COLUMN_NAME));
+
+ // Validate actual catalog.
+ schema = manager.activeSchema(clock.nowLong());
+ assertNotNull(schema);
+ table = schema.table(TABLE_NAME);
+ assertNotNull(table);
+
+ // Validate column descriptor.
+ CatalogTableColumnDescriptor column =
schema.table(TABLE_NAME).column(NEW_COLUMN_NAME);
+ assertEquals(NEW_COLUMN_NAME, column.name());
+
+ // Assert that schema's and table's update token was updated after
adding a column.
+ assertTrue(schema.updateToken() > schemaCausalityToken);
+ assertEquals(schema.updateToken(), table.updateToken());
+ }
+
+ @Test
+ public void testDropColumn() {
+ assertThat(manager.execute(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+
+ long beforeAddedTimestamp = clock.nowLong();
+
+ assertThat(manager.execute(dropColumnParams(TABLE_NAME, "VAL")),
willBe(nullValue()));
+
+ // Validate catalog version from the past.
+ CatalogSchemaDescriptor schema =
manager.activeSchema(beforeAddedTimestamp);
+ assertNotNull(schema);
+ CatalogTableDescriptor table = schema.table(TABLE_NAME);
+ assertNotNull(table);
+
+ long schemaCausalityToken = schema.updateToken();
+ assertTrue(schemaCausalityToken > INITIAL_CAUSALITY_TOKEN);
+ assertEquals(schemaCausalityToken, table.updateToken());
+
+ assertNotNull(schema.table(TABLE_NAME).column("VAL"));
+
+ // Validate actual catalog.
+ schema = manager.activeSchema(clock.nowLong());
+ assertNotNull(schema);
+ table = schema.table(TABLE_NAME);
+ assertNotNull(table);
+
+ assertNull(schema.table(TABLE_NAME).column("VAL"));
+
+ // Assert that schema's and table's update token was updated after
dropping a column.
+ assertTrue(schema.updateToken() > schemaCausalityToken);
+ assertEquals(schema.updateToken(), table.updateToken());
+ }
+
+ @Test
+ public void testCreateHashIndex() {
+ assertThat(manager.execute(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+
+ assertThat(manager.execute(createHashIndexCommand(INDEX_NAME,
List.of("VAL", "ID"))), willBe(nullValue()));
+
+ // Validate catalog version from the past.
+ CatalogSchemaDescriptor schema = manager.schema(1);
+
+ assertNotNull(schema);
+ assertNull(schema.index(INDEX_NAME));
+ assertNull(manager.index(INDEX_NAME, 123L));
+
+ long schemaCausalityToken = schema.updateToken();
+
+ assertTrue(schemaCausalityToken > INITIAL_CAUSALITY_TOKEN);
+
+ // Validate actual catalog.
+ schema = manager.schema(2);
+
+ CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor)
schema.index(INDEX_NAME);
+
+ assertNotNull(schema);
+ assertSame(index, manager.index(INDEX_NAME, clock.nowLong()));
+ assertSame(index, manager.index(index.id(), clock.nowLong()));
+ assertTrue(schema.updateToken() > schemaCausalityToken);
+
+ // Validate newly created hash index.
+ assertEquals(INDEX_NAME, index.name());
+ assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
+ assertEquals(schema.updateToken(), index.updateToken());
+ }
+
+ @Test
+ public void testCreateSortedIndex() {
+ assertThat(manager.execute(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+
+ CatalogCommand command = createSortedIndexCommand(
+ INDEX_NAME,
+ true,
+ List.of("VAL", "ID"),
+ List.of(DESC_NULLS_FIRST, ASC_NULLS_LAST)
+ );
+
+ assertThat(manager.execute(command), willBe(nullValue()));
+
+ // Validate catalog version from the past.
+ CatalogSchemaDescriptor schema = manager.schema(1);
+
+ assertNotNull(schema);
+ assertNull(schema.index(INDEX_NAME));
+ assertNull(manager.index(INDEX_NAME, 123L));
+
+ long schemaCausalityToken = schema.updateToken();
+ assertTrue(schemaCausalityToken > INITIAL_CAUSALITY_TOKEN);
+
+ // Validate actual catalog.
+ schema = manager.schema(2);
+
+ CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor)
schema.index(INDEX_NAME);
+
+ assertNotNull(schema);
+ assertSame(index, manager.index(INDEX_NAME, clock.nowLong()));
+ assertSame(index, manager.index(index.id(), clock.nowLong()));
+ assertTrue(schema.updateToken() > schemaCausalityToken);
+
+ // Validate newly created sorted index.
+ assertEquals(INDEX_NAME, index.name());
+ assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
+ assertEquals(schema.updateToken(), index.updateToken());
+ }
+
+ @Test
+ public void testCreateZone() {
+ String zoneName = ZONE_NAME + 1;
+
+ CreateZoneParams params =
CreateZoneParams.builder().zoneName(zoneName).build();
+
+ assertThat(manager.createZone(params), willCompleteSuccessfully());
+
+ // Validate catalog version from the past.
+ assertNull(manager.zone(zoneName, 0));
+ assertNull(manager.zone(zoneName, 123L));
+
+ // Validate actual catalog.
+ CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong());
+
+ assertNotNull(zone);
+ assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
+
+ // Validate newly created zone.
+ assertEquals(zoneName, zone.name());
+ assertTrue(zone.updateToken() > INITIAL_CAUSALITY_TOKEN);
+ }
+
+ @Test
+ public void testRenameZone() {
+ String zoneName = ZONE_NAME + 1;
+
+ CreateZoneParams createParams =
CreateZoneParams.builder().zoneName(zoneName).build();
+
+ assertThat(manager.createZone(createParams),
willCompleteSuccessfully());
+
+ long beforeDropTimestamp = clock.nowLong();
+
+ String newZoneName = "RenamedZone";
+
+ RenameZoneParams renameZoneParams = RenameZoneParams.builder()
+ .zoneName(zoneName)
+ .newZoneName(newZoneName)
+ .build();
+
+ assertThat(manager.renameZone(renameZoneParams),
willCompleteSuccessfully());
+
+ // Validate catalog version from the past.
+ CatalogZoneDescriptor zone = manager.zone(zoneName,
beforeDropTimestamp);
+
+ assertNotNull(zone);
+ assertEquals(zoneName, zone.name());
+
+ assertSame(zone, manager.zone(zone.id(), beforeDropTimestamp));
+ long causalityToken = zone.updateToken();
+ assertTrue(causalityToken > INITIAL_CAUSALITY_TOKEN);
+
+ // Validate actual catalog.
+ zone = manager.zone(newZoneName, clock.nowLong());
+
+ assertNotNull(zone);
+ assertNull(manager.zone(zoneName, clock.nowLong()));
+ assertEquals(newZoneName, zone.name());
+
+ assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
+ // Assert that renaming of a zone updates token.
+ assertTrue(zone.updateToken() > causalityToken);
+ }
+
+ @Test
+ public void testAlterZone() {
+ String zoneName = ZONE_NAME + 1;
+
+ CreateZoneParams createParams = CreateZoneParams.builder()
+ .zoneName(zoneName)
+ .filter("expression")
+ .build();
+
+ AlterZoneParams alterZoneParams = AlterZoneParams.builder()
+ .zoneName(zoneName)
+ .dataNodesAutoAdjustScaleUp(3)
+ .dataNodesAutoAdjustScaleDown(4)
+ .filter("newExpression")
+ .build();
+
+ assertThat(manager.createZone(createParams),
willCompleteSuccessfully());
+ CatalogZoneDescriptor zone = manager.zone(zoneName, clock.nowLong());
+ assertNotNull(zone);
+ long causalityToken = zone.updateToken();
+ assertTrue(causalityToken > INITIAL_CAUSALITY_TOKEN);
+
+ assertThat(manager.alterZone(alterZoneParams),
willCompleteSuccessfully());
+
+ // Validate actual catalog.
+ zone = manager.zone(zoneName, clock.nowLong());
+ assertNotNull(zone);
+ assertSame(zone, manager.zone(zone.id(), clock.nowLong()));
+
+ assertEquals(zoneName, zone.name());
+ assertEquals(3, zone.dataNodesAutoAdjustScaleUp());
+ assertEquals(4, zone.dataNodesAutoAdjustScaleDown());
+ assertEquals("newExpression", zone.filter());
+ // Assert that altering of a zone updates token.
+ assertTrue(zone.updateToken() > causalityToken);
+ }
+}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
index c76af57621..67de39a298 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogManagerSelfTest.java
@@ -1894,23 +1894,6 @@ public class CatalogManagerSelfTest extends
BaseCatalogManagerTest {
return manager.execute(builder.build());
}
- private CatalogCommand simpleTable(String name) {
- List<ColumnParams> cols = List.of(
- columnParams("ID", INT32),
- columnParamsBuilder("VAL", INT32,
true).defaultValue(constant(null)).build(),
- columnParamsBuilder("VAL_NOT_NULL",
INT32).defaultValue(constant(1)).build(),
- columnParams("DEC", DECIMAL, true, 11, 2),
- columnParams("STR", STRING, 101, true),
- columnParamsBuilder("DEC_SCALE",
DECIMAL).precision(12).scale(3).build()
- );
-
- return simpleTable(name, cols);
- }
-
- private CatalogCommand simpleTable(String tableName, List<ColumnParams>
cols) {
- return createTableCommand(tableName, cols,
List.of(cols.get(0).name()), List.of(cols.get(0).name()));
- }
-
private static CatalogCommand simpleIndex() {
return createSortedIndexCommand(INDEX_NAME, List.of("VAL"),
List.of(ASC_NULLS_LAST));
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
index 856e0d5e0b..48ee7f9dbb 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogSystemViewTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.catalog;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static
org.apache.ignite.internal.catalog.CatalogService.SYSTEM_SCHEMA_NAME;
import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
@@ -32,6 +33,7 @@ import static
org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -116,6 +118,38 @@ public class CatalogSystemViewTest extends
BaseCatalogManagerTest {
assertEquals(STRING, col2.type());
}
+ @ParameterizedTest
+ @EnumSource(SystemViewType.class)
+ public void testCreateSystemViewUpdatesDescriptorToken(SystemViewType
type) {
+ CreateSystemViewCommand command = CreateSystemViewCommand.builder()
+ .name(SYS_VIEW_NAME)
+ .columns(List.of(
+
ColumnParams.builder().name("col1").type(INT32).build(),
+
ColumnParams.builder().name("col2").type(STRING).length(1 << 5).build()
+ ))
+ .type(type)
+ .build();
+
+ CatalogSchemaDescriptor schema = manager.activeSchema(clock.nowLong());
+ assertNotNull(schema);
+ assertEquals(INITIAL_CAUSALITY_TOKEN, schema.updateToken());
+
+ assertThat(manager.execute(command), willCompleteSuccessfully());
+
+ int catalogVersion = manager.latestCatalogVersion();
+
+ CatalogSchemaDescriptor systemSchema =
manager.schema(SYSTEM_SCHEMA_NAME, catalogVersion);
+ assertNotNull(systemSchema, "systemSchema");
+
+ schema = manager.activeSchema(clock.nowLong());
+ assertNotNull(schema);
+ long schemaCausalityToken = schema.updateToken();
+ assertEquals(INITIAL_CAUSALITY_TOKEN, schemaCausalityToken);
+
+ // Assert that creation of the system view updates token for the
descriptor.
+ assertTrue(systemSchema.updateToken() > schemaCausalityToken);
+ }
+
@ParameterizedTest
@EnumSource(SystemViewModification.class)
public void
testCreateSystemViewReplacesExistingViewWhenViewChanges(SystemViewModification
systemViewModification) {
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
index f6dbf3b134..5f8181bec1 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/commands/AbstractCommandValidationTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.catalog.commands;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static org.apache.ignite.sql.ColumnType.INT32;
import java.util.List;
@@ -115,7 +116,7 @@ abstract class AbstractCommandValidationTest extends
BaseIgniteAbstractTest {
for (CatalogCommand command : commandsToApply) {
for (UpdateEntry updates : command.get(catalog)) {
- catalog = updates.applyUpdate(catalog);
+ catalog = updates.applyUpdate(catalog,
INITIAL_CAUSALITY_TOKEN);
}
}
@@ -135,7 +136,8 @@ abstract class AbstractCommandValidationTest extends
BaseIgniteAbstractTest {
SCHEMA_NAME,
tables,
indexes,
- systemViews
+ systemViews,
+ INITIAL_CAUSALITY_TOKEN
))
);
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index 5284c3ef18..f08c469409 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -33,6 +33,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicLong;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.storage.UpdateLog.OnUpdateHandler;
import org.apache.ignite.internal.lang.IgniteInternalException;
@@ -234,6 +235,34 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
assertTrue(waitForCondition(() -> metastore.appliedRevision() >
metastoreRevision, 200));
}
+ @Test
+ void testOnUpdateHandlerUsesCausalityTokenFromMetastore() throws Exception
{
+ CompletableFuture<Void> onUpdateHandlerFuture = new
CompletableFuture<>();
+
+ AtomicLong causalityTokenFromHandler = new AtomicLong(-1L);
+
+ UpdateLog updateLog = createAndStartUpdateLogImpl((update,
metaStorageUpdateTimestamp, causalityToken) -> {
+ causalityTokenFromHandler.set(causalityToken);
+
+ return onUpdateHandlerFuture;
+ });
+
+ assertThat(metastore.deployWatches(), willCompleteSuccessfully());
+
+ long metastoreRevision = metastore.appliedRevision();
+
+ assertThat(updateLog.append(singleEntryUpdateOfVersion(1)),
willCompleteSuccessfully());
+
+ // Let's make sure that the metastore revision will not increase until
onUpdateHandlerFuture is completed.
+ assertFalse(waitForCondition(() -> metastore.appliedRevision() >
metastoreRevision, 200));
+
+ // Let's make sure that the metastore revision increases after
completing onUpdateHandlerFuture.
+ onUpdateHandlerFuture.complete(null);
+
+ // Assert that causality token from OnUpdateHandler is the same as the
revision of this update in metastorage.
+ assertTrue(waitForCondition(() -> metastore.appliedRevision() ==
causalityTokenFromHandler.get(), 200));
+ }
+
private static VersionedUpdate singleEntryUpdateOfVersion(int version) {
return new VersionedUpdate(version, 1, List.of(new
TestUpdateEntry("foo_" + version)));
}
@@ -248,7 +277,7 @@ class UpdateLogImplTest extends BaseIgniteAbstractTest {
}
@Override
- public Catalog applyUpdate(Catalog catalog) {
+ public Catalog applyUpdate(Catalog catalog, long causalityToken) {
return catalog;
}
diff --git
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
index f3d4e7bbab..fc335ff292 100644
---
a/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
+++
b/modules/catalog/src/testFixtures/java/org/apache/ignite/internal/catalog/BaseCatalogManagerTest.java
@@ -19,7 +19,13 @@ package org.apache.ignite.internal.catalog;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_ZONE_NAME;
+import static org.apache.ignite.internal.catalog.CatalogTestUtils.columnParams;
+import static
org.apache.ignite.internal.catalog.CatalogTestUtils.columnParamsBuilder;
+import static
org.apache.ignite.internal.catalog.commands.DefaultValue.constant;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.sql.ColumnType.DECIMAL;
+import static org.apache.ignite.sql.ColumnType.INT32;
+import static org.apache.ignite.sql.ColumnType.STRING;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.mockito.Mockito.spy;
@@ -163,4 +169,21 @@ public abstract class BaseCatalogManagerTest extends
BaseIgniteAbstractTest {
.primaryKeyColumns(primaryKeys)
.colocationColumns(colocationColumns);
}
+
+ protected CatalogCommand simpleTable(String name) {
+ List<ColumnParams> cols = List.of(
+ columnParams("ID", INT32),
+ columnParamsBuilder("VAL", INT32,
true).defaultValue(constant(null)).build(),
+ columnParamsBuilder("VAL_NOT_NULL",
INT32).defaultValue(constant(1)).build(),
+ columnParams("DEC", DECIMAL, true, 11, 2),
+ columnParams("STR", STRING, 101, true),
+ columnParamsBuilder("DEC_SCALE",
DECIMAL).precision(12).scale(3).build()
+ );
+
+ return simpleTable(name, cols);
+ }
+
+ protected CatalogCommand simpleTable(String tableName, List<ColumnParams>
cols) {
+ return createTableCommand(tableName, cols,
List.of(cols.get(0).name()), List.of(cols.get(0).name()));
+ }
}
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
index faef22168c..5dea3ed336 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtilUpdateAssignmentsTest.java
@@ -19,6 +19,7 @@ package
org.apache.ignite.internal.distributionzones.rebalance;
import static java.util.stream.Collectors.toSet;
import static
org.apache.ignite.internal.affinity.AffinityUtils.calculateAssignmentForPartition;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static org.apache.ignite.internal.util.ByteUtils.toBytes;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -91,7 +92,8 @@ public class RebalanceUtilUpdateAssignmentsTest extends
IgniteAbstractTest {
1,
List.of(new CatalogTableColumnDescriptor("k1", ColumnType.INT32,
false, 0, 0, 0, null)),
List.of("k1"),
- null
+ null,
+ INITIAL_CAUSALITY_TOKEN
);
private static final int partNum = 2;
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
index 3abf1c584c..2896e9d2fa 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/CatalogSchemaManagerTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.schema;
import static java.nio.charset.StandardCharsets.UTF_8;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureCompletedMatcher.completedFuture;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willTimeoutFast;
@@ -172,7 +173,7 @@ class CatalogSchemaManagerTest extends
BaseIgniteAbstractTest {
new CatalogTableColumnDescriptor("v1", ColumnType.INT32,
false, 0, 0, 0, null)
);
CatalogTableDescriptor tableDescriptor = new CatalogTableDescriptor(
- TABLE_ID, -1, TABLE_NAME, 0, 1, columns, List.of("k1", "k2"),
null
+ TABLE_ID, -1, TABLE_NAME, 0, 1, columns, List.of("k1", "k2"),
null, INITIAL_CAUSALITY_TOKEN
);
CompletableFuture<Boolean> future = tableCreatedListener()
@@ -242,7 +243,7 @@ class CatalogSchemaManagerTest extends
BaseIgniteAbstractTest {
new CatalogTableColumnDescriptor("v2", ColumnType.STRING,
false, 0, 0, 0, null)
);
- return new CatalogTableDescriptor(TABLE_ID, -1, TABLE_NAME, 0, 2,
columns, List.of("k1", "k2"), null);
+ return new CatalogTableDescriptor(TABLE_ID, -1, TABLE_NAME, 0, 2,
columns, List.of("k1", "k2"), null, INITIAL_CAUSALITY_TOKEN);
}
private void completeCausalityToken(long causalityToken) {
@@ -278,7 +279,7 @@ class CatalogSchemaManagerTest extends
BaseIgniteAbstractTest {
new CatalogTableColumnDescriptor("k2", ColumnType.STRING,
false, 0, 0, 0, null)
);
- return new CatalogTableDescriptor(TABLE_ID, -1, TABLE_NAME, 0, 2,
columns, List.of("k1", "k2"), null);
+ return new CatalogTableDescriptor(TABLE_ID, -1, TABLE_NAME, 0, 2,
columns, List.of("k1", "k2"), null, INITIAL_CAUSALITY_TOKEN);
}
@Test
@@ -316,7 +317,7 @@ class CatalogSchemaManagerTest extends
BaseIgniteAbstractTest {
new CatalogTableColumnDescriptor("v1", ColumnType.INT64,
false, 0, 0, 0, null)
);
- return new CatalogTableDescriptor(TABLE_ID, -1, TABLE_NAME, 0, 2,
columns, List.of("k1", "k2"), null);
+ return new CatalogTableDescriptor(TABLE_ID, -1, TABLE_NAME, 0, 2,
columns, List.of("k1", "k2"), null, INITIAL_CAUSALITY_TOKEN);
}
@Test
diff --git
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
index 55ca4ffbec..9a86b052da 100644
---
a/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
+++
b/modules/schema/src/test/java/org/apache/ignite/internal/schema/catalog/CatalogToSchemaDescriptorConverterTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.schema.catalog;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static org.apache.ignite.internal.schema.SchemaTestUtils.specToType;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.equalTo;
@@ -140,7 +141,8 @@ public class CatalogToSchemaDescriptorConverterTest extends
AbstractSchemaConver
new CatalogTableColumnDescriptor("K1",
ColumnType.INT32, false, 0, 0, 0, null)
),
List.of("K1", "K2"),
- List.of("K2")
+ List.of("K2"),
+ INITIAL_CAUSALITY_TOKEN
);
SchemaDescriptor schema =
CatalogToSchemaDescriptorConverter.convert(tableDescriptor);
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
index 2cf970e427..ad6163ecc0 100644
---
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/schema/CatalogSqlSchemaManagerTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.sql.engine.schema;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_SCHEMA_NAME;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
@@ -601,7 +602,14 @@ public class CatalogSqlSchemaManagerTest extends
BaseIgniteAbstractTest {
CatalogIndexDescriptor[] indexesArray =
indexDescriptorMap.values().toArray(new CatalogIndexDescriptor[0]);
CatalogSystemViewDescriptor[] systemViewsArray =
systemViewDescriptorMap.values().toArray(new CatalogSystemViewDescriptor[0]);
- return new CatalogSchemaDescriptor(ID.incrementAndGet(), name,
tablesArray, indexesArray, systemViewsArray);
+ return new CatalogSchemaDescriptor(
+ ID.incrementAndGet(),
+ name,
+ tablesArray,
+ indexesArray,
+ systemViewsArray,
+ INITIAL_CAUSALITY_TOKEN
+ );
}
}
@@ -692,7 +700,8 @@ public class CatalogSqlSchemaManagerTest extends
BaseIgniteAbstractTest {
CatalogTableDescriptor.INITIAL_TABLE_VERSION,
columnDescriptors,
primaryKey,
- colocationKey
+ colocationKey,
+ INITIAL_CAUSALITY_TOKEN
);
}
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
index 356e20db5b..24e1be2078 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvTableStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static
org.apache.ignite.internal.catalog.descriptors.CatalogColumnCollation.ASC_NULLS_LAST;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
import static
org.apache.ignite.internal.storage.MvPartitionStorage.REBALANCE_IN_PROGRESS;
@@ -780,7 +781,8 @@ public abstract class AbstractMvTableStorageTest extends
BaseMvStoragesTest {
CatalogUtils.fromParams(ColumnParams.builder().name("STRVAL").length(100).type(STRING).build())
),
List.of("INTKEY"),
- null
+ null,
+ INITIAL_CAUSALITY_TOKEN
);
CatalogSortedIndexDescriptor sortedIndex = new
CatalogSortedIndexDescriptor(
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
index 37b27a7870..042b110214 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/index/AbstractIndexStorageTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.index;
import static java.util.stream.Collectors.toList;
import static java.util.stream.Collectors.toUnmodifiableList;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static
org.apache.ignite.internal.storage.BaseMvStoragesTest.getOrCreateMvPartition;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.is;
@@ -158,7 +159,8 @@ public abstract class AbstractIndexStorageTest<S extends
IndexStorage, D extends
1,
Stream.concat(Stream.of(pkColumn),
ALL_TYPES_COLUMN_PARAMS.stream()).map(CatalogUtils::fromParams).collect(toList()),
List.of("pk"),
- null
+ null,
+ INITIAL_CAUSALITY_TOKEN
);
when(catalogService.table(eq(TABLE_NAME),
anyLong())).thenReturn(tableDescriptor);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
index abc281d7a4..15060ef49e 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerTest.java
@@ -20,6 +20,7 @@ package
org.apache.ignite.internal.table.distributed.replication;
import static java.util.Collections.singletonList;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.stream.Collectors.toList;
+import static
org.apache.ignite.internal.catalog.CatalogManagerImpl.INITIAL_CAUSALITY_TOKEN;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_PARTITION_COUNT;
import static
org.apache.ignite.internal.hlc.HybridTimestamp.hybridTimestampToLong;
import static org.apache.ignite.internal.schema.BinaryRowMatcher.equalToRow;
@@ -323,7 +324,8 @@ public class PartitionReplicaListenerTest extends
IgniteAbstractTest {
new CatalogTableColumnDescriptor("strVal",
ColumnType.STRING, false, 0, 0, 0, null)
),
List.of("intKey", "strKey"),
- null
+ null,
+ INITIAL_CAUSALITY_TOKEN
);
/** Partition replication listener to test. */