This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 82dd62fb1dd IGNITE-27703 Remove waiting for cluster time during
assignments waiting (#7531)
82dd62fb1dd is described below
commit 82dd62fb1ddf1a049c6ebd3ec171dfd8b792d479
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Feb 5 22:30:36 2026 +0200
IGNITE-27703 Remove waiting for cluster time during assignments waiting
(#7531)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 3 +
.../ignite/client/handler/FakePlacementDriver.java | 2 +-
.../ignite/internal/util/ExceptionUtils.java | 19 ++++
.../rebalance/ItRebalanceDistributedTest.java | 2 +
.../rebalance/ZoneRebalanceUtil.java | 21 ++++
.../ignite/internal/index/TestPlacementDriver.java | 6 +-
.../partitiondistribution/Assignments.java | 16 +++
.../partition/replicator/fixtures/Node.java | 2 +
.../replicator/fixtures/TestPlacementDriver.java | 8 +-
.../PartitionReplicaLifecycleManagerTest.java | 1 +
.../AssignmentsPlacementDriver.java | 2 -
.../wrappers/DelegatingPlacementDriver.java | 3 +-
.../wrappers/ExecutorInclinedPlacementDriver.java | 8 +-
.../placementdriver/TestPlacementDriver.java | 6 +-
.../placementdriver/AssignmentsTracker.java | 11 +-
.../placementdriver/PlacementDriverManager.java | 4 +-
.../placementdriver/AssignmentsTrackerTest.java | 4 +-
modules/platforms/cpp/ignite/common/error_codes.h | 1 +
modules/platforms/cpp/ignite/odbc/common_types.cpp | 1 +
.../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs | 3 +
modules/replicator/build.gradle | 1 +
.../ItPlacementDriverReplicaSideTest.java | 2 +
.../ignite/internal/replicator/ReplicaManager.java | 78 ++++++++-----
.../ignite/internal/replicator/ReplicaService.java | 17 ++-
.../exception/ReplicaUnavailableException.java | 13 +--
.../internal/replicator/ReplicaManagerTest.java | 125 ++++++++++++++++++++-
.../runner/app/ItIgniteNodeRestartTest.java | 2 +
.../org/apache/ignite/internal/app/IgniteImpl.java | 2 +
.../sql/engine/exec/fsm/QueryExecutionProgram.java | 8 +-
.../mapping/ExecutionDistributionProviderImpl.java | 8 +-
.../engine/exec/mapping/MappingServiceImpl.java | 2 +-
.../ignite/distributed/ReplicaUnavailableTest.java | 2 +
.../distributed/storage/InternalTableImpl.java | 2 +
.../distributed/TableManagerRecoveryTest.java | 1 +
.../apache/ignite/distributed/ItTxTestCluster.java | 2 +
.../tx/impl/PersistentTxStateVacuumizer.java | 4 +-
36 files changed, 317 insertions(+), 75 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 51db3a334a9..25a7c7048ab 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -496,6 +496,9 @@ public class ErrorGroups {
/** Replication group unavailable exception code. */
public static final int GROUP_UNAVAILABLE_ERR =
REPLICATOR_ERR_GROUP.registerErrorCode((short) 10);
+
+ /** Replica is absent on the node and the node is not in assignments
for this replica. */
+ public static final int REPLICA_ABSENT_ERR =
REPLICATOR_ERR_GROUP.registerErrorCode((short) 11);
}
/** Storage error group. */
diff --git
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index d4afe93e403..7714422d85f 100644
---
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -132,7 +132,7 @@ public class FakePlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
@Override
public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
List<? extends ReplicationGroupId> replicationGroupIds,
- HybridTimestamp clusterTimeToAwait, long timeoutMillis
+ long timeoutMillis
) {
return failedFuture(new
UnsupportedOperationException("awaitNonEmptyAssignments() is not supported in
FakePlacementDriver yet."));
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index e7d52116b5c..07c61e925f1 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -754,6 +754,25 @@ public final class ExceptionUtils {
return false;
}
+ /**
+ * Determine if a particular error matches any of passed error codes.
+ *
+ * @param t Unwrapped throwable.
+ * @param codes The codes list.
+ * @return {@code True} if exception allows retry.
+ */
+ public static boolean matchAny(Throwable t, List<Integer> codes) {
+ int errCode = extractCodeFrom(t);
+
+ for (int c0 : codes) {
+ if (c0 == errCode) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
// TODO: https://issues.apache.org/jira/browse/IGNITE-19870
// This method should be removed or re-worked and usages should be changed
to IgniteExceptionMapperUtil.mapToPublicException.
/**
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a0b39880129..02b5cbc15c2 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.rebalance;
import static java.util.Collections.reverse;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
@@ -1471,6 +1472,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
name,
clusterService,
cmgManager,
+ groupId -> completedFuture(Assignments.EMPTY),
clockService,
Set.of(PartitionReplicationMessageGroup.class,
TxMessageGroup.class),
placementDriver,
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index 2053a21083b..645bae843e7 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/**
* Util class for methods needed for the rebalance process.
@@ -686,6 +687,7 @@ public class ZoneRebalanceUtil {
* @param partitionId Partition identifier.
* @return Pending partition assignments.
*/
+ @TestOnly
public static CompletableFuture<Set<Assignment>>
pendingPartitionAssignments(
MetaStorageManager metaStorageManager,
int zoneId,
@@ -797,4 +799,23 @@ public class ZoneRebalanceUtil {
return e != null ? AssignmentsChain.fromBytes(e.value()) : null;
}
+
+ /**
+ * Returns stable partition assignments from meta storage.
+ *
+ * @param metaStorageManager Meta storage manager.
+ * @param zonePartitionId Zone partition id.
+ * @return Future with partition assignments as a value.
+ */
+ public static CompletableFuture<Assignments>
zonePartitionStableAssignments(
+ MetaStorageManager metaStorageManager,
+ ZonePartitionId zonePartitionId
+ ) {
+ return metaStorageManager
+ .get(stablePartAssignmentsKey(zonePartitionId))
+ .thenApply(e -> e == null || e.value() == null || e.empty() ||
e.tombstone()
+ ? Assignments.EMPTY
+ : Assignments.fromBytes(e.value())
+ );
+ }
}
diff --git
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index 53df45122c8..d78113e3a86 100644
---
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -72,8 +72,10 @@ class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
}
@Override
- public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
- HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
+ long timeoutMillis
+ ) {
return failedFuture(new
UnsupportedOperationException("awaitNonEmptyAssignments() is not supported in
TestPlacementDriver yet."));
}
diff --git
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
index 06198b3be41..ba5639cc202 100644
---
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
+++
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
@@ -157,6 +157,22 @@ public class Assignments {
return nodes.isEmpty();
}
+ /**
+ * Checks whether the assignments contain an assignment for a node with
the given consistent ID, either peer assignment or not.
+ *
+ * @param consistentId Consistent ID of the node.
+ * @return {@code true} if the assignments contain an assignment for a
node with the given consistent ID.
+ */
+ public boolean contains(String consistentId) {
+ for (Assignment a : nodes) {
+ if (a.consistentId().equals(consistentId)) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
/**
* Serializes the instance into an array of bytes.
*/
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index f7b22413146..7aeaa3351cf 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -123,6 +123,7 @@ import
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.raft.Loza;
@@ -641,6 +642,7 @@ public class Node {
name,
clusterService,
cmgManager,
+ groupId -> completedFuture(Assignments.EMPTY),
clockService,
Set.of(PartitionReplicationMessageGroup.class,
TxMessageGroup.class),
placementDriverManager.placementDriver(),
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
index b8bff4db7a6..1d068bf6325 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
@@ -128,8 +128,10 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
}
@Override
- public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
- HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
+ long timeoutMillis
+ ) {
List<TokenizedAssignments> assignments = tokenizedAssignments;
if (assignments == null) {
@@ -148,7 +150,7 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
throw new IllegalStateException("Primary replica is not defined in
test PlacementDriver");
}
- return CompletableFuture.completedFuture(primary);
+ return completedFuture(primary);
}
@Override
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 0394c11a97f..7d6b1e216e3 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -226,6 +226,7 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
nodeName,
clusterService,
cmgManager,
+ groupId -> completedFuture(Assignments.EMPTY),
clockService,
Set.of(),
placementDriver,
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
index c07bb7d2d59..62578e17bf5 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
@@ -68,13 +68,11 @@ public interface AssignmentsPlacementDriver {
* {@link EmptyAssignmentsException} is thrown.
*
* @param replicationGroupIds List of replication group Ids.
- * @param clusterTimeToAwait Cluster time to await.
* @param timeoutMillis Timeout in milliseconds.
* @return List of tokenized assignments.
*/
CompletableFuture<List<TokenizedAssignments>> awaitNonEmptyAssignments(
List<? extends ReplicationGroupId> replicationGroupIds,
- HybridTimestamp clusterTimeToAwait,
long timeoutMillis
);
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
index c60d9b0e5e7..f07790bf7c8 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
@@ -69,10 +69,9 @@ abstract class DelegatingPlacementDriver implements
PlacementDriver {
@Override
public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
List<? extends ReplicationGroupId> replicationGroupIds,
- HybridTimestamp clusterTimeToAwait,
long timeoutMillis
) {
- return delegate.awaitNonEmptyAssignments(replicationGroupIds,
clusterTimeToAwait, timeoutMillis);
+ return delegate.awaitNonEmptyAssignments(replicationGroupIds,
timeoutMillis);
}
@Override
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
index b514be9b950..0c8fae58737 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
@@ -70,8 +70,10 @@ public class ExecutorInclinedPlacementDriver extends
DelegatingPlacementDriver {
}
@Override
- public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
- HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
- return
decorateFuture(super.awaitNonEmptyAssignments(replicationGroupIds,
clusterTimeToAwait, timeoutMillis));
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
+ long timeoutMillis
+ ) {
+ return
decorateFuture(super.awaitNonEmptyAssignments(replicationGroupIds,
timeoutMillis));
}
}
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 3f5b5c7827a..80442d123b9 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -107,8 +107,10 @@ public class TestPlacementDriver extends
AbstractEventProducer<PrimaryReplicaEve
}
@Override
- public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
- HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+ public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
+ List<? extends ReplicationGroupId> replicationGroupIds,
+ long timeoutMillis
+ ) {
return failedFuture(new
UnsupportedOperationException("awaitNonEmptyAssignments() is not supported in
TestPlacementDriver yet."));
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 4a446996021..7f4f6b57ee5 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -176,17 +176,10 @@ public class AssignmentsTracker implements
AssignmentsPlacementDriver {
@Override
public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
List<? extends ReplicationGroupId> replicationGroupIds,
- HybridTimestamp clusterTimeToAwait,
long timeoutMillis
) {
- return msManager
- .clusterTime()
- .waitFor(clusterTimeToAwait)
- .thenCompose(ignored -> inBusyLock(busyLock, () -> {
- long now = coarseCurrentTimeMillis();
- return
awaitNonEmptyAssignmentsWithCheckMostRecent(replicationGroupIds, now,
timeoutMillis);
- }))
- .thenApply(identity());
+ long now = coarseCurrentTimeMillis();
+ return
awaitNonEmptyAssignmentsWithCheckMostRecent(replicationGroupIds, now,
timeoutMillis);
}
private CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignmentsWithCheckMostRecent(
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 3d351993642..6e3a89c1d89 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -334,8 +334,8 @@ public class PlacementDriverManager implements
IgniteComponent {
@Override
public CompletableFuture<List<TokenizedAssignments>>
awaitNonEmptyAssignments(
- List<? extends ReplicationGroupId> replicationGroupIds,
HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
- return
assignmentsTracker.awaitNonEmptyAssignments(replicationGroupIds,
clusterTimeToAwait, timeoutMillis);
+ List<? extends ReplicationGroupId> replicationGroupIds,
long timeoutMillis) {
+ return
assignmentsTracker.awaitNonEmptyAssignments(replicationGroupIds, timeoutMillis);
}
@Override
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
index 1cec7d4a00a..e69c05c7929 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
@@ -94,7 +94,7 @@ public class AssignmentsTrackerTest extends
BaseIgniteAbstractTest {
@Test
public void testInitialEmptyAssignmentsWithSuccessfulWaiting() {
CompletableFuture<List<TokenizedAssignments>> assignmentsListFuture =
assignmentsTracker
- .awaitNonEmptyAssignments(List.of(groupId0),
metaStorageManager.clusterTime().currentSafeTime(), 10_000);
+ .awaitNonEmptyAssignments(List.of(groupId0), 10_000);
assertFalse(assignmentsListFuture.isDone());
@@ -108,7 +108,7 @@ public class AssignmentsTrackerTest extends
BaseIgniteAbstractTest {
@Test
public void testChangeAssignmentsForOneGroupWhileWaitingForAnother() {
CompletableFuture<List<TokenizedAssignments>> assignmentsListFuture =
assignmentsTracker
- .awaitNonEmptyAssignments(List.of(groupId0, groupId1),
metaStorageManager.clusterTime().currentSafeTime(), 10_000);
+ .awaitNonEmptyAssignments(List.of(groupId0, groupId1), 10_000);
assertFalse(assignmentsListFuture.isDone());
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h
b/modules/platforms/cpp/ignite/common/error_codes.h
index af1bec65073..23f91076ba3 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -151,6 +151,7 @@ enum class code : underlying_t {
REPLICA_STOPPING = 0x80008,
GROUP_OVERLOADED = 0x80009,
GROUP_UNAVAILABLE = 0x8000a,
+ REPLICA_ABSENT = 0x8000b,
// Storage group. Group code: 9
INDEX_NOT_BUILT = 0x90001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 70316a61f56..5e6581e7378 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -225,6 +225,7 @@ sql_state error_code_to_sql_state(error::code code) {
case error::code::REPLICA_STOPPING:
case error::code::GROUP_OVERLOADED:
case error::code::GROUP_UNAVAILABLE:
+ case error::code::REPLICA_ABSENT:
return sql_state::SHY000_GENERAL_ERROR;
// Storage group. Group code: 9
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 6ac80fd68b1..478c47335a2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -413,6 +413,9 @@ namespace Apache.Ignite
/// <summary> GroupUnavailable error. </summary>
public const int GroupUnavailable = (GroupCode << 16) | (10 &
0xFFFF);
+
+ /// <summary> ReplicaAbsent error. </summary>
+ public const int ReplicaAbsent = (GroupCode << 16) | (11 & 0xFFFF);
}
/// <summary> Storage errors. </summary>
diff --git a/modules/replicator/build.gradle b/modules/replicator/build.gradle
index f37aa4a4207..4b7eacdcec6 100644
--- a/modules/replicator/build.gradle
+++ b/modules/replicator/build.gradle
@@ -49,6 +49,7 @@ dependencies {
integrationTestImplementation project(':ignite-metrics')
integrationTestImplementation project(':ignite-runner')
integrationTestImplementation project(':ignite-configuration-system')
+ integrationTestImplementation project(':ignite-partition-distribution')
integrationTestImplementation testFixtures(project)
integrationTestImplementation testFixtures(project(':ignite-core'))
integrationTestImplementation
testFixtures(project(':ignite-configuration'))
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 98f87cfa53a..d510cc27271 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.StaticNodeFinder;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverActorMessage;
import
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
@@ -225,6 +226,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
nodeName,
clusterService,
cmgManager,
+ groupId -> completedFuture(Assignments.EMPTY),
testClockService,
Set.of(ReplicaMessageTestGroup.class),
new TestPlacementDriver(primaryReplicaSupplier),
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 64b8c7cd83b..2a9b362cfc1 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -36,11 +36,13 @@ import static
org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE
import static org.apache.ignite.internal.util.CompletableFutures.allOf;
import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_ABSENT_ERR;
import java.io.IOException;
import java.util.ArrayList;
@@ -176,6 +178,9 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/** Cluster group manager. */
private final ClusterManagementGroupManager cmgMgr;
+ /** Supplier of stable assingments, used for replica absence handling. */
+ private final Function<ZonePartitionId, CompletableFuture<Assignments>>
stableAssignmentsSupplier;
+
/** Replica message handler. */
private final NetworkMessageHandler handler;
@@ -237,6 +242,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @param nodeName Node name.
* @param clusterNetSvc Cluster network service.
* @param cmgMgr Cluster group manager.
+ * @param stableAssignmentsSupplier Supplier of stable assignments.
* @param clockService Clock service.
* @param messageGroupsToHandle Message handlers.
* @param placementDriver A placement driver.
@@ -257,6 +263,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
String nodeName,
ClusterService clusterNetSvc,
ClusterManagementGroupManager cmgMgr,
+ Function<ZonePartitionId, CompletableFuture<Assignments>>
stableAssignmentsSupplier,
ClockService clockService,
Set<Class<?>> messageGroupsToHandle,
PlacementDriver placementDriver,
@@ -274,6 +281,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
) {
this.clusterNetSvc = clusterNetSvc;
this.cmgMgr = cmgMgr;
+ this.stableAssignmentsSupplier = stableAssignmentsSupplier;
this.clockService = clockService;
this.messageGroupsToHandle = messageGroupsToHandle;
this.volatileLogStorageFactoryCreator =
volatileLogStorageFactoryCreator;
@@ -374,7 +382,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
HybridTimestamp requestTimestamp = extractTimestamp(request);
if (replicaFut == null || !replicaFut.isDone()) {
- sendReplicaUnavailableErrorResponse(senderConsistentId,
correlationId, groupId, requestTimestamp);
+ sendReplicaUnavailableErrorResponse(senderConsistentId,
correlationId, groupId, requestTimestamp, replicaFut == null);
return;
}
@@ -1048,34 +1056,49 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
String senderConsistentId,
long correlationId,
ReplicationGroupId groupId,
- @Nullable HybridTimestamp requestTimestamp
+ @Nullable HybridTimestamp requestTimestamp,
+ boolean replicaIsAbsent
) {
- if (requestTimestamp != null) {
- clusterNetSvc.messagingService().respond(
- senderConsistentId,
- REPLICA_MESSAGES_FACTORY
- .errorTimestampAwareReplicaResponse()
- .throwable(
- new ReplicaUnavailableException(
- groupId,
-
clusterNetSvc.topologyService().localMember())
- )
-
.timestamp(clockService.updateClock(requestTimestamp))
- .build(),
- correlationId);
+ CompletableFuture<Boolean> replicaInAssignmentsFuture;
+
+ if (replicaIsAbsent && (groupId instanceof ZonePartitionId)) {
+ replicaInAssignmentsFuture =
stableAssignmentsSupplier.apply((ZonePartitionId) groupId)
+ .thenApply(assignments ->
assignments.contains(localNodeConsistentId));
} else {
- clusterNetSvc.messagingService().respond(
- senderConsistentId,
- REPLICA_MESSAGES_FACTORY
- .errorReplicaResponse()
- .throwable(
- new ReplicaUnavailableException(
- groupId,
-
clusterNetSvc.topologyService().localMember())
- )
- .build(),
- correlationId);
+ replicaInAssignmentsFuture = trueCompletedFuture();
}
+
+ replicaInAssignmentsFuture.thenAccept(isInAssignments -> {
+ ReplicaUnavailableException e;
+
+ if (replicaIsAbsent && !isInAssignments) {
+ e = new ReplicaUnavailableException(
+ REPLICA_ABSENT_ERR,
+ format("Replica is absent on this node and not in
assignments, the request should be retried on another node "
+ + "[groupId={}, nodeName={}]",
groupId, localNodeConsistentId
+ )
+ );
+ } else {
+ e = new ReplicaUnavailableException(groupId,
clusterNetSvc.topologyService().localMember());
+ }
+
+ NetworkMessage msg;
+
+ if (requestTimestamp != null) {
+ msg = REPLICA_MESSAGES_FACTORY
+ .errorTimestampAwareReplicaResponse()
+ .throwable(e)
+ .timestamp(clockService.updateClock(requestTimestamp))
+ .build();
+ } else {
+ msg = REPLICA_MESSAGES_FACTORY
+ .errorReplicaResponse()
+ .throwable(e)
+ .build();
+ }
+
+ clusterNetSvc.messagingService().respond(senderConsistentId, msg,
correlationId);
+ });
}
/**
@@ -1193,7 +1216,8 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
ComponentStoppingException.class,
// Not a problem, there will be a retry.
TimeoutException.class,
- GroupOverloadedException.class
+ GroupOverloadedException.class,
+ ReplicaUnavailableException.class
)) {
failureProcessor.process(
new FailureContext(ex, String.format("Could not
advance safe time for %s", replica.groupId())));
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index e6871e3ff48..3831ddc8539 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -23,11 +23,13 @@ import static
org.apache.ignite.internal.util.ExceptionUtils.matchAny;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.GROUP_OVERLOADED_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_ABSENT_ERR;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
+import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -57,6 +59,13 @@ import org.jetbrains.annotations.TestOnly;
/** The service is intended to execute requests on replicas. */
public class ReplicaService {
+ private static final List<Integer> RETRIABLE_ERRORS = List.of(
+ ACQUIRE_LOCK_ERR,
+ REPLICA_MISS_ERR,
+ GROUP_OVERLOADED_ERR,
+ REPLICA_ABSENT_ERR
+ );
+
/** Message service. */
private final MessagingService messagingService;
@@ -170,7 +179,11 @@ public class ReplicaService {
if (response instanceof ErrorReplicaResponse) {
var errResp = (ErrorReplicaResponse) response;
- if (errResp.throwable() instanceof
ReplicaUnavailableException) {
+ // If replica is absent, it means it is not in assignments
and won't appear on that node, no need to wait.
+ boolean shouldWaitForReplica = errResp.throwable()
instanceof ReplicaUnavailableException
+ && ((ReplicaUnavailableException)
errResp.throwable()).code() != REPLICA_ABSENT_ERR;
+
+ if (shouldWaitForReplica) {
CompletableFuture<NetworkMessage> requestFuture = new
CompletableFuture<>();
CompletableFuture<NetworkMessage> awaitReplicaFut =
pendingInvokes.computeIfAbsent(
@@ -248,7 +261,7 @@ public class ReplicaService {
} else {
int replicaOperationRetryInterval =
replicationConfiguration.replicaOperationRetryIntervalMillis().value();
if (retryExecutor != null
- && matchAny(unwrapCause(errResp.throwable()),
ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR, GROUP_OVERLOADED_ERR)
+ && matchAny(unwrapCause(errResp.throwable()),
RETRIABLE_ERRORS)
&& replicaOperationRetryInterval > 0) {
retryExecutor.schedule(
// Need to resubmit again to pool which is
valid for synchronous IO execution.
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
index d8de9144003..67e2b45af26 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.replicator.exception;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
-import java.util.UUID;
import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -27,6 +26,8 @@ import
org.apache.ignite.internal.replicator.ReplicationGroupId;
* The exception is thrown when a replica is not ready to handle a request.
*/
public class ReplicaUnavailableException extends ReplicationException {
+ private static final long serialVersionUID = 9142077461528136559L;
+
/**
* The constructor.
*
@@ -34,18 +35,16 @@ public class ReplicaUnavailableException extends
ReplicationException {
* @param node Node.
*/
public ReplicaUnavailableException(ReplicationGroupId groupId,
InternalClusterNode node) {
- super(REPLICA_UNAVAILABLE_ERR, "Replica is not ready
[replicationGroupId=" + groupId + ", nodeName=" + node.name() + ']');
+ this(REPLICA_UNAVAILABLE_ERR, "Replica is not ready
[replicationGroupId=" + groupId + ", nodeName=" + node.name() + ']');
}
/**
- * The constructor is used for creating an exception instance that is
thrown from a remote server.
+ * The constructor.
*
- * @param traceId Trace id.
* @param code Error code.
* @param message Error message.
- * @param cause Cause exception.
*/
- public ReplicaUnavailableException(UUID traceId, int code, String message,
Throwable cause) {
- super(traceId, code, message, cause);
+ public ReplicaUnavailableException(int code, String message) {
+ super(code, message);
}
}
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 69ae5c362b6..eb774efdd2e 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -19,16 +19,22 @@ package org.apache.ignite.internal.replicator;
import static java.util.UUID.randomUUID;
import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_ABSENT_ERR;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
@@ -36,12 +42,15 @@ import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
+import java.util.HashMap;
+import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.failure.NoOpFailureManager;
@@ -49,10 +58,15 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lang.NodeStoppingException;
import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ChannelType;
import org.apache.ignite.internal.network.ClusterNodeImpl;
import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.InternalClusterNode;
import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.NetworkMessageHandler;
import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Marshaller;
@@ -65,12 +79,19 @@ import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFacto
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.thread.ExecutorChooser;
import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.network.NetworkAddress;
+import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -84,6 +105,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
*/
@ExtendWith(MockitoExtension.class)
public class ReplicaManagerTest extends BaseIgniteAbstractTest {
+ private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new
ReplicaMessagesFactory();
+
private ExecutorService requestsExecutor;
private ReplicaManager replicaManager;
@@ -91,18 +114,25 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
@Mock
private Loza raftManager;
+ private final AtomicReference<NetworkMessageHandler> msgHandlerRef = new
AtomicReference<>();
+
+ private final Map<Long, NetworkMessage> messagingResponses = new
HashMap<>();
+
+ private MessagingService messagingService = new TestMessagingService();
+
@BeforeEach
void startReplicaManager(
TestInfo testInfo,
@Mock ClusterService clusterService,
@Mock ClusterManagementGroupManager cmgManager,
@Mock PlacementDriver placementDriver,
- @Mock MessagingService messagingService,
@Mock TopologyService topologyService,
@Mock Marshaller marshaller,
@Mock TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
@Mock VolatileLogStorageFactoryCreator
volatileLogStorageFactoryCreator
) {
+ messagingResponses.clear();
+
String nodeName = testNodeName(testInfo, 0);
when(clusterService.messagingService()).thenReturn(messagingService);
@@ -125,6 +155,7 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
nodeName,
clusterService,
cmgManager,
+ groupId -> completedFuture(Assignments.EMPTY),
new TestClockService(clock),
Set.of(),
placementDriver,
@@ -225,4 +256,96 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
verify(createReplicaListener).notify(eq(expectedCreateParams));
verify(removeReplicaListener).notify(eq(expectedCreateParams));
}
+
+ @Test
+ public void testReplicaAbsence() {
+ ReplicaSafeTimeSyncRequest replicaRequest =
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+ .groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY,
new ZonePartitionId(-1, -1)))
+ .build();
+
+ long correlationId = 1L;
+
+ msgHandlerRef.get().onReceived(replicaRequest,
mock(InternalClusterNode.class), correlationId);
+
+ Awaitility.await()
+ .timeout(5, TimeUnit.SECONDS)
+ .until(() -> messagingResponses.get(correlationId) != null);
+
+ NetworkMessage resp = messagingResponses.get(correlationId);
+
+ assertNotNull(resp);
+ assertInstanceOf(ErrorReplicaResponse.class, resp);
+
+ ErrorReplicaResponse errorResp = (ErrorReplicaResponse) resp;
+
+ assertInstanceOf(ReplicationException.class, errorResp.throwable());
+
+ ReplicationException e = (ReplicationException) errorResp.throwable();
+ assertEquals(REPLICA_ABSENT_ERR, e.code());
+ }
+
+ private class TestMessagingService implements MessagingService {
+ @Override
+ public void weakSend(InternalClusterNode recipient, NetworkMessage
msg) {
+ // No-op.
+ }
+
+ @Override
+ public CompletableFuture<Void> send(InternalClusterNode recipient,
ChannelType channelType, NetworkMessage msg) {
+ // No-op.
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> send(String recipientConsistentId,
ChannelType channelType, NetworkMessage msg) {
+ // No-op.
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> send(NetworkAddress
recipientNetworkAddress, ChannelType channelType, NetworkMessage msg) {
+ // No-op.
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<Void> respond(InternalClusterNode recipient,
ChannelType channelType, NetworkMessage msg,
+ long correlationId) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public CompletableFuture<Void> respond(String recipientConsistentId,
ChannelType channelType, NetworkMessage msg,
+ long correlationId) {
+ messagingResponses.put(correlationId, msg);
+ return completedFuture(null);
+ }
+
+ @Override
+ public CompletableFuture<NetworkMessage> invoke(InternalClusterNode
recipient, ChannelType channelType, NetworkMessage msg,
+ long timeout) {
+ // No-op.
+ return null;
+ }
+
+ @Override
+ public CompletableFuture<NetworkMessage> invoke(String
recipientConsistentId, ChannelType channelType, NetworkMessage msg,
+ long timeout) {
+ // No-op.
+ return null;
+ }
+
+ @Override
+ public void addMessageHandler(Class<?> messageGroup,
NetworkMessageHandler handler) {
+ if (messageGroup.equals(ReplicaMessageGroup.class)) {
+ msgHandlerRef.set(handler);
+ }
+ }
+
+ @Override
+ public void addMessageHandler(Class<?> messageGroup,
ExecutorChooser<NetworkMessage> executorChooser,
+ NetworkMessageHandler handler) {
+ addMessageHandler(messageGroup, handler);
+ }
+ }
}
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 729742a9dfd..22fb3e072e9 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -30,6 +30,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesTest
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionStableAssignments;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultChannelTypeRegistry;
import static
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
@@ -600,6 +601,7 @@ public class ItIgniteNodeRestartTest extends
BaseIgniteRestartTest {
name,
clusterSvc,
cmgManager,
+ groupId -> zonePartitionStableAssignments(metaStorageMgr,
groupId),
clockService,
Set.of(PartitionReplicationMessageGroup.class,
TxMessageGroup.class),
placementDriverManager.placementDriver(),
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index e09eab22191..33687ab6e40 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -24,6 +24,7 @@ import static
org.apache.ignite.internal.configuration.IgnitePaths.metastoragePa
import static
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
import static org.apache.ignite.internal.configuration.IgnitePaths.vaultPath;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionStableAssignments;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
@@ -917,6 +918,7 @@ public class IgniteImpl implements Ignite {
name,
clusterSvc,
cmgMgr,
+ groupId -> zonePartitionStableAssignments(metaStorageMgr,
groupId),
clockService,
Set.of(PartitionReplicationMessageGroup.class,
TxMessageGroup.class),
placementDriverMgr.placementDriver(),
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
index 5e71c6a485e..8ceef87b6c4 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
@@ -125,7 +125,7 @@ class QueryExecutionProgram extends
Program<AsyncSqlCursor<InternalSqlRow>> {
return true;
}
- return lockConflict(th) || replicaMiss(th) || groupOverloaded(th);
+ return lockConflict(th) || replicaMiss(th) || groupOverloaded(th)
|| replicaAbsent(th);
}
return false;
@@ -144,7 +144,7 @@ class QueryExecutionProgram extends
Program<AsyncSqlCursor<InternalSqlRow>> {
return false;
}
- return nodeLeft(th) || lockConflict(th) || replicaMiss(th) ||
groupOverloaded(th)
+ return nodeLeft(th) || lockConflict(th) || replicaMiss(th) ||
groupOverloaded(th) || replicaAbsent(th)
|| multiStepPlanOutdated(th) || incompatibleSchemaChange(th)
|| fastPlanSchemaVersionMismatch(th);
}
@@ -164,6 +164,10 @@ class QueryExecutionProgram extends
Program<AsyncSqlCursor<InternalSqlRow>> {
return ExceptionUtils.extractCodeFrom(th) ==
Replicator.GROUP_OVERLOADED_ERR;
}
+ private static boolean replicaAbsent(Throwable th) {
+ return ExceptionUtils.extractCodeFrom(th) ==
Replicator.REPLICA_ABSENT_ERR;
+ }
+
private static boolean multiStepPlanOutdated(Throwable th) {
return th instanceof SqlPlanOutdatedException;
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
index 8f4acb3ab17..675a7a8f450 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
@@ -91,7 +91,7 @@ public class ExecutionDistributionProviderImpl implements
ExecutionDistributionP
replicationGroupIds.add(new ZonePartitionId(table.zoneId(),
partitionIndex));
}
- return allReplicas(replicationGroupIds, operationTime);
+ return allReplicas(replicationGroupIds);
}
List<CompletableFuture<TokenizedAssignments>> result = new
ArrayList<>(partitions);
@@ -140,13 +140,9 @@ public class ExecutionDistributionProviderImpl implements
ExecutionDistributionP
});
}
- private CompletableFuture<List<TokenizedAssignments>> allReplicas(
- List<ZonePartitionId> replicationGroupIds,
- HybridTimestamp operationTime
- ) {
+ private CompletableFuture<List<TokenizedAssignments>>
allReplicas(List<ZonePartitionId> replicationGroupIds) {
return placementDriver.awaitNonEmptyAssignments(
replicationGroupIds,
- operationTime,
AWAIT_NON_EMPTY_ASSIGNMENTS_TIMEOUT_MILLIS
);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index a5aa72032b9..bb75d8b378f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -220,7 +220,7 @@ public class MappingServiceImpl implements MappingService,
LogicalTopologyEventL
for (IgniteTable tbl : tables) {
CompletableFuture<List<TokenizedAssignments>> assignments =
distributionProvider
- .forTable(clock.now(), tbl, mapOnBackups);
+ .forTable(clock.current(), tbl, mapOnBackups);
tablesAssignments.put(tbl.id(), assignments);
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 152913d0df4..5f229335e8e 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.network.StaticNodeFinder;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
import
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
import
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowReplicaRequest;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -209,6 +210,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
NODE_NAME,
clusterService,
cmgManager,
+ groupId -> completedFuture(Assignments.EMPTY),
testClockService,
Set.of(PartitionReplicationMessageGroup.class,
TxMessageGroup.class),
new
TestPlacementDriver(clusterService.topologyService().localMember()),
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 1059074a828..700c06e6711 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -56,6 +56,7 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
import static
org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_ERR;
import static
org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.GROUP_OVERLOADED_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_ABSENT_ERR;
import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
import static
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
@@ -2296,6 +2297,7 @@ public class InternalTableImpl implements InternalTable {
GROUP_OVERLOADED_ERR,
REPLICA_MISS_ERR,
REPLICA_UNAVAILABLE_ERR,
+ REPLICA_ABSENT_ERR,
PRIMARY_REPLICA_AWAIT_ERR,
PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR
);
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 54eb46cdc9e..25c1309e1c3 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -438,6 +438,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
NODE_NAME,
clusterService,
mock(ClusterManagementGroupManager.class, RETURNS_DEEP_STUBS),
+ groupId -> completedFuture(Assignments.EMPTY),
clockService,
Set.of(),
placementDriver,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 2118a46bf90..5038bc62959 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -107,6 +107,7 @@ import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDa
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionsSnapshots;
import
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
@@ -490,6 +491,7 @@ public class ItTxTestCluster {
nodeName,
clusterService,
cmgManager,
+ groupId -> completedFuture(Assignments.EMPTY),
clockService,
Set.of(PartitionReplicationMessageGroup.class,
TxMessageGroup.class),
placementDriver,
diff --git
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
index 6ec051c40fb..f03ee4f7ba2 100644
---
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
+++
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.replicator.ReplicaService;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.exception.AwaitReplicaTimeoutException;
import
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
import org.apache.ignite.internal.tx.message.TxMessagesFactory;
import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
@@ -186,7 +187,8 @@ public class PersistentTxStateVacuumizer {
// the persistent tx state.
// Also, replica calls from PersistentTxStateVacuumizer are
local, so retry with new primary replica most likely will
// happen on another node.
- AwaitReplicaTimeoutException.class
+ AwaitReplicaTimeoutException.class,
+ ReplicaUnavailableException.class
);
}