This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c9e5a1fbf20 IGNITE-24665 Extract BuildIndexEventListener from 
PartitionReplicaListener (#5408)
c9e5a1fbf20 is described below

commit c9e5a1fbf20f4a3245765c177d28266c5b0a069f
Author: Slava Koptilin <[email protected]>
AuthorDate: Fri Mar 14 15:32:50 2025 +0200

    IGNITE-24665 Extract BuildIndexEventListener from PartitionReplicaListener 
(#5408)
---
 .../PartitionReplicaBuildIndexProcessor.java       | 165 +++++++++++++++++++++
 .../replicator/PartitionReplicaListener.java       | 103 ++-----------
 2 files changed, 177 insertions(+), 91 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaBuildIndexProcessor.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaBuildIndexProcessor.java
new file mode 100644
index 00000000000..471169edd87
--- /dev/null
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaBuildIndexProcessor.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.table.distributed.replicator;
+
+import static java.util.concurrent.CompletableFuture.failedFuture;
+import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
+import static 
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.latestIndexDescriptorInBuildingStatus;
+import static 
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.rwTxActiveCatalogVersion;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
+
+import java.util.concurrent.CompletableFuture;
+import org.apache.ignite.internal.catalog.CatalogService;
+import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
+import org.apache.ignite.internal.catalog.events.CatalogEvent;
+import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
+import 
org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
+import org.apache.ignite.internal.event.EventListener;
+import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteReplicaRequest;
+import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.table.distributed.index.IndexMeta;
+import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
+import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
+import org.apache.ignite.internal.util.IgniteSpinBusyLock;
+
+/**
+ * Processor that handles catalog events {@link CatalogEvent#INDEX_BUILDING} 
and
+ * tracks read-write transaction operations for building indexes.
+ */
+public class PartitionReplicaBuildIndexProcessor {
+    /** Busy lock to stop synchronously. */
+    private final IgniteSpinBusyLock busyLock;
+
+    private final int tableId;
+
+    private final IndexMetaStorage indexMetaStorage;
+
+    /** Read-write transaction operation tracker for building indexes. */
+    private final IndexBuilderTxRwOperationTracker txRwOperationTracker;
+
+    private final CatalogService catalogService;
+
+    /** Listener for {@link CatalogEvent#INDEX_BUILDING}. */
+    private final EventListener<CatalogEventParameters> listener = 
this::onIndexBuilding;
+
+    /**
+     * Creates a new instance of {code PartitionReplicaBuildIndexProcessor}
+     * and registers a new listener for the {@link 
CatalogEvent#INDEX_BUILDING} event.
+     *
+     * @param busyLock Busy lock to stop synchronously.
+     * @param tableId Table ID.
+     * @param indexMetaStorage Index meta storage.
+     * @param catalogService Catalog service.
+     */
+    PartitionReplicaBuildIndexProcessor(
+            IgniteSpinBusyLock busyLock,
+            int tableId,
+            IndexMetaStorage indexMetaStorage,
+            CatalogService catalogService
+    ) {
+        this.busyLock = busyLock;
+        this.tableId = tableId;
+        this.indexMetaStorage = indexMetaStorage;
+        this.txRwOperationTracker = new IndexBuilderTxRwOperationTracker();
+        this.catalogService = catalogService;
+
+        prepareIndexBuilderTxRwOperationTracker();
+    }
+
+    /**
+     * Returns read-write transaction operation tracker.
+     *
+     * @return Read-write transaction operation tracker.
+     */
+    IndexBuilderTxRwOperationTracker tracker() {
+        return txRwOperationTracker;
+    }
+
+    /**
+     * Stops the listener for the {@link CatalogEvent#INDEX_BUILDING} event.
+     */
+    void onShutdown() {
+        assert busyLock.blockedByCurrentThread() : "Busy lock must be locked 
by the current thread.";
+
+        catalogService.removeListener(CatalogEvent.INDEX_BUILDING, listener);
+
+        txRwOperationTracker.close();
+    }
+
+    void incrementRwOperationCountIfNeeded(ReplicaRequest request) {
+        if (request instanceof ReadWriteReplicaRequest) {
+            int rwTxActiveCatalogVersion = 
rwTxActiveCatalogVersion(catalogService, (ReadWriteReplicaRequest) request);
+
+            // It is very important that the counter is increased only after 
the schema sync at the begin timestamp of RW transaction,
+            // otherwise there may be races/errors and the index will not be 
able to start building.
+            if 
(!txRwOperationTracker.incrementOperationCount(rwTxActiveCatalogVersion)) {
+                throw new 
StaleTransactionOperationException(((ReadWriteReplicaRequest) 
request).transactionId());
+            }
+        }
+    }
+
+    void decrementRwOperationCountIfNeeded(ReplicaRequest request) {
+        if (request instanceof ReadWriteReplicaRequest) {
+            txRwOperationTracker.decrementOperationCount(
+                    rwTxActiveCatalogVersion(catalogService, 
(ReadWriteReplicaRequest) request)
+            );
+        }
+    }
+
+    private void prepareIndexBuilderTxRwOperationTracker() {
+        // Expected to be executed on the metastore thread.
+        CatalogIndexDescriptor indexDescriptor = 
latestIndexDescriptorInBuildingStatus(catalogService, tableId);
+
+        if (indexDescriptor != null) {
+            IndexMeta indexMeta = 
indexMetaStorage.indexMeta(indexDescriptor.id());
+
+            assert indexMeta != null : indexDescriptor.id();
+
+            
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(indexMeta.statusChange(REGISTERED).catalogVersion());
+        }
+
+        catalogService.listen(CatalogEvent.INDEX_BUILDING, listener);
+    }
+
+    private CompletableFuture<Boolean> onIndexBuilding(CatalogEventParameters 
parameters) {
+        if (!busyLock.enterBusy()) {
+            return trueCompletedFuture();
+        }
+
+        try {
+            int indexId = ((StartBuildingIndexEventParameters) 
parameters).indexId();
+
+            IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
+
+            assert indexMeta != null : "indexId=" + indexId + ", 
catalogVersion=" + parameters.catalogVersion();
+
+            MetaIndexStatusChange registeredStatusChange = 
indexMeta.statusChange(REGISTERED);
+
+            if (indexMeta.tableId() == tableId) {
+                
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(registeredStatusChange.catalogVersion());
+            }
+
+            return falseCompletedFuture();
+        } catch (Throwable t) {
+            return failedFuture(t);
+        } finally {
+            busyLock.leaveBusy();
+        }
+    }
+}
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 91a219805b9..848b2e7bc43 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -35,11 +35,8 @@ import static 
org.apache.ignite.internal.partitiondistribution.Assignments.fromB
 import static org.apache.ignite.internal.raft.PeersAndLearners.fromAssignments;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toReplicationGroupIdMessage;
 import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
-import static 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatus.REGISTERED;
 import static 
org.apache.ignite.internal.table.distributed.replicator.RemoteResourceIds.cursorId;
 import static 
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.beginRwTxTs;
-import static 
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.latestIndexDescriptorInBuildingStatus;
-import static 
org.apache.ignite.internal.table.distributed.replicator.ReplicatorUtils.rwTxActiveCatalogVersion;
 import static org.apache.ignite.internal.tx.TransactionIds.beginTimestamp;
 import static org.apache.ignite.internal.tx.TxState.ABORTED;
 import static org.apache.ignite.internal.tx.TxState.COMMITTED;
@@ -50,10 +47,8 @@ import static 
org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 import static org.apache.ignite.internal.util.CompletableFutures.allOfToList;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyCollectionCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptyListCompletedFuture;
-import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
-import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.IgniteUtils.findAny;
 import static org.apache.ignite.internal.util.IgniteUtils.findFirst;
@@ -87,12 +82,8 @@ import java.util.function.Supplier;
 import org.apache.ignite.internal.binarytuple.BinaryTupleCommon;
 import org.apache.ignite.internal.catalog.Catalog;
 import org.apache.ignite.internal.catalog.CatalogService;
-import org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.catalog.events.CatalogEvent;
-import org.apache.ignite.internal.catalog.events.CatalogEventParameters;
-import 
org.apache.ignite.internal.catalog.events.StartBuildingIndexEventParameters;
-import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.hlc.ClockService;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
@@ -197,9 +188,7 @@ import 
org.apache.ignite.internal.table.distributed.SortedIndexLocker;
 import org.apache.ignite.internal.table.distributed.StorageUpdateHandler;
 import 
org.apache.ignite.internal.table.distributed.TableSchemaAwareIndexStorage;
 import org.apache.ignite.internal.table.distributed.TableUtils;
-import org.apache.ignite.internal.table.distributed.index.IndexMeta;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
-import 
org.apache.ignite.internal.table.distributed.index.MetaIndexStatusChange;
 import 
org.apache.ignite.internal.table.distributed.replicator.handlers.BuildIndexReplicaRequestHandler;
 import org.apache.ignite.internal.tx.Lock;
 import org.apache.ignite.internal.tx.LockKey;
@@ -338,16 +327,14 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
     /** Prevents double stopping. */
     private final AtomicBoolean stopGuard = new AtomicBoolean();
 
-    /** Read-write transaction operation tracker for building indexes. */
-    private final IndexBuilderTxRwOperationTracker txRwOperationTracker = new 
IndexBuilderTxRwOperationTracker();
-
-    /** Listener for {@link CatalogEvent#INDEX_BUILDING}. */
-    private final EventListener<CatalogEventParameters> 
indexBuildingCatalogEventListener = this::onIndexBuilding;
+    /**
+     * Processor that handles catalog events {@link 
CatalogEvent#INDEX_BUILDING} and
+     * tracks read-write transaction operations for building indexes.
+     */
+    private final PartitionReplicaBuildIndexProcessor indexBuildingProcessor;
 
     private final SchemaRegistry schemaRegistry;
 
-    private final IndexMetaStorage indexMetaStorage;
-
     private final LowWatermark lowWatermark;
 
     private static final boolean SKIP_UPDATES = 
getBoolean(IgniteSystemProperties.IGNITE_SKIP_STORAGE_UPDATE_IN_BENCHMARK);
@@ -433,7 +420,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         this.catalogService = catalogService;
         this.remotelyTriggeredResourceRegistry = 
remotelyTriggeredResourceRegistry;
         this.schemaRegistry = schemaRegistry;
-        this.indexMetaStorage = indexMetaStorage;
         this.lowWatermark = lowWatermark;
         this.replicationGroupId = replicationGroupId;
         this.tableId = tableId;
@@ -441,6 +427,8 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
         this.schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
 
+        indexBuildingProcessor = new 
PartitionReplicaBuildIndexProcessor(busyLock, tableId, indexMetaStorage, 
catalogService);
+
         replicaPrimacyEngine = new ReplicaPrimacyEngine(placementDriver, 
clockService, replicationGroupId, localNode);
         reliableCatalogVersions = new 
ReliableCatalogVersions(schemaSyncService, catalogService);
         raftCommandApplicator = new 
ReplicationRaftCommandApplicator(raftCommandRunner, replicationGroupId);
@@ -483,13 +471,9 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
         buildIndexReplicaRequestHandler = new BuildIndexReplicaRequestHandler(
                 indexMetaStorage,
-                txRwOperationTracker,
+                indexBuildingProcessor.tracker(),
                 safeTime,
                 raftCommandApplicator);
-
-        // TODO https://issues.apache.org/jira/browse/IGNITE-24665
-        // Consider extracting the following code to a separate class 
BuildIndexEventListener.
-        prepareIndexBuilderTxRwOperationTracker();
     }
 
     // TODO https://issues.apache.org/jira/browse/IGNITE-22522 Remove.
@@ -3549,9 +3533,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
 
         busyLock.block();
 
-        catalogService.removeListener(CatalogEvent.INDEX_BUILDING, 
indexBuildingCatalogEventListener);
-
-        txRwOperationTracker.close();
+        indexBuildingProcessor.onShutdown();
     }
 
     private int partId() {
@@ -3568,7 +3550,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             ReplicaPrimacy replicaPrimacy,
             @Nullable HybridTimestamp opStartTsIfDirectRo
     ) {
-        incrementRwOperationCountIfNeeded(request);
+        indexBuildingProcessor.incrementRwOperationCountIfNeeded(request);
 
         UUID txIdLockingLwm = tryToLockLwmIfNeeded(request, 
opStartTsIfDirectRo);
 
@@ -3576,7 +3558,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             return processOperationRequest(senderId, request, replicaPrimacy, 
opStartTsIfDirectRo)
                     .whenComplete((unused, throwable) -> {
                         unlockLwmIfNeeded(txIdLockingLwm, request);
-                        decrementRwOperationCountIfNeeded(request);
+                        
indexBuildingProcessor.decrementRwOperationCountIfNeeded(request);
                     });
         } catch (Throwable e) {
             try {
@@ -3586,7 +3568,7 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
             }
 
             try {
-                decrementRwOperationCountIfNeeded(request);
+                
indexBuildingProcessor.decrementRwOperationCountIfNeeded(request);
             } catch (Throwable decrementProblem) {
                 e.addSuppressed(decrementProblem);
             }
@@ -3594,26 +3576,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         }
     }
 
