This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 0786de36f9 IGNITE-20367 ItTableRaftSnapshotsTest times out with high
flaky rate (#2642)
0786de36f9 is described below
commit 0786de36f99820f1eabe4f3d613bb262caec5367
Author: Alexander Lapin <[email protected]>
AuthorDate: Mon Oct 2 12:32:22 2023 +0300
IGNITE-20367 ItTableRaftSnapshotsTest times out with high flaky rate (#2642)
---
.../java/org/apache/ignite/lang/ErrorGroups.java | 12 +++++
.../internal/placementdriver/PlacementDriver.java | 17 +++++--
.../placementdriver/TestPlacementDriver.java | 8 +++-
.../internal/placementdriver/LeaseUpdater.java | 18 ++++++--
.../PrimaryReplicaAwaitException.java | 47 +++++++++++++++++++
.../PrimaryReplicaAwaitTimeoutException.java | 53 ++++++++++++++++++++++
.../placementdriver/leases/LeaseTracker.java | 21 ++++++++-
.../placementdriver/PlacementDriverTest.java | 27 +++++++----
.../ItPrimaryReplicaChoiceTest.java | 23 ++++++++--
.../ignite/internal/table/ItTableScanTest.java | 3 +-
.../replicator/PartitionReplicaListener.java | 3 +-
.../distributed/storage/InternalTableImpl.java | 22 ++++++---
12 files changed, 219 insertions(+), 35 deletions(-)
diff --git a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
index bf28673f7e..963280289a 100755
--- a/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
+++ b/modules/api/src/main/java/org/apache/ignite/lang/ErrorGroups.java
@@ -499,4 +499,16 @@ public class ErrorGroups {
/** Command to the catalog has not passed the validation. See
exception message for details. */
public static final int VALIDATION_ERR =
CATALOG_ERR_GROUP.registerErrorCode((short) 1);
}
+
+ /** Placement driver error group. */
+ public static class PlacementDriver {
+ /** Placement driver error group. */
+ public static final ErrorGroup PLACEMENT_DRIVER_ERR_GROUP =
registerGroup("PLACEMENTDRIVER", (short) 18);
+
+ /** Primary replica await timeout error. */
+ public static final int PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR =
PLACEMENT_DRIVER_ERR_GROUP.registerErrorCode((short) 1);
+
+ /** Primary replica await error. */
+ public static final int PRIMARY_REPLICA_AWAIT_ERR =
PLACEMENT_DRIVER_ERR_GROUP.registerErrorCode((short) 2);
+ }
}
diff --git
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
index 934ee63737..81292626c6 100644
---
a/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
+++
b/modules/placement-driver-api/src/main/java/org/apache/ignite/internal/placementdriver/PlacementDriver.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.placementdriver;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.event.EventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
@@ -32,18 +33,24 @@ public interface PlacementDriver extends
EventProducer<PrimaryReplicaEvent, Prim
* Returns a future for the primary replica for the specified replication
group whose expiration time (the right border of the
* corresponding lease interval) is greater than or equal to the timestamp
passed as a parameter. Please pay attention that there are
* no restriction on the lease start time (left border), it can either be
less or greater than or equal to proposed timestamp.
- * Given method will await for an appropriate primary replica appearance
if there's no already existing one. Such awaiting logic is
- * unbounded, so it's mandatory to use explicit await termination like
{@code orTimeout}.
+ * Given method will await for an appropriate primary replica appearance
if there's no already existing one.
*
* @param groupId Replication group id.
* @param timestamp Timestamp reference value.
+ * @param timeout – How long to wait before completing exceptionally with
a TimeoutException, in units of unit.
+ * @param unit – A TimeUnit determining how to interpret the timeout
parameter.
* @return Primary replica future.
*/
- CompletableFuture<ReplicaMeta> awaitPrimaryReplica(ReplicationGroupId
groupId, HybridTimestamp timestamp);
+ CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
+ ReplicationGroupId groupId,
+ HybridTimestamp timestamp,
+ long timeout,
+ TimeUnit unit
+ );
/**
- * Same as {@link #awaitPrimaryReplica(ReplicationGroupId,
HybridTimestamp)} despite the fact that given method await logic is bounded.
- * It will wait for a primary replica for a reasonable period of time, and
complete a future with null if a matching
+ * Same as {@link #awaitPrimaryReplica(ReplicationGroupId,
HybridTimestamp, long, TimeUnit)} despite the fact that given method await
+ * logic is bounded. It will wait for a primary replica for a reasonable
period of time, and complete a future with null if a matching
* lease isn't found. Generally speaking reasonable here means enough for
distribution across cluster nodes.
*
* @param replicationGroupId Replication group id.
diff --git
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
index 99c9ce71dd..1c4d728072 100644
---
a/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
+++
b/modules/placement-driver-api/src/testFixtures/java/org/apache/ignite/internal/placementdriver/TestPlacementDriver.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.CompletableFuture.completedFuture;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
import org.apache.ignite.internal.event.EventListener;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
@@ -40,7 +41,12 @@ public class TestPlacementDriver implements PlacementDriver {
}
@Override
- public CompletableFuture<ReplicaMeta>
awaitPrimaryReplica(ReplicationGroupId groupId, HybridTimestamp timestamp) {
+ public CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
+ ReplicationGroupId groupId,
+ HybridTimestamp timestamp,
+ long timeout,
+ TimeUnit unit
+ ) {
return completedFuture(primaryReplica);
}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
index f5b95e34d0..91e9720bbb 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseUpdater.java
@@ -249,19 +249,27 @@ public class LeaseUpdater {
*/
private @Nullable ClusterNode nextLeaseHolder(Set<Assignment> assignments,
@Nullable String proposedConsistentId) {
//TODO: IGNITE-18879 Implement more intellectual algorithm to choose a
node.
- String consistentId = null;
+ ClusterNode primaryCandidate = null;
for (Assignment assignment : assignments) {
+ // Check whether given assignments is actually available in
logical topology. It's a best effort check because it's possible
+ // for proposed primary candidate to leave the topology at any
time. In that case primary candidate will be recalculated.
+ ClusterNode candidateNode =
topologyTracker.nodeByConsistentId(assignment.consistentId());
+
+ if (candidateNode == null) {
+ continue;
+ }
+
if (assignment.consistentId().equals(proposedConsistentId)) {
- consistentId = proposedConsistentId;
+ primaryCandidate = candidateNode;
break;
- } else if (consistentId == null || consistentId.hashCode() >
assignment.consistentId().hashCode()) {
- consistentId = assignment.consistentId();
+ } else if (primaryCandidate == null ||
primaryCandidate.name().hashCode() > assignment.consistentId().hashCode()) {
+ primaryCandidate = candidateNode;
}
}
- return consistentId == null ? null :
topologyTracker.nodeByConsistentId(consistentId);
+ return primaryCandidate;
}
/** Returns {@code true} if active. */
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
new file mode 100644
index 0000000000..c0497f6b19
--- /dev/null
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitException.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.lang.ErrorGroups;
+
+/**
+ * The exception is thrown when a primary replica await process has failed.
Please pay attention that there is a specific
+ * {@link PrimaryReplicaAwaitTimeoutException} for the primary replica await
timeout.
+ */
+public class PrimaryReplicaAwaitException extends IgniteInternalException {
+
+ /**
+ * The constructor.
+ *
+ * @param replicationGroupId Replication group id.
+ * @param referenceTimestamp Timestamp reference value.
+ * @param cause Cause exception.
+ */
+ public PrimaryReplicaAwaitException(ReplicationGroupId replicationGroupId,
HybridTimestamp referenceTimestamp, Throwable cause) {
+ super(ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_ERR,
+ IgniteStringFormatter.format(
+ "The primary replica await exception
[replicationGroupId={}, referenceTimestamp={}]",
+ replicationGroupId, referenceTimestamp
+ ),
+ cause);
+ }
+}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
new file mode 100644
index 0000000000..cbe33d5702
--- /dev/null
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/PrimaryReplicaAwaitTimeoutException.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.placementdriver;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalException;
+import org.apache.ignite.internal.lang.IgniteStringFormatter;
+import org.apache.ignite.internal.placementdriver.leases.Lease;
+import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.lang.ErrorGroups;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * The exception is thrown when a primary replica await process has times out.
+ */
+public class PrimaryReplicaAwaitTimeoutException extends
IgniteInternalException {
+
+ /**
+ * The constructor.
+ *
+ * @param replicationGroupId Replication group id.
+ * @param referenceTimestamp Timestamp reference value.
+ * @param cause Cause exception.
+ */
+ public PrimaryReplicaAwaitTimeoutException(
+ ReplicationGroupId replicationGroupId,
+ HybridTimestamp referenceTimestamp,
+ @Nullable Lease currentLease,
+ Throwable cause
+ ) {
+ super(ErrorGroups.PlacementDriver.PRIMARY_REPLICA_AWAIT_TIMEOUT_ERR,
+ IgniteStringFormatter.format(
+ "The primary replica await timed out
[replicationGroupId={}, referenceTimestamp={}, currentLease={}]",
+ replicationGroupId, referenceTimestamp, currentLease
+ ),
+ cause);
+ }
+}
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
index ae385f3bc4..df20b3d946 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/leases/LeaseTracker.java
@@ -42,6 +42,8 @@ import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.ignite.internal.event.AbstractEventProducer;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -53,6 +55,8 @@ import
org.apache.ignite.internal.metastorage.MetaStorageManager;
import org.apache.ignite.internal.metastorage.WatchEvent;
import org.apache.ignite.internal.metastorage.WatchListener;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
+import org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitException;
+import
org.apache.ignite.internal.placementdriver.PrimaryReplicaAwaitTimeoutException;
import org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEvent;
import
org.apache.ignite.internal.placementdriver.event.PrimaryReplicaEventParameters;
@@ -212,8 +216,21 @@ public class LeaseTracker extends
AbstractEventProducer<PrimaryReplicaEvent, Pri
}
@Override
- public CompletableFuture<ReplicaMeta>
awaitPrimaryReplica(ReplicationGroupId groupId, HybridTimestamp timestamp) {
- return inBusyLockAsync(busyLock, () ->
getOrCreatePrimaryReplicaWaiter(groupId).waitFor(timestamp));
+ public CompletableFuture<ReplicaMeta> awaitPrimaryReplica(
+ ReplicationGroupId groupId,
+ HybridTimestamp timestamp,
+ long timeout,
+ TimeUnit unit
+ ) {
+ return inBusyLockAsync(busyLock, () ->
getOrCreatePrimaryReplicaWaiter(groupId).waitFor(timestamp)
+ .orTimeout(timeout, unit)
+ .exceptionally(e -> {
+ if (e instanceof TimeoutException) {
+ throw new PrimaryReplicaAwaitTimeoutException(groupId,
timestamp, leases.leaseByGroupId().get(groupId), e);
+ }
+
+ throw new PrimaryReplicaAwaitException(groupId, timestamp,
e);
+ }));
}
@Override
diff --git
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
index be03826af1..49cf533c2a 100644
---
a/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
+++
b/modules/placement-driver/src/test/java/org/apache/ignite/internal/placementdriver/PlacementDriverTest.java
@@ -18,6 +18,7 @@
package org.apache.ignite.internal.placementdriver;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.metastorage.dsl.Operations.noop;
import static org.apache.ignite.internal.metastorage.dsl.Operations.put;
import static
org.apache.ignite.internal.placementdriver.PlacementDriverManager.PLACEMENTDRIVER_LEASES_KEY;
@@ -99,6 +100,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
private static final int
AWAIT_PERIOD_FOR_LOCAL_NODE_TO_BE_NOTIFIED_ABOUT_LEASE_UPDATES = 1_000;
+ private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
+
private VaultManager vault;
private MetaStorageManager metastore;
@@ -158,7 +161,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testAwaitPrimaryReplicaInInterval() throws Exception {
// Await primary replica for time 10.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture.isDone());
// Publish primary replica for an interval [1, 5].
@@ -195,7 +199,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testAwaitPrimaryReplicaBeforeInterval() throws Exception {
// Await primary replica for time 10.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture.isDone());
// Publish primary replica for an interval [1, 5].
@@ -236,7 +241,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
AWAIT_PERIOD_FOR_LOCAL_NODE_TO_BE_NOTIFIED_ABOUT_LEASE_UPDATES));
// Await primary replica for time 10.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
// Assert that primary waiter is completed.
assertTrue(primaryReplicaFuture.isDone());
@@ -257,8 +263,10 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testTwoWaitersSameTime() throws Exception {
// Await primary replica for time 10 twice.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
- CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture1.isDone());
assertFalse(primaryReplicaFuture2.isDone());
@@ -291,8 +299,10 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testTwoWaitersSameTimeFirstTimedOutSecondSucceed() throws
Exception {
// Await primary replica for time 10 twice.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
- CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture1 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture2 =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture1.isDone());
assertFalse(primaryReplicaFuture2.isDone());
@@ -329,7 +339,8 @@ public class PlacementDriverTest extends
BaseIgniteAbstractTest {
@Test
public void testGetPrimaryReplica() throws Exception {
// Await primary replica for time 10.
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(GROUP_1, AWAIT_TIME_10_000,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS);
assertFalse(primaryReplicaFuture.isDone());
// Publish primary replica for an interval [1, 15].
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
index 0190c629f4..b3148dc83b 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/placementdriver/ItPrimaryReplicaChoiceTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.placementdriver;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.SessionUtils.executeUpdate;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -47,6 +48,8 @@ import org.junit.jupiter.api.TestInfo;
* The test class checks invariant of a primary replica choice.
*/
public class ItPrimaryReplicaChoiceTest extends ClusterPerTestIntegrationTest {
+ private static final int AWAIT_PRIMARY_REPLICA_TIMEOUT = 10;
+
/** Table name. */
private static final String TABLE_NAME = "test_table";
@@ -72,7 +75,9 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
- node(0).clock().now()
+ node(0).clock().now(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
@@ -102,7 +107,9 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
- node(0).clock().now()
+ node(0).clock().now(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
@@ -140,7 +147,9 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
- node(0).clock().now()
+ node(0).clock().now(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
@@ -192,7 +201,9 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
CompletableFuture<ReplicaMeta> primaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
- node(0).clock().now()
+ node(0).clock().now(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
);
assertThat(primaryReplicaFut, willCompleteSuccessfully());
@@ -249,7 +260,9 @@ public class ItPrimaryReplicaChoiceTest extends
ClusterPerTestIntegrationTest {
assertTrue(IgniteTestUtils.waitForCondition(() -> {
CompletableFuture<ReplicaMeta> newPrimaryReplicaFut =
node(0).placementDriver().awaitPrimaryReplica(
tblReplicationGrp,
- node(0).clock().now()
+ node(0).clock().now(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
);
assertThat(newPrimaryReplicaFut, willCompleteSuccessfully());
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
index 8b2fdc8192..56abeab107 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/table/ItTableScanTest.java
@@ -38,6 +38,7 @@ import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import java.util.function.IntFunction;
@@ -880,7 +881,7 @@ public class ItTableScanTest extends
ClusterPerClassIntegrationTest {
PlacementDriver placementDriver = ((IgniteImpl)
ignite).placementDriver();
ReplicaMeta primaryReplica = IgniteTestUtils.await(
- placementDriver.awaitPrimaryReplica(tblPartId, ((IgniteImpl)
ignite).clock().now()));
+ placementDriver.awaitPrimaryReplica(tblPartId, ((IgniteImpl)
ignite).clock().now(), 30, TimeUnit.SECONDS));
tx.enlist(
tblPartId,
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
index 397e416a39..58d672f7ff 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/replicator/PartitionReplicaListener.java
@@ -21,6 +21,7 @@ import static java.util.Objects.requireNonNull;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toList;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_CREATE;
import static
org.apache.ignite.internal.catalog.events.CatalogEvent.INDEX_DROP;
@@ -2496,7 +2497,7 @@ public class PartitionReplicaListener implements
ReplicaListener {
} else if (request instanceof BuildIndexReplicaRequest) {
// TODO: IGNITE-20330 Possibly replaced by
placementDriver#getPrimaryReplica and should also be added to the documentation
// about PrimaryReplicaMissException
- return placementDriver.awaitPrimaryReplica(replicationGroupId, now)
+ return placementDriver.awaitPrimaryReplica(replicationGroupId,
now, 30, SECONDS)
.thenCompose(replicaMeta -> {
if (isLocalPeer(replicaMeta.getLeaseholder())) {
return completedFuture(null);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index e520eee115..da8fadab24 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.table.distributed.storage;
import static it.unimi.dsi.fastutil.ints.Int2ObjectMaps.emptyMap;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.table.distributed.storage.RowBatch.allResultFutures;
import static org.apache.ignite.internal.util.ExceptionUtils.withCause;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -48,7 +49,6 @@ import java.util.concurrent.CompletionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
import java.util.concurrent.Flow.Subscription;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
@@ -1203,8 +1203,12 @@ public class InternalTableImpl implements InternalTable {
entries.sort(Comparator.comparingInt(Entry::getIntKey));
for (Entry<RaftGroupService> e : entries) {
- CompletableFuture<ReplicaMeta> f =
placementDriver.awaitPrimaryReplica(e.getValue().groupId(), clock.now())
- .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT,
TimeUnit.SECONDS);
+ CompletableFuture<ReplicaMeta> f =
placementDriver.awaitPrimaryReplica(
+ e.getValue().groupId(),
+ clock.now(),
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
+ );
result.add(f.thenApply(primaryReplica -> {
ClusterNode node =
clusterNodeResolver.apply(primaryReplica.getLeaseholder());
@@ -1390,8 +1394,12 @@ public class InternalTableImpl implements InternalTable {
HybridTimestamp now = clock.now();
- CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(tablePartitionId, now)
- .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT, TimeUnit.SECONDS);
+ CompletableFuture<ReplicaMeta> primaryReplicaFuture =
placementDriver.awaitPrimaryReplica(
+ tablePartitionId,
+ now,
+ AWAIT_PRIMARY_REPLICA_TIMEOUT,
+ SECONDS
+ );
return primaryReplicaFuture.handle((primaryReplica, e) -> {
if (e != null) {
@@ -1597,8 +1605,8 @@ public class InternalTableImpl implements InternalTable {
protected CompletableFuture<ClusterNode> evaluateReadOnlyRecipientNode(int
partId) {
TablePartitionId tablePartitionId = new TablePartitionId(tableId,
partId);
- return placementDriver.awaitPrimaryReplica(tablePartitionId,
clock.now())
- .orTimeout(AWAIT_PRIMARY_REPLICA_TIMEOUT,
TimeUnit.SECONDS).handle((res, e) -> {
+ return placementDriver.awaitPrimaryReplica(tablePartitionId,
clock.now(), AWAIT_PRIMARY_REPLICA_TIMEOUT, SECONDS)
+ .handle((res, e) -> {
if (e != null) {
throw withCause(TransactionException::new,
REPLICA_UNAVAILABLE_ERR, e);
} else {