This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch catalog-feature
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/catalog-feature by this push:
new d94db48177 IGNITE-19798 Add functionality to the catalog to switch the
IndexManager to catalog events (#2247)
d94db48177 is described below
commit d94db481775e7e2d4bfcc9d596850c2475b2b294
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Mon Jun 26 12:22:14 2023 +0300
IGNITE-19798 Add functionality to the catalog to switch the IndexManager to
catalog events (#2247)
---
.../ignite/internal/catalog/CatalogService.java | 2 +
.../internal/catalog/CatalogServiceImpl.java | 46 ++++--
.../catalog/events/AddColumnEventParameters.java | 10 +-
.../catalog/events/AlterColumnEventParameters.java | 5 +-
.../catalog/events/AlterZoneEventParameters.java | 5 +-
.../catalog/events/CatalogEventParameters.java | 14 +-
.../catalog/events/CreateIndexEventParameters.java | 5 +-
.../catalog/events/CreateTableEventParameters.java | 5 +-
.../catalog/events/CreateZoneEventParameters.java | 5 +-
.../catalog/events/DropColumnEventParameters.java | 5 +-
.../catalog/events/DropIndexEventParameters.java | 14 +-
.../catalog/events/DropTableEventParameters.java | 5 +-
.../catalog/events/DropZoneEventParameters.java | 5 +-
.../internal/catalog/storage/AlterColumnEntry.java | 4 +-
.../internal/catalog/storage/AlterZoneEntry.java | 4 +-
.../internal/catalog/storage/DropColumnsEntry.java | 4 +-
.../internal/catalog/storage/DropIndexEntry.java | 15 +-
.../internal/catalog/storage/DropTableEntry.java | 4 +-
.../internal/catalog/storage/DropZoneEntry.java | 4 +-
.../ignite/internal/catalog/storage/Fireable.java | 3 +-
.../internal/catalog/storage/NewColumnsEntry.java | 4 +-
.../internal/catalog/storage/NewIndexEntry.java | 4 +-
.../internal/catalog/storage/NewTableEntry.java | 4 +-
.../internal/catalog/storage/NewZoneEntry.java | 4 +-
.../ignite/internal/catalog/storage/UpdateLog.java | 3 +-
.../internal/catalog/storage/UpdateLogImpl.java | 6 +-
.../internal/catalog/CatalogServiceSelfTest.java | 171 +++++++++++++++++----
.../catalog/storage/UpdateLogImplTest.java | 49 +++---
28 files changed, 310 insertions(+), 99 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
index b078bbd3c5..c7364ebea3 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogService.java
@@ -44,6 +44,8 @@ public interface CatalogService {
CatalogTableDescriptor table(int tableId, long timestamp);
+ CatalogTableDescriptor table(int tableId, int catalogVersion);
+
CatalogIndexDescriptor index(String indexName, long timestamp);
CatalogIndexDescriptor index(int indexId, long timestamp);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index ace40aa938..22d3563dc5 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -191,6 +191,11 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
return catalogAt(timestamp).table(tableId);
}
+ @Override
+ public CatalogTableDescriptor table(int tableId, int catalogVersion) {
+ return catalog(catalogVersion).table(tableId);
+ }
+
@Override
public CatalogIndexDescriptor index(String indexName, long timestamp) {
return
catalogAt(timestamp).schema(DEFAULT_SCHEMA_NAME).index(indexName);
@@ -275,11 +280,16 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
CatalogZoneDescriptor zone = getZone(catalog,
Objects.requireNonNullElse(params.zone(), DEFAULT_ZONE_NAME));
- CatalogTableDescriptor table =
CatalogUtils.fromParams(catalog.objectIdGenState(), zone.id(), params);
+ int id = catalog.objectIdGenState();
+
+ CatalogTableDescriptor table = CatalogUtils.fromParams(id++,
zone.id(), params);
+
+ CatalogHashIndexDescriptor pkIndex =
createHashIndexDescriptor(table, id++, createPkIndexParams(params));
return List.of(
new NewTableEntry(table),
- new ObjectIdGenUpdateEntry(1)
+ new NewIndexEntry(pkIndex),
+ new ObjectIdGenUpdateEntry(id - catalog.objectIdGenState())
);
});
}
@@ -295,7 +305,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
Arrays.stream(schema.indexes())
.filter(index -> index.tableId() == table.id())
- .forEach(index -> updateEntries.add(new
DropIndexEntry(index.id())));
+ .forEach(index -> updateEntries.add(new
DropIndexEntry(index.id(), index.tableId())));
updateEntries.add(new DropTableEntry(table.id()));
@@ -386,9 +396,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
CatalogTableDescriptor table = getTable(schema,
params.tableName());
- validateCreateHashIndexParams(params, table);
-
- CatalogHashIndexDescriptor index =
CatalogUtils.fromParams(catalog.objectIdGenState(), table.id(), params);
+ CatalogHashIndexDescriptor index =
createHashIndexDescriptor(table, catalog.objectIdGenState(), params);
return List.of(
new NewIndexEntry(index),
@@ -431,7 +439,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
}
return List.of(
- new DropIndexEntry(index.id())
+ new DropIndexEntry(index.id(), index.tableId())
);
});
}
@@ -615,7 +623,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
class OnUpdateHandlerImpl implements OnUpdateHandler {
@Override
- public void handle(VersionedUpdate update, HybridTimestamp
metaStorageUpdateTimestamp) {
+ public void handle(VersionedUpdate update, HybridTimestamp
metaStorageUpdateTimestamp, long causalityToken) {
int version = update.version();
Catalog catalog = catalogByVer.get(version - 1);
@@ -637,7 +645,7 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
eventFutures.add(fireEvent(
fireEvent.eventType(),
- fireEvent.createEventParameters(version)
+ fireEvent.createEventParameters(causalityToken,
version)
));
}
}
@@ -877,4 +885,24 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
throwUnsupportedDdl("Cannot decrease precision to {} for column
'{}'.", target.precision(), origin.name());
}
}
+
+ private static CreateHashIndexParams createPkIndexParams(CreateTableParams
params) {
+ return CreateHashIndexParams.builder()
+ .schemaName(params.schemaName())
+ .tableName(params.tableName())
+ .indexName(params.tableName() + "_PK")
+ .columns(params.primaryKeyColumns())
+ .unique()
+ .build();
+ }
+
+ private static CatalogHashIndexDescriptor createHashIndexDescriptor(
+ CatalogTableDescriptor table,
+ int indexId,
+ CreateHashIndexParams params
+ ) {
+ validateCreateHashIndexParams(params, table);
+
+ return CatalogUtils.fromParams(indexId, table.id(), params);
+ }
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
index ef43a835f9..6f0226d80f 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AddColumnEventParameters.java
@@ -32,11 +32,17 @@ public class AddColumnEventParameters extends
CatalogEventParameters {
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param tableId An id of table, which columns are added to.
* @param columnDescriptors New columns descriptors.
*/
- public AddColumnEventParameters(long causalityToken, int tableId,
List<CatalogTableColumnDescriptor> columnDescriptors) {
- super(causalityToken);
+ public AddColumnEventParameters(
+ long causalityToken,
+ int catalogVersion,
+ int tableId,
+ List<CatalogTableColumnDescriptor> columnDescriptors
+ ) {
+ super(causalityToken, catalogVersion);
this.tableId = tableId;
this.columnDescriptors = columnDescriptors;
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
index 525b8ae06c..fdd124198d 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterColumnEventParameters.java
@@ -32,11 +32,12 @@ public class AlterColumnEventParameters extends
CatalogEventParameters {
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param tableId Returns an id the table to be modified.
* @param columnDescriptor Descriptor for the column to be replaced.
*/
- public AlterColumnEventParameters(long causalityToken, int tableId,
CatalogTableColumnDescriptor columnDescriptor) {
- super(causalityToken);
+ public AlterColumnEventParameters(long causalityToken, int catalogVersion,
int tableId, CatalogTableColumnDescriptor columnDescriptor) {
+ super(causalityToken, catalogVersion);
this.tableId = tableId;
this.columnDescriptor = columnDescriptor;
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java
index d4895e8a58..7fe598fa19 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/AlterZoneEventParameters.java
@@ -30,10 +30,11 @@ public class AlterZoneEventParameters extends
CatalogEventParameters {
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param zoneDescriptor Newly created distribution zone descriptor.
*/
- public AlterZoneEventParameters(long causalityToken, CatalogZoneDescriptor
zoneDescriptor) {
- super(causalityToken);
+ public AlterZoneEventParameters(long causalityToken, int catalogVersion,
CatalogZoneDescriptor zoneDescriptor) {
+ super(causalityToken, catalogVersion);
this.zoneDescriptor = zoneDescriptor;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
index 11c03c885e..a6ec91e7de 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CatalogEventParameters.java
@@ -23,12 +23,24 @@ import org.apache.ignite.internal.manager.EventParameters;
* Base class for Catalog event parameters.
*/
public abstract class CatalogEventParameters extends EventParameters {
+ private final int catalogVersion;
+
/**
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
*/
- public CatalogEventParameters(long causalityToken) {
+ public CatalogEventParameters(long causalityToken, int catalogVersion) {
super(causalityToken);
+
+ this.catalogVersion = catalogVersion;
+ }
+
+ /**
+ * Returns catalog version.
+ */
+ public int catalogVersion() {
+ return catalogVersion;
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
index 5efa219f66..64ee3e619f 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateIndexEventParameters.java
@@ -30,10 +30,11 @@ public class CreateIndexEventParameters extends
CatalogEventParameters {
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param indexDescriptor Newly created index descriptor.
*/
- public CreateIndexEventParameters(long causalityToken,
CatalogIndexDescriptor indexDescriptor) {
- super(causalityToken);
+ public CreateIndexEventParameters(long causalityToken, int catalogVersion,
CatalogIndexDescriptor indexDescriptor) {
+ super(causalityToken, catalogVersion);
this.indexDescriptor = indexDescriptor;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
index f0a2329a45..a8cd6f6df3 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateTableEventParameters.java
@@ -30,10 +30,11 @@ public class CreateTableEventParameters extends
CatalogEventParameters {
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param tableDescriptor Newly created table descriptor.
*/
- public CreateTableEventParameters(long causalityToken,
CatalogTableDescriptor tableDescriptor) {
- super(causalityToken);
+ public CreateTableEventParameters(long causalityToken, int catalogVersion,
CatalogTableDescriptor tableDescriptor) {
+ super(causalityToken, catalogVersion);
this.tableDescriptor = tableDescriptor;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateZoneEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateZoneEventParameters.java
index ff7101d1c8..9e26910200 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateZoneEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/CreateZoneEventParameters.java
@@ -30,10 +30,11 @@ public class CreateZoneEventParameters extends
CatalogEventParameters {
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param zoneDescriptor Newly created distribution zone descriptor.
*/
- public CreateZoneEventParameters(long causalityToken,
CatalogZoneDescriptor zoneDescriptor) {
- super(causalityToken);
+ public CreateZoneEventParameters(long causalityToken, int catalogVersion,
CatalogZoneDescriptor zoneDescriptor) {
+ super(causalityToken, catalogVersion);
this.zoneDescriptor = zoneDescriptor;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
index ddf491d064..46a17becb5 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropColumnEventParameters.java
@@ -31,11 +31,12 @@ public class DropColumnEventParameters extends
CatalogEventParameters {
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param tableId An id of table, which columns are dropped from.
* @param columns Names of columns to drop.
*/
- public DropColumnEventParameters(long causalityToken, int tableId,
Collection<String> columns) {
- super(causalityToken);
+ public DropColumnEventParameters(long causalityToken, int catalogVersion,
int tableId, Collection<String> columns) {
+ super(causalityToken, catalogVersion);
this.tableId = tableId;
this.columns = columns;
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
index 135d46a144..96b0d16942 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropIndexEventParameters.java
@@ -24,20 +24,30 @@ public class DropIndexEventParameters extends
CatalogEventParameters {
private final int indexId;
+ private final int tableId;
+
/**
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param indexId An id of dropped index.
+ * @param tableId Table ID for which the index was removed.
*/
- public DropIndexEventParameters(long causalityToken, int indexId) {
- super(causalityToken);
+ public DropIndexEventParameters(long causalityToken, int catalogVersion,
int indexId, int tableId) {
+ super(causalityToken, catalogVersion);
this.indexId = indexId;
+ this.tableId = tableId;
}
/** Returns an id of dropped index. */
public int indexId() {
return indexId;
}
+
+ /** Returns table ID for which the index was removed. */
+ public int tableId() {
+ return tableId;
+ }
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
index 71c2e126fe..241e849a2d 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropTableEventParameters.java
@@ -28,10 +28,11 @@ public class DropTableEventParameters extends
CatalogEventParameters {
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param tableId An id of dropped table.
*/
- public DropTableEventParameters(long causalityToken, int tableId) {
- super(causalityToken);
+ public DropTableEventParameters(long causalityToken, int catalogVersion,
int tableId) {
+ super(causalityToken, catalogVersion);
this.tableId = tableId;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropZoneEventParameters.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropZoneEventParameters.java
index 446257190b..c08faba607 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropZoneEventParameters.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/events/DropZoneEventParameters.java
@@ -28,10 +28,11 @@ public class DropZoneEventParameters extends
CatalogEventParameters {
* Constructor.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
* @param zoneId An id of dropped distribution zone.
*/
- public DropZoneEventParameters(long causalityToken, int zoneId) {
- super(causalityToken);
+ public DropZoneEventParameters(long causalityToken, int catalogVersion,
int zoneId) {
+ super(causalityToken, catalogVersion);
this.zoneId = zoneId;
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index b29310e45e..910a363d81 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -69,8 +69,8 @@ public class AlterColumnEntry implements UpdateEntry,
Fireable {
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new AlterColumnEventParameters(causalityToken, tableId, column);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new AlterColumnEventParameters(causalityToken, catalogVersion,
tableId, column);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
index cf11a3400f..c79f366333 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
@@ -54,8 +54,8 @@ public class AlterZoneEntry implements UpdateEntry, Fireable {
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new AlterZoneEventParameters(causalityToken, descriptor);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new AlterZoneEventParameters(causalityToken, catalogVersion,
descriptor);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index 3fc593667c..e29c250283 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -68,8 +68,8 @@ public class DropColumnsEntry implements UpdateEntry,
Fireable {
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new DropColumnEventParameters(causalityToken, tableId, columns);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new DropColumnEventParameters(causalityToken, catalogVersion,
tableId, columns);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
index 4ba8749c2a..0479d4afdd 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -38,13 +38,17 @@ public class DropIndexEntry implements UpdateEntry,
Fireable {
private final int indexId;
+ private final int tableId;
+
/**
* Constructs the object.
*
* @param indexId An id of an index to drop.
+ * @param tableId Table ID for which the index was removed.
*/
- public DropIndexEntry(int indexId) {
+ public DropIndexEntry(int indexId, int tableId) {
this.indexId = indexId;
+ this.tableId = tableId;
}
/** Returns an id of an index to drop. */
@@ -52,14 +56,19 @@ public class DropIndexEntry implements UpdateEntry,
Fireable {
return indexId;
}
+ /** Returns table ID for which the index was removed. */
+ public int tableId() {
+ return tableId;
+ }
+
@Override
public CatalogEvent eventType() {
return CatalogEvent.INDEX_DROP;
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new DropIndexEventParameters(causalityToken, indexId);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new DropIndexEventParameters(causalityToken, catalogVersion,
indexId, tableId);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
index 99f14204a8..6d85b55af8 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -58,8 +58,8 @@ public class DropTableEntry implements UpdateEntry, Fireable {
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new DropTableEventParameters(causalityToken, tableId);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new DropTableEventParameters(causalityToken, catalogVersion,
tableId);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
index be3f2fdba4..2579a94f51 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
@@ -53,8 +53,8 @@ public class DropZoneEntry implements UpdateEntry, Fireable {
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new DropZoneEventParameters(causalityToken, zoneId);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new DropZoneEventParameters(causalityToken, catalogVersion,
zoneId);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
index c2a7954976..30447e0029 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
@@ -33,6 +33,7 @@ public interface Fireable {
* Creates parameters of the fired event.
*
* @param causalityToken Causality token.
+ * @param catalogVersion Catalog version.
*/
- CatalogEventParameters createEventParameters(long causalityToken);
+ CatalogEventParameters createEventParameters(long causalityToken, int
catalogVersion);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index b0fb639e35..1e1aff017e 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -68,8 +68,8 @@ public class NewColumnsEntry implements UpdateEntry, Fireable
{
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new AddColumnEventParameters(causalityToken, tableId,
descriptors);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new AddColumnEventParameters(causalityToken, catalogVersion,
tableId, descriptors);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
index 4e0419ab05..c31b335223 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -58,8 +58,8 @@ public class NewIndexEntry implements UpdateEntry, Fireable {
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new CreateIndexEventParameters(causalityToken, descriptor);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new CreateIndexEventParameters(causalityToken, catalogVersion,
descriptor);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
index 7a5c85e10f..0d6ef3465e 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
@@ -58,8 +58,8 @@ public class NewTableEntry implements UpdateEntry, Fireable {
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new CreateTableEventParameters(causalityToken, descriptor);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new CreateTableEventParameters(causalityToken, catalogVersion,
descriptor);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
index b584951bdd..0cceba1ad2 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
@@ -54,8 +54,8 @@ public class NewZoneEntry implements UpdateEntry, Fireable {
}
@Override
- public CatalogEventParameters createEventParameters(long causalityToken) {
- return new CreateZoneEventParameters(causalityToken, descriptor);
+ public CatalogEventParameters createEventParameters(long causalityToken,
int catalogVersion) {
+ return new CreateZoneEventParameters(causalityToken, catalogVersion,
descriptor);
}
@Override
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
index 94124dae55..eb17046103 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLog.java
@@ -69,7 +69,8 @@ public interface UpdateLog extends IgniteComponent {
*
* @param update A new update.
* @param metaStorageUpdateTimestamp Timestamp assigned to the update
by the Metastorage.
+ * @param causalityToken Causality token.
*/
- void handle(VersionedUpdate update, HybridTimestamp
metaStorageUpdateTimestamp);
+ void handle(VersionedUpdate update, HybridTimestamp
metaStorageUpdateTimestamp, long causalityToken);
}
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 5427173ef8..f627c51118 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -165,7 +165,9 @@ public class UpdateLogImpl implements UpdateLog {
VersionedUpdate update =
fromBytes(Objects.requireNonNull(entry.value()));
- handler.handle(update,
metastore.timestampByRevision(entry.revision()));
+ long revision = entry.revision();
+
+ handler.handle(update, metastore.timestampByRevision(revision),
revision);
}
}
@@ -206,7 +208,7 @@ public class UpdateLogImpl implements UpdateLog {
VersionedUpdate update = fromBytes(payload);
- onUpdateHandler.handle(update, event.timestamp());
+ onUpdateHandler.handle(update, event.timestamp(),
event.revision());
}
return CompletableFuture.completedFuture(null);
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index cb4b7ad325..b2326f0142 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -38,7 +38,10 @@ import static org.junit.jupiter.api.Assertions.assertSame;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.isNull;
+import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doNothing;
+import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
import static org.mockito.Mockito.timeout;
@@ -123,7 +126,6 @@ import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.EnumSource.Mode;
import org.mockito.ArgumentCaptor;
-import org.mockito.ArgumentMatchers;
/**
* Catalog service self test.
@@ -237,6 +239,8 @@ public class CatalogServiceSelfTest {
assertNull(schema.table(TABLE_NAME));
assertNull(service.table(TABLE_NAME, 123L));
assertNull(service.table(1, 123L));
+ assertNull(service.index(createPkIndexName(TABLE_NAME), 123L));
+ assertNull(service.index(2, 123L));
// Validate actual catalog
schema = service.schema(1);
@@ -249,6 +253,9 @@ public class CatalogServiceSelfTest {
assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME,
clock.nowLong()));
assertSame(schema.table(TABLE_NAME), service.table(2,
clock.nowLong()));
+ assertSame(schema.index(createPkIndexName(TABLE_NAME)),
service.index(createPkIndexName(TABLE_NAME), clock.nowLong()));
+ assertSame(schema.index(createPkIndexName(TABLE_NAME)),
service.index(3, clock.nowLong()));
+
// Validate newly created table
CatalogTableDescriptor table = schema.table(TABLE_NAME);
@@ -256,6 +263,15 @@ public class CatalogServiceSelfTest {
assertEquals(TABLE_NAME, table.name());
assertEquals(1L, table.zoneId());
+ // Validate newly created pk index
+ CatalogHashIndexDescriptor pkIndex = (CatalogHashIndexDescriptor)
schema.index(createPkIndexName(TABLE_NAME));
+
+ assertEquals(3L, pkIndex.id());
+ assertEquals(createPkIndexName(TABLE_NAME), pkIndex.name());
+ assertEquals(2L, pkIndex.tableId());
+ assertEquals(table.primaryKeyColumns(), pkIndex.columns());
+ assertTrue(pkIndex.unique());
+
// Validate another table creation.
assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willBe(nullValue()));
@@ -270,10 +286,17 @@ public class CatalogServiceSelfTest {
assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME,
clock.nowLong()));
assertSame(schema.table(TABLE_NAME), service.table(2,
clock.nowLong()));
+ assertSame(schema.index(createPkIndexName(TABLE_NAME)),
service.index(createPkIndexName(TABLE_NAME), clock.nowLong()));
+ assertSame(schema.index(createPkIndexName(TABLE_NAME)),
service.index(3, clock.nowLong()));
+
assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2,
clock.nowLong()));
- assertSame(schema.table(TABLE_NAME_2), service.table(3,
clock.nowLong()));
+ assertSame(schema.table(TABLE_NAME_2), service.table(4,
clock.nowLong()));
+
+ assertSame(schema.index(createPkIndexName(TABLE_NAME_2)),
service.index(createPkIndexName(TABLE_NAME_2), clock.nowLong()));
+ assertSame(schema.index(createPkIndexName(TABLE_NAME_2)),
service.index(5, clock.nowLong()));
assertNotSame(schema.table(TABLE_NAME), schema.table(TABLE_NAME_2));
+ assertNotSame(schema.index(createPkIndexName(TABLE_NAME)),
schema.index(createPkIndexName(TABLE_NAME_2)));
// Try to create another table with same name.
assertThat(service.createTable(simpleTable(TABLE_NAME_2)),
willThrowFast(TableAlreadyExistsException.class));
@@ -322,8 +345,14 @@ public class CatalogServiceSelfTest {
assertSame(schema.table(TABLE_NAME), service.table(TABLE_NAME,
beforeDropTimestamp));
assertSame(schema.table(TABLE_NAME), service.table(2,
beforeDropTimestamp));
+ assertSame(schema.index(createPkIndexName(TABLE_NAME)),
service.index(createPkIndexName(TABLE_NAME), beforeDropTimestamp));
+ assertSame(schema.index(createPkIndexName(TABLE_NAME)),
service.index(3, beforeDropTimestamp));
+
assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2,
beforeDropTimestamp));
- assertSame(schema.table(TABLE_NAME_2), service.table(3,
beforeDropTimestamp));
+ assertSame(schema.table(TABLE_NAME_2), service.table(4,
beforeDropTimestamp));
+
+ assertSame(schema.index(createPkIndexName(TABLE_NAME_2)),
service.index(createPkIndexName(TABLE_NAME_2), beforeDropTimestamp));
+ assertSame(schema.index(createPkIndexName(TABLE_NAME_2)),
service.index(5, beforeDropTimestamp));
// Validate actual catalog
schema = service.schema(3);
@@ -337,8 +366,15 @@ public class CatalogServiceSelfTest {
assertNull(service.table(TABLE_NAME, clock.nowLong()));
assertNull(service.table(2, clock.nowLong()));
+ assertNull(schema.index(createPkIndexName(TABLE_NAME)));
+ assertNull(service.index(createPkIndexName(TABLE_NAME),
clock.nowLong()));
+ assertNull(service.index(3, clock.nowLong()));
+
assertSame(schema.table(TABLE_NAME_2), service.table(TABLE_NAME_2,
clock.nowLong()));
- assertSame(schema.table(TABLE_NAME_2), service.table(3,
clock.nowLong()));
+ assertSame(schema.table(TABLE_NAME_2), service.table(4,
clock.nowLong()));
+
+ assertSame(schema.index(createPkIndexName(TABLE_NAME_2)),
service.index(createPkIndexName(TABLE_NAME_2), clock.nowLong()));
+ assertSame(schema.index(createPkIndexName(TABLE_NAME_2)),
service.index(5, clock.nowLong()));
// Try to drop table once again.
assertThat(service.dropTable(dropTableParams),
willThrowFast(TableNotFoundException.class));
@@ -962,7 +998,7 @@ public class CatalogServiceSelfTest {
assertSame(schema.table(TABLE_NAME), service.table(2,
beforeDropTimestamp));
assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME,
beforeDropTimestamp));
- assertSame(schema.index(INDEX_NAME), service.index(3,
beforeDropTimestamp));
+ assertSame(schema.index(INDEX_NAME), service.index(4,
beforeDropTimestamp));
// Validate actual catalog
schema = service.schema(3);
@@ -999,7 +1035,7 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertNull(schema.index(INDEX_NAME));
assertNull(service.index(INDEX_NAME, 123L));
- assertNull(service.index(3, 123L));
+ assertNull(service.index(4, 123L));
// Validate actual catalog
schema = service.schema(2);
@@ -1007,12 +1043,12 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertNull(service.index(1, clock.nowLong()));
assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME,
clock.nowLong()));
- assertSame(schema.index(INDEX_NAME), service.index(3,
clock.nowLong()));
+ assertSame(schema.index(INDEX_NAME), service.index(4,
clock.nowLong()));
// Validate newly created hash index
CatalogHashIndexDescriptor index = (CatalogHashIndexDescriptor)
schema.index(INDEX_NAME);
- assertEquals(3L, index.id());
+ assertEquals(4L, index.id());
assertEquals(INDEX_NAME, index.name());
assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
assertEquals(List.of("VAL", "ID"), index.columns());
@@ -1040,7 +1076,7 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertNull(schema.index(INDEX_NAME));
assertNull(service.index(INDEX_NAME, 123L));
- assertNull(service.index(3, 123L));
+ assertNull(service.index(4, 123L));
// Validate actual catalog
schema = service.schema(2);
@@ -1048,12 +1084,12 @@ public class CatalogServiceSelfTest {
assertNotNull(schema);
assertNull(service.index(1, clock.nowLong()));
assertSame(schema.index(INDEX_NAME), service.index(INDEX_NAME,
clock.nowLong()));
- assertSame(schema.index(INDEX_NAME), service.index(3,
clock.nowLong()));
+ assertSame(schema.index(INDEX_NAME), service.index(4,
clock.nowLong()));
// Validate newly created sorted index
CatalogSortedIndexDescriptor index = (CatalogSortedIndexDescriptor)
schema.index(INDEX_NAME);
- assertEquals(3L, index.id());
+ assertEquals(4L, index.id());
assertEquals(INDEX_NAME, index.name());
assertEquals(schema.table(TABLE_NAME).id(), index.tableId());
assertEquals("VAL", index.columns().get(0).name());
@@ -1115,7 +1151,7 @@ public class CatalogServiceSelfTest {
List.of(new ObjectIdGenUpdateEntry(1))
);
- updateHandlerCapture.getValue().handle(update, clock.now());
+ updateHandlerCapture.getValue().handle(update, clock.now(), 0);
return completedFuture(false);
});
@@ -1206,10 +1242,10 @@ public class CatalogServiceSelfTest {
service.listen(CatalogEvent.TABLE_DROP, eventListener);
assertThat(service.createTable(createTableParams),
willBe(nullValue()));
- verify(eventListener).notify(any(CreateTableEventParameters.class),
ArgumentMatchers.isNull());
+ verify(eventListener).notify(any(CreateTableEventParameters.class),
isNull());
assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
- verify(eventListener).notify(any(DropTableEventParameters.class),
ArgumentMatchers.isNull());
+ verify(eventListener).notify(any(DropTableEventParameters.class),
isNull());
verifyNoMoreInteractions(eventListener);
}
@@ -1250,24 +1286,31 @@ public class CatalogServiceSelfTest {
assertThat(service.createIndex(createIndexParams),
willThrow(TableNotFoundException.class));
verifyNoInteractions(eventListener);
- // Create table.
+ // Create table with PK index.
assertThat(service.createTable(createTableParams),
willCompleteSuccessfully());
+ verify(eventListener).notify(any(CreateIndexEventParameters.class),
isNull());
+
+ clearInvocations(eventListener);
// Create index.
assertThat(service.createIndex(createIndexParams),
willCompleteSuccessfully());
- verify(eventListener).notify(any(CreateIndexEventParameters.class),
ArgumentMatchers.isNull());
+ verify(eventListener).notify(any(CreateIndexEventParameters.class),
isNull());
+
+ clearInvocations(eventListener);
// Drop index.
- assertThat(service.dropIndex(dropIndexParams),
willCompleteSuccessfully());
- verify(eventListener).notify(any(DropIndexEventParameters.class),
ArgumentMatchers.isNull());
+ assertThat(service.dropIndex(dropIndexParams), willBe(nullValue()));
+ verify(eventListener).notify(any(DropIndexEventParameters.class),
isNull());
- // Drop table.
- assertThat(service.dropTable(dropTableparams),
willCompleteSuccessfully());
+ clearInvocations(eventListener);
+
+ // Drop table with pk index.
+ assertThat(service.dropTable(dropTableparams), willBe(nullValue()));
// Try drop index once again.
assertThat(service.dropIndex(dropIndexParams),
willThrow(IndexNotFoundException.class));
- verifyNoMoreInteractions(eventListener);
+ verify(eventListener).notify(any(DropIndexEventParameters.class),
isNull());
}
@Test
@@ -1522,13 +1565,13 @@ public class CatalogServiceSelfTest {
assertThat(fut, willCompleteSuccessfully());
- verify(eventListener).notify(any(CreateZoneEventParameters.class),
ArgumentMatchers.isNull());
+ verify(eventListener).notify(any(CreateZoneEventParameters.class),
isNull());
fut = service.dropDistributionZone(dropZoneParams);
assertThat(fut, willCompleteSuccessfully());
- verify(eventListener).notify(any(DropZoneEventParameters.class),
ArgumentMatchers.isNull());
+ verify(eventListener).notify(any(DropZoneEventParameters.class),
isNull());
verifyNoMoreInteractions(eventListener);
}
@@ -1566,11 +1609,11 @@ public class CatalogServiceSelfTest {
// Add column.
assertThat(service.addColumn(addColumnParams), willBe(nullValue()));
- verify(eventListener).notify(any(AddColumnEventParameters.class),
ArgumentMatchers.isNull());
+ verify(eventListener).notify(any(AddColumnEventParameters.class),
isNull());
// Drop column.
assertThat(service.dropColumn(dropColumnParams), willBe(nullValue()));
- verify(eventListener).notify(any(DropColumnEventParameters.class),
ArgumentMatchers.isNull());
+ verify(eventListener).notify(any(DropColumnEventParameters.class),
isNull());
// Try drop column once again.
assertThat(service.dropColumn(dropColumnParams),
willThrow(ColumnNotFoundException.class));
@@ -1622,7 +1665,7 @@ public class CatalogServiceSelfTest {
service.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
try {
- assertNotNull(service.schema((int)
parameters.causalityToken()));
+ assertNotNull(service.schema(parameters.catalogVersion()));
result.complete(null);
@@ -1638,6 +1681,78 @@ public class CatalogServiceSelfTest {
assertThat(result, willCompleteSuccessfully());
}
+ @Test
+ void testGetTableByIdAndCatalogVersion() {
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+
+ assertNull(service.table(2, 0));
+ assertNotNull(service.table(2, 1));
+ }
+
+ @Test
+ void testGetTableIdOnDropIndexEvent() {
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe(nullValue()));
+
+ assertThat(
+ service.createIndex(
+ CreateHashIndexParams.builder()
+ .schemaName(SCHEMA_NAME)
+ .tableName(TABLE_NAME)
+ .indexName(INDEX_NAME)
+ .columns(List.of("VAL"))
+ .build()
+ ),
+ willBe(nullValue())
+ );
+
+ EventListener<CatalogEventParameters> eventListener =
mock(EventListener.class);
+
+ ArgumentCaptor<DropIndexEventParameters> captor =
ArgumentCaptor.forClass(DropIndexEventParameters.class);
+
+
doReturn(completedFuture(false)).when(eventListener).notify(captor.capture(),
any());
+
+ service.listen(CatalogEvent.INDEX_DROP, eventListener);
+
+ // Let's remove the index.
+ assertThat(
+
service.dropIndex(DropIndexParams.builder().schemaName(SCHEMA_NAME).indexName(INDEX_NAME).build()),
+ willBe(nullValue())
+ );
+
+ DropIndexEventParameters eventParameters = captor.getValue();
+
+ assertEquals(4L, eventParameters.indexId());
+ assertEquals(2L, eventParameters.tableId());
+
+ // Let's delete the table.
+ assertThat(
+
service.dropTable(DropTableParams.builder().schemaName(SCHEMA_NAME).tableName(TABLE_NAME).build()),
+ willBe(nullValue())
+ );
+
+ // Let's make sure that the PK index has been deleted.
+ eventParameters = captor.getValue();
+
+ assertEquals(3L, eventParameters.indexId());
+ assertEquals(2L, eventParameters.tableId());
+ }
+
+ @Test
+ void testCreateTableWithoutPkColumns() {
+ assertThat(
+ service.createTable(
+ CreateTableParams.builder()
+ .schemaName(SCHEMA_NAME)
+ .zone(ZONE_NAME)
+ .tableName(TABLE_NAME)
+ .columns(List.of())
+ .primaryKeyColumns(List.of())
+ .build()
+ ),
+ willThrowFast(IgniteInternalException.class, "Missing primary
key columns")
+ );
+ }
+
private CompletableFuture<Void> changeColumn(
String tab,
String col,
@@ -1723,4 +1838,8 @@ public class CatalogServiceSelfTest {
this.scale = scale;
}
}
+
+ private static String createPkIndexName(String tableName) {
+ return tableName + "_PK";
+ }
}
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index b7e6af23c2..7fbfb55391 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -17,14 +17,13 @@
package org.apache.ignite.internal.catalog.storage;
-import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
+import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -76,24 +75,24 @@ class UpdateLogImplTest {
long revisionBefore = metastore.appliedRevision();
- updateLog.registerUpdateHandler((update, ts) -> {/* no-op */});
+ updateLog.registerUpdateHandler((update, ts, causalityToken) -> {/*
no-op */});
updateLog.start();
assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
- List<VersionedUpdate> expectedLog = List.of(
+ List<VersionedUpdate> expectedVersions = List.of(
new VersionedUpdate(1, 1L, List.of(new
TestUpdateEntry("foo"))),
new VersionedUpdate(2, 2L, List.of(new TestUpdateEntry("bar")))
);
- for (VersionedUpdate update : expectedLog) {
- assertTrue(await(updateLog.append(update)));
+ for (VersionedUpdate update : expectedVersions) {
+ assertThat(updateLog.append(update), willBe(true));
}
// and wait till metastore apply necessary revision
assertTrue(
waitForCondition(
- () -> metastore.appliedRevision() - expectedLog.size()
== revisionBefore,
+ () -> metastore.appliedRevision() -
expectedVersions.size() == revisionBefore,
TimeUnit.SECONDS.toMillis(5)
)
);
@@ -104,11 +103,18 @@ class UpdateLogImplTest {
// and check if log is replayed on start
updateLog = createUpdateLogImpl();
- List<VersionedUpdate> actualLog = new ArrayList<>();
- updateLog.registerUpdateHandler((update, ts) -> actualLog.add(update));
+ List<VersionedUpdate> actualVersions = new ArrayList<>();
+ List<Long> actualCausalityTokens = new ArrayList<>();
+
+ updateLog.registerUpdateHandler((update, ts, causalityToken) -> {
+ actualVersions.add(update);
+ actualCausalityTokens.add(causalityToken);
+ });
+
updateLog.start();
- assertEquals(expectedLog, actualLog);
+ assertEquals(expectedVersions, actualVersions);
+ assertEquals(List.of(revisionBefore + 1, revisionBefore + 2),
actualCausalityTokens);
}
private UpdateLogImpl createUpdateLogImpl() {
@@ -136,33 +142,39 @@ class UpdateLogImplTest {
UpdateLogImpl updateLog = createUpdateLogImpl();
List<Integer> appliedVersions = new ArrayList<>();
- updateLog.registerUpdateHandler((update, ts) ->
appliedVersions.add(update.version()));
+ List<Long> causalityTokens = new ArrayList<>();
- updateLog.start();
+ updateLog.registerUpdateHandler((update, ts, causalityToken) -> {
+ appliedVersions.add(update.version());
+ causalityTokens.add(causalityToken);
+ });
long revisionBefore = metastore.appliedRevision();
+ updateLog.start();
+
assertThat("Watches were not deployed", metastore.deployWatches(),
willCompleteSuccessfully());
// first update should always be successful
-
assertTrue(await(updateLog.append(singleEntryUpdateOfVersion(startVersion))));
+ assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion)),
willBe(true));
// update of the same version should not be accepted
-
assertFalse(await(updateLog.append(singleEntryUpdateOfVersion(startVersion))));
+ assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion)),
willBe(false));
// update of the version lower than the last applied should not be
accepted
-
assertFalse(await(updateLog.append(singleEntryUpdateOfVersion(startVersion -
1))));
+ assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion -
1)), willBe(false));
// update of the version creating gap should not be accepted
-
assertFalse(await(updateLog.append(singleEntryUpdateOfVersion(startVersion +
2))));
+ assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion +
2)), willBe(false));
// regular update of next version should be accepted
-
assertTrue(await(updateLog.append(singleEntryUpdateOfVersion(startVersion +
1))));
+ assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion +
1)), willBe(true));
// now the gap is filled, thus update should be accepted as well
-
assertTrue(await(updateLog.append(singleEntryUpdateOfVersion(startVersion +
2))));
+ assertThat(updateLog.append(singleEntryUpdateOfVersion(startVersion +
2)), willBe(true));
List<Integer> expectedVersions = List.of(startVersion, startVersion +
1, startVersion + 2);
+ List<Long> expectedTokens = List.of(revisionBefore + 1, revisionBefore
+ 2, revisionBefore + 3);
// wait till necessary revision is applied
assertTrue(
@@ -173,6 +185,7 @@ class UpdateLogImplTest {
);
assertThat(appliedVersions, equalTo(expectedVersions));
+ assertThat(causalityTokens, equalTo(expectedTokens));
}
private static VersionedUpdate singleEntryUpdateOfVersion(int version) {