This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 209698938f IGNITE-19641 Catalog events are triggered too early. (#2231)
209698938f is described below
commit 209698938ff1d5206fc3a9403bb019c423a1f65b
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Jun 23 10:22:32 2023 +0300
IGNITE-19641 Catalog events are triggered too early. (#2231)
---
.../internal/catalog/CatalogServiceImpl.java | 270 ++-------------------
.../internal/catalog/storage/AlterColumnEntry.java | 56 ++++-
.../internal/catalog/storage/AlterZoneEntry.java | 32 ++-
.../internal/catalog/storage/DropColumnsEntry.java | 54 ++++-
.../internal/catalog/storage/DropIndexEntry.java | 42 +++-
.../internal/catalog/storage/DropTableEntry.java | 42 +++-
.../internal/catalog/storage/DropZoneEntry.java | 30 ++-
.../storage/{UpdateEntry.java => Fireable.java} | 18 +-
.../internal/catalog/storage/NewColumnsEntry.java | 51 +++-
.../internal/catalog/storage/NewIndexEntry.java | 41 +++-
.../internal/catalog/storage/NewTableEntry.java | 41 +++-
.../internal/catalog/storage/NewZoneEntry.java | 30 ++-
.../catalog/storage/ObjectIdGenUpdateEntry.java | 13 +-
.../internal/catalog/storage/UpdateEntry.java | 10 +-
.../internal/catalog/storage/UpdateLogImpl.java | 1 +
.../internal/catalog/CatalogServiceSelfTest.java | 23 ++
.../catalog/storage/UpdateLogImplTest.java | 6 +
17 files changed, 487 insertions(+), 273 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 3ae0c88abf..010fa208ee 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.catalog;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.commands.CreateZoneParams.INFINITE_TIMER_VALUE;
import static
org.apache.ignite.lang.ErrorGroups.Sql.UNSUPPORTED_DDL_OPERATION_ERR;
@@ -31,7 +30,6 @@ import java.util.List;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Objects;
-import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.function.Predicate;
@@ -55,24 +53,15 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
-import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
-import org.apache.ignite.internal.catalog.events.AlterColumnEventParameters;
-import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
-import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
-import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
-import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
-import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
-import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
-import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
-import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
import org.apache.ignite.internal.catalog.storage.AlterColumnEntry;
import org.apache.ignite.internal.catalog.storage.AlterZoneEntry;
import org.apache.ignite.internal.catalog.storage.DropColumnsEntry;
import org.apache.ignite.internal.catalog.storage.DropIndexEntry;
import org.apache.ignite.internal.catalog.storage.DropTableEntry;
import org.apache.ignite.internal.catalog.storage.DropZoneEntry;
+import org.apache.ignite.internal.catalog.storage.Fireable;
import org.apache.ignite.internal.catalog.storage.NewColumnsEntry;
import org.apache.ignite.internal.catalog.storage.NewIndexEntry;
import org.apache.ignite.internal.catalog.storage.NewTableEntry;
@@ -86,8 +75,6 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.Producer;
-import org.apache.ignite.internal.util.ArrayUtils;
-import org.apache.ignite.internal.util.CollectionUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.lang.ColumnAlreadyExistsException;
import org.apache.ignite.lang.ColumnNotFoundException;
@@ -814,254 +801,31 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
@Override
public void handle(VersionedUpdate update) {
int version = update.version();
- long activationTimestamp = update.activationTimestamp();
Catalog catalog = catalogByVer.get(version - 1);
- assert catalog != null;
-
- List<CompletableFuture<?>> eventFutures = new
ArrayList<>(update.entries().size());
+ assert catalog != null : version - 1;
for (UpdateEntry entry : update.entries()) {
- String schemaName = CatalogService.PUBLIC;
- CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(schemaName), "No schema found: " +
schemaName);
-
- if (entry instanceof NewTableEntry) {
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- catalog.zones(),
- List.of(new CatalogSchemaDescriptor(
- schema.id(),
- schema.name(),
- ArrayUtils.concat(schema.tables(),
((NewTableEntry) entry).descriptor()),
- schema.indexes()
- ))
- );
-
- eventFutures.add(fireEvent(
- CatalogEvent.TABLE_CREATE,
- new CreateTableEventParameters(version,
((NewTableEntry) entry).descriptor())
- ));
-
- } else if (entry instanceof DropTableEntry) {
- int tableId = ((DropTableEntry) entry).tableId();
-
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- catalog.zones(),
- List.of(new CatalogSchemaDescriptor(
- schema.id(),
- schema.name(),
- Arrays.stream(schema.tables()).filter(t ->
t.id() != tableId).toArray(CatalogTableDescriptor[]::new),
- schema.indexes()
- ))
- );
-
- eventFutures.add(fireEvent(
- CatalogEvent.TABLE_DROP,
- new DropTableEventParameters(version, tableId)
- ));
- } else if (entry instanceof NewColumnsEntry) {
- int tableId = ((NewColumnsEntry) entry).tableId();
- List<CatalogTableColumnDescriptor> columnDescriptors =
((NewColumnsEntry) entry).descriptors();
-
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- catalog.zones(),
- List.of(new CatalogSchemaDescriptor(
- schema.id(),
- schema.name(),
- Arrays.stream(schema.tables())
- .map(table -> table.id() != tableId
- ? table
- : new
CatalogTableDescriptor(
- table.id(),
- table.name(),
- table.zoneId(),
-
CollectionUtils.concat(table.columns(), columnDescriptors),
-
table.primaryKeyColumns(),
-
table.colocationColumns())
- )
-
.toArray(CatalogTableDescriptor[]::new),
- schema.indexes()
- ))
- );
-
- eventFutures.add(fireEvent(
- CatalogEvent.TABLE_ALTER,
- new AddColumnEventParameters(version, tableId,
columnDescriptors)
- ));
- } else if (entry instanceof DropColumnsEntry) {
- int tableId = ((DropColumnsEntry) entry).tableId();
- Set<String> columns = ((DropColumnsEntry) entry).columns();
-
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- catalog.zones(),
- List.of(new CatalogSchemaDescriptor(
- schema.id(),
- schema.name(),
- Arrays.stream(schema.tables())
- .map(table -> table.id() != tableId
- ? table
- : new
CatalogTableDescriptor(
- table.id(),
- table.name(),
- table.zoneId(),
-
table.columns().stream()
-
.filter(col -> !columns.contains(col.name()))
-
.collect(toList()),
-
table.primaryKeyColumns(),
-
table.colocationColumns())
- )
-
.toArray(CatalogTableDescriptor[]::new),
- schema.indexes()
- ))
- );
-
- eventFutures.add(fireEvent(
- CatalogEvent.TABLE_ALTER,
- new DropColumnEventParameters(version, tableId,
columns)
- ));
- } else if (entry instanceof NewIndexEntry) {
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- catalog.zones(),
- List.of(new CatalogSchemaDescriptor(
- schema.id(),
- schema.name(),
- schema.tables(),
- ArrayUtils.concat(schema.indexes(),
((NewIndexEntry) entry).descriptor())
- ))
- );
-
- eventFutures.add(fireEvent(
- CatalogEvent.INDEX_CREATE,
- new CreateIndexEventParameters(version,
((NewIndexEntry) entry).descriptor())
- ));
- } else if (entry instanceof DropIndexEntry) {
- int indexId = ((DropIndexEntry) entry).indexId();
-
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- catalog.zones(),
- List.of(new CatalogSchemaDescriptor(
- schema.id(),
- schema.name(),
- schema.tables(),
- Arrays.stream(schema.indexes()).filter(t
-> t.id() != indexId).toArray(CatalogIndexDescriptor[]::new)
- ))
- );
+ catalog = entry.applyUpdate(catalog);
+ }
- eventFutures.add(fireEvent(
- CatalogEvent.INDEX_DROP,
- new DropIndexEventParameters(version, indexId)
- ));
- } else if (entry instanceof NewZoneEntry) {
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- CollectionUtils.concat(catalog.zones(),
List.of(((NewZoneEntry) entry).descriptor())),
- catalog.schemas()
- );
+ catalog = applyUpdateFinal(catalog, update);
- eventFutures.add(fireEvent(
- CatalogEvent.ZONE_CREATE,
- new CreateZoneEventParameters(version,
((NewZoneEntry) entry).descriptor())
- ));
- } else if (entry instanceof DropZoneEntry) {
- int zoneId = ((DropZoneEntry) entry).zoneId();
-
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- catalog.zones().stream().filter(z -> z.id() !=
zoneId).collect(toList()),
- catalog.schemas()
- );
+ registerCatalog(catalog);
- eventFutures.add(fireEvent(
- CatalogEvent.ZONE_DROP,
- new DropZoneEventParameters(version, zoneId)
- ));
- } else if (entry instanceof AlterZoneEntry) {
- CatalogZoneDescriptor descriptor = ((AlterZoneEntry)
entry).descriptor();
-
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- catalog.zones().stream()
- .map(z -> z.id() == descriptor.id() ?
descriptor : z)
- .collect(toList()),
- catalog.schemas()
- );
+ List<CompletableFuture<?>> eventFutures = new
ArrayList<>(update.entries().size());
- eventFutures.add(fireEvent(
- CatalogEvent.ZONE_ALTER,
- new AlterZoneEventParameters(version, descriptor)
- ));
- } else if (entry instanceof ObjectIdGenUpdateEntry) {
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState() +
((ObjectIdGenUpdateEntry) entry).delta(),
- catalog.zones(),
- catalog.schemas()
- );
- } else if (entry instanceof AlterColumnEntry) {
- int tableId = ((AlterColumnEntry) entry).tableId();
- CatalogTableColumnDescriptor target = ((AlterColumnEntry)
entry).descriptor();
-
- catalog = new Catalog(
- version,
- activationTimestamp,
- catalog.objectIdGenState(),
- catalog.zones(),
- List.of(new CatalogSchemaDescriptor(
- schema.id(),
- schema.name(),
- Arrays.stream(schema.tables())
- .map(table -> table.id() != tableId
- ? table
- : new
CatalogTableDescriptor(
- table.id(),
- table.name(),
- table.zoneId(),
-
table.columns().stream()
-
.map(source -> source.name().equals(target.name()) ? target : source)
-
.collect(toList()),
-
table.primaryKeyColumns(),
-
table.colocationColumns())
- )
-
.toArray(CatalogTableDescriptor[]::new),
- schema.indexes()
- ))
- );
+ for (UpdateEntry entry : update.entries()) {
+ if (entry instanceof Fireable) {
+ Fireable fireEvent = (Fireable) entry;
eventFutures.add(fireEvent(
- CatalogEvent.TABLE_ALTER,
- new AlterColumnEventParameters(version, tableId,
target)
+ fireEvent.eventType(),
+ fireEvent.createEventParameters(version)
));
- } else {
- assert false : entry;
}
}
- registerCatalog(catalog);
-
CompletableFuture.allOf(eventFutures.toArray(CompletableFuture[]::new))
.whenComplete((ignore, err) -> {
if (err != null) {
@@ -1089,4 +853,14 @@ public class CatalogServiceImpl extends
Producer<CatalogEvent, CatalogEventParam
interface UpdateProducer {
List<UpdateEntry> get(Catalog catalog);
}
+
+ private Catalog applyUpdateFinal(Catalog catalog, VersionedUpdate update) {
+ return new Catalog(
+ update.version(),
+ update.activationTimestamp(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ catalog.schemas()
+ );
+ }
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
index b5dd20008b..1067e574c0 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterColumnEntry.java
@@ -17,13 +17,25 @@
package org.apache.ignite.internal.catalog.storage;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.AlterColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.tostring.S;
/**
* Describes a column replacement.
*/
-public class AlterColumnEntry implements UpdateEntry {
+public class AlterColumnEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = -4552940987881338656L;
private final int tableId;
@@ -51,7 +63,47 @@ public class AlterColumnEntry implements UpdateEntry {
return column;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.TABLE_ALTER;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new AlterColumnEventParameters(causalityToken, tableId, column);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ List.of(new CatalogSchemaDescriptor(
+ schema.id(),
+ schema.name(),
+ Arrays.stream(schema.tables())
+ .map(table -> table.id() != tableId
+ ? table
+ : new CatalogTableDescriptor(
+ table.id(),
+ table.name(),
+ table.zoneId(),
+ table.columns().stream()
+ .map(source ->
source.name().equals(column.name()) ? column : source)
+ .collect(toList()),
+ table.primaryKeyColumns(),
+ table.colocationColumns())
+ )
+ .toArray(CatalogTableDescriptor[]::new),
+ schema.indexes()
+ ))
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
index 3e2f5c56c0..cf11a3400f 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/AlterZoneEntry.java
@@ -17,13 +17,19 @@
package org.apache.ignite.internal.catalog.storage;
+import static java.util.stream.Collectors.toList;
+
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.AlterZoneEventParameters;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.tostring.S;
/**
* Describes altering zone.
*/
-public class AlterZoneEntry implements UpdateEntry {
+public class AlterZoneEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = 7727583734058987315L;
private final CatalogZoneDescriptor descriptor;
@@ -42,7 +48,29 @@ public class AlterZoneEntry implements UpdateEntry {
return descriptor;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.ZONE_ALTER;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new AlterZoneEventParameters(causalityToken, descriptor);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones().stream()
+ .map(z -> z.id() == descriptor.id() ? descriptor : z)
+ .collect(toList()),
+ catalog.schemas()
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
index bf1d009fdf..1d50e5e196 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropColumnsEntry.java
@@ -17,13 +17,25 @@
package org.apache.ignite.internal.catalog.storage;
+import static java.util.stream.Collectors.toList;
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
import java.util.Set;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.DropColumnEventParameters;
import org.apache.ignite.internal.tostring.S;
/**
* Describes dropping of columns.
*/
-public class DropColumnsEntry implements UpdateEntry {
+public class DropColumnsEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = 2970125889493580121L;
private final int tableId;
@@ -50,7 +62,45 @@ public class DropColumnsEntry implements UpdateEntry {
return columns;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.TABLE_ALTER;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new DropColumnEventParameters(causalityToken, tableId, columns);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ List.of(new CatalogSchemaDescriptor(
+ schema.id(),
+ schema.name(),
+ Arrays.stream(schema.tables())
+ .map(table -> table.id() == tableId ? new
CatalogTableDescriptor(
+ table.id(),
+ table.name(),
+ table.zoneId(),
+ table.columns().stream()
+ .filter(col ->
!columns.contains(col.name()))
+ .collect(toList()),
+ table.primaryKeyColumns(),
+ table.colocationColumns()) : table
+ )
+ .toArray(CatalogTableDescriptor[]::new),
+ schema.indexes()
+ ))
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
index 2efef88bea..fc5b019c81 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropIndexEntry.java
@@ -17,12 +17,23 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
import org.apache.ignite.internal.tostring.S;
/**
* Describes deletion of an index.
*/
-public class DropIndexEntry implements UpdateEntry {
+public class DropIndexEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = -604729846502020728L;
private final int indexId;
@@ -41,7 +52,34 @@ public class DropIndexEntry implements UpdateEntry {
return indexId;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.INDEX_DROP;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new DropIndexEventParameters(causalityToken, indexId);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ List.of(new CatalogSchemaDescriptor(
+ schema.id(),
+ schema.name(),
+ schema.tables(),
+ Arrays.stream(schema.indexes()).filter(t -> t.id() !=
indexId).toArray(CatalogIndexDescriptor[]::new)
+ ))
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
index ee53b390d2..54ffadfe58 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropTableEntry.java
@@ -17,12 +17,23 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.tostring.S;
/**
* Describes deletion of a table.
*/
-public class DropTableEntry implements UpdateEntry {
+public class DropTableEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = 7727583734058987315L;
private final int tableId;
@@ -41,7 +52,34 @@ public class DropTableEntry implements UpdateEntry {
return tableId;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.TABLE_DROP;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new DropTableEventParameters(causalityToken, tableId);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ List.of(new CatalogSchemaDescriptor(
+ schema.id(),
+ schema.name(),
+ Arrays.stream(schema.tables()).filter(t -> t.id() !=
tableId).toArray(CatalogTableDescriptor[]::new),
+ schema.indexes()
+ ))
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
index 2dc1816eb8..be3f2fdba4 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/DropZoneEntry.java
@@ -17,12 +17,18 @@
package org.apache.ignite.internal.catalog.storage;
+import static java.util.stream.Collectors.toList;
+
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.DropZoneEventParameters;
import org.apache.ignite.internal.tostring.S;
/**
* Describes deletion of a zone.
*/
-public class DropZoneEntry implements UpdateEntry {
+public class DropZoneEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = 7727583734058987315L;
private final int zoneId;
@@ -41,7 +47,27 @@ public class DropZoneEntry implements UpdateEntry {
return zoneId;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.ZONE_DROP;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new DropZoneEventParameters(causalityToken, zoneId);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones().stream().filter(z -> z.id() !=
zoneId).collect(toList()),
+ catalog.schemas()
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
similarity index 63%
copy from
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
copy to
modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
index 39c487447f..c2a7954976 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/Fireable.java
@@ -17,10 +17,22 @@
package org.apache.ignite.internal.catalog.storage;
-import java.io.Serializable;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
/**
- * A marker interface describing a particular change within the {@link
VersionedUpdate group}.
+ * Interface for updates that require firing events.
*/
-public interface UpdateEntry extends Serializable {
+public interface Fireable {
+ /**
+ * Returns the type of the fired event.
+ */
+ CatalogEvent eventType();
+
+ /**
+ * Creates parameters of the fired event.
+ *
+ * @param causalityToken Causality token.
+ */
+ CatalogEventParameters createEventParameters(long causalityToken);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
index 17f86675cc..c15fb4773f 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewColumnsEntry.java
@@ -17,14 +17,25 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.Arrays;
import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import
org.apache.ignite.internal.catalog.descriptors.CatalogTableColumnDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.AddColumnEventParameters;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.CollectionUtils;
/**
* Describes addition of new columns.
*/
-public class NewColumnsEntry implements UpdateEntry {
+public class NewColumnsEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = 2970125889493580121L;
private final int tableId;
@@ -51,7 +62,43 @@ public class NewColumnsEntry implements UpdateEntry {
return descriptors;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.TABLE_ALTER;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new AddColumnEventParameters(causalityToken, tableId,
descriptors);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ List.of(new CatalogSchemaDescriptor(
+ schema.id(),
+ schema.name(),
+ Arrays.stream(schema.tables())
+ .map(table -> table.id() == tableId ? new
CatalogTableDescriptor(
+ table.id(),
+ table.name(),
+ table.zoneId(),
+
CollectionUtils.concat(table.columns(), descriptors),
+ table.primaryKeyColumns(),
+ table.colocationColumns()) : table
+ )
+ .toArray(CatalogTableDescriptor[]::new),
+ schema.indexes()
+ ))
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
index 8ec1b1f1eb..0c57d7eef2 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewIndexEntry.java
@@ -17,13 +17,23 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.ArrayUtils;
/**
* Describes addition of a new index.
*/
-public class NewIndexEntry implements UpdateEntry {
+public class NewIndexEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = 6717363577013237711L;
private final CatalogIndexDescriptor descriptor;
@@ -42,7 +52,34 @@ public class NewIndexEntry implements UpdateEntry {
return descriptor;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.INDEX_CREATE;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new CreateIndexEventParameters(causalityToken, descriptor);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ List.of(new CatalogSchemaDescriptor(
+ schema.id(),
+ schema.name(),
+ schema.tables(),
+ ArrayUtils.concat(schema.indexes(), descriptor)
+ ))
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
index aabfeaa418..589ef47a8a 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewTableEntry.java
@@ -17,13 +17,23 @@
package org.apache.ignite.internal.catalog.storage;
+import static org.apache.ignite.internal.catalog.CatalogService.PUBLIC;
+
+import java.util.List;
+import java.util.Objects;
+import org.apache.ignite.internal.catalog.Catalog;
+import org.apache.ignite.internal.catalog.descriptors.CatalogSchemaDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.ArrayUtils;
/**
* Describes addition of a new table.
*/
-public class NewTableEntry implements UpdateEntry {
+public class NewTableEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = 2970125889493580121L;
private final CatalogTableDescriptor descriptor;
@@ -42,7 +52,34 @@ public class NewTableEntry implements UpdateEntry {
return descriptor;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.TABLE_CREATE;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new CreateTableEventParameters(causalityToken, descriptor);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ CatalogSchemaDescriptor schema =
Objects.requireNonNull(catalog.schema(PUBLIC));
+
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ catalog.zones(),
+ List.of(new CatalogSchemaDescriptor(
+ schema.id(),
+ schema.name(),
+ ArrayUtils.concat(schema.tables(), descriptor),
+ schema.indexes()
+ ))
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
index dc671e2278..b584951bdd 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/NewZoneEntry.java
@@ -17,13 +17,19 @@
package org.apache.ignite.internal.catalog.storage;
+import java.util.List;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import org.apache.ignite.internal.catalog.events.CreateZoneEventParameters;
import org.apache.ignite.internal.tostring.S;
+import org.apache.ignite.internal.util.CollectionUtils;
/**
* Describes addition of a new zone.
*/
-public class NewZoneEntry implements UpdateEntry {
+public class NewZoneEntry implements UpdateEntry, Fireable {
private static final long serialVersionUID = 2970125889493580121L;
private final CatalogZoneDescriptor descriptor;
@@ -42,7 +48,27 @@ public class NewZoneEntry implements UpdateEntry {
return descriptor;
}
- /** {@inheritDoc} */
+ @Override
+ public CatalogEvent eventType() {
+ return CatalogEvent.ZONE_CREATE;
+ }
+
+ @Override
+ public CatalogEventParameters createEventParameters(long causalityToken) {
+ return new CreateZoneEventParameters(causalityToken, descriptor);
+ }
+
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState(),
+ CollectionUtils.concat(catalog.zones(), List.of(descriptor)),
+ catalog.schemas()
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
index 26c97af8ec..45f8d51fa9 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/ObjectIdGenUpdateEntry.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.catalog.storage;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.tostring.S;
/**
@@ -41,7 +42,17 @@ public class ObjectIdGenUpdateEntry implements UpdateEntry {
return delta;
}
- /** {@inheritDoc} */
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ return new Catalog(
+ catalog.version(),
+ catalog.time(),
+ catalog.objectIdGenState() + delta,
+ catalog.zones(),
+ catalog.schemas()
+ );
+ }
+
@Override
public String toString() {
return S.toString(this);
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
index 39c487447f..dc01e77f40 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateEntry.java
@@ -18,9 +18,17 @@
package org.apache.ignite.internal.catalog.storage;
import java.io.Serializable;
+import org.apache.ignite.internal.catalog.Catalog;
/**
- * A marker interface describing a particular change within the {@link
VersionedUpdate group}.
+ * Interface describing a particular change within the {@link VersionedUpdate
group}.
*/
public interface UpdateEntry extends Serializable {
+ /**
+ * Applies own change to the catalog.
+ *
+ * @param catalog Current catalog.
+ * @return New catalog.
+ */
+ Catalog applyUpdate(Catalog catalog);
}
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
index 12c5bb04a1..3652a89fa6 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/storage/UpdateLogImpl.java
@@ -158,6 +158,7 @@ public class UpdateLogImpl implements UpdateLog {
private void restoreStateFromVault(OnUpdateHandler handler) {
int ver = 1;
+ // TODO: IGNITE-19790 Read range from metastore
while (true) {
VaultEntry entry = vault.get(CatalogKey.update(ver++)).join();
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
index d4f21d0222..4afcb4b2ee 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/CatalogServiceSelfTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.catalog;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.CompletableFuture.failedFuture;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrow;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowFast;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
@@ -1571,6 +1572,28 @@ public class CatalogServiceSelfTest {
verifyNoMoreInteractions(eventListener);
}
+ @Test
+ void testGetCatalogEntityInCatalogEvent() {
+ CompletableFuture<Void> result = new CompletableFuture<>();
+
+ service.listen(CatalogEvent.TABLE_CREATE, (parameters, exception) -> {
+ try {
+ assertNotNull(service.schema((int)
parameters.causalityToken()));
+
+ result.complete(null);
+
+ return completedFuture(true);
+ } catch (Throwable t) {
+ result.completeExceptionally(t);
+
+ return failedFuture(t);
+ }
+ });
+
+ assertThat(service.createTable(simpleTable(TABLE_NAME)),
willBe((Object) null));
+ assertThat(result, willCompleteSuccessfully());
+ }
+
private CompletableFuture<Void> changeColumn(
String tab,
String col,
diff --git
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
index dd9bf36ee5..706ca47bb3 100644
---
a/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
+++
b/modules/catalog/src/test/java/org/apache/ignite/internal/catalog/storage/UpdateLogImplTest.java
@@ -31,6 +31,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.TimeUnit;
+import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import
org.apache.ignite.internal.metastorage.server.SimpleInMemoryKeyValueStorage;
@@ -185,6 +186,11 @@ class UpdateLogImplTest {
this.payload = payload;
}
+ @Override
+ public Catalog applyUpdate(Catalog catalog) {
+ return catalog;
+ }
+
@Override
public boolean equals(Object o) {
if (this == o) {