This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 47c9e2776e IGNITE-21196 Optimize primary replica events processing 
(#3007)
47c9e2776e is described below

commit 47c9e2776ef76c7bfd7ffe013370a58fe434605b
Author: Ivan Bessonov <bessonov...@gmail.com>
AuthorDate: Thu Jan 4 12:29:47 2024 +0300

    IGNITE-21196 Optimize primary replica events processing (#3007)
---
 .../apache/ignite/internal/replicator/Replica.java |  7 +++
 .../ignite/internal/replicator/ReplicaManager.java | 51 ++++++++++++++++++++++
 .../replicator/listener/ReplicaListener.java       | 18 ++++++++
 .../replicator/PartitionReplicaListener.java       | 26 +++++++----
 .../PartitionReplicaListenerDurableUnlockTest.java |  5 +--
 5 files changed, 96 insertions(+), 11 deletions(-)

diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index 3a4c53b2b8..5945cd8672 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -123,6 +123,13 @@ public class Replica {
         raftClient.subscribeLeader(this::onLeaderElected);
     }
 
+    /**
+     * Returns an instance of replica listener, associated with current 
replica.
+     */
+    ReplicaListener replicaListener() {
+        return listener;
+    }
+
     /**
      * Processes a replication request on the replica.
      *
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 84bcd9da3d..2b045b162f 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -21,6 +21,7 @@ import static 
java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.stream.Collectors.toSet;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
+import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
@@ -50,6 +51,8 @@ import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
+import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessagesFactory;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverReplicaMessage;
@@ -227,6 +230,9 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                 new LinkedBlockingQueue<>(),
                 NamedThreadFactory.create(nodeName, "replica", LOG)
         );
+
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
this::onPrimaryReplicaElected);
+        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
this::onPrimaryReplicaExpired);
     }
 
     private void onReplicaMessageReceived(NetworkMessage message, String 
senderConsistentId, @Nullable Long correlationId) {
@@ -744,6 +750,51 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
         }
     }
 
+
+    /**
+     * Event handler for {@link PrimaryReplicaEvent#PRIMARY_REPLICA_ELECTED}. 
Propagates execution to the
+     *      {@link 
ReplicaListener#onPrimaryElected(PrimaryReplicaEventParameters, Throwable)} of 
the replica, that corresponds
+     *      to a given {@link PrimaryReplicaEventParameters#groupId()}.
+     */
+    private CompletableFuture<Boolean> onPrimaryReplicaElected(
+            PrimaryReplicaEventParameters primaryReplicaEventParameters,
+            Throwable throwable
+    ) {
+        CompletableFuture<Replica> replica = 
replicas.get(primaryReplicaEventParameters.groupId());
+
+        if (replica == null) {
+            return falseCompletedFuture();
+        }
+
+        if (replica.isDone() && !replica.isCompletedExceptionally()) {
+            return 
replica.join().replicaListener().onPrimaryElected(primaryReplicaEventParameters,
 throwable);
+        } else {
+            return replica.thenCompose(r -> 
r.replicaListener().onPrimaryElected(primaryReplicaEventParameters, throwable));
+        }
+    }
+
+    /**
+     * Event handler for {@link PrimaryReplicaEvent#PRIMARY_REPLICA_EXPIRED}. 
Propagates execution to the
+     *      {@link 
ReplicaListener#onPrimaryExpired(PrimaryReplicaEventParameters, Throwable)} of 
the replica, that corresponds
+     *      to a given {@link PrimaryReplicaEventParameters#groupId()}.
+     */
+    private CompletableFuture<Boolean> onPrimaryReplicaExpired(
+            PrimaryReplicaEventParameters primaryReplicaEventParameters,
+            Throwable throwable
+    ) {
+        CompletableFuture<Replica> replica = 
replicas.get(primaryReplicaEventParameters.groupId());
+
+        if (replica == null) {
+            return falseCompletedFuture();
+        }
+
+        if (replica.isDone() && !replica.isCompletedExceptionally()) {
+            return 
replica.join().replicaListener().onPrimaryExpired(primaryReplicaEventParameters,
 throwable);
+        } else {
+            return replica.thenCompose(r -> 
r.replicaListener().onPrimaryExpired(primaryReplicaEventParameters, throwable));
+        }
+    }
+
     /**
      * Idle safe time sync for replicas.
      */
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
index 88a1937e97..eeee59b449 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/listener/ReplicaListener.java
@@ -18,8 +18,12 @@
 package org.apache.ignite.internal.replicator.listener;
 
 import java.util.concurrent.CompletableFuture;
+import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
+import org.apache.ignite.internal.replicator.ReplicaManager;
 import org.apache.ignite.internal.replicator.ReplicaResult;
 import org.apache.ignite.internal.replicator.message.ReplicaRequest;
+import org.apache.ignite.internal.util.CompletableFutures;
+import org.jetbrains.annotations.Nullable;
 
 /** Replica listener. */
 @FunctionalInterface
@@ -33,6 +37,20 @@ public interface ReplicaListener {
      */
     CompletableFuture<ReplicaResult> invoke(ReplicaRequest request, String 
senderId);
 
+    /**
+     * Invoked by {@link ReplicaManager} when current replica is elected as 
primary.
+     */
+    default CompletableFuture<Boolean> 
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
+        return CompletableFutures.falseCompletedFuture();
+    }
+
+    /**
+     * Invoked by {@link ReplicaManager} then current replica stops being a 
primary replica.
+     */
+    default CompletableFuture<Boolean> 
onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
+        return CompletableFutures.falseCompletedFuture();
+    }
+
     /** Callback on replica shutdown. */
     default void onShutdown() {
         // No-op.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 85547ca54e..fb765d00c9 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -79,7 +79,6 @@ import 
org.apache.ignite.internal.lang.SafeTimeReorderException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
-import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.raft.Command;
 import org.apache.ignite.internal.raft.service.RaftCommandRunner;
@@ -344,13 +343,17 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         cursors = new 
ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
 
         schemaCompatValidator = new 
SchemaCompatibilityValidator(validationSchemasSource, catalogService, 
schemaSyncService);
-
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED, 
this::onPrimaryElected);
-        placementDriver.listen(PrimaryReplicaEvent.PRIMARY_REPLICA_EXPIRED, 
this::onPrimaryExpired);
     }
 
