This is an automated email from the ASF dual-hosted git repository.
amashenkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 95a1291dda IGNITE-20339 Get rid of legacy IndexEvent, TableEvent and
SchemaEvent (#2619)
95a1291dda is described below
commit 95a1291ddaedf40da06b6ff97160f03dda35718a
Author: Andrew V. Mashenkov <[email protected]>
AuthorDate: Thu Sep 28 18:29:03 2023 +0300
IGNITE-20339 Get rid of legacy IndexEvent, TableEvent and SchemaEvent
(#2619)
---
.../apache/ignite/internal/index/IndexManager.java | 58 +++++++-----------
.../ignite/internal/index/event/IndexEvent.java | 31 ----------
.../internal/index/event/IndexEventParameters.java | 60 -------------------
.../runner/app/ItIgniteNodeRestartTest.java | 1 -
.../org/apache/ignite/internal/app/IgniteImpl.java | 1 -
.../internal/schema/CatalogSchemaManager.java | 19 +-----
.../ignite/internal/schema/event/SchemaEvent.java | 28 ---------
.../schema/event/SchemaEventParameters.java | 64 --------------------
.../internal/sql/engine/SqlQueryProcessor.java | 28 +--------
.../internal/table/distributed/TableManager.java | 70 ++++++----------------
.../ignite/internal/table/event/TableEvent.java | 34 -----------
.../internal/table/event/TableEventParameters.java | 48 ---------------
.../table/distributed/TableManagerTest.java | 13 ----
13 files changed, 41 insertions(+), 414 deletions(-)
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
index e0c929d7ca..885357fced 100644
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
+++
b/modules/index/src/main/java/org/apache/ignite/internal/index/IndexManager.java
@@ -42,9 +42,6 @@ import org.apache.ignite.internal.catalog.events.CatalogEvent;
import org.apache.ignite.internal.catalog.events.CreateIndexEventParameters;
import org.apache.ignite.internal.catalog.events.DropIndexEventParameters;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
-import org.apache.ignite.internal.event.AbstractEventProducer;
-import org.apache.ignite.internal.index.event.IndexEvent;
-import org.apache.ignite.internal.index.event.IndexEventParameters;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
@@ -74,7 +71,7 @@ import org.apache.ignite.internal.util.IgniteSpinBusyLock;
* as well as managing indexes' lifecycle.
*/
// TODO: IGNITE-19082 Delete this class
-public class IndexManager extends AbstractEventProducer<IndexEvent,
IndexEventParameters> implements IgniteComponent {
+public class IndexManager implements IgniteComponent {
private static final IgniteLogger LOG =
Loggers.forClass(IndexManager.class);
/** Schema manager. */
@@ -203,39 +200,28 @@ public class IndexManager extends
AbstractEventProducer<IndexEvent, IndexEventPa
}
private CompletableFuture<Boolean>
onIndexCreate(CreateIndexEventParameters parameters) {
- CatalogIndexDescriptor index = parameters.indexDescriptor();
+ return inBusyLockAsync(busyLock, () -> {
+ CatalogIndexDescriptor index = parameters.indexDescriptor();
- int indexId = index.id();
- int tableId = index.tableId();
+ int indexId = index.id();
+ int tableId = index.tableId();
- long causalityToken = parameters.causalityToken();
- int catalogVersion = parameters.catalogVersion();
+ long causalityToken = parameters.causalityToken();
+ int catalogVersion = parameters.catalogVersion();
- return inBusyLockAsync(busyLock, () -> {
CatalogTableDescriptor table = catalogManager.table(tableId,
catalogVersion);
assert table != null : "tableId=" + tableId + ", indexId=" +
indexId;
- return createIndexLocallyBusy(causalityToken, table,
index).thenApply(unused -> false);
- });
- }
-
- private CompletableFuture<?> createIndexLocallyBusy(
- long causalityToken,
- CatalogTableDescriptor table,
- CatalogIndexDescriptor index
- ) {
- int tableId = table.id();
- int indexId = index.id();
-
- if (LOG.isInfoEnabled()) {
- LOG.info(
- "Creating local index: name={}, id={}, tableId={},
token={}",
- index.name(), indexId, tableId, causalityToken
- );
- }
+ if (LOG.isInfoEnabled()) {
+ LOG.info(
+ "Creating local index: name={}, id={}, tableId={},
token={}",
+ index.name(), indexId, tableId, causalityToken
+ );
+ }
- return registerIndexBusyAsync(table, index, causalityToken);
+ return startIndexAsync(table, index,
causalityToken).thenApply(unused -> false);
+ });
}
/**
@@ -348,7 +334,7 @@ public class IndexManager extends
AbstractEventProducer<IndexEvent, IndexEventPa
assert table != null : "tableId=" + tableId + ", indexId=" +
index.id();
- startIndexFutures.add(registerIndexBusyAsync(table, index,
causalityToken));
+ startIndexFutures.add(startIndexAsync(table, index,
causalityToken));
}
// Forces to wait until recovery is complete before the metastore
watches are deployed to avoid races with other components.
@@ -362,7 +348,7 @@ public class IndexManager extends
AbstractEventProducer<IndexEvent, IndexEventPa
});
}
- private CompletableFuture<?> registerIndexBusyAsync(
+ private CompletableFuture<?> startIndexAsync(
CatalogTableDescriptor table,
CatalogIndexDescriptor index,
long causalityToken
@@ -378,20 +364,20 @@ public class IndexManager extends
AbstractEventProducer<IndexEvent, IndexEventPa
causalityToken,
updater(mvTableStorageById ->
tablePartitionFuture.thenCombine(schemaRegistryFuture,
(partitionSet, schemaRegistry) -> inBusyLock(busyLock,
() -> {
- registerIndexBusy(table, index, partitionSet,
schemaRegistry);
+ registerIndex(table, index, partitionSet,
schemaRegistry);
- return
addMvTableStorageIfAbsent(mvTableStorageById,
getTableImplStrictBusy(tableId).internalTable().storage());
+ return
addMvTableStorageIfAbsent(mvTableStorageById,
getTableImplStrict(tableId).internalTable().storage());
})))
);
}
- private void registerIndexBusy(
+ private void registerIndex(
CatalogTableDescriptor table,
CatalogIndexDescriptor index,
PartitionSet partitionSet,
SchemaRegistry schemaRegistry
) {
- TableImpl tableImpl = getTableImplStrictBusy(table.id());
+ TableImpl tableImpl = getTableImplStrict(table.id());
var storageIndexDescriptor = StorageIndexDescriptor.create(table,
index);
@@ -454,7 +440,7 @@ public class IndexManager extends
AbstractEventProducer<IndexEvent, IndexEventPa
return newMap;
}
- private TableImpl getTableImplStrictBusy(int tableId) {
+ private TableImpl getTableImplStrict(int tableId) {
TableImpl table = tableManager.getTable(tableId);
assert table != null : tableId;
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEvent.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEvent.java
deleted file mode 100644
index 1105c39205..0000000000
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEvent.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.index.event;
-
-import org.apache.ignite.internal.event.Event;
-
-/**
- * Index management events.
- */
-public enum IndexEvent implements Event {
- /** This event is fired when an index was created. */
- CREATE,
-
- /** This event is fired when an index was dropped. */
- DROP
-}
diff --git
a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
b/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
deleted file mode 100644
index 14e9e8d272..0000000000
---
a/modules/index/src/main/java/org/apache/ignite/internal/index/event/IndexEventParameters.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.index.event;
-
-import org.apache.ignite.internal.event.EventParameters;
-
-/** Index event parameters. There are properties which associate with a
particular index. */
-public class IndexEventParameters extends EventParameters {
- private final int tableId;
-
- private final int indexId;
-
- private final int catalogVersion;
-
- /**
- * Constructor.
- *
- * @param revision Causality token.
- * @param catalogVersion Catalog version.
- * @param tableId Table ID.
- * @param indexId Index ID.
- */
- public IndexEventParameters(long revision, int catalogVersion, int
tableId, int indexId) {
- super(revision);
-
- this.catalogVersion = catalogVersion;
- this.tableId = tableId;
- this.indexId = indexId;
- }
-
- /** Returns table ID this event relates to. */
- public int tableId() {
- return tableId;
- }
-
- /** Returns index ID this event relates to. */
- public int indexId() {
- return indexId;
- }
-
- /** Returns catalog version this event relates to. */
- public int catalogVersion() {
- return catalogVersion;
- }
-}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 8bee8c427b..bc6e9dda43 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -399,7 +399,6 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
registry,
clusterSvc,
tableManager,
- indexManager,
schemaManager,
dataStorageManager,
() ->
dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index b27921f3d1..94dd53e894 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -582,7 +582,6 @@ public class IgniteImpl implements Ignite {
registry,
clusterSvc,
distributedTblMgr,
- indexManager,
schemaManager,
dataStorageMgr,
() ->
dataStorageModules.collectSchemasFields(modules.distributed().polymorphicSchemaExtensions()),
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
index 6f2d49c2d4..f50587b64e 100644
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
+++
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/CatalogSchemaManager.java
@@ -45,17 +45,12 @@ import
org.apache.ignite.internal.catalog.events.CatalogEventParameters;
import org.apache.ignite.internal.catalog.events.CreateTableEventParameters;
import org.apache.ignite.internal.catalog.events.TableEventParameters;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
-import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
-import org.apache.ignite.internal.logger.IgniteLogger;
-import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.metastorage.MetaStorageManager;
-import org.apache.ignite.internal.schema.event.SchemaEvent;
-import org.apache.ignite.internal.schema.event.SchemaEventParameters;
import
org.apache.ignite.internal.schema.marshaller.schema.SchemaSerializerImpl;
import org.apache.ignite.internal.schema.registry.SchemaRegistryImpl;
import org.apache.ignite.internal.util.ByteUtils;
@@ -66,9 +61,7 @@ import org.jetbrains.annotations.Nullable;
/**
* This class services management of table schemas.
*/
-public class CatalogSchemaManager extends AbstractEventProducer<SchemaEvent,
SchemaEventParameters> implements IgniteComponent {
- private static final IgniteLogger LOGGER =
Loggers.forClass(CatalogSchemaManager.class);
-
+public class CatalogSchemaManager implements IgniteComponent {
/** Schema history key predicate part. */
private static final String SCHEMA_STORE_PREFIX = ".sch-hist.";
private static final String LATEST_SCHEMA_VERSION_STORE_SUFFIX =
".sch-hist-latest";
@@ -174,16 +167,6 @@ public class CatalogSchemaManager extends
AbstractEventProducer<SchemaEvent, Sch
return failedFuture(e);
}
- // Fire event early, because dependent listeners have to register
VersionedValues' update futures
- var eventParams = new SchemaEventParameters(causalityToken,
tableId, newSchema);
-
- fireEvent(SchemaEvent.CREATE, eventParams)
- .whenComplete((v, e) -> {
- if (e != null) {
- LOGGER.warn("Error when processing CREATE event",
e);
- }
- });
-
return registriesVv.update(causalityToken, (registries, e) ->
inBusyLock(busyLock, () -> {
if (e != null) {
return failedFuture(new
IgniteInternalException(IgniteStringFormatter.format(
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
deleted file mode 100644
index e3cbe007d1..0000000000
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEvent.java
+++ /dev/null
@@ -1,28 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.schema.event;
-
-import org.apache.ignite.internal.event.Event;
-
-/**
- * Table management events.
- */
-public enum SchemaEvent implements Event {
- /** This event is fired when a schema was created. */
- CREATE
-}
diff --git
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java
b/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java
deleted file mode 100644
index 2b95d4abc2..0000000000
---
a/modules/schema/src/main/java/org/apache/ignite/internal/schema/event/SchemaEventParameters.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.schema.event;
-
-import org.apache.ignite.internal.event.EventParameters;
-import org.apache.ignite.internal.schema.SchemaDescriptor;
-
-/**
- * Table event parameter.
- */
-public class SchemaEventParameters extends EventParameters {
- /** Table identifier. */
- private final int tableId;
-
- /** Schema descriptor. */
- private final SchemaDescriptor schemaDescriptor;
-
- /**
- * Constructor.
- *
- * @param causalityToken Causality token.
- * @param tableId Table id.
- * @param schemaDescriptor Schema descriptor.
- */
- public SchemaEventParameters(long causalityToken, int tableId,
SchemaDescriptor schemaDescriptor) {
- super(causalityToken);
-
- this.tableId = tableId;
- this.schemaDescriptor = schemaDescriptor;
- }
-
- /**
- * Get a table id.
- *
- * @return Table id.
- */
- public int tableId() {
- return tableId;
- }
-
- /**
- * Gets schema descriptor.
- *
- * @return Schema descriptor.
- */
- public SchemaDescriptor schemaDescriptor() {
- return schemaDescriptor;
- }
-}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
index 68413b652f..5877552761 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/SqlQueryProcessor.java
@@ -35,18 +35,11 @@ import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import java.util.function.Supplier;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.tools.Frameworks;
-import org.apache.calcite.util.Pair;
import org.apache.ignite.internal.catalog.CatalogManager;
-import org.apache.ignite.internal.event.Event;
-import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
-import org.apache.ignite.internal.index.IndexManager;
-import org.apache.ignite.internal.index.event.IndexEvent;
import org.apache.ignite.internal.lang.IgniteInternalException;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.logger.IgniteLogger;
@@ -88,7 +81,6 @@ import
org.apache.ignite.internal.sql.engine.util.cache.CaffeineCacheFactory;
import org.apache.ignite.internal.sql.metrics.SqlClientMetricSource;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.table.distributed.TableManager;
-import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.tx.InternalTransaction;
import org.apache.ignite.internal.util.AsyncCursor;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
@@ -146,8 +138,6 @@ public class SqlQueryProcessor implements QueryProcessor {
private final TableManager tableManager;
- private final IndexManager indexManager;
-
private final CatalogSchemaManager schemaManager;
private final DataStorageManager dataStorageManager;
@@ -157,9 +147,6 @@ public class SqlQueryProcessor implements QueryProcessor {
/** Busy lock for stop synchronisation. */
private final IgniteSpinBusyLock busyLock = new IgniteSpinBusyLock();
- /** Event listeners to close. */
- private final List<Pair<Event, EventListener>> evtLsnrs = new
ArrayList<>();
-
private final ReplicaService replicaService;
private volatile SessionManager sessionManager;
@@ -189,7 +176,6 @@ public class SqlQueryProcessor implements QueryProcessor {
Consumer<LongFunction<CompletableFuture<?>>> registry,
ClusterService clusterSrvc,
TableManager tableManager,
- IndexManager indexManager,
CatalogSchemaManager schemaManager,
DataStorageManager dataStorageManager,
Supplier<Map<String, Map<String, Class<?>>>>
dataStorageFieldsSupplier,
@@ -200,7 +186,6 @@ public class SqlQueryProcessor implements QueryProcessor {
) {
this.clusterSrvc = clusterSrvc;
this.tableManager = tableManager;
- this.indexManager = indexManager;
this.schemaManager = schemaManager;
this.dataStorageManager = dataStorageManager;
this.dataStorageFieldsSupplier = dataStorageFieldsSupplier;
@@ -318,18 +303,7 @@ public class SqlQueryProcessor implements QueryProcessor {
Collections.reverse(services);
- Stream<AutoCloseable> closableComponents = services.stream().map(s ->
s::stop);
-
- Stream<AutoCloseable> closableListeners = evtLsnrs.stream()
- .map((p) -> () -> {
- if (p.left instanceof TableEvent) {
- tableManager.removeListener((TableEvent) p.left,
p.right);
- } else {
- indexManager.removeListener((IndexEvent) p.left,
p.right);
- }
- });
-
- IgniteUtils.closeAll(Stream.concat(closableComponents,
closableListeners).collect(Collectors.toList()));
+ IgniteUtils.closeAll(services.stream().map(s -> s::stop));
}
/** {@inheritDoc} */
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 6830a03fab..63e6682d4d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -75,6 +75,7 @@ import java.util.function.IntSupplier;
import java.util.function.LongFunction;
import java.util.function.Supplier;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
import java.util.stream.Stream;
import org.apache.ignite.internal.affinity.AffinityUtils;
import org.apache.ignite.internal.affinity.Assignment;
@@ -90,7 +91,6 @@ import
org.apache.ignite.internal.catalog.events.DropTableEventParameters;
import org.apache.ignite.internal.causality.CompletionListener;
import org.apache.ignite.internal.causality.IncrementalVersionedValue;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
-import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
@@ -125,7 +125,6 @@ import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.schema.CatalogSchemaManager;
import org.apache.ignite.internal.schema.configuration.GcConfiguration;
-import org.apache.ignite.internal.schema.event.SchemaEvent;
import org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
@@ -159,8 +158,6 @@ import
org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
import
org.apache.ignite.internal.table.distributed.schema.ThreadLocalPartitionCommandsMarshaller;
import org.apache.ignite.internal.table.distributed.storage.InternalTableImpl;
import org.apache.ignite.internal.table.distributed.storage.PartitionStorages;
-import org.apache.ignite.internal.table.event.TableEvent;
-import org.apache.ignite.internal.table.event.TableEventParameters;
import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.LockManager;
@@ -191,7 +188,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Table manager.
*/
-public class TableManager extends AbstractEventProducer<TableEvent,
TableEventParameters> implements IgniteTablesInternal, IgniteComponent {
+public class TableManager implements IgniteTablesInternal, IgniteComponent {
private static final long QUERY_DATA_NODES_COUNT_TIMEOUT =
TimeUnit.SECONDS.toMillis(3);
/** The logger. */
@@ -530,12 +527,6 @@ public class TableManager extends
AbstractEventProducer<TableEvent, TableEventPa
return onTableDelete(((DropTableEventParameters)
parameters)).thenApply(unused -> false);
});
- schemaManager.listen(SchemaEvent.CREATE, (parameters, exception)
-> inBusyLockAsync(busyLock, () -> {
- var eventParameters = new
TableEventParameters(parameters.causalityToken(), parameters.tableId());
-
- return fireEvent(TableEvent.ALTER,
eventParameters).thenApply(v -> false);
- }));
-
addMessageHandler(clusterService.messagingService());
});
}
@@ -645,27 +636,19 @@ public class TableManager extends
AbstractEventProducer<TableEvent, TableEventPa
}
private CompletableFuture<Void> onTableDelete(DropTableEventParameters
parameters) {
- long causalityToken = parameters.causalityToken();
- int catalogVersion = parameters.catalogVersion();
-
- int tableId = parameters.tableId();
-
- if (!busyLock.enterBusy()) {
- fireEvent(TableEvent.DROP, new
TableEventParameters(causalityToken, tableId), new NodeStoppingException());
+ return inBusyLockAsync(busyLock, () -> {
+ long causalityToken = parameters.causalityToken();
+ int catalogVersion = parameters.catalogVersion();
- return failedFuture(new NodeStoppingException());
- }
+ int tableId = parameters.tableId();
- try {
CatalogTableDescriptor tableDescriptor =
getTableDescriptor(tableId, catalogVersion - 1);
CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(tableDescriptor, catalogVersion - 1);
dropTableLocally(causalityToken, tableDescriptor, zoneDescriptor);
- } finally {
- busyLock.leaveBusy();
- }
- return completedFuture(null);
+ return completedFuture(null);
+ });
}
/**
@@ -1214,15 +1197,8 @@ public class TableManager extends
AbstractEventProducer<TableEvent, TableEventPa
* @return Future that will be completed when local changes related to the
table creation are applied.
*/
private CompletableFuture<?> createTableLocally(long causalityToken, int
catalogVersion, CatalogTableDescriptor tableDescriptor) {
- int tableId = tableDescriptor.id();
-
- if (!busyLock.enterBusy()) {
- fireEvent(TableEvent.CREATE, new
TableEventParameters(causalityToken, tableId), new NodeStoppingException());
-
- return failedFuture(new NodeStoppingException());
- }
-
- try {
+ return inBusyLockAsync(busyLock, () -> {
+ int tableId = tableDescriptor.id();
int zoneId = tableDescriptor.zoneId();
CatalogZoneDescriptor zoneDescriptor =
getZoneDescriptor(tableDescriptor, catalogVersion);
@@ -1256,9 +1232,7 @@ public class TableManager extends
AbstractEventProducer<TableEvent, TableEventPa
}
}
}).thenCompose(ignored ->
writeTableAssignmentsToMetastore(tableId, assignmentsFuture));
- } finally {
- busyLock.leaveBusy();
- }
+ });
}
/**
@@ -1329,10 +1303,9 @@ public class TableManager extends
AbstractEventProducer<TableEvent, TableEventPa
tablesById(causalityToken).thenRun(() -> inBusyLock(busyLock, () ->
completeApiCreateFuture(table)));
// TODO should be reworked in IGNITE-16763
- // We use the event notification future as the result so that
dependent components can complete the schema updates.
// TODO: https://issues.apache.org/jira/browse/IGNITE-19913 Possible
performance degradation.
- return allOf(createPartsFut, fireEvent(TableEvent.CREATE, new
TableEventParameters(causalityToken, tableId)));
+ return createPartsFut.thenApply(ignore -> null);
}
/**
@@ -1474,22 +1447,13 @@ public class TableManager extends
AbstractEventProducer<TableEvent, TableEventPa
startedTables.remove(tableId);
- fireEvent(TableEvent.DROP, new
TableEventParameters(causalityToken, tableId))
- .whenComplete((v, e) -> {
- Set<ByteArray> assignmentKeys = new HashSet<>();
-
- for (int p = 0; p < partitions; p++) {
- assignmentKeys.add(stablePartAssignmentsKey(new
TablePartitionId(tableId, p)));
- }
+ Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
+ .mapToObj(p -> stablePartAssignmentsKey(new
TablePartitionId(tableId, p)))
+ .collect(Collectors.toSet());
- metaStorageMgr.removeAll(assignmentKeys);
-
- if (e != null) {
- LOG.error("Error on " + TableEvent.DROP + "
notification", e);
- }
- });
+ metaStorageMgr.removeAll(assignmentKeys);
} catch (NodeStoppingException e) {
- fireEvent(TableEvent.DROP, new
TableEventParameters(causalityToken, tableId), e);
+ // No op.
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
deleted file mode 100644
index ece0849fab..0000000000
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEvent.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.event;
-
-import org.apache.ignite.internal.event.Event;
-
-/**
- * Table management events.
- */
-public enum TableEvent implements Event {
- /** This event is fired when a table was created. */
- CREATE,
-
- /** This event is fired when a table config was changed. */
- ALTER,
-
- /** This event is fired when a table was dropped. */
- DROP
-}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
deleted file mode 100644
index 7478938d25..0000000000
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/event/TableEventParameters.java
+++ /dev/null
@@ -1,48 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.ignite.internal.table.event;
-
-import org.apache.ignite.internal.event.EventParameters;
-
-/**
- * Table event parameters. There are properties which associate with a
concrete table.
- */
-public class TableEventParameters extends EventParameters {
- /** Table identifier. */
- private final int tableId;
-
- /**
- * Constructor.
- *
- * @param causalityToken Causality token.
- * @param tableId Table identifier.
- */
- public TableEventParameters(long causalityToken, int tableId) {
- super(causalityToken);
- this.tableId = tableId;
- }
-
- /**
- * Get the table identifier.
- *
- * @return Table id.
- */
- public int tableId() {
- return tableId;
- }
-}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
index 1fd032726f..83912c6855 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerTest.java
@@ -55,10 +55,8 @@ import java.util.List;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Flow.Subscription;
import java.util.concurrent.Phaser;
-import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.LongFunction;
import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -106,7 +104,6 @@ import org.apache.ignite.internal.table.TableImpl;
import org.apache.ignite.internal.table.TableTestUtils;
import
org.apache.ignite.internal.table.distributed.raft.snapshot.outgoing.OutgoingSnapshotsManager;
import org.apache.ignite.internal.table.distributed.schema.SchemaSyncService;
-import org.apache.ignite.internal.table.event.TableEvent;
import org.apache.ignite.internal.testframework.IgniteAbstractTest;
import org.apache.ignite.internal.tx.HybridTimestampTracker;
import org.apache.ignite.internal.tx.TxManager;
@@ -666,20 +663,10 @@ public class TableManagerTest extends IgniteAbstractTest {
});
}
- CountDownLatch createTblLatch = new CountDownLatch(1);
-
- tableManager.listen(TableEvent.CREATE, (parameters, exception) -> {
- createTblLatch.countDown();
-
- return completedFuture(true);
- });
-
createZone(PARTITIONS, REPLICAS);
createTable(tableName);
- assertTrue(createTblLatch.await(10, TimeUnit.SECONDS));
-
TableImpl tbl2 = tableManager.tableImpl(tableName);
assertNotNull(tbl2);