This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 0786de36f9 IGNITE-20367 ItTableRaftSnapshotsTest times out with high 
flaky rate (#2642)
0786de36f9 is described below

commit 0786de36f99820f1eabe4f3d613bb262caec5367
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Oct 2 12:32:22 2023 +0300

    IGNITE-20367 ItTableRaftSnapshotsTest times out with high flaky rate (#2642)
---
 .../java/org/apache/ignite/lang/ErrorGroups.java   | 12 +++++
 .../internal/placementdriver/PlacementDriver.java  | 17 +++++--
 .../placementdriver/TestPlacementDriver.java       |  8 +++-
 .../internal/placementdriver/LeaseUpdater.java     | 18 ++++++--
 .../PrimaryReplicaAwaitException.java              | 47 +++++++++++++++++++
 .../PrimaryReplicaAwaitTimeoutException.java       | 53 ++++++++++++++++++++++
 .../placementdriver/leases/LeaseTracker.java       | 21 ++++++++-
 .../placementdriver/PlacementDriverTest.java       | 27 +++++++----
 .../ItPrimaryReplicaChoiceTest.java                | 23 ++++++++--
 .../ignite/internal/table/ItTableScanTest.java     |  3 +-
 .../replicator/PartitionReplicaListener.java       |  3 +-
 .../distributed/storage/InternalTableImpl.java     | 22 ++++++---
 12 files changed, 219 insertions(+), 35 deletions(-)

diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java 
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index bf28673f7e..963280289a 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -499,4 +499,16 @@ public class ErrorGroups {
         /** Command to the catalog has not passed the validation. See 
exception message for details. */
         public static final int VALIDATION_ERR = 
CATALOG_ERR_GROUP.registerErrorCode((short) 1);
     }
+
+    /** Placement driver error group. */
+    public static class PlacementDriver {
+        /** Placement driver error group. */
+        public static final ErrorGroup PLACEMENT_DRIVER_ERR_GROUP = 
registerGroup("PLACEMENTDRIVER", (short) 18);
+
+        /** Primary replica await timeout error. */
+        public static final int PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR = 
PLACEMENT_DRIVER_ERR_GROUP.registerErrorCode((short) 1);
+
+        /** Primary replica await error. */
+        public static final int PRIMARY_REPLICA_AWAIT_ERR = 
PLACEMENT_DRIVER_ERR_GROUP.registerErrorCode((short) 2);
+    }
 }
diff --git 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index 934ee63737..81292626c6 100644
--- 
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++ 
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.placementdriver;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.event.EventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
@@ -32,18 +33,24 @@ public interface PlacementDriver extends 
EventProducer<PrimaryReplicaEvent, Prim
      * Returns a future for the primary replica for the specified replication 
group whose expiration time (the right border of the
      * corresponding lease interval) is greater than or equal to the timestamp 
passed as a parameter. Please pay attention that there are
      * no restriction on the lease start time (left border), it can either be 
less or greater than or equal to proposed timestamp.
-     * Given method will await for an appropriate primary replica appearance 
if there's no already existing one. Such awaiting logic is
-     * unbounded, so it's mandatory to use explicit await termination like 
{@code orTimeout}.
+     * Given method will await for an appropriate primary replica appearance 
if there's no already existing one.
      *
      * @param groupId Replication group id.
      * @param timestamp Timestamp reference value.
+     * @param timeout – How long to wait before completing exceptionally with 
a TimeoutException, in units of unit.
+     * @param unit – A TimeUnit determining how to interpret the timeout 
parameter.
      * @return Primary replica future.
      */
-    CompletableFuture<ReplicaMeta> awaitPrimaryReplica(ReplicationGroupId 
groupId, HybridTimestamp timestamp);
+    CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
+            ReplicationGroupId groupId,
+            HybridTimestamp timestamp,
+            long timeout,
+            TimeUnit unit
+    );
 
     /**
-     * Same as {@link #awaitPrimaryReplica(ReplicationGroupId, 
HybridTimestamp)} despite the fact that given method await logic is bounded.
-     * It will wait for a primary replica for a reasonable period of time, and 
complete a future with null if a matching
+     * Same as {@link #awaitPrimaryReplica(ReplicationGroupId, 
HybridTimestamp, long, TimeUnit)} despite the fact that given method await
+     * logic is bounded. It will wait for a primary replica for a reasonable 
period of time, and complete a future with null if a matching
      * lease isn't found. Generally speaking reasonable here means enough for 
distribution across cluster nodes.
      *
      * @param replicationGroupId Replication group id.
diff --git 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 99c9ce71dd..1c4d728072 100644
--- 
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++ 
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.placementdriver;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
 import org.apache.ignite.internal.event.EventListener;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
@@ -40,7 +41,12 @@ public class TestPlacementDriver implements PlacementDriver {
     }
 
     @Override
-    public CompletableFuture<ReplicaMeta> 
awaitPrimaryReplica(ReplicationGroupId groupId, HybridTimestamp timestamp) {
+    public CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
+            ReplicationGroupId groupId,
+            HybridTimestamp timestamp,
+            long timeout,
+            TimeUnit unit
+    ) {
         return completedFuture(primaryReplica);
     }
 
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index f5b95e34d0..91e9720bbb 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -249,19 +249,27 @@ public class LeaseUpdater {
      */
     private @Nullable ClusterNode nextLeaseHolder(Set<Assignment> assignments, 
@Nullable String proposedConsistentId) {
         //TODO: IGNITE-18879 Implement more intellectual algorithm to choose a 
node.
-        String consistentId = null;
+        ClusterNode primaryCandidate = null;
 
         for (Assignment assignment : assignments) {
+            // Check whether given assignments is actually available in 
logical topology. It's a best effort check because it's possible
+            // for proposed primary candidate to leave the topology at any 
time. In that case primary candidate will be recalculated.
+            ClusterNode candidateNode = 
topologyTracker.nodeByConsistentId(assignment.consistentId());
+
+            if (candidateNode == null) {
+                continue;
+            }
+
             if (assignment.consistentId().equals(proposedConsistentId)) {
-                consistentId = proposedConsistentId;
+                primaryCandidate = candidateNode;
 
                 break;
-            } else if (consistentId == null || consistentId.hashCode() > 
assignment.consistentId().hashCode()) {
-                consistentId = assignment.consistentId();
+            } else if (primaryCandidate == null || 
primaryCandidate.name().hashCode() > assignment.consistentId().hashCode()) {
+                primaryCandidate = candidateNode;
             }
         }
 
-        return consistentId == null ? null : 
topologyTracker.nodeByConsistentId(consistentId);
+        return primaryCandidate;
     }
 
     /** Returns {@code true} if active. */
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
new file mode 100644
index 0000000000..c0497f6b19
--- /dev/null
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.lang.ErrorGroups;
+
+/**
+ * The exception is thrown when a primary replica await process has failed. 
Please pay attention that there is a specific
+ * {@link PrimaryReplicaAwaitTimeoutException} for the primary replica await 
timeout.
+ */
+public class PrimaryReplicaAwaitException extends IgniteInternalException {
+
+    /**
+     * The constructor.
+     *
+     * @param replicationGroupId Replication group id.
+     * @param referenceTimestamp Timestamp reference value.
+     * @param cause Cause exception.
+     */
+    public PrimaryReplicaAwaitException(ReplicationGroupId replicationGroupId, 
HybridTimestamp referenceTimestamp, Throwable cause) {
+        super(ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_ERR,
+                IgniteStringFormatter.format(
+                        "The primary replica await exception 
[replicationGroupId={}, referenceTimestamp={}]",
+                        replicationGroupId, referenceTimestamp
+                ),
+                cause);
+    }
+}
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
new file mode 100644
index 0000000000..cbe33d5702
--- /dev/null
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.lang.ErrorGroups;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The exception is thrown when a primary replica await process has times out.
+ */
+public class PrimaryReplicaAwaitTimeoutException extends 
IgniteInternalException {
+
+    /**
+     * The constructor.
+     *
+     * @param replicationGroupId Replication group id.
+     * @param referenceTimestamp Timestamp reference value.
+     * @param cause Cause exception.
+     */
+    public PrimaryReplicaAwaitTimeoutException(
+            ReplicationGroupId replicationGroupId,
+            HybridTimestamp referenceTimestamp,
+            @Nullable Lease currentLease,
+            Throwable cause
+    ) {
+        super(ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR,
+                IgniteStringFormatter.format(
+                        "The primary replica await timed out 
[replicationGroupId={}, referenceTimestamp={}, currentLease={}]",
+                        replicationGroupId, referenceTimestamp, currentLease
+                ),
+                cause);
+    }
+}
diff --git 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index ae385f3bc4..df20b3d946 100644
--- 
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++ 
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -42,6 +42,8 @@ import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import org.apache.ignite.internal.event.AbstractEventProducer;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -53,6 +55,8 @@ import 
org.apache.ignite.internal.metastorage.MetaStorageManager;
 import org.apache.ignite.internal.metastorage.WatchEvent;
 import org.apache.ignite.internal.metastorage.WatchListener;
 import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitException;