-    private void incrementRwOperationCountIfNeeded(ReplicaRequest request) {
-        if (request instanceof ReadWriteReplicaRequest) {
-            int rwTxActiveCatalogVersion = 
rwTxActiveCatalogVersion(catalogService, (ReadWriteReplicaRequest) request);
-
-            // It is very important that the counter is increased only after 
the schema sync at the begin timestamp of RW transaction,
-            // otherwise there may be races/errors and the index will not be 
able to start building.
-            if 
(!txRwOperationTracker.incrementOperationCount(rwTxActiveCatalogVersion)) {
-                throw new 
StaleTransactionOperationException(((ReadWriteReplicaRequest) 
request).transactionId());
-            }
-        }
-    }
-
-    private void decrementRwOperationCountIfNeeded(ReplicaRequest request) {
-        if (request instanceof ReadWriteReplicaRequest) {
-            txRwOperationTracker.decrementOperationCount(
-                    rwTxActiveCatalogVersion(catalogService, 
(ReadWriteReplicaRequest) request)
-            );
-        }
-    }
-
     /**
      * Generates a fake transaction ID that will only be used to identify one 
direct RO operation for purposes of locking and unlocking LWM.
      * It should not be used as a replacement for a real transaction ID in 
other contexts.
@@ -3688,47 +3650,6 @@ public class PartitionReplicaListener implements 
ReplicaListener, ReplicaTablePr
         }
     }
 
-    private void prepareIndexBuilderTxRwOperationTracker() {
-        // Expected to be executed on the metastore thread.
-        CatalogIndexDescriptor indexDescriptor = 
latestIndexDescriptorInBuildingStatus(catalogService, tableId());
-
-        if (indexDescriptor != null) {
-            IndexMeta indexMeta = 
indexMetaStorage.indexMeta(indexDescriptor.id());
-
-            assert indexMeta != null : indexDescriptor.id();
-
-            
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(indexMeta.statusChange(REGISTERED).catalogVersion());
-        }
-
-        catalogService.listen(CatalogEvent.INDEX_BUILDING, 
indexBuildingCatalogEventListener);
-    }
-
-    private CompletableFuture<Boolean> onIndexBuilding(CatalogEventParameters 
parameters) {
-        if (!busyLock.enterBusy()) {
-            return trueCompletedFuture();
-        }
-
-        try {
-            int indexId = ((StartBuildingIndexEventParameters) 
parameters).indexId();
-
-            IndexMeta indexMeta = indexMetaStorage.indexMeta(indexId);
-
-            assert indexMeta != null : "indexId=" + indexId + ", 
catalogVersion=" + parameters.catalogVersion();
-
-            MetaIndexStatusChange registeredStatusChange = 
indexMeta.statusChange(REGISTERED);
-
-            if (indexMeta.tableId() == tableId()) {
-                
txRwOperationTracker.updateMinAllowedCatalogVersionForStartOperation(registeredStatusChange.catalogVersion());
-            }
-
-            return falseCompletedFuture();
-        } catch (Throwable t) {
-            return failedFuture(t);
-        } finally {
-            busyLock.leaveBusy();
-        }
-    }
-
     private List<Integer> indexIdsAtRwTxBeginTs(UUID txId) {
         return TableUtils.indexIdsAtRwTxBeginTs(catalogService, txId, 
tableId());
     }

Reply via email to