This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push: new 95a1291dda IGNITE-20339 Get rid of legacy IndexEvent, TableEvent and SchemaEvent (#2619) 95a1291dda is described below commit 95a1291ddaedf40da06b6ff97160f03dda35718a Author: Andrew V. Mashenkov <amashen...@users.noreply.github.com> AuthorDate: Thu Sep 28 18:29:03 2023 +0300 IGNITE-20339 Get rid of legacy IndexEvent, TableEvent and SchemaEvent (#2619) --- .../apache/ignite/internal/index/IndexManager.java | 58 +++++++----------- .../ignite/internal/index/event/IndexEvent.java | 31 ---------- .../internal/index/event/IndexEventParameters.java | 60 ------------------- .../runner/app/ItIgniteNodeRestartTest.java | 1 - .../org/apache/ignite/internal/app/IgniteImpl.java | 1 - .../internal/schema/CatalogSchemaManager.java | 19 +----- .../ignite/internal/schema/event/SchemaEvent.java | 28 --------- .../schema/event/SchemaEventParameters.java | 64 -------------------- .../internal/sql/engine/SqlQueryProcessor.java | 28 +-------- .../internal/table/distributed/TableManager.java | 70 ++++++---------------- .../ignite/internal/table/event/TableEvent.java | 34 ----------- .../internal/table/event/TableEventParameters.java | 48 --------------- .../table/distributed/TableManagerTest.java | 13 ---- 13 files changed, 41 insertions(+), 414 deletions(-) diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java index e0c929d7ca..885357fced 100644 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java +++ b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java @@ -42,9 +42,6 @@ import org.apache.ignite.internal.catalog.events.CatalogEvent; import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters; import org.apache.ignite.internal.catalog.events.DropIndexEventParameters; import org.apache.ignite.internal.causality.IncrementalVersionedValue; -import org.apache.ignite.internal.event.AbstractEventProducer; -import org.apache.ignite.internal.index.event.IndexEvent; -import org.apache.ignite.internal.index.event.IndexEventParameters; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.IgniteComponent; @@ -74,7 +71,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock; * as well as managing indexes' lifecycle. */ // TODO: IGNITE-19082 Delete this class -public class IndexManager extends AbstractEventProducer<IndexEvent, IndexEventParameters> implements IgniteComponent { +public class IndexManager implements IgniteComponent { private static final IgniteLogger LOG = Loggers.forClass(IndexManager.class); /** Schema manager. */ @@ -203,39 +200,28 @@ public class IndexManager extends AbstractEventProducer<IndexEvent, IndexEventPa } private CompletableFuture<Boolean> onIndexCreate(CreateIndexEventParameters parameters) { - CatalogIndexDescriptor index = parameters.indexDescriptor(); + return inBusyLockAsync(busyLock, () -> { + CatalogIndexDescriptor index = parameters.indexDescriptor(); - int indexId = index.id(); - int tableId = index.tableId(); + int indexId = index.id(); + int tableId = index.tableId(); - long causalityToken = parameters.causalityToken(); - int catalogVersion = parameters.catalogVersion(); + long causalityToken = parameters.causalityToken(); + int catalogVersion = parameters.catalogVersion(); - return inBusyLockAsync(busyLock, () -> { CatalogTableDescriptor table = catalogManager.table(tableId, catalogVersion); assert table != null : "tableId=" + tableId + ", indexId=" + indexId; - return createIndexLocallyBusy(causalityToken, table, index).thenApply(unused -> false); - }); - } - - private CompletableFuture<?> createIndexLocallyBusy( - long causalityToken, - CatalogTableDescriptor table, - CatalogIndexDescriptor index - ) { - int tableId = table.id(); - int indexId = index.id(); - - if (LOG.isInfoEnabled()) { - LOG.info( - "Creating local index: name={}, id={}, tableId={}, token={}", - index.name(), indexId, tableId, causalityToken - ); - } + if (LOG.isInfoEnabled()) { + LOG.info( + "Creating local index: name={}, id={}, tableId={}, token={}", + index.name(), indexId, tableId, causalityToken + ); + } - return registerIndexBusyAsync(table, index, causalityToken); + return startIndexAsync(table, index, causalityToken).thenApply(unused -> false); + }); } /** @@ -348,7 +334,7 @@ public class IndexManager extends AbstractEventProducer<IndexEvent, IndexEventPa assert table != null : "tableId=" + tableId + ", indexId=" + index.id(); - startIndexFutures.add(registerIndexBusyAsync(table, index, causalityToken)); + startIndexFutures.add(startIndexAsync(table, index, causalityToken)); } // Forces to wait until recovery is complete before the metastore watches are deployed to avoid races with other components. @@ -362,7 +348,7 @@ public class IndexManager extends AbstractEventProducer<IndexEvent, IndexEventPa }); } - private CompletableFuture<?> registerIndexBusyAsync( + private CompletableFuture<?> startIndexAsync( CatalogTableDescriptor table, CatalogIndexDescriptor index, long causalityToken @@ -378,20 +364,20 @@ public class IndexManager extends AbstractEventProducer<IndexEvent, IndexEventPa causalityToken, updater(mvTableStorageById -> tablePartitionFuture.thenCombine(schemaRegistryFuture, (partitionSet, schemaRegistry) -> inBusyLock(busyLock, () -> { - registerIndexBusy(table, index, partitionSet, schemaRegistry); + registerIndex(table, index, partitionSet, schemaRegistry); - return addMvTableStorageIfAbsent(mvTableStorageById, getTableImplStrictBusy(tableId).internalTable().storage()); + return addMvTableStorageIfAbsent(mvTableStorageById, getTableImplStrict(tableId).internalTable().storage()); }))) ); } - private void registerIndexBusy( + private void registerIndex( CatalogTableDescriptor table, CatalogIndexDescriptor index, PartitionSet partitionSet, SchemaRegistry schemaRegistry ) { - TableImpl tableImpl = getTableImplStrictBusy(table.id()); + TableImpl tableImpl = getTableImplStrict(table.id()); var storageIndexDescriptor = StorageIndexDescriptor.create(table, index); @@ -454,7 +440,7 @@ public class IndexManager extends AbstractEventProducer<IndexEvent, IndexEventPa return newMap; } - private TableImpl getTableImplStrictBusy(int tableId) { + private TableImpl getTableImplStrict(int tableId) { TableImpl table = tableManager.getTable(tableId); assert table != null : tableId; diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEvent.java b/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEvent.java deleted file mode 100644 index 1105c39205..0000000000 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEvent.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.index.event; - -import org.apache.ignite.internal.event.Event; - -/** - * Index management events. - */ -public enum IndexEvent implements Event { - /** This event is fired when an index was created. */ - CREATE, - - /** This event is fired when an index was dropped. */ - DROP -} diff --git a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java b/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java deleted file mode 100644 index 14e9e8d272..0000000000 --- a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.index.event; - -import org.apache.ignite.internal.event.EventParameters; - -/** Index event parameters. There are properties which associate with a particular index. */ -public class IndexEventParameters extends EventParameters { - private final int tableId; - - private final int indexId; - - private final int catalogVersion; - - /** - * Constructor. - * - * @param revision Causality token. - * @param catalogVersion Catalog version. - * @param tableId Table ID. - * @param indexId Index ID. - */ - public IndexEventParameters(long revision, int catalogVersion, int tableId, int indexId) { - super(revision); - - this.catalogVersion = catalogVersion; - this.tableId = tableId; - this.indexId = indexId; - } - - /** Returns table ID this event relates to. */ - public int tableId() { - return tableId; - } - - /** Returns index ID this event relates to. */ - public int indexId() { - return indexId; - } - - /** Returns catalog version this event relates to. */ - public int catalogVersion() { - return catalogVersion; - } -} diff --git a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java index 8bee8c427b..bc6e9dda43 100644 --- a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java +++ b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java @@ -399,7 +399,6 @@ public class ItIgniteNodeRestartTest extends BaseIgniteRestartTest { registry, clusterSvc, tableManager, - indexManager, schemaManager, dataStorageManager, () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()), diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java index b27921f3d1..94dd53e894 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java @@ -582,7 +582,6 @@ public class IgniteImpl implements Ignite { registry, clusterSvc, distributedTblMgr, - indexManager, schemaManager, dataStorageMgr, () -> dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()), diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java index 6f2d49c2d4..f50587b64e 100644 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java +++ b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java @@ -45,17 +45,12 @@ import org.apache.ignite.internal.catalog.events.CatalogEventParameters; import org.apache.ignite.internal.catalog.events.CreateTableEventParameters; import org.apache.ignite.internal.catalog.events.TableEventParameters; import org.apache.ignite.internal.causality.IncrementalVersionedValue; -import org.apache.ignite.internal.event.AbstractEventProducer; import org.apache.ignite.internal.lang.ByteArray; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.IgniteStringFormatter; import org.apache.ignite.internal.lang.NodeStoppingException; -import org.apache.ignite.internal.logger.IgniteLogger; -import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.manager.IgniteComponent; import org.apache.ignite.internal.metastorage.MetaStorageManager; -import org.apache.ignite.internal.schema.event.SchemaEvent; -import org.apache.ignite.internal.schema.event.SchemaEventParameters; import org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl; import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl; import org.apache.ignite.internal.util.ByteUtils; @@ -66,9 +61,7 @@ import org.jetbrains.annotations.Nullable; /** * This class services management of table schemas. */ -public class CatalogSchemaManager extends AbstractEventProducer<SchemaEvent, SchemaEventParameters> implements IgniteComponent { - private static final IgniteLogger LOGGER = Loggers.forClass(CatalogSchemaManager.class); - +public class CatalogSchemaManager implements IgniteComponent { /** Schema history key predicate part. */ private static final String SCHEMA_STORE_PREFIX = ".sch-hist."; private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX = ".sch-hist-latest"; @@ -174,16 +167,6 @@ public class CatalogSchemaManager extends AbstractEventProducer<SchemaEvent, Sch return failedFuture(e); } - // Fire event early, because dependent listeners have to register VersionedValues' update futures - var eventParams = new SchemaEventParameters(causalityToken, tableId, newSchema); - - fireEvent(SchemaEvent.CREATE, eventParams) - .whenComplete((v, e) -> { - if (e != null) { - LOGGER.warn("Error when processing CREATE event", e); - } - }); - return registriesVv.update(causalityToken, (registries, e) -> inBusyLock(busyLock, () -> { if (e != null) { return failedFuture(new IgniteInternalException(IgniteStringFormatter.format( diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java deleted file mode 100644 index e3cbe007d1..0000000000 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.schema.event; - -import org.apache.ignite.internal.event.Event; - -/** - * Table management events. - */ -public enum SchemaEvent implements Event { - /** This event is fired when a schema was created. */ - CREATE -} diff --git a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java deleted file mode 100644 index 2b95d4abc2..0000000000 --- a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java +++ /dev/null @@ -1,64 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.schema.event; - -import org.apache.ignite.internal.event.EventParameters; -import org.apache.ignite.internal.schema.SchemaDescriptor; - -/** - * Table event parameter. - */ -public class SchemaEventParameters extends EventParameters { - /** Table identifier. */ - private final int tableId; - - /** Schema descriptor. */ - private final SchemaDescriptor schemaDescriptor; - - /** - * Constructor. - * - * @param causalityToken Causality token. - * @param tableId Table id. - * @param schemaDescriptor Schema descriptor. - */ - public SchemaEventParameters(long causalityToken, int tableId, SchemaDescriptor schemaDescriptor) { - super(causalityToken); - - this.tableId = tableId; - this.schemaDescriptor = schemaDescriptor; - } - - /** - * Get a table id. - * - * @return Table id. - */ - public int tableId() { - return tableId; - } - - /** - * Gets schema descriptor. - * - * @return Schema descriptor. - */ - public SchemaDescriptor schemaDescriptor() { - return schemaDescriptor; - } -} diff --git a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java index 68413b652f..5877552761 100644 --- a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java +++ b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java @@ -35,18 +35,11 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Consumer; import java.util.function.LongFunction; import java.util.function.Supplier; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.calcite.schema.SchemaPlus; import org.apache.calcite.tools.Frameworks; -import org.apache.calcite.util.Pair; import org.apache.ignite.internal.catalog.CatalogManager; -import org.apache.ignite.internal.event.Event; -import org.apache.ignite.internal.event.EventListener; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; -import org.apache.ignite.internal.index.IndexManager; -import org.apache.ignite.internal.index.event.IndexEvent; import org.apache.ignite.internal.lang.IgniteInternalException; import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; @@ -88,7 +81,6 @@ import org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory; import org.apache.ignite.internal.sql.metrics.SqlClientMetricSource; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.table.distributed.TableManager; -import org.apache.ignite.internal.table.event.TableEvent; import org.apache.ignite.internal.tx.InternalTransaction; import org.apache.ignite.internal.util.AsyncCursor; import org.apache.ignite.internal.util.IgniteSpinBusyLock; @@ -146,8 +138,6 @@ public class SqlQueryProcessor implements QueryProcessor { private final TableManager tableManager; - private final IndexManager indexManager; - private final CatalogSchemaManager schemaManager; private final DataStorageManager dataStorageManager; @@ -157,9 +147,6 @@ public class SqlQueryProcessor implements QueryProcessor { /** Busy lock for stop synchronisation. */ private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock(); - /** Event listeners to close. */ - private final List<Pair<Event, EventListener>> evtLsnrs = new ArrayList<>(); - private final ReplicaService replicaService; private volatile SessionManager sessionManager; @@ -189,7 +176,6 @@ public class SqlQueryProcessor implements QueryProcessor { Consumer<LongFunction<CompletableFuture<?>>> registry, ClusterService clusterSrvc, TableManager tableManager, - IndexManager indexManager, CatalogSchemaManager schemaManager, DataStorageManager dataStorageManager, Supplier<Map<String, Map<String, Class<?>>>> dataStorageFieldsSupplier, @@ -200,7 +186,6 @@ public class SqlQueryProcessor implements QueryProcessor { ) { this.clusterSrvc = clusterSrvc; this.tableManager = tableManager; - this.indexManager = indexManager; this.schemaManager = schemaManager; this.dataStorageManager = dataStorageManager; this.dataStorageFieldsSupplier = dataStorageFieldsSupplier; @@ -318,18 +303,7 @@ public class SqlQueryProcessor implements QueryProcessor { Collections.reverse(services); - Stream<AutoCloseable> closableComponents = services.stream().map(s -> s::stop); - - Stream<AutoCloseable> closableListeners = evtLsnrs.stream() - .map((p) -> () -> { - if (p.left instanceof TableEvent) { - tableManager.removeListener((TableEvent) p.left, p.right); - } else { - indexManager.removeListener((IndexEvent) p.left, p.right); - } - }); - - IgniteUtils.closeAll(Stream.concat(closableComponents, closableListeners).collect(Collectors.toList())); + IgniteUtils.closeAll(services.stream().map(s -> s::stop)); } /** {@inheritDoc} */ diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 6830a03fab..63e6682d4d 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -75,6 +75,7 @@ import java.util.function.IntSupplier; import java.util.function.LongFunction; import java.util.function.Supplier; import java.util.stream.Collectors; +import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.ignite.internal.affinity.AffinityUtils; import org.apache.ignite.internal.affinity.Assignment; @@ -90,7 +91,6 @@ import org.apache.ignite.internal.catalog.events.DropTableEventParameters; import org.apache.ignite.internal.causality.CompletionListener; import org.apache.ignite.internal.causality.IncrementalVersionedValue; import org.apache.ignite.internal.distributionzones.DistributionZoneManager; -import org.apache.ignite.internal.event.AbstractEventProducer; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.ByteArray; @@ -125,7 +125,6 @@ import org.apache.ignite.internal.replicator.ReplicaService; import org.apache.ignite.internal.replicator.TablePartitionId; import org.apache.ignite.internal.schema.CatalogSchemaManager; import org.apache.ignite.internal.schema.configuration.GcConfiguration; -import org.apache.ignite.internal.schema.event.SchemaEvent; import org.apache.ignite.internal.storage.DataStorageManager; import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.RowId; @@ -159,8 +158,6 @@ import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; import org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller; import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl; import org.apache.ignite.internal.table.distributed.storage.PartitionStorages; -import org.apache.ignite.internal.table.event.TableEvent; -import org.apache.ignite.internal.table.event.TableEventParameters; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.LockManager; @@ -191,7 +188,7 @@ import org.jetbrains.annotations.TestOnly; /** * Table manager. */ -public class TableManager extends AbstractEventProducer<TableEvent, TableEventParameters> implements IgniteTablesInternal, IgniteComponent { +public class TableManager implements IgniteTablesInternal, IgniteComponent { private static final long QUERY_DATA_NODES_COUNT_TIMEOUT = TimeUnit.SECONDS.toMillis(3); /** The logger. */ @@ -530,12 +527,6 @@ public class TableManager extends AbstractEventProducer<TableEvent, TableEventPa return onTableDelete(((DropTableEventParameters) parameters)).thenApply(unused -> false); }); - schemaManager.listen(SchemaEvent.CREATE, (parameters, exception) -> inBusyLockAsync(busyLock, () -> { - var eventParameters = new TableEventParameters(parameters.causalityToken(), parameters.tableId()); - - return fireEvent(TableEvent.ALTER, eventParameters).thenApply(v -> false); - })); - addMessageHandler(clusterService.messagingService()); }); } @@ -645,27 +636,19 @@ public class TableManager extends AbstractEventProducer<TableEvent, TableEventPa } private CompletableFuture<Void> onTableDelete(DropTableEventParameters parameters) { - long causalityToken = parameters.causalityToken(); - int catalogVersion = parameters.catalogVersion(); - - int tableId = parameters.tableId(); - - if (!busyLock.enterBusy()) { - fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tableId), new NodeStoppingException()); + return inBusyLockAsync(busyLock, () -> { + long causalityToken = parameters.causalityToken(); + int catalogVersion = parameters.catalogVersion(); - return failedFuture(new NodeStoppingException()); - } + int tableId = parameters.tableId(); - try { CatalogTableDescriptor tableDescriptor = getTableDescriptor(tableId, catalogVersion - 1); CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion - 1); dropTableLocally(causalityToken, tableDescriptor, zoneDescriptor); - } finally { - busyLock.leaveBusy(); - } - return completedFuture(null); + return completedFuture(null); + }); } /** @@ -1214,15 +1197,8 @@ public class TableManager extends AbstractEventProducer<TableEvent, TableEventPa * @return Future that will be completed when local changes related to the table creation are applied. */ private CompletableFuture<?> createTableLocally(long causalityToken, int catalogVersion, CatalogTableDescriptor tableDescriptor) { - int tableId = tableDescriptor.id(); - - if (!busyLock.enterBusy()) { - fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId), new NodeStoppingException()); - - return failedFuture(new NodeStoppingException()); - } - - try { + return inBusyLockAsync(busyLock, () -> { + int tableId = tableDescriptor.id(); int zoneId = tableDescriptor.zoneId(); CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion); @@ -1256,9 +1232,7 @@ public class TableManager extends AbstractEventProducer<TableEvent, TableEventPa } } }).thenCompose(ignored -> writeTableAssignmentsToMetastore(tableId, assignmentsFuture)); - } finally { - busyLock.leaveBusy(); - } + }); } /** @@ -1329,10 +1303,9 @@ public class TableManager extends AbstractEventProducer<TableEvent, TableEventPa tablesById(causalityToken).thenRun(() -> inBusyLock(busyLock, () -> completeApiCreateFuture(table))); // TODO should be reworked in IGNITE-16763 - // We use the event notification future as the result so that dependent components can complete the schema updates. // TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible performance degradation. - return allOf(createPartsFut, fireEvent(TableEvent.CREATE, new TableEventParameters(causalityToken, tableId))); + return createPartsFut.thenApply(ignore -> null); } /** @@ -1474,22 +1447,13 @@ public class TableManager extends AbstractEventProducer<TableEvent, TableEventPa startedTables.remove(tableId); - fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tableId)) - .whenComplete((v, e) -> { - Set<ByteArray> assignmentKeys = new HashSet<>(); - - for (int p = 0; p < partitions; p++) { - assignmentKeys.add(stablePartAssignmentsKey(new TablePartitionId(tableId, p))); - } + Set<ByteArray> assignmentKeys = IntStream.range(0, partitions) + .mapToObj(p -> stablePartAssignmentsKey(new TablePartitionId(tableId, p))) + .collect(Collectors.toSet()); - metaStorageMgr.removeAll(assignmentKeys); - - if (e != null) { - LOG.error("Error on " + TableEvent.DROP + " notification", e); - } - }); + metaStorageMgr.removeAll(assignmentKeys); } catch (NodeStoppingException e) { - fireEvent(TableEvent.DROP, new TableEventParameters(causalityToken, tableId), e); + // No op. } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java deleted file mode 100644 index ece0849fab..0000000000 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java +++ /dev/null @@ -1,34 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.table.event; - -import org.apache.ignite.internal.event.Event; - -/** - * Table management events. - */ -public enum TableEvent implements Event { - /** This event is fired when a table was created. */ - CREATE, - - /** This event is fired when a table config was changed. */ - ALTER, - - /** This event is fired when a table was dropped. */ - DROP -} diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java deleted file mode 100644 index 7478938d25..0000000000 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.internal.table.event; - -import org.apache.ignite.internal.event.EventParameters; - -/** - * Table event parameters. There are properties which associate with a concrete table. - */ -public class TableEventParameters extends EventParameters { - /** Table identifier. */ - private final int tableId; - - /** - * Constructor. - * - * @param causalityToken Causality token. - * @param tableId Table identifier. - */ - public TableEventParameters(long causalityToken, int tableId) { - super(causalityToken); - this.tableId = tableId; - } - - /** - * Get the table identifier. - * - * @return Table id. - */ - public int tableId() { - return tableId; - } -} diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java index 1fd032726f..83912c6855 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java @@ -55,10 +55,8 @@ import java.util.List; import java.util.Set; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.Flow.Subscription; import java.util.concurrent.Phaser; -import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.LongFunction; import org.apache.ignite.internal.affinity.AffinityUtils; @@ -106,7 +104,6 @@ import org.apache.ignite.internal.table.TableImpl; import org.apache.ignite.internal.table.TableTestUtils; import org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager; import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService; -import org.apache.ignite.internal.table.event.TableEvent; import org.apache.ignite.internal.testframework.IgniteAbstractTest; import org.apache.ignite.internal.tx.HybridTimestampTracker; import org.apache.ignite.internal.tx.TxManager; @@ -666,20 +663,10 @@ public class TableManagerTest extends IgniteAbstractTest { }); } - CountDownLatch createTblLatch = new CountDownLatch(1); - - tableManager.listen(TableEvent.CREATE, (parameters, exception) -> { - createTblLatch.countDown(); - - return completedFuture(true); - }); - createZone(PARTITIONS, REPLICAS); createTable(tableName); - assertTrue(createTblLatch.await(10, TimeUnit.SECONDS)); - TableImpl tbl2 = tableManager.tableImpl(tableName); assertNotNull(tbl2);