This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new c9e5a1fbf20 IGNITE-24665 Extract BuildIndexEventListener from
PartitionReplicaListener (#5408)
c9e5a1fbf20 is described below
commit c9e5a1fbf20f4a3245765c177d28266c5b0a069f
Author: Slava Koptilin <[email protected]>
AuthorDate: Fri Mar 14 15:32:50 2025 +0200
IGNITE-24665 Extract BuildIndexEventListener from PartitionReplicaListener
(#5408)
---
.../PartitionReplicaBuildIndexProcessor.java | 165 +++++++++++++++++++++
.../replicator/PartitionReplicaListener.java | 103 ++-----------
2 files changed, 177 insertions(+), 91 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaBuildIndexProcessor.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaBuildIndexProcessor.java
new file mode 100644
index 00000000000..471169edd87
--- /dev/null
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaBuildIndexProcessor.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
+import static
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.latestIndexDescriptorInBuildingStatus;
+import static
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.rwTxActiveCatalogVersion;
+import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import
org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
+import org.apache.ignite.internal.event.EventListener;
+import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * Processor that handles catalog events {@link CatalogEvent#INDEX_BUILDING}
and
+ * tracks read-write transaction operations for building indexes.
+ */
+public class PartitionReplicaBuildIndexProcessor {
+ /** Busy lock to stop synchronously. */
+ private final IgniteSpinBusyLock busyLock;
+
+ private final int tableId;
+
+ private final IndexMetaStorage indexMetaStorage;
+
+ /** Read-write transaction operation tracker for building indexes. */
+ private final IndexBuilderTxRwOperationTracker txRwOperationTracker;
+
+ private final CatalogService catalogService;
+
+ /** Listener for {@link CatalogEvent#INDEX_BUILDING}. */
+ private final EventListener<CatalogEventParameters> listener =
this::onIndexBuilding;
+
+ /**
+ * Creates a new instance of {code PartitionReplicaBuildIndexProcessor}
+ * and registers a new listener for the {@link
CatalogEvent#INDEX_BUILDING} event.
+ *
+ * @param busyLock Busy lock to stop synchronously.
+ * @param tableId Table ID.
+ * @param indexMetaStorage Index meta storage.
+ * @param catalogService Catalog service.
+ */
+ PartitionReplicaBuildIndexProcessor(
+ IgniteSpinBusyLock busyLock,
+ int tableId,
+ IndexMetaStorage indexMetaStorage,
+ CatalogService catalogService
+ ) {
+ this.busyLock = busyLock;
+ this.tableId = tableId;
+ this.indexMetaStorage = indexMetaStorage;
+ this.txRwOperationTracker = new IndexBuilderTxRwOperationTracker();
+ this.catalogService = catalogService;
+
+ prepareIndexBuilderTxRwOperationTracker();
+ }
+
+ /**
+ * Returns read-write transaction operation tracker.
+ *
+ * @return Read-write transaction operation tracker.
+ */
+ IndexBuilderTxRwOperationTracker tracker() {
+ return txRwOperationTracker;
+ }
+
+ /**
+ * Stops the listener for the {@link CatalogEvent#INDEX_BUILDING} event.
+ */
+ void onShutdown() {
+ assert busyLock.blockedByCurrentThread() : "Busy lock must be locked
by the current thread.";
+
+ catalogService.removeListener(CatalogEvent.INDEX_BUILDING, listener);
+
+ txRwOperationTracker.close();
+ }
+
+ void incrementRwOperationCountIfNeeded(ReplicaRequest request) {
+ if (request instanceof ReadWriteReplicaRequest) {
+ int rwTxActiveCatalogVersion =
rwTxActiveCatalogVersion(catalogService, (ReadWriteReplicaRequest) request);
+
+ // It is very important that the counter is increased only after
the schema sync at the begin timestamp of RW transaction,
+ // otherwise there may be races/errors and the index will not be
able to start building.
+ if
(!txRwOperationTracker.incrementOperationCount(rwTxActiveCatalogVersion)) {
+ throw new
StaleTransactionOperationException(((ReadWriteReplicaRequest)
request).transactionId());
+ }
+ }
+ }
+
+ void decrementRwOperationCountIfNeeded(ReplicaRequest request) {
+ if (request instanceof ReadWriteReplicaRequest) {
+ txRwOperationTracker.decrementOperationCount(
+ rwTxActiveCatalogVersion(catalogService,
(ReadWriteReplicaRequest) request)
+ );
+ }
+ }
+
+ private void prepareIndexBuilderTxRwOperationTracker() {
+ // Expected to be executed on the metastore thread.
+ CatalogIndexDescriptor indexDescriptor =
latestIndexDescriptorInBuildingStatus(catalogService, tableId);
+
+ if (indexDescriptor != null) {
+ IndexMeta indexMeta =
indexMetaStorage.indexMeta(indexDescriptor.id());
+
+ assert indexMeta != null : indexDescriptor.id();
+
+
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(indexMeta.statusChange(REGISTERED).catalogVersion());
+ }
+
+ catalogService.listen(CatalogEvent.INDEX_BUILDING, listener);
+ }
+
+ private CompletableFuture<Boolean> onIndexBuilding(CatalogEventParameters
parameters) {
+ if (!busyLock.enterBusy()) {
+ return trueCompletedFuture();
+ }
+
+ try {
+ int indexId = ((StartBuildingIndexEventParameters)
parameters).indexId();
+
+ IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
+
+ assert indexMeta != null : "indexId=" + indexId + ",
catalogVersion=" + parameters.catalogVersion();
+
+ MetaIndexStatusChange registeredStatusChange =
indexMeta.statusChange(REGISTERED);
+
+ if (indexMeta.tableId() == tableId) {
+
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(registeredStatusChange.catalogVersion());
+ }
+
+ return falseCompletedFuture();
+ } catch (Throwable t) {
+ return failedFuture(t);
+ } finally {
+ busyLock.leaveBusy();
+ }
+ }
+}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 91a219805b9..848b2e7bc43 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -35,11 +35,8 @@ import static
org.apache.ignite.internal.partitiondistribution.Assignments.fromB
import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
-import static
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
import static
org.apache.ignite.internal.table.distributed.replicator.RemoteResourceIds.cursorId;
import static
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.beginRwTxTs;
-import static
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.latestIndexDescriptorInBuildingStatus;
-import static
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.rwTxActiveCatalogVersion;
import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
import static org.apache.ignite.internal.tx.TxState.ABORTED;
import static org.apache.ignite.internal.tx.TxState.COMMITTED;
@@ -50,10 +47,8 @@ import static
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
-import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.IgniteUtils.findAny;
import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
@@ -87,12 +82,8 @@ import java.util.function.Supplier;
import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
import org.apache.ignite.internal.catalog.Catalog;
import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.events.CatalogEvent;
-import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
-import
org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
-import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.ClockService;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -197,9 +188,7 @@ import
org.apache.ignite.internal.table.distributed.SortedIndexLocker;
import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
import
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
import org.apache.ignite.internal.table.distributed.TableUtils;
-import org.apache.ignite.internal.table.distributed.index.IndexMeta;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
-import
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
import
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
import org.apache.ignite.internal.tx.Lock;
import org.apache.ignite.internal.tx.LockKey;
@@ -338,16 +327,14 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
/** Prevents double stopping. */
private final AtomicBoolean stopGuard = new AtomicBoolean();
- /** Read-write transaction operation tracker for building indexes. */
- private final IndexBuilderTxRwOperationTracker txRwOperationTracker = new
IndexBuilderTxRwOperationTracker();
-
- /** Listener for {@link CatalogEvent#INDEX_BUILDING}. */
- private final EventListener<CatalogEventParameters>
indexBuildingCatalogEventListener = this::onIndexBuilding;
+ /**
+ * Processor that handles catalog events {@link
CatalogEvent#INDEX_BUILDING} and
+ * tracks read-write transaction operations for building indexes.
+ */
+ private final PartitionReplicaBuildIndexProcessor indexBuildingProcessor;
private final SchemaRegistry schemaRegistry;
- private final IndexMetaStorage indexMetaStorage;
-
private final LowWatermark lowWatermark;
private static final boolean SKIP_UPDATES =
getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
@@ -433,7 +420,6 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
this.catalogService = catalogService;
this.remotelyTriggeredResourceRegistry =
remotelyTriggeredResourceRegistry;
this.schemaRegistry = schemaRegistry;
- this.indexMetaStorage = indexMetaStorage;
this.lowWatermark = lowWatermark;
this.replicationGroupId = replicationGroupId;
this.tableId = tableId;
@@ -441,6 +427,8 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
this.schemaCompatValidator = new
SchemaCompatibilityValidator(validationSchemasSource, catalogService,
schemaSyncService);
+ indexBuildingProcessor = new
PartitionReplicaBuildIndexProcessor(busyLock, tableId, indexMetaStorage,
catalogService);
+
replicaPrimacyEngine = new ReplicaPrimacyEngine(placementDriver,
clockService, replicationGroupId, localNode);
reliableCatalogVersions = new
ReliableCatalogVersions(schemaSyncService, catalogService);
raftCommandApplicator = new
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
@@ -483,13 +471,9 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler(
indexMetaStorage,
- txRwOperationTracker,
+ indexBuildingProcessor.tracker(),
safeTime,
raftCommandApplicator);
-
- // TODO https://issues.apache.org/jira/browse/IGNITE-24665
- // Consider extracting the following code to a separate class
BuildIndexEventListener.
- prepareIndexBuilderTxRwOperationTracker();
}
// TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove.
@@ -3549,9 +3533,7 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
busyLock.block();
- catalogService.removeListener(CatalogEvent.INDEX_BUILDING,
indexBuildingCatalogEventListener);
-
- txRwOperationTracker.close();
+ indexBuildingProcessor.onShutdown();
}
private int partId() {
@@ -3568,7 +3550,7 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
ReplicaPrimacy replicaPrimacy,
@Nullable HybridTimestamp opStartTsIfDirectRo
) {
- incrementRwOperationCountIfNeeded(request);
+ indexBuildingProcessor.incrementRwOperationCountIfNeeded(request);
UUID txIdLockingLwm = tryToLockLwmIfNeeded(request,
opStartTsIfDirectRo);
@@ -3576,7 +3558,7 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
return processOperationRequest(senderId, request, replicaPrimacy,
opStartTsIfDirectRo)
.whenComplete((unused, throwable) -> {
unlockLwmIfNeeded(txIdLockingLwm, request);
- decrementRwOperationCountIfNeeded(request);
+
indexBuildingProcessor.decrementRwOperationCountIfNeeded(request);
});
} catch (Throwable e) {
try {
@@ -3586,7 +3568,7 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
}
try {
- decrementRwOperationCountIfNeeded(request);
+
indexBuildingProcessor.decrementRwOperationCountIfNeeded(request);
} catch (Throwable decrementProblem) {
e.addSuppressed(decrementProblem);
}
@@ -3594,26 +3576,6 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
}
}
- private void incrementRwOperationCountIfNeeded(ReplicaRequest request) {
- if (request instanceof ReadWriteReplicaRequest) {
- int rwTxActiveCatalogVersion =
rwTxActiveCatalogVersion(catalogService, (ReadWriteReplicaRequest) request);
-
- // It is very important that the counter is increased only after
the schema sync at the begin timestamp of RW transaction,
- // otherwise there may be races/errors and the index will not be
able to start building.
- if
(!txRwOperationTracker.incrementOperationCount(rwTxActiveCatalogVersion)) {
- throw new
StaleTransactionOperationException(((ReadWriteReplicaRequest)
request).transactionId());
- }
- }
- }
-
- private void decrementRwOperationCountIfNeeded(ReplicaRequest request) {
- if (request instanceof ReadWriteReplicaRequest) {
- txRwOperationTracker.decrementOperationCount(
- rwTxActiveCatalogVersion(catalogService,
(ReadWriteReplicaRequest) request)
- );
- }
- }
-
/**
* Generates a fake transaction ID that will only be used to identify one
direct RO operation for purposes of locking and unlocking LWM.
* It should not be used as a replacement for a real transaction ID in
other contexts.
@@ -3688,47 +3650,6 @@ public class PartitionReplicaListener implements
ReplicaListener, ReplicaTablePr
}
}
- private void prepareIndexBuilderTxRwOperationTracker() {
- // Expected to be executed on the metastore thread.
- CatalogIndexDescriptor indexDescriptor =
latestIndexDescriptorInBuildingStatus(catalogService, tableId());
-
- if (indexDescriptor != null) {
- IndexMeta indexMeta =
indexMetaStorage.indexMeta(indexDescriptor.id());
-
- assert indexMeta != null : indexDescriptor.id();
-
-
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(indexMeta.statusChange(REGISTERED).catalogVersion());
- }
-
- catalogService.listen(CatalogEvent.INDEX_BUILDING,
indexBuildingCatalogEventListener);
- }
-
- private CompletableFuture<Boolean> onIndexBuilding(CatalogEventParameters
parameters) {
- if (!busyLock.enterBusy()) {
- return trueCompletedFuture();
- }
-
- try {
- int indexId = ((StartBuildingIndexEventParameters)
parameters).indexId();
-
- IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
-
- assert indexMeta != null : "indexId=" + indexId + ",
catalogVersion=" + parameters.catalogVersion();
-
- MetaIndexStatusChange registeredStatusChange =
indexMeta.statusChange(REGISTERED);
-
- if (indexMeta.tableId() == tableId()) {
-
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(registeredStatusChange.catalogVersion());
- }
-
- return falseCompletedFuture();
- } catch (Throwable t) {
- return failedFuture(t);
- } finally {
- busyLock.leaveBusy();
- }
- }
-
private List<Integer> indexIdsAtRwTxBeginTs(UUID txId) {
return TableUtils.indexIdsAtRwTxBeginTs(catalogService, txId,
tableId());
}