This is an automated email from the ASF dual-hosted git repository.
vpyatkov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new f189d00bde IGNITE-19022 ItReadOnlyTransactionTest is flaky in TC due
to replica is timed out (#1815)
f189d00bde is described below
commit f189d00bde83dd7186d956767a3adf1aeb19cd58
Author: Vladislav Pyatkov <[email protected]>
AuthorDate: Mon Mar 27 20:07:06 2023 +0400
IGNITE-19022 ItReadOnlyTransactionTest is flaky in TC due to replica is
timed out (#1815)
---
.../raft/server/ItJraftCounterServerTest.java | 2 +
.../java/org/apache/ignite/internal/raft/Loza.java | 19 ++++-
.../internal/raft/server/RaftGroupOptions.java | 22 ------
.../ignite/internal/raft/server/RaftServer.java | 9 +++
.../internal/raft/server/impl/JraftServerImpl.java | 12 ++-
.../apache/ignite/raft/jraft/RaftGroupService.java | 11 ++-
.../apache/ignite/raft/jraft/core/NodeImpl.java | 89 ++++++++++++----------
.../ignite/raft/jraft/option/NodeOptions.java | 13 ----
.../apache/ignite/internal/replicator/Replica.java | 15 ++++
.../ignite/internal/replicator/ReplicaManager.java | 38 +++++----
.../ignite/internal/replicator/ReplicaService.java | 81 ++++++++++----------
.../replicator/PlacementDriverReplicaSideTest.java | 1 +
.../internal/table/ItReadOnlyTransactionTest.java | 2 -
.../distributed/ItTxDistributedTestSingleNode.java | 1 +
.../ignite/distributed/ReplicaUnavailableTest.java | 1 +
.../apache/ignite/internal/table/TableImpl.java | 13 ++++
.../internal/table/distributed/TableManager.java | 31 ++++----
.../replicator/PartitionReplicaListener.java | 80 -------------------
18 files changed, 201 insertions(+), 239 deletions(-)
diff --git
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
index 2c66e52bb3..5882d2c303 100644
---
a/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
+++
b/modules/raft/src/integrationTest/java/org/apache/ignite/raft/server/ItJraftCounterServerTest.java
@@ -659,6 +659,8 @@ class ItJraftCounterServerTest extends JraftAbstractTest {
RaftGroupOptions opts = defaults().snapshotStorageFactory(new
SnapshotInMemoryStorageFactory(snapshotMetaStorage));
raftServer.startRaftNode(new RaftNodeId(grpId, serverPeer),
initialMembersConf, listener, opts);
+
+ raftServer.raftNodeReadyFuture(grpId).join();
}, opts -> {});
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index b452883af3..cde94c987a 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -174,7 +174,12 @@ public class Loza implements RaftManager {
RaftGroupListener lsnr,
RaftGroupEventsListener eventsLsnr
) throws NodeStoppingException {
- return startRaftGroupNode(nodeId, configuration, lsnr, eventsLsnr,
RaftGroupOptions.defaults());
+ CompletableFuture<RaftGroupService> fut = startRaftGroupNode(nodeId,
configuration, lsnr, eventsLsnr, RaftGroupOptions.defaults());
+
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta
storage and cmg raft log re-application in async manner
+ raftServer.raftNodeReadyFuture(nodeId.groupId()).join();
+
+ return fut;
}
@Override
@@ -311,6 +316,18 @@ public class Loza implements RaftManager {
);
}
+
+ /**
+ * Gets a future that completes when all committed updates are applied to
state machine after the RAFT node start.
+ * TODO: IGNITE-18273 The method should be defined in RaftManager and
takes RaftNodeId instead of its argument.
+ *
+ * @param groupId Raft group id.
+ * @return Future to last applied revision.
+ */
+ public CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId
groupId) {
+ return raftServer.raftNodeReadyFuture(groupId);
+ }
+
@Override
public boolean stopRaftNode(RaftNodeId nodeId) throws
NodeStoppingException {
if (!busyLock.enterBusy()) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
index dd6d218fde..d074ae35e8 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftGroupOptions.java
@@ -17,11 +17,9 @@
package org.apache.ignite.internal.raft.server;
-import java.util.concurrent.CountDownLatch;
import org.apache.ignite.internal.raft.storage.LogStorageFactory;
import org.apache.ignite.internal.raft.storage.RaftMetaStorageFactory;
import org.apache.ignite.internal.raft.storage.SnapshotStorageFactory;
-import org.jetbrains.annotations.Nullable;
/**
* Options specific to a Raft group that is being started.
@@ -39,10 +37,6 @@ public class RaftGroupOptions {
/** Raft meta storage factory. */
private RaftMetaStorageFactory raftMetaStorageFactory;
- /** Nullable latch that will be completed when storage is ready to process
user requests. */
- @Nullable
- private CountDownLatch storageReadyLatch;
-
/**
* Returns default options as defined by classic Raft (so stores are
persistent).
*
@@ -132,20 +126,4 @@ public class RaftGroupOptions {
return this;
}
-
- /**
- * Returns the latch that will be completed when storage is ready to
process user requests.
- */
- public CountDownLatch getStorageReadyLatch() {
- return storageReadyLatch;
- }
-
- /**
- * Adds storage ready latch to options.
- */
- public RaftGroupOptions setStorageReadyLatch(CountDownLatch
storageReadyLatch) {
- this.storageReadyLatch = storageReadyLatch;
-
- return this;
- }
}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
index ea1aea77ff..c6e2bcb4c1 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/RaftServer.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.raft.server;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.manager.IgniteComponent;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -75,6 +76,14 @@ public interface RaftServer extends IgniteComponent {
RaftGroupOptions groupOptions
);
+ /**
+ * Returns a future, which complete when the raft node is ready and
committed updates are applied.
+ *
+ * @param groupId Raft group ID.
+ * @return A future to last applied revision on start.
+ */
+ CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId groupId);
+
/**
* Stops a given local Raft node if it exists.
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 7e38be811f..58fe7ec69b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -30,7 +30,9 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
+import java.util.Map.Entry;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutorService;
@@ -414,8 +416,6 @@ public class JraftServerImpl implements RaftServer {
nodeOptions.setRaftGrpEvtsLsnr(new
RaftGroupEventsListenerAdapter(nodeId.groupId(), serviceEventInterceptor,
evLsnr));
-
nodeOptions.setStorageReadyLatch(groupOptions.getStorageReadyLatch());
-
LogStorageFactory logStorageFactory =
groupOptions.getLogStorageFactory() == null
? this.logStorageFactory :
groupOptions.getLogStorageFactory();
@@ -457,6 +457,14 @@ public class JraftServerImpl implements RaftServer {
}
}
+ @Override
+ public CompletableFuture<Long> raftNodeReadyFuture(ReplicationGroupId
groupId) {
+ RaftGroupService jraftNode = nodes.entrySet().stream().filter(entry ->
entry.getKey().groupId().equals(groupId))
+ .map(Entry::getValue).findAny().get();
+
+ return jraftNode.getApplyCommittedFuture();
+ }
+
@Override
public boolean stopRaftNode(RaftNodeId nodeId) {
RaftGroupService svc = nodes.remove(nodeId);
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
index 2fbb14523a..42c60d7134 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/RaftGroupService.java
@@ -16,6 +16,7 @@
*/
package org.apache.ignite.raft.jraft;
+import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.logger.IgniteLogger;
import org.apache.ignite.internal.logger.Loggers;
import org.apache.ignite.lang.IgniteInternalException;
@@ -57,7 +58,7 @@ public class RaftGroupService {
/**
* The raft node.
*/
- private Node node;
+ private NodeImpl node;
/**
* The node manager.
@@ -123,6 +124,14 @@ public class RaftGroupService {
return this.node;
}
+ /**
+ * Gets a future which complete when all committed update are applied to
the node's state machine on start.
+ * @return Future completes when this node committed revision would be
equal to the applied one.
+ */
+ public CompletableFuture<Long> getApplyCommittedFuture() {
+ return node.getApplyCommittedFuture();
+ }
+
public synchronized void shutdown() {
// TODO asch remove handlers before shutting down raft node
https://issues.apache.org/jira/browse/IGNITE-14519
if (!this.started) {
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
index 3a4e9aebfe..f7f1326460 100644
--- a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
+++ b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/core/NodeImpl.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ScheduledFuture;
@@ -146,6 +147,9 @@ public class NodeImpl implements Node, RaftServerService {
.writeLock();
protected final Lock readLock = this.readWriteLock
.readLock();
+
+ /** The future completes when all committed actions applied to RAFT state
machine. */
+ private final CompletableFuture<Long> applyCommittedFuture;
private volatile State state;
private volatile CountDownLatch shutdownLatch;
private long currTerm;
@@ -557,6 +561,7 @@ public class NodeImpl implements Node, RaftServerService {
updateLastLeaderTimestamp(Utils.monotonicMs());
this.confCtx = new ConfigurationCtx(this);
this.wakingCandidate = null;
+ this.applyCommittedFuture = new CompletableFuture<>();
}
public HybridClock clock() {
@@ -1063,17 +1068,14 @@ public class NodeImpl implements Node,
RaftServerService {
// Wait committed.
long commitIdx = logManager.getLastLogIndex();
- boolean externalAwaitStorageLatch = opts.getStorageReadyLatch() !=
null;
+ CompletableFuture<Long> logApplyComplition = new CompletableFuture<>();
if (commitIdx > fsmCaller.getLastAppliedIndex()) {
- // TODO: https://issues.apache.org/jira/browse/IGNITE-19047 Meta
storage and cmg raft log re-application in async manner
- CountDownLatch applyCommitLatch = externalAwaitStorageLatch ?
opts.getStorageReadyLatch() : new CountDownLatch(1);
-
LastAppliedLogIndexListener lnsr = new
LastAppliedLogIndexListener() {
@Override
public void onApplied( long lastAppliedLogIndex) {
if (lastAppliedLogIndex >= commitIdx) {
- applyCommitLatch.countDown();
+ logApplyComplition.complete(lastAppliedLogIndex);
fsmCaller.removeLastAppliedLogIndexListener(this);
}
}
@@ -1082,20 +1084,8 @@ public class NodeImpl implements Node, RaftServerService
{
fsmCaller.addLastAppliedLogIndexListener(lnsr);
fsmCaller.onCommitted(commitIdx);
-
- try {
- if (!externalAwaitStorageLatch) {
- applyCommitLatch.await();
- }
- } catch (InterruptedException e) {
- LOG.error("Fail to apply committed updates.", e);
-
- return false;
- }
} else {
- if (externalAwaitStorageLatch) {
- opts.getStorageReadyLatch().countDown();
- }
+ logApplyComplition.complete(fsmCaller.getLastAppliedIndex());
}
if (!this.rpcClientService.init(this.options)) {
@@ -1116,38 +1106,53 @@ public class NodeImpl implements Node,
RaftServerService {
return false;
}
- // set state to follower
- this.state = State.STATE_FOLLOWER;
+ logApplyComplition.whenComplete((committedIdx, err) -> {
+ if (err != null) {
+ LOG.error("Fail to apply committed updates.", err);
+ }
- if (LOG.isInfoEnabled()) {
- LOG.info("Node {} init, term={}, lastLogId={}, conf={},
oldConf={}.", getNodeId(), this.currTerm,
- this.logManager.getLastLogId(false), this.conf.getConf(),
this.conf.getOldConf());
- }
+ // set state to follower
+ this.state = State.STATE_FOLLOWER;
- if (this.snapshotExecutor != null &&
this.options.getSnapshotIntervalSecs() > 0) {
- LOG.debug("Node {} start snapshot timer, term={}.", getNodeId(),
this.currTerm);
- this.snapshotTimer.start();
- }
+ if (LOG.isInfoEnabled()) {
+ LOG.info("Node {} init, term={}, lastLogId={}, conf={},
oldConf={}.", getNodeId(), this.currTerm,
+ this.logManager.getLastLogId(false), this.conf.getConf(),
this.conf.getOldConf());
+ }
- if (!this.conf.isEmpty()) {
- stepDown(this.currTerm, false, new Status());
- }
+ if (this.snapshotExecutor != null &&
this.options.getSnapshotIntervalSecs() > 0) {
+ LOG.debug("Node {} start snapshot timer, term={}.",
getNodeId(), this.currTerm);
+ this.snapshotTimer.start();
+ }
- // Now the raft node is started , have to acquire the writeLock to
avoid race
- // conditions
- this.writeLock.lock();
- if (this.conf.isStable() && this.conf.getConf().size() == 1 &&
this.conf.getConf().contains(this.serverId)) {
- // The group contains only this server which must be the LEADER,
trigger
- // the timer immediately.
- electSelf();
- }
- else {
- this.writeLock.unlock();
- }
+ if (!this.conf.isEmpty()) {
+ stepDown(this.currTerm, false, new Status());
+ }
+
+ // Now the raft node is started , have to acquire the writeLock to
avoid race
+ // conditions
+ this.writeLock.lock();
+ if (this.conf.isStable() && this.conf.getConf().size() == 1 &&
this.conf.getConf().contains(this.serverId)) {
+ // The group contains only this server which must be the
LEADER, trigger
+ // the timer immediately.
+ electSelf();
+ }
+ else {
+ this.writeLock.unlock();
+ }
+
+ applyCommittedFuture.complete(commitIdx);
+ });
return true;
}
+ /**
+ * Gets a future which complete when all committed update are applied to
the node's state machine on start.
+ * @return Future completes when this node committed revision would be
equal to the applied one.
+ */
+ public CompletableFuture<Long> getApplyCommittedFuture() {
+ return applyCommittedFuture;
+ }
/**
* Validates a required option if shared pools are enabled.
*
diff --git
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
index fe7394cec6..e9a051512a 100644
---
a/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
+++
b/modules/raft/src/main/java/org/apache/ignite/raft/jraft/option/NodeOptions.java
@@ -239,10 +239,6 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
/** A hybrid clock */
private HybridClock clock = new HybridClockImpl();
- /** Nullable latch that will be completed when storage is ready to process
user requests. */
- @Nullable
- private CountDownLatch storageReadyLatch;
-
/**
* Amount of Disruptors that will handle the RAFT server.
*/
@@ -641,7 +637,6 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
nodeOptions.setRpcInstallSnapshotTimeout(this.getRpcInstallSnapshotTimeout());
nodeOptions.setElectionTimeoutStrategy(this.getElectionTimeoutStrategy());
nodeOptions.setClock(this.getClock());
- nodeOptions.setStorageReadyLatch(this.getStorageReadyLatch());
return nodeOptions;
}
@@ -681,12 +676,4 @@ public class NodeOptions extends RpcOptions implements
Copiable<NodeOptions> {
public void setElectionTimeoutStrategy(TimeoutStrategy
electionTimeoutStrategy) {
this.electionTimeoutStrategy = electionTimeoutStrategy;
}
-
- public CountDownLatch getStorageReadyLatch() {
- return storageReadyLatch;
- }
-
- public void setStorageReadyLatch(CountDownLatch storageReadyLatch) {
- this.storageReadyLatch = storageReadyLatch;
- }
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
index df11a77ba3..4c5b4925e7 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/Replica.java
@@ -57,6 +57,9 @@ public class Replica {
/** Replica listener. */
private final ReplicaListener listener;
+ /** When the future completes the replica become ready to process
requests. */
+ private final CompletableFuture<Void> whenReplicaReady;
+
/** Storage index tracker. */
private final PendingComparableValuesTracker<Long> storageIndexTracker;
@@ -79,6 +82,7 @@ public class Replica {
* The constructor of a replica server.
*
* @param replicaGrpId Replication group id.
+ * @param replicaReady Future is determined when the replica become ready.
* @param listener Replica listener.
* @param storageIndexTracker Storage index tracker.
* @param raftClient Topology aware Raft client.
@@ -86,12 +90,14 @@ public class Replica {
*/
public Replica(
ReplicationGroupId replicaGrpId,
+ CompletableFuture<Void> replicaReady,
ReplicaListener listener,
PendingComparableValuesTracker<Long> storageIndexTracker,
TopologyAwareRaftGroupService raftClient,
ClusterNode localNode
) {
this.replicaGrpId = replicaGrpId;
+ this.whenReplicaReady = replicaReady;
this.listener = listener;
this.storageIndexTracker = storageIndexTracker;
this.raftClient = raftClient;
@@ -124,6 +130,15 @@ public class Replica {
return replicaGrpId;
}
+ /**
+ * Get a future to wait when the replica become ready.
+ *
+ * @return A future to check when the replica is ready.
+ */
+ public CompletableFuture<Void> ready() {
+ return whenReplicaReady;
+ }
+
private void onLeaderElected(ClusterNode clusterNode, Long term) {
leaderRef.set(clusterNode);
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 9a72bcea3a..8a569c1c99 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -139,20 +139,18 @@ public class ReplicaManager implements IgniteComponent {
}
if (!replicaFut.isDone()) {
- replicaFut.thenCompose(
- ignore -> {
- IgniteUtils.inBusyLock(
- busyLock,
- () ->
sendAwaitReplicaResponse(senderConsistentId, correlationId)
- );
-
- return null;
- }
+ replicaFut.thenAccept(createdReplica ->
+ createdReplica.ready().thenAccept(unused ->
+ IgniteUtils.inBusyLock(
+ busyLock,
+ () ->
sendAwaitReplicaResponse(senderConsistentId, correlationId)
+ )
+ )
);
return replicaFut;
} else {
- IgniteUtils.inBusyLock(busyLock, () ->
sendAwaitReplicaResponse(senderConsistentId, correlationId));
+ sendAwaitReplicaResponse(senderConsistentId,
correlationId);
return replicaFut;
}
@@ -165,7 +163,7 @@ public class ReplicaManager implements IgniteComponent {
HybridTimestamp requestTimestamp = extractTimestamp(request);
- if (replicaFut == null || !replicaFut.isDone()) {
+ if (replicaFut == null || !replicaFut.isDone() ||
!replicaFut.join().ready().isDone()) {
sendReplicaUnavailableErrorResponse(senderConsistentId,
correlationId, request.groupId(), requestTimestamp);
return;
@@ -226,16 +224,16 @@ public class ReplicaManager implements IgniteComponent {
* Starts a replica. If a replica with the same partition id already
exists, the method throws an exception.
*
* @param replicaGrpId Replication group id.
+ * @param whenReplicaReady Future that completes when the replica become
ready.
* @param listener Replica listener.
* @param raftClient Topology aware Raft client.
* @param storageIndexTracker Storage index tracker.
- *
- * @return New replica.
* @throws NodeStoppingException If node is stopping.
* @throws ReplicaIsAlreadyStartedException Is thrown when a replica with
the same replication group id has already been started.
*/
- public Replica startReplica(
+ public void startReplica(
ReplicationGroupId replicaGrpId,
+ CompletableFuture<Void> whenReplicaReady,
ReplicaListener listener,
TopologyAwareRaftGroupService raftClient,
PendingComparableValuesTracker<Long> storageIndexTracker
@@ -245,7 +243,7 @@ public class ReplicaManager implements IgniteComponent {
}
try {
- return startReplicaInternal(replicaGrpId, listener, raftClient,
storageIndexTracker);
+ startReplicaInternal(replicaGrpId, whenReplicaReady, listener,
raftClient, storageIndexTracker);
} finally {
busyLock.leaveBusy();
}
@@ -255,19 +253,21 @@ public class ReplicaManager implements IgniteComponent {
* Internal method for starting a replica.
*
* @param replicaGrpId Replication group id.
+ * @param whenReplicaReady Future that completes when the replica become
ready.
* @param listener Replica listener.
* @param raftClient Topology aware Raft client.
* @param storageIndexTracker Storage index tracker.
- * @return New replica.
*/
- private Replica startReplicaInternal(
+ private void startReplicaInternal(
ReplicationGroupId replicaGrpId,
+ CompletableFuture<Void> whenReplicaReady,
ReplicaListener listener,
TopologyAwareRaftGroupService raftClient,
PendingComparableValuesTracker<Long> storageIndexTracker
) {
ClusterNode localNode = clusterNetSvc.topologyService().localMember();
- Replica newReplica = new Replica(replicaGrpId, listener,
storageIndexTracker, raftClient, localNode);
+
+ Replica newReplica = new Replica(replicaGrpId, whenReplicaReady,
listener, storageIndexTracker, raftClient, localNode);
replicas.compute(replicaGrpId, (replicationGroupId, replicaFut) -> {
if (replicaFut == null) {
@@ -282,8 +282,6 @@ public class ReplicaManager implements IgniteComponent {
return replicaFut;
}
});
-
- return newReplica;
}
/**
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index 1cd3bf67e4..d18587a381 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -25,7 +25,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicReference;
import org.apache.ignite.internal.hlc.HybridClock;
import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite.internal.replicator.exception.ReplicationException;
@@ -84,8 +83,7 @@ public class ReplicaService {
* @see ReplicaUnavailableException If replica with given replication
group id doesn't exist or not started yet.
*/
private <R> CompletableFuture<R> sendToReplica(String
targetNodeConsistentId, ReplicaRequest req) {
-
- AtomicReference<CompletableFuture<R>> res = new AtomicReference<>(new
CompletableFuture<>());
+ CompletableFuture<R> res = new CompletableFuture<>();
// TODO: IGNITE-17824 Use named executor instead of default one in
order to process replica Response.
messagingService.invoke(targetNodeConsistentId, req,
RPC_TIMEOUT).whenCompleteAsync((response, throwable) -> {
@@ -95,9 +93,9 @@ public class ReplicaService {
}
if (throwable instanceof TimeoutException) {
- res.get().completeExceptionally(new
ReplicationTimeoutException(req.groupId()));
+ res.completeExceptionally(new
ReplicationTimeoutException(req.groupId()));
} else {
- res.get().completeExceptionally(withCause(
+ res.completeExceptionally(withCause(
ReplicationException::new,
REPLICA_COMMON_ERR,
"Failed to process replica request
[replicaGroupId=" + req.groupId() + ']',
@@ -114,55 +112,56 @@ public class ReplicaService {
var errResp = (ErrorReplicaResponse) response;
if (errResp.throwable() instanceof
ReplicaUnavailableException) {
- pendingInvokes.compute(targetNodeConsistentId,
(clusterNode, fut) -> {
- if (fut == null) {
- AwaitReplicaRequest awaitReplicaReq =
REPLICA_MESSAGES_FACTORY.awaitReplicaRequest()
- .groupId(req.groupId())
- .build();
-
- fut =
messagingService.invoke(targetNodeConsistentId, awaitReplicaReq, RPC_TIMEOUT)
- .whenComplete((response0, throwable0)
-> {
-
pendingInvokes.remove(targetNodeConsistentId);
- });
- }
-
- fut.handle((response0, throwable0) -> {
- if (throwable0 != null) {
- if (throwable0 instanceof
CompletionException) {
- throwable0 = throwable0.getCause();
- }
+ CompletableFuture<NetworkMessage> awaitReplicaFut =
pendingInvokes.computeIfAbsent(
+ targetNodeConsistentId,
+ consistentId -> {
+ AwaitReplicaRequest awaitReplicaReq =
REPLICA_MESSAGES_FACTORY.awaitReplicaRequest()
+ .groupId(req.groupId())
+ .build();
+
+ return
messagingService.invoke(targetNodeConsistentId, awaitReplicaReq, RPC_TIMEOUT);
+ }
+ );
- if (throwable0 instanceof
TimeoutException) {
-
res.get().completeExceptionally(errResp.throwable());
- } else {
-
res.get().completeExceptionally(withCause(
- ReplicationException::new,
- REPLICA_COMMON_ERR,
- "Failed to process replica
request [replicaGroupId=" + req.groupId() + ']',
- throwable0));
- }
- } else {
- res.get().thenCompose(ignore ->
sendToReplica(targetNodeConsistentId, req));
+ awaitReplicaFut.handle((response0, throwable0) -> {
+ pendingInvokes.remove(targetNodeConsistentId,
awaitReplicaFut);
- res.get().complete(null);
+ if (throwable0 != null) {
+ if (throwable0 instanceof CompletionException)
{
+ throwable0 = throwable0.getCause();
}
- return null;
- });
+ if (throwable0 instanceof TimeoutException) {
+
res.completeExceptionally(errResp.throwable());
+ } else {
+ res.completeExceptionally(withCause(
+ ReplicationException::new,
+ REPLICA_COMMON_ERR,
+ "Failed to process replica request
[replicaGroupId=" + req.groupId() + ']',
+ throwable0));
+ }
+ } else {
+ sendToReplica(targetNodeConsistentId,
req).whenComplete((r, e) -> {
+ if (e != null) {
+ res.completeExceptionally(e);
+ } else {
+ res.complete((R) r);
+ }
+ });
+ }
- return fut;
+ return null;
});
-
} else {
- res.get().completeExceptionally(errResp.throwable());
+ res.completeExceptionally(errResp.throwable());
}
} else {
- res.get().complete((R) ((ReplicaResponse)
response).result());
+ res.complete((R) ((ReplicaResponse) response).result());
}
}
});
- return res.get();
+ return res;
}
/**
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
index da4e7a5b11..02b78bb59e 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/PlacementDriverReplicaSideTest.java
@@ -101,6 +101,7 @@ public class PlacementDriverReplicaSideTest {
Replica replica = new Replica(
GRP_ID,
+ completedFuture(null),
mock(ReplicaListener.class),
storageIndexTracker,
raftClient,
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
index f2974d7212..31c3dce14c 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItReadOnlyTransactionTest.java
@@ -41,13 +41,11 @@ import
org.apache.ignite.internal.sql.engine.ClusterPerClassIntegrationTest;
import org.apache.ignite.network.ClusterNode;
import org.apache.ignite.tx.Transaction;
import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
/**
* Test reads with specific timestamp.
*/
-@Disabled("IGNITE-19022 ItReadOnlyTransactionTest is flaky in TC due to
replica is timed out")
public class ItReadOnlyTransactionTest extends ClusterPerClassIntegrationTest {
/** Table name. */
public static final String TABLE_NAME = "tbl";
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
index 37643df468..b39b64f61b 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ItTxDistributedTestSingleNode.java
@@ -468,6 +468,7 @@ public class ItTxDistributedTestSingleNode extends
TxAbstractTest {
DummySchemaManagerImpl schemaManager = new
DummySchemaManagerImpl(schemaDescriptor);
replicaManagers.get(assignment).startReplica(
new TablePartitionId(tblId, partId),
+
CompletableFuture.completedFuture(null),
new PartitionReplicaListener(
testMpPartStorage,
raftSvc,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 83f6dbb59a..2668b59a00 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -130,6 +130,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
try {
replicaManager.startReplica(
tablePartitionId,
+ CompletableFuture.completedFuture(null),
request0 ->
CompletableFuture.completedFuture(null),
mock(TopologyAwareRaftGroupService.class),
new PendingComparableValuesTracker<>(0L)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
index a8ec9e2644..35b5c5f456 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/TableImpl.java
@@ -268,6 +268,19 @@ public class TableImpl implements Table {
};
}
+ /**
+ * The future completes when the primary key index is ready to use.
+ *
+ * @return Future whcih complete when a primary index for the table is .
+ */
+ public CompletableFuture<Void> pkIndexesReadyFuture() {
+ var fut = new CompletableFuture<Void>();
+
+ pkId.whenComplete((uuid, throwable) -> fut.complete(null));
+
+ return fut;
+ }
+
/**
* Register the index with given id in a table.
*
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 17fdced54e..f93a15331a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -57,7 +57,6 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
@@ -721,8 +720,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
CompletableFuture<Void> startGroupFut;
- CountDownLatch storageReadyLatch = new CountDownLatch(1);
-
// start new nodes, only if it is table creation, other cases
will be covered by rebalance logic
if (oldPartAssignment.isEmpty() && localMemberAssignment !=
null) {
startGroupFut =
partitionStoragesFut.thenComposeAsync(partitionStorages -> {
@@ -771,8 +768,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
internalTbl.storage(),
internalTbl.txStateStorage(),
partitionKey(internalTbl,
partId),
- storageUpdateHandler,
- storageReadyLatch
+ storageUpdateHandler
);
Peer serverPeer =
newConfiguration.peer(localMemberName);
@@ -834,7 +830,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
TxStateStorage txStateStorage =
partitionStorages.getTxStateStorage();
try {
-
replicaMgr.startReplica(replicaGrpId,
+ replicaMgr.startReplica(
+ replicaGrpId,
+ allOf(
+ ((Loza)
raftMgr).raftNodeReadyFuture(replicaGrpId),
+
table.pkIndexesReadyFuture()
+ ),
new
PartitionReplicaListener(
partitionStorage,
updatedRaftGroupService,
@@ -852,8 +853,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
placementDriver,
storageUpdateHandler,
this::isLocalPeer,
-
schemaManager.schemaRegistry(causalityToken, tblId),
- storageReadyLatch
+
schemaManager.schemaRegistry(causalityToken, tblId)
),
updatedRaftGroupService,
storageIndexTracker
@@ -926,8 +926,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
MvTableStorage mvTableStorage,
TxStateTableStorage txStateTableStorage,
PartitionKey partitionKey,
- StorageUpdateHandler storageUpdateHandler,
- CountDownLatch storageReadyLatch
+ StorageUpdateHandler storageUpdateHandler
) {
RaftGroupOptions raftGroupOptions;
@@ -953,8 +952,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
incomingSnapshotsExecutor
));
- raftGroupOptions.setStorageReadyLatch(storageReadyLatch);
-
return raftGroupOptions;
}
@@ -2013,8 +2010,7 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
internalTable.storage(),
internalTable.txStateStorage(),
partitionKey(internalTable, partId),
- storageUpdateHandler,
- null
+ storageUpdateHandler
);
RaftGroupListener raftGrpLsnr = new PartitionListener(
@@ -2052,7 +2048,12 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
UUID tblId = tbl.tableId();
- replicaMgr.startReplica(replicaGrpId,
+ replicaMgr.startReplica(
+ replicaGrpId,
+ allOf(
+ ((Loza)
raftMgr).raftNodeReadyFuture(replicaGrpId),
+ tbl.pkIndexesReadyFuture()
+ ),
new PartitionReplicaListener(
mvPartitionStorage,
internalTable.partitionRaftGroupService(partId),
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 0da68a8da7..19e26f25c4 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -43,7 +43,6 @@ import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ConcurrentNavigableMap;
import java.util.concurrent.ConcurrentSkipListMap;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Executor;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
@@ -205,75 +204,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
private final CompletableFuture<SchemaRegistry> schemaFut;
- /** Nullable latch that will be completed when storage is ready to process
user requests. */
- @Nullable
- private final CountDownLatch storageReadyLatch;
-
- /**
- * The constructor.
- *
- * @param mvDataStorage Data storage.
- * @param raftClient Raft client.
- * @param txManager Transaction manager.
- * @param lockManager Lock manager.
- * @param partId Partition id.
- * @param tableId Table id.
- * @param indexesLockers Index lock helper objects.
- * @param pkIndexStorage Pk index storage.
- * @param secondaryIndexStorages Secondary index storages.
- * @param hybridClock Hybrid clock.
- * @param safeTime Safe time clock.
- * @param txStateStorage Transaction state storage.
- * @param placementDriver Placement driver.
- * @param storageUpdateHandler Handler that processes updates writing them
to storage.
- * @param isLocalPeerChecker Function for checking that the given peer is
local.
- * @param schemaFut Table schema.
- * @param storageReadyLatch Nullable latch that will be completed when
storage is ready to process user requests.
- */
- public PartitionReplicaListener(
- MvPartitionStorage mvDataStorage,
- RaftGroupService raftClient,
- TxManager txManager,
- LockManager lockManager,
- Executor scanRequestExecutor,
- int partId,
- UUID tableId,
- Supplier<Map<UUID, IndexLocker>> indexesLockers,
- Lazy<TableSchemaAwareIndexStorage> pkIndexStorage,
- Supplier<Map<UUID, TableSchemaAwareIndexStorage>>
secondaryIndexStorages,
- HybridClock hybridClock,
- PendingComparableValuesTracker<HybridTimestamp> safeTime,
- TxStateStorage txStateStorage,
- PlacementDriver placementDriver,
- StorageUpdateHandler storageUpdateHandler,
- Function<Peer, Boolean> isLocalPeerChecker,
- CompletableFuture<SchemaRegistry> schemaFut,
- CountDownLatch storageReadyLatch
- ) {
- this.mvDataStorage = mvDataStorage;
- this.raftClient = raftClient;
- this.txManager = txManager;
- this.lockManager = lockManager;
- this.scanRequestExecutor = scanRequestExecutor;
- this.partId = partId;
- this.tableId = tableId;
- this.indexesLockers = indexesLockers;
- this.pkIndexStorage = pkIndexStorage;
- this.secondaryIndexStorages = secondaryIndexStorages;
- this.hybridClock = hybridClock;
- this.safeTime = safeTime;
- this.txStateStorage = txStateStorage;
- this.placementDriver = placementDriver;
- this.isLocalPeerChecker = isLocalPeerChecker;
- this.storageUpdateHandler = storageUpdateHandler;
- this.schemaFut = schemaFut;
- this.storageReadyLatch = storageReadyLatch;
-
- this.replicationGroupId = new TablePartitionId(tableId, partId);
-
- cursors = new
ConcurrentSkipListMap<>(IgniteUuid.globalOrderComparator());
- }
-
/**
* The constructor.
*
@@ -330,7 +260,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
this.isLocalPeerChecker = isLocalPeerChecker;
this.storageUpdateHandler = storageUpdateHandler;
this.schemaFut = schemaFut;
- this.storageReadyLatch = null;
this.replicationGroupId = new TablePartitionId(tableId, partId);
@@ -339,15 +268,6 @@ public class PartitionReplicaListener implements
ReplicaListener {
@Override
public CompletableFuture<?> invoke(ReplicaRequest request) {
- try {
- if (storageReadyLatch != null) {
- storageReadyLatch.await();
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- throw new IgniteInternalException("Interrupted while awaiting the
storage initialization.", e);
- }
-
if (request instanceof TxStateReplicaRequest) {
return processTxStateReplicaRequest((TxStateReplicaRequest)
request);
}