+import 
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
 import org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
 import 
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
@@ -212,8 +216,21 @@ public class LeaseTracker extends 
AbstractEventProducer<PrimaryReplicaEvent, Pri
     }
 
     @Override
-    public CompletableFuture<ReplicaMeta> 
awaitPrimaryReplica(ReplicationGroupId groupId, HybridTimestamp timestamp) {
-        return inBusyLockAsync(busyLock, () -> 
getOrCreatePrimaryReplicaWaiter(groupId).waitFor(timestamp));
+    public CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
+            ReplicationGroupId groupId,
+            HybridTimestamp timestamp,
+            long timeout,
+            TimeUnit unit
+    ) {
+        return inBusyLockAsync(busyLock, () -> 
getOrCreatePrimaryReplicaWaiter(groupId).waitFor(timestamp)
+                .orTimeout(timeout, unit)
+                .exceptionally(e -> {
+                    if (e instanceof TimeoutException) {
+                        throw new PrimaryReplicaAwaitTimeoutException(groupId, 
timestamp, leases.leaseByGroupId().get(groupId), e);
+                    }
+
+                    throw new PrimaryReplicaAwaitException(groupId, timestamp, 
e);
+                }));
     }
 
     @Override
diff --git 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index be03826af1..49cf533c2a 100644
--- 
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++ 
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -18,6 +18,7 @@
 package org.apache.ignite.internal.placementdriver;
 
 import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
 import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
 import static 
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
@@ -99,6 +100,8 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
 
     private static final int 