-    private CompletableFuture<Boolean> 
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
-        if (!localNode.name().equals(evt.leaseholder()) || 
!replicationGroupId.equals(evt.groupId())) {
+    @Override
+    public CompletableFuture<Boolean> 
onPrimaryElected(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
+        assert replicationGroupId.equals(evt.groupId()) : format(
+                "The replication group listener does not match the event 
[grp={}, eventGrp={}]",
+                replicationGroupId,
+                evt.groupId()
+        );
+
+        if (!localNode.name().equals(evt.leaseholder())) {
             return falseCompletedFuture();
         }
 
@@ -428,8 +431,15 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         });
     }
 
-    private CompletableFuture<Boolean> 
onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
-        if (!localNode.name().equals(evt.leaseholder()) || 
!replicationGroupId.equals(evt.groupId())) {
+    @Override
+    public CompletableFuture<Boolean> 
onPrimaryExpired(PrimaryReplicaEventParameters evt, @Nullable Throwable 
exception) {
+        assert replicationGroupId.equals(evt.groupId()) : format(
+                "The replication group listener does not match the event 
[grp={}, eventGrp={}]",
+                replicationGroupId,
+                evt.groupId()
+        );
+
+        if (!localNode.name().equals(evt.leaseholder())) {
             return falseCompletedFuture();
         }
 
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
index d0b6afe400..5ea1c8a6d0 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/replication/PartitionReplicaListenerDurableUnlockTest.java
@@ -48,7 +48,6 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
-import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
 import org.apache.ignite.internal.raft.service.RaftGroupService;
 import org.apache.ignite.internal.replicator.TablePartitionId;
@@ -198,7 +197,7 @@ public class PartitionReplicaListenerDurableUnlockTest 
extends IgniteAbstractTes
 
         PrimaryReplicaEventParameters parameters = new 
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name(), clock.now());
 
-        
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
 parameters), willSucceedIn(1, SECONDS));
+        assertThat(partitionReplicaListener.onPrimaryElected(parameters, 
null), willSucceedIn(1, SECONDS));
 
         for (IgniteBiTuple<UUID, TxMeta> tx : txStateStorage.scan()) {
             if (isFinalState(tx.getValue().txState())) {
@@ -228,7 +227,7 @@ public class PartitionReplicaListenerDurableUnlockTest 
extends IgniteAbstractTes
 
         PrimaryReplicaEventParameters parameters = new 
PrimaryReplicaEventParameters(0, part0, LOCAL_NODE.name(), clock.now());
 
-        
assertThat(placementDriver.fireEvent(PrimaryReplicaEvent.PRIMARY_REPLICA_ELECTED,
 parameters), willSucceedIn(1, SECONDS));
+        assertThat(partitionReplicaListener.onPrimaryElected(parameters, 
null), willSucceedIn(1, SECONDS));
 
         assertTrue(txStateStorage.get(tx0).locksReleased());
 

Reply via email to