This is an automated email from the ASF dual-hosted git repository.

vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new f189d00bde IGNITE-19022 ItReadOnlyTransactionTest is flaky in TC due 
to replica is timed out (#1815)
f189d00bde is described below

commit f189d00bde83dd7186d956767a3adf1aeb19cd58
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Mon Mar 27 20:07:06 2023 +0400

    IGNITE-19022 ItReadOnlyTransactionTest is flaky in TC due to replica is 
timed out (#1815)
---
 .../raft/server/ItJraftCounterServerTest.java      |  2 +
 .../java/org/apache/ignite/internal/raft/Loza.java | 19 ++++-
 .../internal/raft/server/RaftGroupOptions.java     | 22 ------
 .../ignite/internal/raft/server/RaftServer.java    |  9 +++
 .../internal/raft/server/impl/JraftServerImpl.java | 12 ++-
 .../apache/ignite/raft/jraft/RaftGroupService.java | 11 ++-
 .../apache/ignite/raft/jraft/core/NodeImpl.java    | 89 ++++++++++++----------
 .../ignite/raft/jraft/option/NodeOptions.java      | 13 ----
 .../apache/ignite/internal/replicator/Replica.java | 15 ++++
 .../ignite/internal/replicator/ReplicaManager.java | 38 +++++----
 .../ignite/internal/replicator/ReplicaService.java | 81 ++++++++++----------
 .../replicator/PlacementDriverReplicaSideTest.java |  1 +
 .../internal/table/ItReadOnlyTransactionTest.java  |  2 -
 .../distributed/ItTxDistributedTestSingleNode.java |  1 +
 .../ignite/distributed/ReplicaUnavailableTest.java |  1 +
 .../apache/ignite/internal/table/TableImpl.java    | 13 ++++
 .../internal/table/distributed/TableManager.java   | 31 ++++----
 .../replicator/PartitionReplicaListener.java       | 80 -------------------
 18 files changed, 201 insertions(+), 239 deletions(-)

diff --git 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index 2c66e52bb3..5882d2c303 100644
--- 
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++ 
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -659,6 +659,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
                 RaftGroupOptions opts = defaults().snapshotStorageFactory(new 
SnapshotInMemoryStorageFactory(snapshotMetaStorage));
 
                 raftServer.startRaftNode(new RaftNodeId(grpId, serverPeer), 
initialMembersConf, listener, opts);
+
+                raftServer.raftNodeReadyFuture(grpId).join();
             }, opts -> {});
         }
 
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index b452883af3..cde94c987a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -174,7 +174,12 @@ public class Loza implements RaftManager {
             RaftGroupListener lsnr,
             RaftGroupEventsListener eventsLsnr
     ) throws NodeStoppingException {
-        return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr, 
RaftGroupOptions.defaults());
+        CompletableFuture<RaftGroupService> fut = startRaftGroupNode(nodeId, 
configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults());
+
+        // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta 
storage and cmg raft log re-application in async manner
+        raftServer.raftNodeReadyFuture(nodeId.groupId()).join();
+
+        return fut;
     }
 
     @Override
@@ -311,6 +316,18 @@ public class Loza implements RaftManager {
         );
     }
 
+
+    /**
+     * Gets a future that completes when all committed updates are applied to 
state machine after the RAFT node start.
+     * TODO: IGNITE-18273 The method should be defined in RaftManager and 
takes RaftNodeId instead of its argument.
+     *
+     * @param groupId Raft group id.
+     * @return Future to last applied revision.
+     */
+    public CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId 
groupId) {
+        return raftServer.raftNodeReadyFuture(groupId);
+    }
+
     @Override
     public boolean stopRaftNode(RaftNodeId nodeId) throws 