AWAIT_PERIOD_FOR_LOCAL_NODE_TO_BE_NOTIFIED_ABOUT_LEASE_UPDATES = 1_000;
 
+    private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
+
     private VaultManager vault;
 
     private MetaStorageManager metastore;
@@ -158,7 +161,8 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testAwaitPrimaryReplicaInInterval() throws Exception {
         // Await primary replica for time 10.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
         assertFalse(primaryReplicaFuture.isDone());
 
         // Publish primary replica for an interval [1, 5].
@@ -195,7 +199,8 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testAwaitPrimaryReplicaBeforeInterval() throws Exception {
         // Await primary replica for time 10.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
         assertFalse(primaryReplicaFuture.isDone());
 
         // Publish primary replica for an interval [1, 5].
@@ -236,7 +241,8 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
                 
AWAIT_PERIOD_FOR_LOCAL_NODE_TO_BE_NOTIFIED_ABOUT_LEASE_UPDATES));
 
         // Await primary replica for time 10.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
 
         // Assert that primary waiter is completed.
         assertTrue(primaryReplicaFuture.isDone());
@@ -257,8 +263,10 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testTwoWaitersSameTime() throws Exception {
         // Await primary replica for time 10 twice.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture1 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture2 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture1 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture2 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
 
         assertFalse(primaryReplicaFuture1.isDone());
         assertFalse(primaryReplicaFuture2.isDone());
@@ -291,8 +299,10 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testTwoWaitersSameTimeFirstTimedOutSecondSucceed() throws 
Exception {
         // Await primary replica for time 10 twice.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture1 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture2 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture1 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture2 = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
 
         assertFalse(primaryReplicaFuture1.isDone());
         assertFalse(primaryReplicaFuture2.isDone());
@@ -329,7 +339,8 @@ public class PlacementDriverTest extends 
BaseIgniteAbstractTest {
     @Test
     public void testGetPrimaryReplica() throws Exception {
         // Await primary replica for time 10.
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
         assertFalse(primaryReplicaFuture.isDone());
 
         // Publish primary replica for an interval [1, 15].
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index 0190c629f4..b3148dc83b 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.placementdriver;
 
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.SessionUtils.executeUpdate;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
@@ -47,6 +48,8 @@ import org.junit.jupiter.api.TestInfo;
  * The test class checks invariant of a primary replica choice.
  */
 public class ItPrimaryReplicaChoiceTest extends ClusterPerTestIntegrationTest {
+    private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
+
     /** Table name. */
     private static final String TABLE_NAME = "test_table";
 
@@ -72,7 +75,9 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
 
         CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
                 tblReplicationGrp,
-                node(0).clock().now()
+                node(0).clock().now(),
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                SECONDS
         );
 
         assertThat(primaryReplicaFut, willCompleteSuccessfully());
@@ -102,7 +107,9 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
 
         CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
                 tblReplicationGrp,
-                node(0).clock().now()
+                node(0).clock().now(),
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                SECONDS
         );
 
         assertThat(primaryReplicaFut, willCompleteSuccessfully());
@@ -140,7 +147,9 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
 
         CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
                 tblReplicationGrp,
-                node(0).clock().now()
+                node(0).clock().now(),
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                SECONDS
         );
 
         assertThat(primaryReplicaFut, willCompleteSuccessfully());
@@ -192,7 +201,9 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
 
         CompletableFuture<ReplicaMeta> primaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
                 tblReplicationGrp,
-                node(0).clock().now()
+                node(0).clock().now(),
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                SECONDS
         );
 
         assertThat(primaryReplicaFut, willCompleteSuccessfully());
@@ -249,7 +260,9 @@ public class ItPrimaryReplicaChoiceTest extends 
ClusterPerTestIntegrationTest {
         assertTrue(IgniteTestUtils.waitForCondition(() -> {
             CompletableFuture<ReplicaMeta> newPrimaryReplicaFut = 
node(0).placementDriver().awaitPrimaryReplica(
                     tblReplicationGrp,
-                    node(0).clock().now()
+                    node(0).clock().now(),
+                    AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                    SECONDS
             );
 
             assertThat(newPrimaryReplicaFut, willCompleteSuccessfully());
diff --git 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 8b2fdc8192..56abeab107 100644
--- 
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++ 
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.function.Function;
 import java.util.function.IntFunction;
@@ -880,7 +881,7 @@ public class ItTableScanTest extends 
ClusterPerClassIntegrationTest {
 
         PlacementDriver placementDriver = ((IgniteImpl) 
ignite).placementDriver();
         ReplicaMeta primaryReplica = IgniteTestUtils.await(
-                placementDriver.awaitPrimaryReplica(tblPartId, ((IgniteImpl) 
ignite).clock().now()));
+                placementDriver.awaitPrimaryReplica(tblPartId, ((IgniteImpl) 
ignite).clock().now(), 30, TimeUnit.SECONDS));
 
         tx.enlist(
                 tblPartId,
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 397e416a39..58d672f7ff 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
 import static java.util.concurrent.CompletableFuture.allOf;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.toList;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
 import static 
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DROP;
@@ -2496,7 +2497,7 @@ public class PartitionReplicaListener implements 
ReplicaListener {
         } else if (request instanceof BuildIndexReplicaRequest) {
             // TODO: IGNITE-20330 Possibly replaced by 
placementDriver#getPrimaryReplica and should also be added to the documentation
             //  about PrimaryReplicaMissException
-            return placementDriver.awaitPrimaryReplica(replicationGroupId, now)
+            return placementDriver.awaitPrimaryReplica(replicationGroupId, 
now, 30, SECONDS)
                     .thenCompose(replicaMeta -> {
                         if (isLocalPeer(replicaMeta.getLeaseholder())) {
                             return completedFuture(null);
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index e520eee115..da8fadab24 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.storage;
 import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
 import static java.util.concurrent.CompletableFuture.completedFuture;
 import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
 import static 
org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
 import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
 import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -48,7 +49,6 @@ import java.util.concurrent.CompletionException;
 import java.util.concurrent.Flow.Publisher;
 import java.util.concurrent.Flow.Subscriber;
 import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicLong;
@@ -1203,8 +1203,12 @@ public class InternalTableImpl implements InternalTable {
         entries.sort(Comparator.comparingInt(Entry::getIntKey));
 
         for (Entry<RaftGroupService> e : entries) {
-            CompletableFuture<ReplicaMeta> f = 
placementDriver.awaitPrimaryReplica(e.getValue().groupId(), clock.now())
-                    .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT, 
TimeUnit.SECONDS);
+            CompletableFuture<ReplicaMeta> f = 
placementDriver.awaitPrimaryReplica(
+                    e.getValue().groupId(),
+                    clock.now(),
+                    AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                    SECONDS
+            );
 
             result.add(f.thenApply(primaryReplica -> {
                 ClusterNode node = 
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
@@ -1390,8 +1394,12 @@ public class InternalTableImpl implements InternalTable {
 
         HybridTimestamp now = clock.now();
 
-        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(tablePartitionId, now)
-                .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT, TimeUnit.SECONDS);
+        CompletableFuture<ReplicaMeta> primaryReplicaFuture = 
placementDriver.awaitPrimaryReplica(
+                tablePartitionId,
+                now,
+                AWAIT_PRIMARY_REPLICA_TIMEOUT,
+                SECONDS
+        );
 
         return primaryReplicaFuture.handle((primaryReplica, e) -> {
             if (e != null) {
@@ -1597,8 +1605,8 @@ public class InternalTableImpl implements InternalTable {
     protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int 
partId) {
         TablePartitionId tablePartitionId = new TablePartitionId(tableId, 
partId);
 
-        return placementDriver.awaitPrimaryReplica(tablePartitionId, 
clock.now())
-                .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT, 
TimeUnit.SECONDS).handle((res, e) -> {
+        return placementDriver.awaitPrimaryReplica(tablePartitionId, 
clock.now(), AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
+                .handle((res, e) -> {
                     if (e != null) {
                         throw withCause(TransactionException::new, 
REPLICA_UNAVAILABLE_ERR, e);
                     } else {

Reply via email to