This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 82dd62fb1dd IGNITE-27703 Remove waiting for cluster time during 
assignments waiting (#7531)
82dd62fb1dd is described below

commit 82dd62fb1ddf1a049c6ebd3ec171dfd8b792d479
Author: Denis Chudov <[email protected]>
AuthorDate: Thu Feb 5 22:30:36 2026 +0200

    IGNITE-27703 Remove waiting for cluster time during assignments waiting 
(#7531)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   |   3 +
 .../ignite/client/handler/FakePlacementDriver.java |   2 +-
 .../ignite/internal/util/ExceptionUtils.java       |  19 ++++
 .../rebalance/ItRebalanceDistributedTest.java      |   2 +
 .../rebalance/ZoneRebalanceUtil.java               |  21 ++++
 .../ignite/internal/index/TestPlacementDriver.java |   6 +-
 .../partitiondistribution/Assignments.java         |  16 +++
 .../partition/replicator/fixtures/Node.java        |   2 +
 .../replicator/fixtures/TestPlacementDriver.java   |   8 +-
 .../PartitionReplicaLifecycleManagerTest.java      |   1 +
 .../AssignmentsPlacementDriver.java                |   2 -
 .../wrappers/DelegatingPlacementDriver.java        |   3 +-
 .../wrappers/ExecutorInclinedPlacementDriver.java  |   8 +-
 .../placementdriver/TestPlacementDriver.java       |   6 +-
 .../placementdriver/AssignmentsTracker.java        |  11 +-
 .../placementdriver/PlacementDriverManager.java    |   4 +-
 .../placementdriver/AssignmentsTrackerTest.java    |   4 +-
 modules/platforms/cpp/ignite/common/error_codes.h  |   1 +
 modules/platforms/cpp/ignite/odbc/common_types.cpp |   1 +
 .../platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs |   3 +
 modules/replicator/build.gradle                    |   1 +
 .../ItPlacementDriverReplicaSideTest.java          |   2 +
 .../ignite/internal/replicator/ReplicaManager.java |  78 ++++++++-----
 .../ignite/internal/replicator/ReplicaService.java |  17 ++-
 .../exception/ReplicaUnavailableException.java     |  13 +--
 .../internal/replicator/ReplicaManagerTest.java    | 125 ++++++++++++++++++++-
 .../runner/app/ItIgniteNodeRestartTest.java        |   2 +
 .../org/apache/ignite/internal/app/IgniteImpl.java |   2 +
 .../sql/engine/exec/fsm/QueryExecutionProgram.java |   8 +-
 .../mapping/ExecutionDistributionProviderImpl.java |   8 +-
 .../engine/exec/mapping/MappingServiceImpl.java    |   2 +-
 .../ignite/distributed/ReplicaUnavailableTest.java |   2 +
 .../distributed/storage/InternalTableImpl.java     |   2 +
 .../distributed/TableManagerRecoveryTest.java      |   1 +
 .../apache/ignite/distributed/ItTxTestCluster.java |   2 +
 .../tx/impl/PersistentTxStateVacuumizer.java       |   4 +-
 36 files changed, 317 insertions(+), 75 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index 51db3a334a9..25a7c7048ab 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -496,6 +496,9 @@ public class ErrorGroups {
 
         /** Replication group unavailable exception code. */
         public static final int GROUP_UNAVAILABLE_ERR = 
REPLICATOR_ERR_GROUP.registerErrorCode((short) 10);
+
+        /** Replica is absent on the node and the node is not in assignments 
for this replica. */
+        public static final int REPLICA_ABSENT_ERR = 
REPLICATOR_ERR_GROUP.registerErrorCode((short) 11);
     }
 
     /** Storage error group. */
diff --git 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
index d4afe93e403..7714422d85f 100644
--- 
a/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
+++ 
b/modules/client-handler/src/testFixtures/java/org/apache/ignite/client/handler/FakePlacementDriver.java
@@ -132,7 +132,7 @@ public class FakePlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     @Override
     public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(
             List<? extends ReplicationGroupId> replicationGroupIds,
-            HybridTimestamp clusterTimeToAwait, long timeoutMillis
+            long timeoutMillis
     ) {
         return failedFuture(new 
UnsupportedOperationException("awaitNonEmptyAssignments() is not supported in 
FakePlacementDriver yet."));
     }
diff --git 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
index e7d52116b5c..07c61e925f1 100644
--- 
a/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
+++ 
b/modules/core/src/main/java/org/apache/ignite/internal/util/ExceptionUtils.java
@@ -754,6 +754,25 @@ public final class ExceptionUtils {
         return false;
     }
 
+    /**
+     * Determine if a particular error matches any of passed error codes.
+     *
+     * @param t Unwrapped throwable.
+     * @param codes The codes list.
+     * @return {@code True} if exception allows retry.
+     */
+    public static boolean matchAny(Throwable t, List<Integer> codes) {
+        int errCode = extractCodeFrom(t);
+
+        for (int c0 : codes) {
+            if (c0 == errCode) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     // TODO: https://issues.apache.org/jira/browse/IGNITE-19870
     // This method should be removed or re-worked and usages should be changed 
to IgniteExceptionMapperUtil.mapToPublicException.
     /**
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index a0b39880129..02b5cbc15c2 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -19,6 +19,7 @@ package org.apache.ignite.internal.rebalance;
 
 import static java.util.Collections.reverse;
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
@@ -1471,6 +1472,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     name,
                     clusterService,
                     cmgManager,
+                    groupId -> completedFuture(Assignments.EMPTY),
                     clockService,
                     Set.of(PartitionReplicationMessageGroup.class, 
TxMessageGroup.class),
                     placementDriver,
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
index 2053a21083b..645bae843e7 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/ZoneRebalanceUtil.java
@@ -71,6 +71,7 @@ import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.util.IgniteSpinBusyLock;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /**
  * Util class for methods needed for the rebalance process.
@@ -686,6 +687,7 @@ public class ZoneRebalanceUtil {
      * @param partitionId Partition identifier.
      * @return Pending partition assignments.
      */
+    @TestOnly
     public static CompletableFuture<Set<Assignment>> 
pendingPartitionAssignments(
             MetaStorageManager metaStorageManager,
             int zoneId,
@@ -797,4 +799,23 @@ public class ZoneRebalanceUtil {
 
         return e != null ? AssignmentsChain.fromBytes(e.value()) : null;
     }
+
+    /**
+     * Returns stable partition assignments from meta storage.
+     *
+     * @param metaStorageManager Meta storage manager.
+     * @param zonePartitionId Zone partition id.
+     * @return Future with partition assignments as a value.
+     */
+    public static CompletableFuture<Assignments> 
zonePartitionStableAssignments(
+            MetaStorageManager metaStorageManager,
+            ZonePartitionId zonePartitionId
+    ) {
+        return metaStorageManager
+                .get(stablePartAssignmentsKey(zonePartitionId))
+                .thenApply(e -> e == null || e.value() == null || e.empty() || 
e.tombstone()
+                        ? Assignments.EMPTY
+                        : Assignments.fromBytes(e.value())
+                );
+    }
 }
diff --git 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
index 53df45122c8..d78113e3a86 100644
--- 
a/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
+++ 
b/modules/index/src/test/java/org/apache/ignite/internal/index/TestPlacementDriver.java
@@ -72,8 +72,10 @@ class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
     }
 
     @Override
-    public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
-            HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+    public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
+            long timeoutMillis
+    ) {
         return failedFuture(new 
UnsupportedOperationException("awaitNonEmptyAssignments() is not supported in 
TestPlacementDriver yet."));
     }
 
diff --git 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
index 06198b3be41..ba5639cc202 100644
--- 
a/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
+++ 
b/modules/partition-distribution/src/main/java/org/apache/ignite/internal/partitiondistribution/Assignments.java
@@ -157,6 +157,22 @@ public class Assignments {
         return nodes.isEmpty();
     }
 
+    /**
+     * Checks whether the assignments contain an assignment for a node with 
the given consistent ID, either peer assignment or not.
+     *
+     * @param consistentId Consistent ID of the node.
+     * @return {@code true} if the assignments contain an assignment for a 
node with the given consistent ID.
+     */
+    public boolean contains(String consistentId) {
+        for (Assignment a : nodes) {
+            if (a.consistentId().equals(consistentId)) {
+                return true;
+            }
+        }
+
+        return false;
+    }
+
     /**
      * Serializes the instance into an array of bytes.
      */
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
index f7b22413146..7aeaa3351cf 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/Node.java
@@ -123,6 +123,7 @@ import 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.placementdriver.PlacementDriverManager;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.raft.Loza;
@@ -641,6 +642,7 @@ public class Node {
                 name,
                 clusterService,
                 cmgManager,
+                groupId -> completedFuture(Assignments.EMPTY),
                 clockService,
                 Set.of(PartitionReplicationMessageGroup.class, 
TxMessageGroup.class),
                 placementDriverManager.placementDriver(),
diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
index b8bff4db7a6..1d068bf6325 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/fixtures/TestPlacementDriver.java
@@ -128,8 +128,10 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     }
 
     @Override
-    public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
-            HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+    public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
+            long timeoutMillis
+    ) {
         List<TokenizedAssignments> assignments = tokenizedAssignments;
 
         if (assignments == null) {
@@ -148,7 +150,7 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
             throw new IllegalStateException("Primary replica is not defined in 
test PlacementDriver");
         }
 
-        return CompletableFuture.completedFuture(primary);
+        return completedFuture(primary);
     }
 
     @Override
diff --git 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 0394c11a97f..7d6b1e216e3 100644
--- 
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++ 
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -226,6 +226,7 @@ class PartitionReplicaLifecycleManagerTest extends 
BaseIgniteAbstractTest {
                 nodeName,
                 clusterService,
                 cmgManager,
+                groupId -> completedFuture(Assignments.EMPTY),
                 clockService,
                 Set.of(),
                 placementDriver,
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
index c07bb7d2d59..62578e17bf5 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsPlacementDriver.java
@@ -68,13 +68,11 @@ public interface AssignmentsPlacementDriver {
      * {@link EmptyAssignmentsException} is thrown.
      *
      * @param replicationGroupIds List of replication group Ids.
-     * @param clusterTimeToAwait Cluster time to await.
      * @param timeoutMillis Timeout in milliseconds.
      * @return List of tokenized assignments.
      */
     CompletableFuture<List<TokenizedAssignments>> awaitNonEmptyAssignments(
             List<? extends ReplicationGroupId> replicationGroupIds,
-            HybridTimestamp clusterTimeToAwait,
             long timeoutMillis
     );
 }
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
index c60d9b0e5e7..f07790bf7c8 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/DelegatingPlacementDriver.java
@@ -69,10 +69,9 @@ abstract class DelegatingPlacementDriver implements 
PlacementDriver {
     @Override
     public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(
             List<? extends ReplicationGroupId> replicationGroupIds,
-            HybridTimestamp clusterTimeToAwait,
             long timeoutMillis
     ) {
-        return delegate.awaitNonEmptyAssignments(replicationGroupIds, 
clusterTimeToAwait, timeoutMillis);
+        return delegate.awaitNonEmptyAssignments(replicationGroupIds, 
timeoutMillis);
     }
 
     @Override
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
index b514be9b950..0c8fae58737 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/wrappers/ExecutorInclinedPlacementDriver.java
@@ -70,8 +70,10 @@ public class ExecutorInclinedPlacementDriver extends 
DelegatingPlacementDriver {
     }
 
     @Override
-    public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
-            HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
-        return 
decorateFuture(super.awaitNonEmptyAssignments(replicationGroupIds, 
clusterTimeToAwait, timeoutMillis));
+    public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
+            long timeoutMillis
+    ) {
+        return 
decorateFuture(super.awaitNonEmptyAssignments(replicationGroupIds, 
timeoutMillis));
     }
 }
diff --git 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 3f5b5c7827a..80442d123b9 100644
--- 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -107,8 +107,10 @@ public class TestPlacementDriver extends 
AbstractEventProducer<PrimaryReplicaEve
     }
 
     @Override
-    public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(List<? extends ReplicationGroupId> replicationGroupIds,
-            HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
+    public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(
+            List<? extends ReplicationGroupId> replicationGroupIds,
+            long timeoutMillis
+    ) {
         return failedFuture(new 
UnsupportedOperationException("awaitNonEmptyAssignments() is not supported in 
TestPlacementDriver yet."));
     }
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index 4a446996021..7f4f6b57ee5 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -176,17 +176,10 @@ public class AssignmentsTracker implements 
AssignmentsPlacementDriver {
     @Override
     public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(
             List<? extends ReplicationGroupId> replicationGroupIds,
-            HybridTimestamp clusterTimeToAwait,
             long timeoutMillis
     ) {
-        return msManager
-                .clusterTime()
-                .waitFor(clusterTimeToAwait)
-                .thenCompose(ignored -> inBusyLock(busyLock, () -> {
-                    long now = coarseCurrentTimeMillis();
-                    return 
awaitNonEmptyAssignmentsWithCheckMostRecent(replicationGroupIds, now, 
timeoutMillis);
-                }))
-                .thenApply(identity());
+        long now = coarseCurrentTimeMillis();
+        return 
awaitNonEmptyAssignmentsWithCheckMostRecent(replicationGroupIds, now, 
timeoutMillis);
     }
 
     private CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignmentsWithCheckMostRecent(
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
index 3d351993642..6e3a89c1d89 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriverManager.java
@@ -334,8 +334,8 @@ public class PlacementDriverManager implements 
IgniteComponent {
 
             @Override
             public CompletableFuture<List<TokenizedAssignments>> 
awaitNonEmptyAssignments(
-                    List<? extends ReplicationGroupId> replicationGroupIds, 
HybridTimestamp clusterTimeToAwait, long timeoutMillis) {
-                return 
assignmentsTracker.awaitNonEmptyAssignments(replicationGroupIds, 
clusterTimeToAwait, timeoutMillis);
+                    List<? extends ReplicationGroupId> replicationGroupIds, 
long timeoutMillis) {
+                return 
assignmentsTracker.awaitNonEmptyAssignments(replicationGroupIds, timeoutMillis);
             }
 
             @Override
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
index 1cec7d4a00a..e69c05c7929 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/AssignmentsTrackerTest.java
@@ -94,7 +94,7 @@ public class AssignmentsTrackerTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testInitialEmptyAssignmentsWithSuccessfulWaiting() {
         CompletableFuture<List<TokenizedAssignments>> assignmentsListFuture = 
assignmentsTracker
-                .awaitNonEmptyAssignments(List.of(groupId0), 
metaStorageManager.clusterTime().currentSafeTime(), 10_000);
+                .awaitNonEmptyAssignments(List.of(groupId0), 10_000);
 
         assertFalse(assignmentsListFuture.isDone());
 
@@ -108,7 +108,7 @@ public class AssignmentsTrackerTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testChangeAssignmentsForOneGroupWhileWaitingForAnother() {
         CompletableFuture<List<TokenizedAssignments>> assignmentsListFuture = 
assignmentsTracker
-                .awaitNonEmptyAssignments(List.of(groupId0, groupId1), 
metaStorageManager.clusterTime().currentSafeTime(), 10_000);
+                .awaitNonEmptyAssignments(List.of(groupId0, groupId1), 10_000);
 
         assertFalse(assignmentsListFuture.isDone());
 
diff --git a/modules/platforms/cpp/ignite/common/error_codes.h 
b/modules/platforms/cpp/ignite/common/error_codes.h
index af1bec65073..23f91076ba3 100644
--- a/modules/platforms/cpp/ignite/common/error_codes.h
+++ b/modules/platforms/cpp/ignite/common/error_codes.h
@@ -151,6 +151,7 @@ enum class code : underlying_t {
     REPLICA_STOPPING = 0x80008,
     GROUP_OVERLOADED = 0x80009,
     GROUP_UNAVAILABLE = 0x8000a,
+    REPLICA_ABSENT = 0x8000b,
 
     // Storage group. Group code: 9
     INDEX_NOT_BUILT = 0x90001,
diff --git a/modules/platforms/cpp/ignite/odbc/common_types.cpp 
b/modules/platforms/cpp/ignite/odbc/common_types.cpp
index 70316a61f56..5e6581e7378 100644
--- a/modules/platforms/cpp/ignite/odbc/common_types.cpp
+++ b/modules/platforms/cpp/ignite/odbc/common_types.cpp
@@ -225,6 +225,7 @@ sql_state error_code_to_sql_state(error::code code) {
         case error::code::REPLICA_STOPPING:
         case error::code::GROUP_OVERLOADED:
         case error::code::GROUP_UNAVAILABLE:
+        case error::code::REPLICA_ABSENT:
             return sql_state::SHY000_GENERAL_ERROR;
 
         // Storage group. Group code: 9
diff --git a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs 
b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
index 6ac80fd68b1..478c47335a2 100644
--- a/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
+++ b/modules/platforms/dotnet/Apache.Ignite/ErrorCodes.g.cs
@@ -413,6 +413,9 @@ namespace Apache.Ignite
 
             /// <summary> GroupUnavailable error. </summary>
             public const int GroupUnavailable = (GroupCode << 16) | (10 & 
0xFFFF);
+
+            /// <summary> ReplicaAbsent error. </summary>
+            public const int ReplicaAbsent = (GroupCode << 16) | (11 & 0xFFFF);
         }
 
         /// <summary> Storage errors. </summary>
diff --git a/modules/replicator/build.gradle b/modules/replicator/build.gradle
index f37aa4a4207..4b7eacdcec6 100644
--- a/modules/replicator/build.gradle
+++ b/modules/replicator/build.gradle
@@ -49,6 +49,7 @@ dependencies {
     integrationTestImplementation project(':ignite-metrics')
     integrationTestImplementation project(':ignite-runner')
     integrationTestImplementation project(':ignite-configuration-system')
+    integrationTestImplementation project(':ignite-partition-distribution')
     integrationTestImplementation testFixtures(project)
     integrationTestImplementation testFixtures(project(':ignite-core'))
     integrationTestImplementation 
testFixtures(project(':ignite-configuration'))
diff --git 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index 98f87cfa53a..d510cc27271 100644
--- 
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++ 
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -78,6 +78,7 @@ import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.NetworkMessageHandler;
 import org.apache.ignite.internal.network.StaticNodeFinder;
 import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverActorMessage;
 import 
org.apache.ignite.internal.placementdriver.message.PlacementDriverMessageGroup;
@@ -225,6 +226,7 @@ public class ItPlacementDriverReplicaSideTest extends 
IgniteAbstractTest {
                     nodeName,
                     clusterService,
                     cmgManager,
+                    groupId -> completedFuture(Assignments.EMPTY),
                     testClockService,
                     Set.of(ReplicaMessageTestGroup.class),
                     new TestPlacementDriver(primaryReplicaSupplier),
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index 64b8c7cd83b..2a9b362cfc1 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -36,11 +36,13 @@ import static 
org.apache.ignite.internal.thread.ThreadOperation.TX_STATE_STORAGE
 import static org.apache.ignite.internal.util.CompletableFutures.allOf;
 import static 
org.apache.ignite.internal.util.CompletableFutures.isCompletedSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static 
org.apache.ignite.internal.util.CompletableFutures.trueCompletedFuture;
 import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
 import static 
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_ABSENT_ERR;
 
 import java.io.IOException;
 import java.util.ArrayList;
@@ -176,6 +178,9 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     /** Cluster group manager. */
     private final ClusterManagementGroupManager cmgMgr;
 
+    /** Supplier of stable assingments, used for replica absence handling. */
+    private final Function<ZonePartitionId, CompletableFuture<Assignments>> 
stableAssignmentsSupplier;
+
     /** Replica message handler. */
     private final NetworkMessageHandler handler;
 
@@ -237,6 +242,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
      * @param nodeName Node name.
      * @param clusterNetSvc Cluster network service.
      * @param cmgMgr Cluster group manager.
+     * @param stableAssignmentsSupplier Supplier of stable assignments.
      * @param clockService Clock service.
      * @param messageGroupsToHandle Message handlers.
      * @param placementDriver A placement driver.
@@ -257,6 +263,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             String nodeName,
             ClusterService clusterNetSvc,
             ClusterManagementGroupManager cmgMgr,
+            Function<ZonePartitionId, CompletableFuture<Assignments>> 
stableAssignmentsSupplier,
             ClockService clockService,
             Set<Class<?>> messageGroupsToHandle,
             PlacementDriver placementDriver,
@@ -274,6 +281,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
     ) {
         this.clusterNetSvc = clusterNetSvc;
         this.cmgMgr = cmgMgr;
+        this.stableAssignmentsSupplier = stableAssignmentsSupplier;
         this.clockService = clockService;
         this.messageGroupsToHandle = messageGroupsToHandle;
         this.volatileLogStorageFactoryCreator = 
volatileLogStorageFactoryCreator;
@@ -374,7 +382,7 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             HybridTimestamp requestTimestamp = extractTimestamp(request);
 
             if (replicaFut == null || !replicaFut.isDone()) {
-                sendReplicaUnavailableErrorResponse(senderConsistentId, 
correlationId, groupId, requestTimestamp);
+                sendReplicaUnavailableErrorResponse(senderConsistentId, 
correlationId, groupId, requestTimestamp, replicaFut == null);
 
                 return;
             }
@@ -1048,34 +1056,49 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
             String senderConsistentId,
             long correlationId,
             ReplicationGroupId groupId,
-            @Nullable HybridTimestamp requestTimestamp
+            @Nullable HybridTimestamp requestTimestamp,
+            boolean replicaIsAbsent
     ) {
-        if (requestTimestamp != null) {
-            clusterNetSvc.messagingService().respond(
-                    senderConsistentId,
-                    REPLICA_MESSAGES_FACTORY
-                            .errorTimestampAwareReplicaResponse()
-                            .throwable(
-                                    new ReplicaUnavailableException(
-                                            groupId,
-                                            
clusterNetSvc.topologyService().localMember())
-                            )
-                            
.timestamp(clockService.updateClock(requestTimestamp))
-                            .build(),
-                    correlationId);
+        CompletableFuture<Boolean> replicaInAssignmentsFuture;
+
+        if (replicaIsAbsent && (groupId instanceof ZonePartitionId)) {
+            replicaInAssignmentsFuture = 
stableAssignmentsSupplier.apply((ZonePartitionId) groupId)
+                    .thenApply(assignments -> 
assignments.contains(localNodeConsistentId));
         } else {
-            clusterNetSvc.messagingService().respond(
-                    senderConsistentId,
-                    REPLICA_MESSAGES_FACTORY
-                            .errorReplicaResponse()
-                            .throwable(
-                                    new ReplicaUnavailableException(
-                                            groupId,
-                                            
clusterNetSvc.topologyService().localMember())
-                            )
-                            .build(),
-                    correlationId);
+            replicaInAssignmentsFuture = trueCompletedFuture();
         }
+
+        replicaInAssignmentsFuture.thenAccept(isInAssignments -> {
+            ReplicaUnavailableException e;
+
+            if (replicaIsAbsent && !isInAssignments) {
+                e = new ReplicaUnavailableException(
+                        REPLICA_ABSENT_ERR,
+                        format("Replica is absent on this node and not in 
assignments, the request should be retried on another node "
+                                        + "[groupId={}, nodeName={}]", 
groupId, localNodeConsistentId
+                        )
+                );
+            } else {
+                e = new ReplicaUnavailableException(groupId, 
clusterNetSvc.topologyService().localMember());
+            }
+
+            NetworkMessage msg;
+
+            if (requestTimestamp != null) {
+                msg = REPLICA_MESSAGES_FACTORY
+                        .errorTimestampAwareReplicaResponse()
+                        .throwable(e)
+                        .timestamp(clockService.updateClock(requestTimestamp))
+                        .build();
+            } else {
+                msg = REPLICA_MESSAGES_FACTORY
+                        .errorReplicaResponse()
+                        .throwable(e)
+                        .build();
+            }
+
+            clusterNetSvc.messagingService().respond(senderConsistentId, msg, 
correlationId);
+        });
     }
 
     /**
@@ -1193,7 +1216,8 @@ public class ReplicaManager extends 
AbstractEventProducer<LocalReplicaEvent, Loc
                         ComponentStoppingException.class,
                         // Not a problem, there will be a retry.
                         TimeoutException.class,
-                        GroupOverloadedException.class
+                        GroupOverloadedException.class,
+                        ReplicaUnavailableException.class
                 )) {
                     failureProcessor.process(
                             new FailureContext(ex, String.format("Could not 
advance safe time for %s", replica.groupId())));
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
index e6871e3ff48..3831ddc8539 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaService.java
@@ -23,11 +23,13 @@ import static 
org.apache.ignite.internal.util.ExceptionUtils.matchAny;
 import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.GROUP_OVERLOADED_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_ABSENT_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_COMMON_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_TIMEOUT_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
 
+import java.util.List;
 import java.util.Map;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -57,6 +59,13 @@ import org.jetbrains.annotations.TestOnly;
 
 /** The service is intended to execute requests on replicas. */
 public class ReplicaService {
+    private static final List<Integer> RETRIABLE_ERRORS = List.of(
+            ACQUIRE_LOCK_ERR,
+            REPLICA_MISS_ERR,
+            GROUP_OVERLOADED_ERR,
+            REPLICA_ABSENT_ERR
+    );
+
     /** Message service. */
     private final MessagingService messagingService;
 
@@ -170,7 +179,11 @@ public class ReplicaService {
                 if (response instanceof ErrorReplicaResponse) {
                     var errResp = (ErrorReplicaResponse) response;
 
-                    if (errResp.throwable() instanceof 
ReplicaUnavailableException) {
+                    // If replica is absent, it means it is not in assignments 
and won't appear on that node, no need to wait.
+                    boolean shouldWaitForReplica = errResp.throwable() 
instanceof ReplicaUnavailableException
+                            && ((ReplicaUnavailableException) 
errResp.throwable()).code() != REPLICA_ABSENT_ERR;
+
+                    if (shouldWaitForReplica) {
                         CompletableFuture<NetworkMessage> requestFuture = new 
CompletableFuture<>();
 
                         CompletableFuture<NetworkMessage> awaitReplicaFut = 
pendingInvokes.computeIfAbsent(
@@ -248,7 +261,7 @@ public class ReplicaService {
                     } else {
                         int replicaOperationRetryInterval = 
replicationConfiguration.replicaOperationRetryIntervalMillis().value();
                         if (retryExecutor != null
-                                && matchAny(unwrapCause(errResp.throwable()), 
ACQUIRE_LOCK_ERR, REPLICA_MISS_ERR, GROUP_OVERLOADED_ERR)
+                                && matchAny(unwrapCause(errResp.throwable()), 
RETRIABLE_ERRORS)
                                 && replicaOperationRetryInterval > 0) {
                             retryExecutor.schedule(
                                     // Need to resubmit again to pool which is 
valid for synchronous IO execution.
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
index d8de9144003..67e2b45af26 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/exception/ReplicaUnavailableException.java
@@ -19,7 +19,6 @@ package org.apache.ignite.internal.replicator.exception;
 
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 
-import java.util.UUID;
 import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 
@@ -27,6 +26,8 @@ import 
org.apache.ignite.internal.replicator.ReplicationGroupId;
  * The exception is thrown when a replica is not ready to handle a request.
  */
 public class ReplicaUnavailableException extends ReplicationException {
+    private static final long serialVersionUID = 9142077461528136559L;
+
     /**
      * The constructor.
      *
@@ -34,18 +35,16 @@ public class ReplicaUnavailableException extends 
ReplicationException {
      * @param node Node.
      */
     public ReplicaUnavailableException(ReplicationGroupId groupId, 
InternalClusterNode node) {
-        super(REPLICA_UNAVAILABLE_ERR, "Replica is not ready 
[replicationGroupId=" + groupId + ", nodeName=" + node.name() + ']');
+        this(REPLICA_UNAVAILABLE_ERR, "Replica is not ready 
[replicationGroupId=" + groupId + ", nodeName=" + node.name() + ']');
     }
 
     /**
-     * The constructor is used for creating an exception instance that is 
thrown from a remote server.
+     * The constructor.
      *
-     * @param traceId Trace id.
      * @param code Error code.
      * @param message Error message.
-     * @param cause Cause exception.
      */
-    public ReplicaUnavailableException(UUID traceId, int code, String message, 
Throwable cause) {
-        super(traceId, code, message, cause);
+    public ReplicaUnavailableException(int code, String message) {
+        super(code, message);
     }
 }
diff --git 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 69ae5c362b6..eb774efdd2e 100644
--- 
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++ 
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -19,16 +19,22 @@ package org.apache.ignite.internal.replicator;
 
 import static java.util.UUID.randomUUID;
 import static java.util.concurrent.CompletableFuture.allOf;
+import static java.util.concurrent.CompletableFuture.completedFuture;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
 import static 
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
 import static 
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
+import static 
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toZonePartitionIdMessage;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.util.CompletableFutures.emptySetCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.falseCompletedFuture;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_ABSENT_ERR;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertInstanceOf;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.mock;
@@ -36,12 +42,15 @@ import static org.mockito.Mockito.never;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.HashMap;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ForkJoinPool;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 import 
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.failure.NoOpFailureManager;
@@ -49,10 +58,15 @@ import org.apache.ignite.internal.hlc.HybridClockImpl;
 import org.apache.ignite.internal.hlc.TestClockService;
 import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.manager.ComponentContext;
+import org.apache.ignite.internal.network.ChannelType;
 import org.apache.ignite.internal.network.ClusterNodeImpl;
 import org.apache.ignite.internal.network.ClusterService;
+import org.apache.ignite.internal.network.InternalClusterNode;
 import org.apache.ignite.internal.network.MessagingService;
+import org.apache.ignite.internal.network.NetworkMessage;
+import org.apache.ignite.internal.network.NetworkMessageHandler;
 import org.apache.ignite.internal.network.TopologyService;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.Marshaller;
@@ -65,12 +79,19 @@ import 
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFacto
 import org.apache.ignite.internal.raft.server.RaftGroupOptions;
 import org.apache.ignite.internal.raft.service.RaftGroupListener;
 import 
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
+import org.apache.ignite.internal.replicator.exception.ReplicationException;
 import org.apache.ignite.internal.replicator.listener.ReplicaListener;
+import org.apache.ignite.internal.replicator.message.ErrorReplicaResponse;
+import org.apache.ignite.internal.replicator.message.ReplicaMessageGroup;
+import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
+import 
org.apache.ignite.internal.replicator.message.ReplicaSafeTimeSyncRequest;
 import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.apache.ignite.internal.thread.ExecutorChooser;
 import org.apache.ignite.internal.thread.IgniteThreadFactory;
 import org.apache.ignite.internal.util.IgniteUtils;
 import org.apache.ignite.internal.util.PendingComparableValuesTracker;
 import org.apache.ignite.network.NetworkAddress;
+import org.awaitility.Awaitility;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -84,6 +105,8 @@ import org.mockito.junit.jupiter.MockitoExtension;
  */
 @ExtendWith(MockitoExtension.class)
 public class ReplicaManagerTest extends BaseIgniteAbstractTest {
+    private static final ReplicaMessagesFactory REPLICA_MESSAGES_FACTORY = new 
ReplicaMessagesFactory();
+
     private ExecutorService requestsExecutor;
 
     private ReplicaManager replicaManager;
@@ -91,18 +114,25 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
     @Mock
     private Loza raftManager;
 
+    private final AtomicReference<NetworkMessageHandler> msgHandlerRef = new 
AtomicReference<>();
+
+    private final Map<Long, NetworkMessage> messagingResponses = new 
HashMap<>();
+
+    private MessagingService messagingService = new TestMessagingService();
+
     @BeforeEach
     void startReplicaManager(
             TestInfo testInfo,
             @Mock ClusterService clusterService,
             @Mock ClusterManagementGroupManager cmgManager,
             @Mock PlacementDriver placementDriver,
-            @Mock MessagingService messagingService,
             @Mock TopologyService topologyService,
             @Mock Marshaller marshaller,
             @Mock TopologyAwareRaftGroupServiceFactory raftGroupServiceFactory,
             @Mock VolatileLogStorageFactoryCreator 
volatileLogStorageFactoryCreator
     ) {
+        messagingResponses.clear();
+
         String nodeName = testNodeName(testInfo, 0);
 
         when(clusterService.messagingService()).thenReturn(messagingService);
@@ -125,6 +155,7 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
                 nodeName,
                 clusterService,
                 cmgManager,
+                groupId -> completedFuture(Assignments.EMPTY),
                 new TestClockService(clock),
                 Set.of(),
                 placementDriver,
@@ -225,4 +256,96 @@ public class ReplicaManagerTest extends 
BaseIgniteAbstractTest {
         verify(createReplicaListener).notify(eq(expectedCreateParams));
         verify(removeReplicaListener).notify(eq(expectedCreateParams));
     }
+
+    @Test
+    public void testReplicaAbsence() {
+        ReplicaSafeTimeSyncRequest replicaRequest = 
REPLICA_MESSAGES_FACTORY.replicaSafeTimeSyncRequest()
+                .groupId(toZonePartitionIdMessage(REPLICA_MESSAGES_FACTORY, 
new ZonePartitionId(-1, -1)))
+                .build();
+
+        long correlationId = 1L;
+
+        msgHandlerRef.get().onReceived(replicaRequest, 
mock(InternalClusterNode.class), correlationId);
+
+        Awaitility.await()
+                .timeout(5, TimeUnit.SECONDS)
+                .until(() -> messagingResponses.get(correlationId) != null);
+
+        NetworkMessage resp = messagingResponses.get(correlationId);
+
+        assertNotNull(resp);
+        assertInstanceOf(ErrorReplicaResponse.class, resp);
+
+        ErrorReplicaResponse errorResp = (ErrorReplicaResponse) resp;
+
+        assertInstanceOf(ReplicationException.class, errorResp.throwable());
+
+        ReplicationException e = (ReplicationException) errorResp.throwable();
+        assertEquals(REPLICA_ABSENT_ERR, e.code());
+    }
+
+    private class TestMessagingService implements MessagingService {
+        @Override
+        public void weakSend(InternalClusterNode recipient, NetworkMessage 
msg) {
+            // No-op.
+        }
+
+        @Override
+        public CompletableFuture<Void> send(InternalClusterNode recipient, 
ChannelType channelType, NetworkMessage msg) {
+            // No-op.
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Void> send(String recipientConsistentId, 
ChannelType channelType, NetworkMessage msg) {
+            // No-op.
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Void> send(NetworkAddress 
recipientNetworkAddress, ChannelType channelType, NetworkMessage msg) {
+            // No-op.
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<Void> respond(InternalClusterNode recipient, 
ChannelType channelType, NetworkMessage msg,
+                long correlationId) {
+            throw new UnsupportedOperationException();
+        }
+
+        @Override
+        public CompletableFuture<Void> respond(String recipientConsistentId, 
ChannelType channelType, NetworkMessage msg,
+                long correlationId) {
+            messagingResponses.put(correlationId, msg);
+            return completedFuture(null);
+        }
+
+        @Override
+        public CompletableFuture<NetworkMessage> invoke(InternalClusterNode 
recipient, ChannelType channelType, NetworkMessage msg,
+                long timeout) {
+            // No-op.
+            return null;
+        }
+
+        @Override
+        public CompletableFuture<NetworkMessage> invoke(String 
recipientConsistentId, ChannelType channelType, NetworkMessage msg,
+                long timeout) {
+            // No-op.
+            return null;
+        }
+
+        @Override
+        public void addMessageHandler(Class<?> messageGroup, 
NetworkMessageHandler handler) {
+            if (messageGroup.equals(ReplicaMessageGroup.class)) {
+                msgHandlerRef.set(handler);
+            }
+        }
+
+        @Override
+        public void addMessageHandler(Class<?> messageGroup, 
ExecutorChooser<NetworkMessage> executorChooser,
+                NetworkMessageHandler handler) {
+            addMessageHandler(messageGroup, handler);
+        }
+    }
 }
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
index 729742a9dfd..22fb3e072e9 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/runner/app/ItIgniteNodeRestartTest.java
@@ -30,6 +30,7 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTest
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.STABLE_ASSIGNMENTS_PREFIX;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionStableAssignments;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultChannelTypeRegistry;
 import static 
org.apache.ignite.internal.network.utils.ClusterServiceTestUtils.defaultSerializationRegistry;
 import static org.apache.ignite.internal.table.NodeUtils.transferPrimary;
@@ -600,6 +601,7 @@ public class ItIgniteNodeRestartTest extends 
BaseIgniteRestartTest {
                 name,
                 clusterSvc,
                 cmgManager,
+                groupId -> zonePartitionStableAssignments(metaStorageMgr, 
groupId),
                 clockService,
                 Set.of(PartitionReplicationMessageGroup.class, 
TxMessageGroup.class),
                 placementDriverManager.placementDriver(),
diff --git 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
index e09eab22191..33687ab6e40 100644
--- 
a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
+++ 
b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgniteImpl.java
@@ -24,6 +24,7 @@ import static 
org.apache.ignite.internal.configuration.IgnitePaths.metastoragePa
 import static 
org.apache.ignite.internal.configuration.IgnitePaths.partitionsPath;
 import static org.apache.ignite.internal.configuration.IgnitePaths.vaultPath;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.zonePartitionStableAssignments;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_READ;
 import static org.apache.ignite.internal.thread.ThreadOperation.STORAGE_WRITE;
 import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
@@ -917,6 +918,7 @@ public class IgniteImpl implements Ignite {
                 name,
                 clusterSvc,
                 cmgMgr,
+                groupId -> zonePartitionStableAssignments(metaStorageMgr, 
groupId),
                 clockService,
                 Set.of(PartitionReplicationMessageGroup.class, 
TxMessageGroup.class),
                 placementDriverMgr.placementDriver(),
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
index 5e71c6a485e..8ceef87b6c4 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/fsm/QueryExecutionProgram.java
@@ -125,7 +125,7 @@ class QueryExecutionProgram extends 
Program<AsyncSqlCursor<InternalSqlRow>> {
                 return true;
             }
 
-            return lockConflict(th) || replicaMiss(th) || groupOverloaded(th);
+            return lockConflict(th) || replicaMiss(th) || groupOverloaded(th) 
|| replicaAbsent(th);
         }
 
         return false;
@@ -144,7 +144,7 @@ class QueryExecutionProgram extends 
Program<AsyncSqlCursor<InternalSqlRow>> {
             return false;
         }
 
-        return nodeLeft(th) || lockConflict(th) || replicaMiss(th) || 
groupOverloaded(th)
+        return nodeLeft(th) || lockConflict(th) || replicaMiss(th) || 
groupOverloaded(th) || replicaAbsent(th)
                 || multiStepPlanOutdated(th) || incompatibleSchemaChange(th) 
|| fastPlanSchemaVersionMismatch(th);
     }
 
@@ -164,6 +164,10 @@ class QueryExecutionProgram extends 
Program<AsyncSqlCursor<InternalSqlRow>> {
         return ExceptionUtils.extractCodeFrom(th) == 
Replicator.GROUP_OVERLOADED_ERR;
     }
 
+    private static boolean replicaAbsent(Throwable th) {
+        return ExceptionUtils.extractCodeFrom(th) == 
Replicator.REPLICA_ABSENT_ERR;
+    }
+
     private static boolean multiStepPlanOutdated(Throwable th)  {
         return th instanceof SqlPlanOutdatedException;
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
index 8f4acb3ab17..675a7a8f450 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/ExecutionDistributionProviderImpl.java
@@ -91,7 +91,7 @@ public class ExecutionDistributionProviderImpl implements 
ExecutionDistributionP
                 replicationGroupIds.add(new ZonePartitionId(table.zoneId(), 
partitionIndex));
             }
 
-            return allReplicas(replicationGroupIds, operationTime);
+            return allReplicas(replicationGroupIds);
         }
 
         List<CompletableFuture<TokenizedAssignments>> result = new 
ArrayList<>(partitions);
@@ -140,13 +140,9 @@ public class ExecutionDistributionProviderImpl implements 
ExecutionDistributionP
         });
     }
 
-    private CompletableFuture<List<TokenizedAssignments>> allReplicas(
-            List<ZonePartitionId> replicationGroupIds,
-            HybridTimestamp operationTime
-    ) {
+    private CompletableFuture<List<TokenizedAssignments>> 
allReplicas(List<ZonePartitionId> replicationGroupIds) {
         return placementDriver.awaitNonEmptyAssignments(
                 replicationGroupIds,
-                operationTime,
                 AWAIT_NON_EMPTY_ASSIGNMENTS_TIMEOUT_MILLIS
         );
     }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
index a5aa72032b9..bb75d8b378f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/mapping/MappingServiceImpl.java
@@ -220,7 +220,7 @@ public class MappingServiceImpl implements MappingService, 
LogicalTopologyEventL
 
             for (IgniteTable tbl : tables) {
                 CompletableFuture<List<TokenizedAssignments>> assignments = 
distributionProvider
-                        .forTable(clock.now(), tbl, mapOnBackups);
+                        .forTable(clock.current(), tbl, mapOnBackups);
 
                 tablesAssignments.put(tbl.id(), assignments);
             }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 152913d0df4..5f229335e8e 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -69,6 +69,7 @@ import org.apache.ignite.internal.network.StaticNodeFinder;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessageGroup;
 import 
org.apache.ignite.internal.partition.replicator.network.PartitionReplicationMessagesFactory;
 import 
org.apache.ignite.internal.partition.replicator.network.replication.ReadWriteSingleRowReplicaRequest;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
 import org.apache.ignite.internal.raft.Loza;
 import org.apache.ignite.internal.raft.PeersAndLearners;
@@ -209,6 +210,7 @@ public class ReplicaUnavailableTest extends 
IgniteAbstractTest {
                 NODE_NAME,
                 clusterService,
                 cmgManager,
+                groupId -> completedFuture(Assignments.EMPTY),
                 testClockService,
                 Set.of(PartitionReplicationMessageGroup.class, 
TxMessageGroup.class),
                 new 
TestPlacementDriver(clusterService.topologyService().localMember()),
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index 1059074a828..700c06e6711 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -56,6 +56,7 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.GROUP_OVERLOADED_ERR;
+import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_ABSENT_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_MISS_ERR;
 import static 
org.apache.ignite.lang.ErrorGroups.Replicator.REPLICA_UNAVAILABLE_ERR;
 import static org.apache.ignite.lang.ErrorGroups.Transactions.ACQUIRE_LOCK_ERR;
@@ -2296,6 +2297,7 @@ public class InternalTableImpl implements InternalTable {
                 GROUP_OVERLOADED_ERR,
                 REPLICA_MISS_ERR,
                 REPLICA_UNAVAILABLE_ERR,
+                REPLICA_ABSENT_ERR,
                 PRIMARY_REPLICA_AWAIT_ERR,
                 PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR
         );
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index 54eb46cdc9e..25c1309e1c3 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -438,6 +438,7 @@ public class TableManagerRecoveryTest extends 
IgniteAbstractTest {
                 NODE_NAME,
                 clusterService,
                 mock(ClusterManagementGroupManager.class, RETURNS_DEEP_STUBS),
+                groupId -> completedFuture(Assignments.EMPTY),
                 clockService,
                 Set.of(),
                 placementDriver,
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 2118a46bf90..5038bc62959 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -107,6 +107,7 @@ import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDa
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.PartitionsSnapshots;
 import 
org.apache.ignite.internal.partition.replicator.schema.ValidationSchemasSource;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
@@ -490,6 +491,7 @@ public class ItTxTestCluster {
                     nodeName,
                     clusterService,
                     cmgManager,
+                    groupId -> completedFuture(Assignments.EMPTY),
                     clockService,
                     Set.of(PartitionReplicationMessageGroup.class, 
TxMessageGroup.class),
                     placementDriver,
diff --git 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
index 6ec051c40fb..f03ee4f7ba2 100644
--- 
a/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
+++ 
b/modules/transactions/src/main/java/org/apache/ignite/internal/tx/impl/PersistentTxStateVacuumizer.java
@@ -47,6 +47,7 @@ import org.apache.ignite.internal.replicator.ReplicaService;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.exception.AwaitReplicaTimeoutException;
 import 
org.apache.ignite.internal.replicator.exception.PrimaryReplicaMissException;
+import 
org.apache.ignite.internal.replicator.exception.ReplicaUnavailableException;
 import org.apache.ignite.internal.replicator.message.ReplicaMessagesFactory;
 import org.apache.ignite.internal.tx.message.TxMessagesFactory;
 import org.apache.ignite.internal.tx.message.VacuumTxStateReplicaRequest;
@@ -186,7 +187,8 @@ public class PersistentTxStateVacuumizer {
                 // the persistent tx state.
                 // Also, replica calls from PersistentTxStateVacuumizer are 
local, so retry with new primary replica most likely will
                 // happen on another node.
-                AwaitReplicaTimeoutException.class
+                AwaitReplicaTimeoutException.class,
+                ReplicaUnavailableException.class
         );
     }
 

Reply via email to