NodeStoppingException {
         if (!busyLock.enterBusy()) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index dd6d218fde..d074ae35e8 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -17,11 +17,9 @@
 
 package org.apache.ignite.internal.raft.server;
 
-import java.util.concurrent.CountDownLatch;
 import org.apache.ignite.internal.raft.storage.LogStorageFactory;
 import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory;
 import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
-import org.jetbrains.annotations.Nullable;
 
 /**
  * Options specific to a Raft group that is being started.
@@ -39,10 +37,6 @@ public class RaftGroupOptions {
     /** Raft meta storage factory. */
     private RaftMetaStorageFactory raftMetaStorageFactory;
 
-    /** Nullable latch that will be completed when storage is ready to process 
user requests. */
-    @Nullable
-    private CountDownLatch storageReadyLatch;
-
     /**
      * Returns default options as defined by classic Raft (so stores are 
persistent).
      *
@@ -132,20 +126,4 @@ public class RaftGroupOptions {
 
         return this;
     }
-
-    /**
-     * Returns the latch that will be completed when storage is ready to 
process user requests.
-     */
-    public CountDownLatch getStorageReadyLatch() {
-        return storageReadyLatch;
-    }
-
-    /**
-     * Adds storage ready latch to options.
-     */
-    public RaftGroupOptions setStorageReadyLatch(CountDownLatch 
storageReadyLatch) {
-        this.storageReadyLatch = storageReadyLatch;
-
-        return this;
-    }
 }
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index ea1aea77ff..c6e2bcb4c1 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.server;
 
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.manager.IgniteComponent;
 import org.apache.ignite.internal.raft.Peer;
 import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -75,6 +76,14 @@ public interface RaftServer extends IgniteComponent {
             RaftGroupOptions groupOptions
     );
 
+    /**
+     * Returns a future, which complete when the raft node is ready and 
committed updates are applied.
+     *
+     * @param groupId Raft group ID.
+     * @return A future to last applied revision on start.
+     */
+    CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId groupId);
+
     /**
      * Stops a given local Raft node if it exists.
      *
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 7e38be811f..58fe7ec69b 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -30,7 +30,9 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
+import java.util.Map.Entry;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ExecutorService;
@@ -414,8 +416,6 @@ public class JraftServerImpl implements RaftServer {
 
             nodeOptions.setRaftGrpEvtsLsnr(new 
RaftGroupEventsListenerAdapter(nodeId.groupId(), serviceEventInterceptor, 
evLsnr));
 
-            
nodeOptions.setStorageReadyLatch(groupOptions.getStorageReadyLatch());
-
             LogStorageFactory logStorageFactory = 
groupOptions.getLogStorageFactory() == null
                     ? this.logStorageFactory : 
groupOptions.getLogStorageFactory();
 
@@ -457,6 +457,14 @@ public class JraftServerImpl implements RaftServer {
         }
     }
 
+    @Override
+    public CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId 
groupId) {
+        RaftGroupService jraftNode = nodes.entrySet().stream().filter(entry -> 
entry.getKey().groupId().equals(groupId))
+                .map(Entry::getValue).findAny().get();
+
+        return jraftNode.getApplyCommittedFuture();
+    }
+
     @Override
     public boolean stopRaftNode(RaftNodeId nodeId) {
         RaftGroupService svc = nodes.remove(nodeId);
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
index 2fbb14523a..42c60d7134 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
@@ -16,6 +16,7 @@
  */
 package org.apache.ignite.raft.jraft;
 
+import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import org.apache.ignite.lang.IgniteInternalException;
@@ -57,7 +58,7 @@ public class RaftGroupService {
     /**
      * The raft node.
      */
-    private Node node;
+    private NodeImpl node;
 
     /**
      * The node manager.
@@ -123,6 +124,14 @@ public class RaftGroupService {
         return this.node;
     }
 
+    /**
+     * Gets a future which complete when all committed update are applied to 
the node's state machine on start.
+     * @return Future completes when this node committed revision would be 
equal to the applied one.
+     */
+    public CompletableFuture<Long> getApplyCommittedFuture() {
+        return node.getApplyCommittedFuture();
+    }
+
     public synchronized void shutdown() {
         // TODO asch remove handlers before shutting down raft node 
https://issues.apache.org/jira/browse/IGNITE-14519
         if (!this.started) {
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 3a4e9aebfe..f7f1326460 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ScheduledFuture;
@@ -146,6 +147,9 @@ public class NodeImpl implements Node, RaftServerService {
         .writeLock();
     protected final Lock readLock = this.readWriteLock
         .readLock();
+
+    /** The future completes when all committed actions applied to RAFT state 
machine. */
+    private final CompletableFuture<Long> applyCommittedFuture;
     private volatile State state;
     private volatile CountDownLatch shutdownLatch;
     private long currTerm;
@@ -557,6 +561,7 @@ public class NodeImpl implements Node, RaftServerService {
         updateLastLeaderTimestamp(Utils.monotonicMs());
         this.confCtx = new ConfigurationCtx(this);
         this.wakingCandidate = null;
+        this.applyCommittedFuture = new CompletableFuture<>();
     }
 
     public HybridClock clock() {
@@ -1063,17 +1068,14 @@ public class NodeImpl implements Node, 
RaftServerService {
         // Wait committed.
         long commitIdx = logManager.getLastLogIndex();
 
-        boolean externalAwaitStorageLatch = opts.getStorageReadyLatch() != 
null;
+        CompletableFuture<Long> logApplyComplition = new CompletableFuture<>();
 
         if (commitIdx > fsmCaller.getLastAppliedIndex()) {
-            // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta 
storage and cmg raft log re-application in async manner
-            CountDownLatch applyCommitLatch = externalAwaitStorageLatch ? 
opts.getStorageReadyLatch() : new CountDownLatch(1);
-
             LastAppliedLogIndexListener lnsr = new 
LastAppliedLogIndexListener() {
                 @Override
                 public void onApplied( long lastAppliedLogIndex) {
                     if (lastAppliedLogIndex >= commitIdx) {
-                        applyCommitLatch.countDown();
+                        logApplyComplition.complete(lastAppliedLogIndex);
                         fsmCaller.removeLastAppliedLogIndexListener(this);
                     }
                 }
@@ -1082,20 +1084,8 @@ public class NodeImpl implements Node, RaftServerService 
{
             fsmCaller.addLastAppliedLogIndexListener(lnsr);
 
             fsmCaller.onCommitted(commitIdx);
-
-            try {
-                if (!externalAwaitStorageLatch) {
-                    applyCommitLatch.await();
-                }
-            } catch (InterruptedException e) {
-                LOG.error("Fail to apply committed updates.", e);
-
-                return false;
-            }
         } else {
-            if (externalAwaitStorageLatch) {
-                opts.getStorageReadyLatch().countDown();
-            }
+            logApplyComplition.complete(fsmCaller.getLastAppliedIndex());
         }
 
         if (!this.rpcClientService.init(this.options)) {
@@ -1116,38 +1106,53 @@ public class NodeImpl implements Node, 
RaftServerService {
             return false;
         }
 
-        // set state to follower
-        this.state = State.STATE_FOLLOWER;
+        logApplyComplition.whenComplete((committedIdx, err) -> {
+            if (err != null) {
+                LOG.error("Fail to apply committed updates.", err);
+            }
 
-        if (LOG.isInfoEnabled()) {
-            LOG.info("Node {} init, term={}, lastLogId={}, conf={}, 
oldConf={}.", getNodeId(), this.currTerm,
-                this.logManager.getLastLogId(false), this.conf.getConf(), 
this.conf.getOldConf());
-        }
+            // set state to follower
+            this.state = State.STATE_FOLLOWER;
 
-        if (this.snapshotExecutor != null && 
this.options.getSnapshotIntervalSecs() > 0) {
-            LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(), 
this.currTerm);
-            this.snapshotTimer.start();
-        }
+            if (LOG.isInfoEnabled()) {
+                LOG.info("Node {} init, term={}, lastLogId={}, conf={}, 
oldConf={}.", getNodeId(), this.currTerm,
+                    this.logManager.getLastLogId(false), this.conf.getConf(), 
this.conf.getOldConf());
+            }
 
-        if (!this.conf.isEmpty()) {
-            stepDown(this.currTerm, false, new Status());
-        }
+            if (this.snapshotExecutor != null && 
this.options.getSnapshotIntervalSecs() > 0) {
+                LOG.debug("Node {} start snapshot timer, term={}.", 
getNodeId(), this.currTerm);
+                this.snapshotTimer.start();
+            }
 
-        // Now the raft node is started , have to acquire the writeLock to 
avoid race
-        // conditions
-        this.writeLock.lock();
-        if (this.conf.isStable() && this.conf.getConf().size() == 1 && 
this.conf.getConf().contains(this.serverId)) {
-            // The group contains only this server which must be the LEADER, 
trigger
-            // the timer immediately.
-            electSelf();
-        }
-        else {
-            this.writeLock.unlock();
-        }
+            if (!this.conf.isEmpty()) {
+                stepDown(this.currTerm, false, new Status());
+            }
+
+            // Now the raft node is started , have to acquire the writeLock to 
avoid race
+            // conditions
+            this.writeLock.lock();
+            if (this.conf.isStable() && this.conf.getConf().size() == 1 && 
this.conf.getConf().contains(this.serverId)) {
+                // The group contains only this server which must be the 
LEADER, trigger
+                // the timer immediately.
+                electSelf();
+            }
+            else {
+                this.writeLock.unlock();
+            }
+
+            applyCommittedFuture.complete(commitIdx);
+        });
 
         return true;
     }
 
+    /**
+     * Gets a future which complete when all committed update are applied to 
the node's state machine on start.
+     * @return Future completes when this node committed revision would be 
equal to the applied one.
+     */
+    public CompletableFuture<Long> getApplyCommittedFuture() {
+        return applyCommittedFuture;
+    }
     /**
      * Validates a required option if shared pools are enabled.
      *
diff --git 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index fe7394cec6..e9a051512a 100644
--- 
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++ 
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -239,10 +239,6 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     /** A hybrid clock */
     private HybridClock clock = new HybridClockImpl();
 
-    /** Nullable latch that will be completed when storage is ready to process 
user requests. */
-    @Nullable
-    private CountDownLatch storageReadyLatch;
-
     /**
      * Amount of Disruptors that will handle the RAFT server.
      */
@@ -641,7 +637,6 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
         
nodeOptions.setRpcInstallSnapshotTimeout(this.getRpcInstallSnapshotTimeout());
         
nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
         nodeOptions.setClock(this.getClock());
-        nodeOptions.setStorageReadyLatch(this.getStorageReadyLatch());
 
         return nodeOptions;
     }
@@ -681,12 +676,4 @@ public class NodeOptions extends RpcOptions implements 
Copiable<NodeOptions> {
     public void setElectionTimeoutStrategy(TimeoutStrategy 
electionTimeoutStrategy) {
         this.electionTimeoutStrategy = electionTimeoutStrategy;
     }
-
-    public CountDownLatch getStorageReadyLatch() {
-        return storageReadyLatch;
-    }
-
-    public void setStorageReadyLatch(CountDownLatch storageReadyLatch) {
-        this.storageReadyLatch = storageReadyLatch;
-    }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index df11a77ba3..4c5b4925e7 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -57,6 +57,9 @@ public class Replica {
     /** Replica listener. */
     private final ReplicaListener listener;
 
+    /** When the future completes the replica become ready to process 
requests. */
+    private final CompletableFuture<Void> whenReplicaReady;
+
     /** Storage index tracker. */
     private final PendingComparableValuesTracker<Long> storageIndexTracker;
 
@@ -79,6 +82,7 @@ public class Replica {
      * The constructor of a replica server.
      *
      * @param replicaGrpId Replication group id.
+     * @param replicaReady Future is determined when the replica become ready.
      * @param listener Replica listener.
      * @param storageIndexTracker Storage index tracker.
      * @param raftClient Topology aware Raft client.
@@ -86,12 +90,14 @@ public class Replica {
      */
     public Replica(
             ReplicationGroupId replicaGrpId,
+            CompletableFuture<Void> replicaReady,
             ReplicaListener listener,
             PendingComparableValuesTracker<Long> storageIndexTracker,
             TopologyAwareRaftGroupService raftClient,
             ClusterNode localNode
     ) {
         this.replicaGrpId = replicaGrpId;
+        this.whenReplicaReady = replicaReady;
         this.listener = listener;
         this.storageIndexTracker = storageIndexTracker;
         this.raftClient = raftClient;
@@ -124,6 +130,15 @@ public class Replica {
         return replicaGrpId;
     }
 
+    /**
+     * Get a future to wait when the replica become ready.
+     *
+     * @return A future to check when the replica is ready.
+     */
+    public CompletableFuture<Void> ready() {
+        return whenReplicaReady;
+    }
+
     private void onLeaderElected(ClusterNode clusterNode, Long term) {
         leaderRef.set(clusterNode);
 
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 9a72bcea3a..8a569c1c99 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -139,20 +139,18 @@ public class ReplicaManager implements IgniteComponent {
                     }
 
                     if (!replicaFut.isDone()) {
-                        replicaFut.thenCompose(
-                                ignore -> {
-                                    IgniteUtils.inBusyLock(
-                                            busyLock,
-                                            () -> 
sendAwaitReplicaResponse(senderConsistentId, correlationId)
-                                    );
-
-                                    return null;
-                                }
+                        replicaFut.thenAccept(createdReplica ->
+                                createdReplica.ready().thenAccept(unused ->
+                                        IgniteUtils.inBusyLock(
+                                                busyLock,
+                                                () -> 
sendAwaitReplicaResponse(senderConsistentId, correlationId)
+                                        )
+                                )
                         );
 
                         return replicaFut;
                     } else {
-                        IgniteUtils.inBusyLock(busyLock, () -> 
sendAwaitReplicaResponse(senderConsistentId, correlationId));
+                        sendAwaitReplicaResponse(senderConsistentId, 
correlationId);
 
                         return replicaFut;
                     }
@@ -165,7 +163,7 @@ public class ReplicaManager implements IgniteComponent {
 
             HybridTimestamp requestTimestamp = extractTimestamp(request);
 
-            if (replicaFut == null || !replicaFut.isDone()) {
+            if (replicaFut == null || !replicaFut.isDone() || 
!replicaFut.join().ready().isDone()) {
                 sendReplicaUnavailableErrorResponse(senderConsistentId, 
correlationId, request.groupId(), requestTimestamp);
 
                 return;
@@ -226,16 +224,16 @@ public class ReplicaManager implements IgniteComponent {
      * Starts a replica. If a replica with the same partition id already 
exists, the method throws an exception.
      *
      * @param replicaGrpId Replication group id.
+     * @param whenReplicaReady Future that completes when the replica become 
ready.
      * @param listener Replica listener.
      * @param raftClient Topology aware Raft client.
      * @param storageIndexTracker Storage index tracker.
-     *
-     * @return New replica.
      * @throws NodeStoppingException If node is stopping.
      * @throws ReplicaIsAlreadyStartedException Is thrown when a replica with 
the same replication group id has already been started.
      */
-    public Replica startReplica(
+    public void startReplica(
             ReplicationGroupId replicaGrpId,
+            CompletableFuture<Void> whenReplicaReady,
             ReplicaListener listener,
             TopologyAwareRaftGroupService raftClient,
             PendingComparableValuesTracker<Long> storageIndexTracker
@@ -245,7 +243,7 @@ public class ReplicaManager implements IgniteComponent {
         }
 
         try {
-            return startReplicaInternal(replicaGrpId, listener, raftClient, 
storageIndexTracker);
+            startReplicaInternal(replicaGrpId, whenReplicaReady, listener, 
raftClient, storageIndexTracker);
         } finally {
             busyLock.leaveBusy();
         }
@@ -255,19 +253,21 @@ public class ReplicaManager implements IgniteComponent {
      * Internal method for starting a replica.
      *
      * @param replicaGrpId   Replication group id.
+     * @param whenReplicaReady Future that completes when the replica become 
ready.
      * @param listener Replica listener.
      * @param raftClient Topology aware Raft client.
      * @param storageIndexTracker Storage index tracker.
-     * @return New replica.
      */
-    private Replica startReplicaInternal(
+    private void startReplicaInternal(
             ReplicationGroupId replicaGrpId,
+            CompletableFuture<Void> whenReplicaReady,
             ReplicaListener listener,
             TopologyAwareRaftGroupService raftClient,
             PendingComparableValuesTracker<Long> storageIndexTracker
     ) {
         ClusterNode localNode = clusterNetSvc.topologyService().localMember();
-        Replica newReplica = new Replica(replicaGrpId, listener, 
storageIndexTracker, raftClient, localNode);
+
+        Replica newReplica = new Replica(replicaGrpId, whenReplicaReady, 
listener, storageIndexTracker, raftClient, localNode);
 
         replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
             if (replicaFut == null) {
@@ -282,8 +282,6 @@ public class ReplicaManager implements IgniteComponent {
                 return replicaFut;
             }
         });
-
-        return newReplica;
     }
 
     /**
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 1cd3bf67e4..d18587a381 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
 import org.apache.ignite.internal.hlc.HybridClock;
 import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
 import org.apache.ignite.internal.replicator.exception.ReplicationException;
@@ -84,8 +83,7 @@ public class ReplicaService {
      * @see ReplicaUnavailableException If replica with given replication 
group id doesn't exist or not started yet.
      */
     private <R> CompletableFuture<R> sendToReplica(String 
targetNodeConsistentId, ReplicaRequest req) {
-
-        AtomicReference<CompletableFuture<R>> res = new AtomicReference<>(new 
CompletableFuture<>());
+        CompletableFuture<R> res = new CompletableFuture<>();
 
         // TODO: IGNITE-17824 Use named executor instead of default one in 
order to process replica Response.
         messagingService.invoke(targetNodeConsistentId, req, 
RPC_TIMEOUT).whenCompleteAsync((response, throwable) -> {
@@ -95,9 +93,9 @@ public class ReplicaService {
                 }
 
                 if (throwable instanceof TimeoutException) {
-                    res.get().completeExceptionally(new 
ReplicationTimeoutException(req.groupId()));
+                    res.completeExceptionally(new 
ReplicationTimeoutException(req.groupId()));
                 } else {
-                    res.get().completeExceptionally(withCause(
+                    res.completeExceptionally(withCause(
                             ReplicationException::new,
                             REPLICA_COMMON_ERR,
                             "Failed to process replica request 
[replicaGroupId=" + req.groupId() + ']',
@@ -114,55 +112,56 @@ public class ReplicaService {
                     var errResp = (ErrorReplicaResponse) response;
 
                     if (errResp.throwable() instanceof 
ReplicaUnavailableException) {
-                        pendingInvokes.compute(targetNodeConsistentId, 
(clusterNode, fut) -> {
-                            if (fut == null) {
-                                AwaitReplicaRequest awaitReplicaReq = 
REPLICA_MESSAGES_FACTORY.awaitReplicaRequest()
-                                        .groupId(req.groupId())
-                                        .build();
-
-                                fut = 
messagingService.invoke(targetNodeConsistentId, awaitReplicaReq, RPC_TIMEOUT)
-                                        .whenComplete((response0, throwable0) 
-> {
-                                            
pendingInvokes.remove(targetNodeConsistentId);
-                                        });
-                            }
-
-                            fut.handle((response0, throwable0) -> {
-                                if (throwable0 != null) {
-                                    if (throwable0 instanceof 
CompletionException) {
-                                        throwable0 = throwable0.getCause();
-                                    }
+                        CompletableFuture<NetworkMessage> awaitReplicaFut = 
pendingInvokes.computeIfAbsent(
+                                targetNodeConsistentId,
+                                consistentId -> {
+                                    AwaitReplicaRequest awaitReplicaReq = 
REPLICA_MESSAGES_FACTORY.awaitReplicaRequest()
+                                            .groupId(req.groupId())
+                                            .build();
+
+                                    return 
messagingService.invoke(targetNodeConsistentId, awaitReplicaReq, RPC_TIMEOUT);
+                                }
+                        );
 
-                                    if (throwable0 instanceof 
TimeoutException) {
-                                        
res.get().completeExceptionally(errResp.throwable());
-                                    } else {
-                                        
res.get().completeExceptionally(withCause(
-                                                ReplicationException::new,
-                                                REPLICA_COMMON_ERR,
-                                                "Failed to process replica 
request [replicaGroupId=" + req.groupId() + ']',
-                                                throwable0));
-                                    }
-                                } else {
-                                    res.get().thenCompose(ignore -> 
sendToReplica(targetNodeConsistentId, req));
+                        awaitReplicaFut.handle((response0, throwable0) -> {
+                            pendingInvokes.remove(targetNodeConsistentId, 
awaitReplicaFut);
 
-                                    res.get().complete(null);
+                            if (throwable0 != null) {
+                                if (throwable0 instanceof CompletionException) 
{
+                                    throwable0 = throwable0.getCause();
                                 }
 
-                                return null;
-                            });
+                                if (throwable0 instanceof TimeoutException) {
+                                    
res.completeExceptionally(errResp.throwable());
+                                } else {
+                                    res.completeExceptionally(withCause(
+                                            ReplicationException::new,
+                                            REPLICA_COMMON_ERR,
+                                            "Failed to process replica request 
[replicaGroupId=" + req.groupId() + ']',
+                                            throwable0));
+                                }
+                            } else {
+                                sendToReplica(targetNodeConsistentId, 
req).whenComplete((r, e) -> {
+                                    if (e != null) {
+                                        res.completeExceptionally(e);
+                                    } else {
+                                        res.complete((R) r);
+                                    }
+                                });
+                            }
 
-                            return fut;
+                            return null;
                         });
-
                     } else {
-                        res.get().completeExceptionally(errResp.throwable());
+                        res.completeExceptionally(errResp.throwable());
                     }
                 } else {
-                    res.get().complete((R) ((ReplicaResponse) 
response).result());
+                    res.complete((R) ((ReplicaResponse) response).result());
                 }
             }
         });
 
-        return res.get();
+        return res;
     }
 
     /**
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
index da4e7a5b11..02b78bb59e 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -101,6 +101,7 @@ public class PlacementDriverReplicaSideTest {
 
         Replica replica = new Replica(
                 GRP_ID,
+                completedFuture(null),
                 mock(ReplicaListener.class),
                 storageIndexTracker,
                 raftClient,
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
index f2974d7212..31c3dce14c 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
@@ -41,13 +41,11 @@ import 
org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
 import org.apache.ignite.network.ClusterNode;
 import org.apache.ignite.tx.Transaction;
 import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 
 /**
  * Test reads with specific timestamp.
  */
-@Disabled("IGNITE-19022 ItReadOnlyTransactionTest is flaky in TC due to 
replica is timed out")
 public class ItReadOnlyTransactionTest extends ClusterPerClassIntegrationTest {
     /** Table name. */
     public static final String TABLE_NAME = "tbl";
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 37643df468..b39b64f61b 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -468,6 +468,7 @@ public class ItTxDistributedTestSingleNode extends 
TxAbstractTest {
                                 DummySchemaManagerImpl schemaManager = new 
DummySchemaManagerImpl(schemaDescriptor);
                                 replicaManagers.get(assignment).startReplica(
                                         new TablePartitionId(tblId, partId),
+                                        
CompletableFuture.completedFuture(null),
                                         new PartitionReplicaListener(
                                                 testMpPartStorage,
                                                 raftSvc,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 83f6dbb59a..2668b59a00 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -130,6 +130,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                     try {
                         replicaManager.startReplica(
                                 tablePartitionId,
+                                CompletableFuture.completedFuture(null),
                                 request0 -> 
CompletableFuture.completedFuture(null),
                                 mock(TopologyAwareRaftGroupService.class),
                                 new PendingComparableValuesTracker<>(0L)
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index a8ec9e2644..35b5c5f456 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -268,6 +268,19 @@ public class TableImpl implements Table {
         };
     }
 
+    /**
+     * The future completes when the primary key index is ready to use.
+     *
+     * @return Future whcih complete when a primary index for the table is .
+     */
+    public CompletableFuture<Void> pkIndexesReadyFuture() {
+        var fut = new CompletableFuture<Void>();
+
+        pkId.whenComplete((uuid, throwable) -> fut.complete(null));
+
+        return fut;
+    }
+
     /**
      * Register the index with given id in a table.
      *
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 17fdced54e..f93a15331a 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -57,7 +57,6 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionException;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
@@ -721,8 +720,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                 CompletableFuture<Void> startGroupFut;
 
-                CountDownLatch storageReadyLatch = new CountDownLatch(1);
-
                 // start new nodes, only if it is table creation, other cases 
will be covered by rebalance logic
                 if (oldPartAssignment.isEmpty() && localMemberAssignment != 
null) {
                     startGroupFut = 
partitionStoragesFut.thenComposeAsync(partitionStorages -> {
@@ -771,8 +768,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                 internalTbl.storage(),
                                                 internalTbl.txStateStorage(),
                                                 partitionKey(internalTbl, 
partId),
-                                                storageUpdateHandler,
-                                                storageReadyLatch
+                                                storageUpdateHandler
                                         );
 
                                         Peer serverPeer = 
newConfiguration.peer(localMemberName);
@@ -834,7 +830,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                         TxStateStorage txStateStorage = 
partitionStorages.getTxStateStorage();
 
                                         try {
-                                            
replicaMgr.startReplica(replicaGrpId,
+                                            replicaMgr.startReplica(
+                                                    replicaGrpId,
+                                                    allOf(
+                                                            ((Loza) 
raftMgr).raftNodeReadyFuture(replicaGrpId),
+                                                            
table.pkIndexesReadyFuture()
+                                                    ),
                                                     new 
PartitionReplicaListener(
                                                             partitionStorage,
                                                             
updatedRaftGroupService,
@@ -852,8 +853,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                                             placementDriver,
                                                             
storageUpdateHandler,
                                                             this::isLocalPeer,
-                                                            
schemaManager.schemaRegistry(causalityToken, tblId),
-                                                            storageReadyLatch
+                                                            
schemaManager.schemaRegistry(causalityToken, tblId)
                                                     ),
                                                     updatedRaftGroupService,
                                                     storageIndexTracker
@@ -926,8 +926,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
             MvTableStorage mvTableStorage,
             TxStateTableStorage txStateTableStorage,
             PartitionKey partitionKey,
-            StorageUpdateHandler storageUpdateHandler,
-            CountDownLatch storageReadyLatch
+            StorageUpdateHandler storageUpdateHandler
     ) {
         RaftGroupOptions raftGroupOptions;
 
@@ -953,8 +952,6 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                 incomingSnapshotsExecutor
         ));
 
-        raftGroupOptions.setStorageReadyLatch(storageReadyLatch);
-
         return raftGroupOptions;
     }
 
@@ -2013,8 +2010,7 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
                                 internalTable.storage(),
                                 internalTable.txStateStorage(),
                                 partitionKey(internalTable, partId),
-                                storageUpdateHandler,
-                                null
+                                storageUpdateHandler
                         );
 
                         RaftGroupListener raftGrpLsnr = new PartitionListener(
@@ -2052,7 +2048,12 @@ public class TableManager extends Producer<TableEvent, 
TableEventParameters> imp
 
                             UUID tblId = tbl.tableId();
 
-                            replicaMgr.startReplica(replicaGrpId,
+                            replicaMgr.startReplica(
+                                    replicaGrpId,
+                                    allOf(
+                                            ((Loza) 
raftMgr).raftNodeReadyFuture(replicaGrpId),
+                                            tbl.pkIndexesReadyFuture()
+                                    ),
                                     new PartitionReplicaListener(
                                             mvPartitionStorage,
                                             
internalTable.partitionRaftGroupService(partId),
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0da68a8da7..19e26f25c4 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -43,7 +43,6 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.ConcurrentNavigableMap;
 import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.Executor;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
@@ -205,75 +204,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     private final CompletableFuture<SchemaRegistry> schemaFut;
 
-    /** Nullable latch that will be completed when storage is ready to process 
user requests. */
-    @Nullable
-    private final CountDownLatch storageReadyLatch;
-
-    /**
-     * The constructor.
-     *
-     * @param mvDataStorage Data storage.
-     * @param raftClient Raft client.
-     * @param txManager Transaction manager.
-     * @param lockManager Lock manager.
-     * @param partId Partition id.
-     * @param tableId Table id.
-     * @param indexesLockers Index lock helper objects.
-     * @param pkIndexStorage Pk index storage.
-     * @param secondaryIndexStorages Secondary index storages.
-     * @param hybridClock Hybrid clock.
-     * @param safeTime Safe time clock.
-     * @param txStateStorage Transaction state storage.
-     * @param placementDriver Placement driver.
-     * @param storageUpdateHandler Handler that processes updates writing them 
to storage.
-     * @param isLocalPeerChecker Function for checking that the given peer is 
local.
-     * @param schemaFut Table schema.
-     * @param storageReadyLatch Nullable latch that will be completed when 
storage is ready to process user requests.
-     */
-    public PartitionReplicaListener(
-            MvPartitionStorage mvDataStorage,
-            RaftGroupService raftClient,
-            TxManager txManager,
-            LockManager lockManager,
-            Executor scanRequestExecutor,
-            int partId,
-            UUID tableId,
-            Supplier<Map<UUID, IndexLocker>> indexesLockers,
-            Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
-            Supplier<Map<UUID, TableSchemaAwareIndexStorage>> 
secondaryIndexStorages,
-            HybridClock hybridClock,
-            PendingComparableValuesTracker<HybridTimestamp> safeTime,
-            TxStateStorage txStateStorage,
-            PlacementDriver placementDriver,
-            StorageUpdateHandler storageUpdateHandler,
-            Function<Peer, Boolean> isLocalPeerChecker,
-            CompletableFuture<SchemaRegistry> schemaFut,
-            CountDownLatch storageReadyLatch
-    ) {
-        this.mvDataStorage = mvDataStorage;
-        this.raftClient = raftClient;
-        this.txManager = txManager;
-        this.lockManager = lockManager;
-        this.scanRequestExecutor = scanRequestExecutor;
-        this.partId = partId;
-        this.tableId = tableId;
-        this.indexesLockers = indexesLockers;
-        this.pkIndexStorage = pkIndexStorage;
-        this.secondaryIndexStorages = secondaryIndexStorages;
-        this.hybridClock = hybridClock;
-        this.safeTime = safeTime;
-        this.txStateStorage = txStateStorage;
-        this.placementDriver = placementDriver;
-        this.isLocalPeerChecker = isLocalPeerChecker;
-        this.storageUpdateHandler = storageUpdateHandler;
-        this.schemaFut = schemaFut;
-        this.storageReadyLatch = storageReadyLatch;
-
-        this.replicationGroupId = new TablePartitionId(tableId, partId);
-
-        cursors = new 
ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
-    }
-
     /**
      * The constructor.
      *
@@ -330,7 +260,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         this.isLocalPeerChecker = isLocalPeerChecker;
         this.storageUpdateHandler = storageUpdateHandler;
         this.schemaFut = schemaFut;
-        this.storageReadyLatch = null;
 
         this.replicationGroupId = new TablePartitionId(tableId, partId);
 
@@ -339,15 +268,6 @@ public class PartitionReplicaListener implements 
ReplicaListener {
 
     @Override
     public CompletableFuture<?> invoke(ReplicaRequest request) {
-        try {
-            if (storageReadyLatch != null) {
-                storageReadyLatch.await();
-            }
-        } catch (InterruptedException e) {
-            Thread.currentThread().interrupt();
-            throw new IgniteInternalException("Interrupted while awaiting the 
storage initialization.", e);
-        }
-
         if (request instanceof TxStateReplicaRequest) {
             return processTxStateReplicaRequest((TxStateReplicaRequest) 
request);
         }

Reply via email to