This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new a2b909f6345 IGNITE-25543 Implement retry mechanism for reset (#6053)
a2b909f6345 is described below
commit a2b909f63451dab3d3db321552ae6b6fcb75d7de
Author: Cyrill <[email protected]>
AuthorDate: Thu Nov 20 19:24:49 2025 +0300
IGNITE-25543 Implement retry mechanism for reset (#6053)
Co-authored-by: Kirill Sizov <[email protected]>
---
.../ignite/internal/util/IgniteBusyLock.java | 35 ++++++
.../ignite/internal/util/IgniteSpinBusyLock.java | 4 +-
.../internal/util/IgniteStripedBusyLock.java | 4 +-
.../apache/ignite/internal/util/IgniteUtils.java | 20 ++--
.../rebalance/PartitionMover.java | 2 +-
.../distributionzones/rebalance/RebalanceUtil.java | 11 --
.../metastorage/impl/MetaStorageManagerImpl.java | 2 +-
.../impl/StandaloneMetaStorageManager.java | 16 +++
.../PartitionReplicaLifecycleManager.java | 49 ++++----
.../PartitionReplicaLifecycleManagerTest.java | 3 +-
.../internal/raft/rebalance/ExceptionUtils.java | 43 +++++++
.../raft/rebalance/RaftStaleUpdateException.java | 30 +++++
.../java/org/apache/ignite/internal/raft/Loza.java | 5 +-
.../ignite/internal/raft/RaftGroupServiceImpl.java | 6 +
.../internal/raft/server/impl/JraftServerImpl.java | 4 +-
.../ItPlacementDriverReplicaSideTest.java | 2 +-
.../ignite/internal/replicator/ReplicaManager.java | 96 +++++++++++++--
.../internal/replicator/ReplicaStateManager.java | 2 +-
.../internal/replicator/ReplicaManagerTest.java | 2 +-
.../ignite/internal/app/ThreadPoolsManager.java | 6 +-
.../ignite/distributed/ReplicaUnavailableTest.java | 2 +-
.../internal/table/distributed/TableManager.java | 13 +-
.../distributed/TableManagerRecoveryTest.java | 132 +++++++++++++++++----
.../apache/ignite/distributed/ItTxTestCluster.java | 2 +-
modules/transactions/build.gradle | 1 +
.../disaster/DisasterRecoveryTestUtil.java | 7 +-
.../ItDisasterRecoveryReconfigurationTest.java | 117 +++++++++++-------
27 files changed, 478 insertions(+), 138 deletions(-)
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteBusyLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteBusyLock.java
new file mode 100644
index 00000000000..b093bc25af3
--- /dev/null
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteBusyLock.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.util;
+
+/**
+ * Synchronization aid to track "busy" state of a subsystem that owns it.
+ */
+public interface IgniteBusyLock {
+ /**
+ * Enters "busy" state.
+ *
+ * @return {@code true} if entered to busy state.
+ */
+ boolean enterBusy();
+
+ /**
+ * Leaves "busy" state.
+ */
+ void leaveBusy();
+}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
index 80dfb3bee19..692f04228d6 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteSpinBusyLock.java
@@ -32,7 +32,7 @@ import java.util.concurrent.TimeUnit;
*
* @see IgniteSpinReadWriteLock
*/
-public class IgniteSpinBusyLock {
+public class IgniteSpinBusyLock implements IgniteBusyLock {
/** Underlying read-write lock. */
private final IgniteSpinReadWriteLock lock = new IgniteSpinReadWriteLock();
@@ -41,6 +41,7 @@ public class IgniteSpinBusyLock {
*
* @return {@code true} if entered to busy state.
*/
+ @Override
public boolean enterBusy() {
return !lock.writeLockedByCurrentThread() && lock.tryReadLock();
}
@@ -57,6 +58,7 @@ public class IgniteSpinBusyLock {
/**
* Leaves "busy" state.
*/
+ @Override
public void leaveBusy() {
lock.readUnlock();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedBusyLock.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedBusyLock.java
index 53dec8e2d4b..96e4141ee23 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedBusyLock.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteStripedBusyLock.java
@@ -21,7 +21,7 @@ package org.apache.ignite.internal.util;
* Busy lock implementation based on {@link IgniteStripedReadWriteLock}. API
is analogous to {@link IgniteSpinBusyLock}, but without the
* ability to unblock it.
*/
-public class IgniteStripedBusyLock {
+public class IgniteStripedBusyLock implements IgniteBusyLock {
/** Underlying read-write lock. */
private final IgniteStripedReadWriteLock lock = new
IgniteStripedReadWriteLock();
@@ -32,6 +32,7 @@ public class IgniteStripedBusyLock {
*
* @return {@code true} if entered to busy state.
*/
+ @Override
public boolean enterBusy() {
if (!lock.readLock().tryLock()) {
return false;
@@ -49,6 +50,7 @@ public class IgniteStripedBusyLock {
/**
* Leaves "busy" state.
*/
+ @Override
public void leaveBusy() {
lock.readLock().unlock();
}
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
index 896af3ba7b5..b32ec82a249 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/IgniteUtils.java
@@ -873,9 +873,9 @@ public class IgniteUtils {
* @param fn Function to run.
* @param <T> Type of returned value from {@code fn}.
* @return Result of the provided function.
- * @throws IgniteInternalException with cause {@link
NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
+ * @throws IgniteInternalException with cause {@link
NodeStoppingException} if {@link IgniteBusyLock#enterBusy()} failed.
*/
- public static <T> T inBusyLock(IgniteSpinBusyLock busyLock, Supplier<T>
fn) {
+ public static <T> T inBusyLock(IgniteBusyLock busyLock, Supplier<T> fn) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
@@ -892,9 +892,9 @@ public class IgniteUtils {
* @param busyLock Component's busy lock.
* @param fn Function to run.
* @return Result of the provided function.
- * @throws IgniteInternalException with cause {@link
NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
+ * @throws IgniteInternalException with cause {@link
NodeStoppingException} if {@link IgniteBusyLock#enterBusy()} failed.
*/
- public static int inBusyLock(IgniteSpinBusyLock busyLock, IntSupplier fn) {
+ public static int inBusyLock(IgniteBusyLock busyLock, IntSupplier fn) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
@@ -910,9 +910,9 @@ public class IgniteUtils {
*
* @param busyLock Component's busy lock.
* @param fn Runnable to run.
- * @throws IgniteInternalException with cause {@link
NodeStoppingException} if {@link IgniteSpinBusyLock#enterBusy()} failed.
+ * @throws IgniteInternalException with cause {@link
NodeStoppingException} if {@link IgniteBusyLock#enterBusy()} failed.
*/
- public static void inBusyLock(IgniteSpinBusyLock busyLock, Runnable fn) {
+ public static void inBusyLock(IgniteBusyLock busyLock, Runnable fn) {
if (!busyLock.enterBusy()) {
throw new IgniteInternalException(NODE_STOPPING_ERR, new
NodeStoppingException());
}
@@ -930,9 +930,9 @@ public class IgniteUtils {
* @param busyLock Component's busy lock.
* @param fn Function to run.
* @return Future returned from the {@code fn}, or future with the {@link
NodeStoppingException} if
- * {@link IgniteSpinBusyLock#enterBusy()} failed or with runtime
exception/error while executing the {@code fn}.
+ * {@link IgniteBusyLock#enterBusy()} failed or with runtime
exception/error while executing the {@code fn}.
*/
- public static <T> CompletableFuture<T> inBusyLockAsync(IgniteSpinBusyLock
busyLock, Supplier<CompletableFuture<T>> fn) {
+ public static <T> CompletableFuture<T> inBusyLockAsync(IgniteBusyLock
busyLock, Supplier<CompletableFuture<T>> fn) {
if (!busyLock.enterBusy()) {
return failedFuture(new NodeStoppingException());
}
@@ -947,13 +947,13 @@ public class IgniteUtils {
}
/**
- * Method that runs the provided {@code fn} in {@code busyLock} if {@link
IgniteSpinBusyLock#enterBusy()} succeed. Otherwise it just
+ * Method that runs the provided {@code fn} in {@code busyLock} if {@link
IgniteBusyLock#enterBusy()} succeed. Otherwise it just
* silently returns.
*
* @param busyLock Component's busy lock.
* @param fn Runnable to run.
*/
- public static void inBusyLockSafe(IgniteSpinBusyLock busyLock, Runnable
fn) {
+ public static void inBusyLockSafe(IgniteBusyLock busyLock, Runnable fn) {
if (!busyLock.enterBusy()) {
return;
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
index be5fc2b2cd7..05d6bb66a4f 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/PartitionMover.java
@@ -18,7 +18,7 @@
package org.apache.ignite.internal.distributionzones.rebalance;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.recoverable;
+import static
org.apache.ignite.internal.raft.rebalance.ExceptionUtils.recoverable;
import static org.apache.ignite.internal.util.CompletableFutures.copyStateTo;
import static org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 74b03b41394..1d938e18408 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -581,17 +581,6 @@ public class RebalanceUtil {
return Integer.parseInt(toStringWithoutPrefix(key, prefix.length));
}
- /**
- * Checks if an error is recoverable, so we can retry a rebalance intent.
- *
- * @param t The throwable.
- * @return {@code True} if this is a recoverable exception.
- */
- public static boolean recoverable(Throwable t) {
- // As long as we don't have a general failure handler, we assume that
all errors are recoverable.
- return true;
- }
-
/**
* Removes nodes from set of nodes.
*
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index 97ba74e3eae..5b9bb88e17e 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -1136,7 +1136,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager, MetastorageGr
/**
* Saves processed Meta Storage revision to the {@link #appliedRevision}.
*/
- private void onRevisionApplied(long revision) {
+ protected void onRevisionApplied(long revision) {
appliedRevision = revision;
}
diff --git
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
index 381b4eb04a4..0eb1cc990f7 100644
---
a/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
+++
b/modules/metastorage/src/testFixtures/java/org/apache/ignite/internal/metastorage/impl/StandaloneMetaStorageManager.java
@@ -95,6 +95,9 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
@Nullable
private Consumer<Boolean> afterInvokeInterceptor;
+ @Nullable
+ private Consumer<Long> onRevisionAppliedInterceptor;
+
/** Creates standalone MetaStorage manager. */
public static StandaloneMetaStorageManager create() {
return create(TEST_NODE_NAME);
@@ -239,6 +242,19 @@ public class StandaloneMetaStorageManager extends
MetaStorageManagerImpl {
this.afterInvokeInterceptor = afterInvokeInterceptor;
}
+ public void setOnRevisionAppliedInterceptor(@Nullable Consumer<Long>
onRevisionAppliedInterceptor) {
+ this.onRevisionAppliedInterceptor = onRevisionAppliedInterceptor;
+ }
+
+ @Override
+ protected void onRevisionApplied(long revision) {
+ super.onRevisionApplied(revision);
+
+ if (onRevisionAppliedInterceptor != null) {
+ onRevisionAppliedInterceptor.accept(revision);
+ }
+ }
+
@Override
public CompletableFuture<Boolean> invoke(Condition cond, Operation
success, Operation failure) {
return super.invoke(cond, success, failure)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index b53c7d1a315..c98a7a1ae03 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -22,7 +22,6 @@ import static java.util.Collections.emptySet;
import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.failedFuture;
-import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
@@ -1306,6 +1305,13 @@ public class PartitionReplicaLifecycleManager extends
zonePartitionId,
pendingAssignments.nodes(),
revision);
+ }).whenComplete((v, ex) -> {
+ if (ex != null) {
+ LOG.debug(
+ "Failed to handle change pending assignment event "
+ + "[zonePartitionId={},
stableAssignments={}, pendingAssignments={}, revision={}, isRecovery={}].",
+ zonePartitionId, stableAssignments,
pendingAssignments, revision, isRecovery, ex);
+ }
});
} finally {
busyLock.leaveBusy();
@@ -1350,22 +1356,7 @@ public class PartitionReplicaLifecycleManager extends
// For regular pending assignments we use (old) stable set, so that
none of new nodes would be able to propose itself as a leader.
// For forced assignments, we should do the same thing, but only for
the subset of stable set that is alive right now. Dead nodes
// are excluded. It is calculated precisely as an intersection between
forced assignments and (old) stable assignments.
- Assignments computedStableAssignments;
-
- // TODO: https://issues.apache.org/jira/browse/IGNITE-22600 remove the
second condition
- // when we will have a proper handling of empty stable assignments
- if (stableAssignments == null || stableAssignments.nodes().isEmpty()) {
- // This condition can only pass if all stable nodes are dead, and
we start new raft group from scratch.
- // In this case new initial configuration must match new forced
assignments.
- computedStableAssignments =
Assignments.forced(pendingAssignmentsNodes, pendingAssignments.timestamp());
- } else if (pendingAssignmentsAreForced) {
- // In case of forced assignments we need to remove nodes that are
present in the stable set but are missing from the
- // pending set. Such operation removes dead stable nodes from the
resulting stable set, which guarantees that we will
- // have a live majority.
- computedStableAssignments = pendingAssignments;
- } else {
- computedStableAssignments = stableAssignments;
- }
+ Assignments computedStableAssignments =
getComputedStableAssignments(stableAssignments, pendingAssignments);
CompletableFuture<?> localServicesStartFuture;
@@ -1388,11 +1379,8 @@ public class PartitionReplicaLifecycleManager extends
false
);
} else if (pendingAssignmentsAreForced && localAssignmentInPending !=
null) {
- localServicesStartFuture = runAsync(() -> {
- inBusyLock(busyLock,
- () -> replicaMgr.resetPeers(replicaGrpId,
fromAssignments(computedStableAssignments.nodes()), revision)
- );
- }, ioExecutor);
+ localServicesStartFuture =
+ replicaMgr.resetWithRetry(replicaGrpId,
fromAssignments(computedStableAssignments.nodes()), revision);
} else {
localServicesStartFuture = nullCompletedFuture();
}
@@ -1442,6 +1430,23 @@ public class PartitionReplicaLifecycleManager extends
}), ioExecutor);
}
+ private static Assignments getComputedStableAssignments(@Nullable
Assignments stableAssignments, Assignments pendingAssignments) {
+ // TODO: https://issues.apache.org/jira/browse/IGNITE-22600 remove the
second condition
+ // when we will have a proper handling of empty stable assignments
+ if (stableAssignments == null || stableAssignments.nodes().isEmpty()) {
+ // This condition can only pass if all stable nodes are dead, and
we start new raft group from scratch.
+ // In this case new initial configuration must match new forced
assignments.
+ return Assignments.forced(pendingAssignments.nodes(),
pendingAssignments.timestamp());
+ } else if (pendingAssignments.force()) {
+ // In case of forced assignments we need to remove nodes that are
present in the stable set but are missing from the
+ // pending set. Such operation removes dead stable nodes from the
resulting stable set, which guarantees that we will
+ // have a live majority.
+ return pendingAssignments;
+ } else {
+ return stableAssignments;
+ }
+ }
+
private CatalogZoneDescriptor zoneDescriptorAt(int zoneId, long timestamp)
{
Catalog catalog = catalogService.activeCatalog(timestamp);
assert catalog != null : "Catalog is not available at " +
nullableHybridTimestamp(timestamp);
diff --git
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
index 9a6493d7e1c..d1cccbc6e64 100644
---
a/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
+++
b/modules/partition-replicator/src/test/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManagerTest.java
@@ -56,7 +56,6 @@ import static org.mockito.Mockito.when;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
@@ -178,7 +177,7 @@ class PartitionReplicaLifecycleManagerTest extends
BaseIgniteAbstractTest {
@Mock DataStorageManager dataStorageManager,
@Mock CatalogService catalogService,
@Mock OutgoingSnapshotsManager outgoingSnapshotsManager,
- @InjectExecutorService ExecutorService executorService,
+ @InjectExecutorService ScheduledExecutorService executorService,
@InjectExecutorService ScheduledExecutorService
scheduledExecutorService,
@InjectConfiguration SystemDistributedConfiguration
systemDistributedConfiguration
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
new file mode 100644
index 00000000000..5b9e836cec6
--- /dev/null
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/ExceptionUtils.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.rebalance;
+
+import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
+
+import org.apache.ignite.internal.lang.ComponentStoppingException;
+import org.apache.ignite.internal.lang.NodeStoppingException;
+
+/**
+ * Helper class for exception handling.
+ */
+public class ExceptionUtils {
+
+ /**
+ * Checks if an error is recoverable, so we can retry a rebalance intent.
+ *
+ * @param t The throwable.
+ * @return {@code True} if this is a recoverable exception.
+ */
+ public static boolean recoverable(Throwable t) {
+ if (hasCause(t, NodeStoppingException.class,
ComponentStoppingException.class, RaftStaleUpdateException.class)) {
+ return false;
+ }
+
+ return true;
+ }
+}
diff --git
a/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftStaleUpdateException.java
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftStaleUpdateException.java
new file mode 100644
index 00000000000..d75150403a1
--- /dev/null
+++
b/modules/raft-api/src/main/java/org/apache/ignite/internal/raft/rebalance/RaftStaleUpdateException.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.raft.rebalance;
+
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+
+/**
+ * Special type of exception used when proposed data is stale and will not be
applied.
+ */
+public class RaftStaleUpdateException extends IgniteInternalCheckedException {
+
+ public RaftStaleUpdateException(String message) {
+ super(message);
+ }
+}
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
index 046115cccd4..18515206f9f 100644
--- a/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
+++ b/modules/raft/src/main/java/org/apache/ignite/internal/raft/Loza.java
@@ -63,6 +63,7 @@ import org.apache.ignite.internal.thread.IgniteThreadFactory;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.raft.jraft.RaftMessagesFactory;
+import org.apache.ignite.raft.jraft.Status;
import org.apache.ignite.raft.jraft.option.NodeOptions;
import org.apache.ignite.raft.jraft.rpc.impl.ActionRequestInterceptor;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
@@ -643,11 +644,11 @@ public class Loza implements RaftManager {
* @param peersAndLearners New node configuration.
* @param sequenceToken Sequence token.
*/
- public void resetPeers(RaftNodeId raftNodeId, PeersAndLearners
peersAndLearners, long sequenceToken) {
+ public Status resetPeers(RaftNodeId raftNodeId, PeersAndLearners
peersAndLearners, long sequenceToken) {
LOG.warn("Reset peers for raft group {}, new configuration is {},
sequence token {}",
raftNodeId, peersAndLearners, sequenceToken);
- raftServer.resetPeers(raftNodeId, peersAndLearners, sequenceToken);
+ return raftServer.resetPeers(raftNodeId, peersAndLearners,
sequenceToken);
}
/**
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
index 424d02cbad0..d264f700527 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/RaftGroupServiceImpl.java
@@ -65,6 +65,7 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.network.RecipientLeftException;
import org.apache.ignite.internal.network.TopologyEventHandler;
import org.apache.ignite.internal.raft.configuration.RaftConfiguration;
+import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
import org.apache.ignite.internal.raft.service.LeaderWithTerm;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
@@ -874,6 +875,11 @@ public class RaftGroupServiceImpl implements
RaftGroupService {
break;
}
+ case ESTALE:
+ fut.completeExceptionally(new
RaftStaleUpdateException(resp.errorMsg()));
+
+ break;
+
default:
fut.completeExceptionally(new RaftException(error,
resp.errorMsg()));
diff --git
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
index 84099e9694a..f4e375a883b 100644
---
a/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
+++
b/modules/raft/src/main/java/org/apache/ignite/internal/raft/server/impl/JraftServerImpl.java
@@ -705,14 +705,14 @@ public class JraftServerImpl implements RaftServer {
* @param raftNodeId Raft node ID.
* @param peersAndLearners New node configuration.
*/
- public void resetPeers(RaftNodeId raftNodeId, PeersAndLearners
peersAndLearners, long sequenceToken) {
+ public Status resetPeers(RaftNodeId raftNodeId, PeersAndLearners
peersAndLearners, long sequenceToken) {
RaftGroupService raftGroupService = nodes.get(raftNodeId);
List<PeerId> peerIds =
peersAndLearners.peers().stream().map(PeerId::fromPeer).collect(toList());
List<PeerId> learnerIds =
peersAndLearners.learners().stream().map(PeerId::fromPeer).collect(toList());
- raftGroupService.getRaftNode().resetPeers(new Configuration(peerIds,
learnerIds, sequenceToken));
+ return raftGroupService.getRaftNode().resetPeers(new
Configuration(peerIds, learnerIds, sequenceToken));
}
/**
diff --git
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
index b3916cacbae..98f87cfa53a 100644
---
a/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
+++
b/modules/replicator/src/integrationTest/java/org/apache/ignite/internal/replicator/ItPlacementDriverReplicaSideTest.java
@@ -237,7 +237,7 @@ public class ItPlacementDriverReplicaSideTest extends
IgniteAbstractTest {
raftManager,
partitionsConfigurer,
new VolatileLogStorageFactoryCreator(nodeName,
workDir.resolve("volatile-log-spillout")),
- ForkJoinPool.commonPool(),
+ Executors.newSingleThreadScheduledExecutor(),
replicaGrpId -> nullCompletedFuture(),
ForkJoinPool.commonPool()
);
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
index d6089f26a2c..1d92e8ff09f 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaManager.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.replicator;
import static java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.CompletableFuture.delayedExecutor;
import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.runAsync;
import static java.util.concurrent.CompletableFuture.supplyAsync;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.toSet;
import static java.util.stream.Collectors.toUnmodifiableSet;
import static org.apache.ignite.internal.failure.FailureType.CRITICAL_ERROR;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
+import static
org.apache.ignite.internal.raft.rebalance.ExceptionUtils.recoverable;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.AFTER_REPLICA_STARTED;
import static
org.apache.ignite.internal.replicator.LocalReplicaEvent.BEFORE_REPLICA_STOPPED;
import static
org.apache.ignite.internal.replicator.message.ReplicaMessageUtils.toTablePartitionIdMessage;
@@ -38,6 +40,7 @@ import static
org.apache.ignite.internal.util.CompletableFutures.isCompletedSucc
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.ExceptionUtils.hasCause;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
import static
org.apache.ignite.internal.util.IgniteUtils.shouldSwitchToRequestsExecutor;
import static
org.apache.ignite.internal.util.IgniteUtils.shutdownAndAwaitTermination;
import static org.apache.ignite.lang.ErrorGroups.Common.INTERNAL_ERR;
@@ -98,6 +101,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.configuration.LogStorageBudgetView;
+import org.apache.ignite.internal.raft.rebalance.RaftStaleUpdateException;
import org.apache.ignite.internal.raft.server.RaftGroupOptions;
import org.apache.ignite.internal.raft.service.RaftGroupListener;
import org.apache.ignite.internal.raft.service.RaftGroupService;
@@ -127,8 +131,11 @@ import org.apache.ignite.internal.util.IgniteUtils;
import org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.apache.ignite.internal.util.TrackerClosedException;
import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.raft.jraft.Status;
+import org.apache.ignite.raft.jraft.error.RaftError;
import org.jetbrains.annotations.Nullable;
import org.jetbrains.annotations.TestOnly;
+import org.jetbrains.annotations.VisibleForTesting;
/**
* Replica manager maintains {@link Replica} instances on an Ignite node.
@@ -183,7 +190,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
/** Creator for {@link
org.apache.ignite.internal.raft.storage.LogStorageFactory} for volatile tables.
*/
private final LogStorageFactoryCreator volatileLogStorageFactoryCreator;
- private final Executor replicaStartStopExecutor;
+ private final ScheduledExecutorService replicaLifecycleExecutor;
/** Raft command marshaller for raft server endpoints starting. */
private final Marshaller raftCommandsMarshaller;
@@ -243,7 +250,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @param partitionRaftConfigurer Configurer of raft options on raft group
creation.
* @param volatileLogStorageFactoryCreator Creator for {@link
org.apache.ignite.internal.raft.storage.LogStorageFactory} for
* volatile tables.
- * @param replicaStartStopExecutor Executor for asynchronous replicas
lifecycle management.
+ * @param replicaLifecycleExecutor Executor for asynchronous replicas
lifecycle management.
* @param getPendingAssignmentsSupplier The supplier of pending
assignments for rebalance failover purposes.
* @param throttledLogExecutor Executor to clean up the throttled logger
cache.
*/
@@ -262,7 +269,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
RaftManager raftManager,
RaftGroupOptionsConfigurer partitionRaftConfigurer,
LogStorageFactoryCreator volatileLogStorageFactoryCreator,
- Executor replicaStartStopExecutor,
+ ScheduledExecutorService replicaLifecycleExecutor,
Function<ReplicationGroupId,
CompletableFuture<VersionedAssignments>> getPendingAssignmentsSupplier,
Executor throttledLogExecutor
) {
@@ -282,10 +289,10 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
this.raftManager = raftManager;
this.partitionRaftConfigurer = partitionRaftConfigurer;
this.getPendingAssignmentsSupplier = getPendingAssignmentsSupplier;
- this.replicaStartStopExecutor = replicaStartStopExecutor;
+ this.replicaLifecycleExecutor = replicaLifecycleExecutor;
this.replicaStateManager = new ReplicaStateManager(
- replicaStartStopExecutor,
+ replicaLifecycleExecutor,
placementDriver,
this,
failureProcessor
@@ -788,9 +795,84 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
* @param peersAndLearners New node configuration.
* @param sequenceToken Sequence token.
*/
+ @VisibleForTesting
public void resetPeers(ReplicationGroupId replicaGrpId, PeersAndLearners
peersAndLearners, long sequenceToken) {
RaftNodeId raftNodeId = new RaftNodeId(replicaGrpId, new
Peer(localNodeConsistentId));
- ((Loza) raftManager).resetPeers(raftNodeId, peersAndLearners,
sequenceToken);
+ Loza loza = (Loza) raftManager;
+ Status status = loza.resetPeers(raftNodeId, peersAndLearners,
sequenceToken);
+
+ // Stale configuration change will not be retried.
+ if (!status.isOk() && status.getRaftError() == RaftError.ESTALE) {
+ throw new IgniteException(INTERNAL_ERR, new
RaftStaleUpdateException(status.getErrorMsg()));
+ }
+ }
+
+ /**
+ * Performs a {@code resetPeers} operation on raft node with retries.
+ *
+ * <p>Performs retries as long as the received exception is recoverable,
which is any exception other than
+ * node or component stopping exceptions or raft stale exceptions caused
by stale sequence token.
+ *
+ * <p>This method is safe to retry even without chaining on it as it will
either perform the reset as expected when no other
+ * reconfigurations happened on the cluster or will complete with an
exception because someone has already applied a configuration that
+ * is newer than the one we want to reset to.
+ *
+ * @param replicaGrpId Replication group ID.
+ * @param peersAndLearners New peers and learners.
+ * @param sequenceToken Sequence token.
+ * @see
org.apache.ignite.internal.raft.rebalance.ExceptionUtils#recoverable(Throwable)
+ *
+ */
+ public CompletableFuture<Void> resetWithRetry(ReplicationGroupId
replicaGrpId, PeersAndLearners peersAndLearners, long sequenceToken) {
+ var result = new CompletableFuture<Void>();
+
+ resetWithRetry(replicaGrpId, peersAndLearners, result, sequenceToken,
1);
+
+ return result;
+ }
+
+ private void resetWithRetry(
+ ReplicationGroupId replicaGrpId,
+ PeersAndLearners peersAndLearners,
+ CompletableFuture<Void> result,
+ long sequenceToken,
+ int iteration
+ ) {
+ if (iteration % 1000 == 0) {
+ LOG.info("Retrying reset [iter={}, groupId={},
peersAndLearners={}]", iteration, replicaGrpId, peersAndLearners);
+ }
+ runAsync(() -> inBusyLock(busyLock, () -> {
+ assert isReplicaStarted(replicaGrpId) : "The local node is outside
of the replication group: " + replicaGrpId;
+
+ resetPeers(replicaGrpId, peersAndLearners, sequenceToken);
+ }), replicaLifecycleExecutor)
+ .whenComplete((resetSuccessful, ex) -> {
+ if (ex != null) {
+ if (recoverable(ex)) {
+ LOG.debug("Failed to reset peers. Retrying
[groupId={}]. ", replicaGrpId, ex);
+
+ resetWithRetryThrottling(replicaGrpId,
peersAndLearners, result, sequenceToken, iteration);
+ } else {
+ result.completeExceptionally(ex);
+ }
+ } else {
+ result.complete(null);
+ }
+ });
+ }
+
+ private void resetWithRetryThrottling(
+ ReplicationGroupId replicaGrpId,
+ PeersAndLearners peersAndLearners,
+ CompletableFuture<Void> result,
+ long sequenceToken,
+ int iteration
+ ) {
+ replicaLifecycleExecutor.schedule(
+ () -> resetWithRetry(replicaGrpId, peersAndLearners, result,
sequenceToken, iteration + 1),
+ 500,
+ TimeUnit.MILLISECONDS
+ );
}
private RaftGroupOptions groupOptionsForPartition(boolean
isVolatileStorage, @Nullable SnapshotStorageFactory snapshotFactory) {
@@ -906,7 +988,7 @@ public class ReplicaManager extends
AbstractEventProducer<LocalReplicaEvent, Loc
}
return true;
- }, replicaStartStopExecutor);
+ }, replicaLifecycleExecutor);
}
/** {@inheritDoc} */
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
index aa1c3ec35a3..066e33319a5 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ReplicaStateManager.java
@@ -186,7 +186,7 @@ class ReplicaStateManager {
assert forcedAssignments.force() :
format("Unexpected assignments to force
[assignments={}, groupId={}].", forcedAssignments, groupId);
- replicaManager.resetPeers(groupId,
fromAssignments(forcedAssignments.nodes()), revision);
+ replicaManager.resetWithRetry(groupId,
fromAssignments(forcedAssignments.nodes()), revision);
}
// Telling the caller that the replica is started.
diff --git
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
index 9a83d085ca8..69ae5c362b6 100644
---
a/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
+++
b/modules/replicator/src/test/java/org/apache/ignite/internal/replicator/ReplicaManagerTest.java
@@ -136,7 +136,7 @@ public class ReplicaManagerTest extends
BaseIgniteAbstractTest {
raftManager,
partitionsConfigurer,
volatileLogStorageFactoryCreator,
- ForkJoinPool.commonPool(),
+ Executors.newSingleThreadScheduledExecutor(),
replicaGrpId -> nullCompletedFuture(),
ForkJoinPool.commonPool()
);
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
index e64e9bfedd8..3f8f9bb5c4e 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/app/ThreadPoolsManager.java
@@ -51,7 +51,7 @@ public class ThreadPoolsManager implements IgniteComponent {
* Separate executor for IO operations like partition storage
initialization, partition raft group meta data persisting,
* index storage creation...
*/
- private final ExecutorService tableIoExecutor;
+ private final ScheduledExecutorService tableIoExecutor;
/**
* Executor on which partition operations are executed. Might do storage
reads and writes (so it's expected to execute disk I/O).
@@ -73,7 +73,7 @@ public class ThreadPoolsManager implements IgniteComponent {
public ThreadPoolsManager(String nodeName, MetricManager metricManager) {
int cpus = Runtime.getRuntime().availableProcessors();
- tableIoExecutor = Executors.newFixedThreadPool(
+ tableIoExecutor = Executors.newScheduledThreadPool(
Math.min(cpus * 3, 25),
IgniteThreadFactory.create(nodeName, "tableManager-io", LOG,
STORAGE_READ, STORAGE_WRITE)
);
@@ -130,7 +130,7 @@ public class ThreadPoolsManager implements IgniteComponent {
/**
* Returns executor used to create/destroy storages, start partition Raft
groups, create index storages...
*/
- public ExecutorService tableIoExecutor() {
+ public ScheduledExecutorService tableIoExecutor() {
return tableIoExecutor;
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
index 45e416252a9..d862c0cf09b 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/distributed/ReplicaUnavailableTest.java
@@ -219,7 +219,7 @@ public class ReplicaUnavailableTest extends
IgniteAbstractTest {
raftManager,
RaftGroupOptionsConfigurer.EMPTY,
view -> new LocalLogStorageFactory(),
- ForkJoinPool.commonPool(),
+ Executors.newSingleThreadScheduledExecutor(),
replicaGrpId -> nullCompletedFuture(),
ForkJoinPool.commonPool()
);
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 255117828c6..03e9855cf4d 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -2333,7 +2333,7 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
};
}
- private CompletableFuture<Void> handleChangePendingAssignmentEvent(
+ protected CompletableFuture<Void> handleChangePendingAssignmentEvent(
Entry pendingAssignmentsEntry,
long revision,
boolean isRecovery
@@ -2482,12 +2482,11 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
computedStableAssignments
), ioExecutor);
} else if (pendingAssignmentsAreForced && localAssignmentInPending !=
null) {
- localServicesStartFuture = runAsync(() -> inBusyLock(busyLock, ()
-> {
- assert replicaMgr.isReplicaStarted(replicaGrpId) : "The local
node is outside of the replication group: " + replicaGrpId;
-
- // Sequence token for data partitions is MS revision.
- replicaMgr.resetPeers(replicaGrpId,
fromAssignments(computedStableAssignments.nodes()), revision);
- }), ioExecutor);
+ localServicesStartFuture = replicaMgr.resetWithRetry(
+ replicaGrpId,
+ fromAssignments(computedStableAssignments.nodes()),
+ revision
+ );
} else {
localServicesStartFuture = nullCompletedFuture();
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
index f92f200d26b..534d1b8fad4 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/TableManagerRecoveryTest.java
@@ -22,8 +22,12 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static java.util.concurrent.TimeUnit.SECONDS;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.CatalogTestUtils.createTestCatalogManager;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsQueueKey;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.COLOCATION_FEATURE_FLAG;
+import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignments;
+import static
org.apache.ignite.internal.partitiondistribution.PendingAssignmentsCalculator.pendingAssignmentsCalculator;
+import static
org.apache.ignite.internal.replicator.ReplicatorConstants.DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS;
import static org.apache.ignite.internal.sql.SqlCommon.DEFAULT_SCHEMA_NAME;
import static org.apache.ignite.internal.table.TableTestUtils.createHashIndex;
import static
org.apache.ignite.internal.table.TableTestUtils.createSimpleTable;
@@ -48,6 +52,7 @@ import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
+import static org.mockito.Mockito.RETURNS_DEEP_STUBS;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
@@ -68,6 +73,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.ScheduledExecutorService;
import java.util.function.Supplier;
@@ -75,6 +81,7 @@ import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
+import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import org.apache.ignite.internal.components.LogSyncer;
import org.apache.ignite.internal.components.LongJvmPauseDetector;
import org.apache.ignite.internal.components.SystemPropertiesNodeProperties;
@@ -85,6 +92,7 @@ import
org.apache.ignite.internal.configuration.testframework.ConfigurationExten
import
org.apache.ignite.internal.configuration.testframework.InjectConfiguration;
import org.apache.ignite.internal.distributionzones.DistributionZoneManager;
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
+import
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil;
import org.apache.ignite.internal.failure.FailureManager;
import org.apache.ignite.internal.failure.FailureProcessor;
import org.apache.ignite.internal.failure.NoOpFailureManager;
@@ -96,7 +104,6 @@ import org.apache.ignite.internal.hlc.HybridTimestampTracker;
import org.apache.ignite.internal.hlc.TestClockService;
import org.apache.ignite.internal.lowwatermark.TestLowWatermark;
import org.apache.ignite.internal.manager.ComponentContext;
-import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import
org.apache.ignite.internal.metastorage.impl.MetaStorageRevisionListenerRegistry;
import
org.apache.ignite.internal.metastorage.impl.StandaloneMetaStorageManager;
import
org.apache.ignite.internal.metastorage.server.ReadOperationForCompactionTracker;
@@ -111,17 +118,25 @@ import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import
org.apache.ignite.internal.partition.replicator.ZonePartitionReplicaListener;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.outgoing.OutgoingSnapshotsManager;
+import org.apache.ignite.internal.partitiondistribution.Assignment;
+import org.apache.ignite.internal.partitiondistribution.Assignments;
+import org.apache.ignite.internal.partitiondistribution.AssignmentsQueue;
import
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils;
import org.apache.ignite.internal.placementdriver.PlacementDriver;
import org.apache.ignite.internal.placementdriver.TestPlacementDriver;
import org.apache.ignite.internal.raft.Loza;
import org.apache.ignite.internal.raft.Peer;
import org.apache.ignite.internal.raft.RaftGroupEventsListener;
+import org.apache.ignite.internal.raft.RaftGroupOptionsConfigurer;
import org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupService;
+import
org.apache.ignite.internal.raft.client.TopologyAwareRaftGroupServiceFactory;
import org.apache.ignite.internal.raft.service.RaftGroupService;
+import
org.apache.ignite.internal.raft.storage.impl.VolatileLogStorageFactoryCreator;
import org.apache.ignite.internal.replicator.Replica;
import org.apache.ignite.internal.replicator.ReplicaManager;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
+import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import org.apache.ignite.internal.schema.AlwaysSyncedSchemaSyncService;
import org.apache.ignite.internal.schema.SchemaDescriptor;
@@ -150,6 +165,7 @@ import
org.apache.ignite.internal.tx.impl.RemotelyTriggeredResourceRegistry;
import org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.storage.state.TxStateStorage;
import
org.apache.ignite.internal.tx.storage.state.rocksdb.TxStateRocksDbSharedStorage;
+import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.sql.IgniteSql;
import org.jetbrains.annotations.Nullable;
@@ -170,16 +186,16 @@ import org.mockito.quality.Strictness;
@MockitoSettings(strictness = Strictness.LENIENT)
public class TableManagerRecoveryTest extends IgniteAbstractTest {
private static final String NODE_NAME = "testNode1";
+ private static final String NODE_NAME2 = "testNode2";
private static final String ZONE_NAME = "zone1";
private static final String TABLE_NAME = "testTable";
private static final String INDEX_NAME = "testIndex1";
private static final String INDEXED_COLUMN_NAME = "columnName";
private static final int PARTITIONS = 8;
- private static final InternalClusterNode node = new ClusterNodeImpl(
- UUID.randomUUID(),
- NODE_NAME,
- new NetworkAddress("127.0.0.1", 2245)
- );
+ private static final InternalClusterNode node =
+ new ClusterNodeImpl(UUID.randomUUID(), NODE_NAME, new
NetworkAddress("127.0.0.1", 2245));
+ private static final InternalClusterNode node2 =
+ new ClusterNodeImpl(UUID.randomUUID(), NODE_NAME2, new
NetworkAddress("127.0.0.1", 2246));
private static final long WAIT_TIMEOUT = SECONDS.toMillis(10);
// Configuration
@@ -197,7 +213,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
// Table manager dependencies.
private SchemaManager sm;
private CatalogManager catalogManager;
- private MetaStorageManagerImpl metaStorageManager;
+ private StandaloneMetaStorageManager metaStorageManager;
private TxStateRocksDbSharedStorage sharedTxStateStorage;
private TableManager tableManager;
@InjectExecutorService(threadCount = 4, allowedOperations = {STORAGE_READ,
STORAGE_WRITE})
@@ -212,7 +228,6 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
private IndexMetaStorage indexMetaStorage;
// Table internal components
- @Mock
private ReplicaManager replicaMgr;
@Mock
private LogSyncer logSyncer;
@@ -333,6 +348,60 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
.loadTableListenerToZoneReplica(any(), anyInt(), any(), any(),
any(), eq(true));
}
+ @Test
+ public void testResetPeersRetry() {
+ createSimpleTable(catalogManager, TABLE_NAME);
+
+ int tableId =
catalogManager.activeCatalog(clock.nowLong()).table(DEFAULT_SCHEMA_NAME,
TABLE_NAME).id();
+ TablePartitionId tablePartitionId = new TablePartitionId(tableId, 0);
+
+ int zoneId =
catalogManager.activeCatalog(clock.nowLong()).table(DEFAULT_SCHEMA_NAME,
TABLE_NAME).zoneId();
+ ZonePartitionId zonePartitionId = new ZonePartitionId(zoneId, 0);
+
+ long assignmentsTimestamp =
catalogManager.catalog(catalogManager.latestCatalogVersion()).time();
+
+ AssignmentsQueue assignmentsQueue = pendingAssignmentsCalculator()
+ .stable(Assignments.of(Set.of(Assignment.forPeer(node.name()),
Assignment.forPeer(node2.name())), assignmentsTimestamp))
+
.target(Assignments.forced(Set.of(Assignment.forPeer(node.name())),
assignmentsTimestamp))
+ .toQueue();
+
+ doReturn(true).when(replicaMgr).isReplicaStarted(any());
+ doReturn(completedFuture(mock(Replica.class,
RETURNS_DEEP_STUBS))).when(replicaMgr).replica(any());
+
+ doAnswer(invocation -> {
+ throw new IllegalStateException("Test exception");
+ })
+ .doAnswer(invocation -> {
+ throw new IgniteException(0);
+ })
+ .doAnswer(invocation -> null)
+ .when(replicaMgr).resetPeers(any(), any(), anyLong());
+
+ // This is to wait until handleChangePendingAssignments is finished.
+ CompletableFuture<Void> assignmentsHandled = new CompletableFuture<>();
+ metaStorageManager.setOnRevisionAppliedInterceptor(rev -> {
+ assignmentsHandled.complete(null);
+ });
+
+ if (colocationEnabled()) {
+ CompletableFuture<Void> putReset = metaStorageManager.put(
+
ZoneRebalanceUtil.pendingPartAssignmentsQueueKey(zonePartitionId),
+ assignmentsQueue.toBytes()
+ );
+ assertThat(putReset, willCompleteSuccessfully());
+ } else {
+ CompletableFuture<Void> putReset = metaStorageManager.put(
+ pendingPartAssignmentsQueueKey(tablePartitionId),
+ assignmentsQueue.toBytes()
+ );
+ assertThat(putReset, willCompleteSuccessfully());
+ }
+
+ assertThat(assignmentsHandled, willCompleteSuccessfully());
+
+ verify(replicaMgr, times(3)).resetPeers(any(), any(), anyLong());
+ }
+
/**
* Creates and starts TableManage and dependencies.
*/
@@ -364,20 +433,44 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
when(topologyService.localMember()).thenReturn(node);
when(distributionZoneManager.dataNodes(any(), anyInt(),
anyInt())).thenReturn(completedFuture(Set.of(NODE_NAME)));
+ PlacementDriver placementDriver = new TestPlacementDriver(node);
+ ClockService clockService = new TestClockService(clock);
+ FailureProcessor failureProcessor = mock(FailureProcessor.class);
+
+ replicaMgr = spy(new ReplicaManager(
+ NODE_NAME,
+ clusterService,
+ mock(ClusterManagementGroupManager.class, RETURNS_DEEP_STUBS),
+ clockService,
+ Set.of(),
+ placementDriver,
+ partitionOperationsExecutor,
+ () -> DEFAULT_IDLE_SAFE_TIME_PROPAGATION_PERIOD_MILLISECONDS,
+ failureProcessor,
+ null,
+ mock(TopologyAwareRaftGroupServiceFactory.class),
+ rm,
+ RaftGroupOptionsConfigurer.EMPTY,
+ new VolatileLogStorageFactoryCreator(NODE_NAME,
workDir.resolve("volatile-log-spillout")),
+ Executors.newSingleThreadScheduledExecutor(),
+ replicaGrpId -> nullCompletedFuture(),
+ ForkJoinPool.commonPool()
+ ));
+
doReturn(nullCompletedFuture())
.when(replicaMgr).startReplica(any(RaftGroupEventsListener.class), any(),
anyBoolean(), any(), any(), any(), any(), any());
ZonePartitionReplicaListener zonePartitionReplicaListener =
mock(ZonePartitionReplicaListener.class);
Replica replica = mock(Replica.class);
- when(replicaMgr.startReplica(any(ReplicationGroupId.class), any(),
any(), any(), any(), any(), anyBoolean(), any(), any()))
- .then(invocation -> {
- partitionReplicaLifecycleManager
- .zonePartitionResources(invocation.getArgument(0))
- .replicaListenerFuture()
- .complete(zonePartitionReplicaListener);
- return completedFuture(replica);
- });
+ doAnswer(invocation -> {
+ partitionReplicaLifecycleManager
+ .zonePartitionResources(invocation.getArgument(0))
+ .replicaListenerFuture()
+ .complete(zonePartitionReplicaListener);
+
+ return completedFuture(replica);
+ }).when(replicaMgr).startReplica(any(ReplicationGroupId.class), any(),
any(), any(), any(), any(), anyBoolean(), any(), any());
doReturn(trueCompletedFuture()).when(replicaMgr).stopReplica(any());
doAnswer(invocation -> {
@@ -408,11 +501,8 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
var revisionUpdater = new
MetaStorageRevisionListenerRegistry(metaStorageManager);
- PlacementDriver placementDriver = new TestPlacementDriver(node);
-
lowWatermark = new TestLowWatermark();
lowWatermark.updateWithoutNotify(savedWatermark);
- ClockService clockService = new TestClockService(clock);
indexMetaStorage = new IndexMetaStorage(catalogManager, lowWatermark,
metaStorageManager);
@@ -424,8 +514,6 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
sm = new SchemaManager(revisionUpdater, catalogManager);
- FailureProcessor failureProcessor = mock(FailureProcessor.class);
-
sharedTxStateStorage = new TxStateRocksDbSharedStorage(
node.name(),
workDir.resolve("tx-state"),
@@ -538,6 +626,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
sm,
indexMetaStorage,
sharedTxStateStorage,
+ replicaMgr,
partitionReplicaLifecycleManager,
tableManager
);
@@ -564,6 +653,7 @@ public class TableManagerRecoveryTest extends
IgniteAbstractTest {
new ComponentContext(),
tableManager,
partitionReplicaLifecycleManager,
+ replicaMgr,
sharedTxStateStorage,
dsm,
sm,
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
index 991b2b5e5d7..2a6447fffaa 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/ItTxTestCluster.java
@@ -505,7 +505,7 @@ public class ItTxTestCluster {
raftSrv,
partitionRaftConfigurer,
new VolatileLogStorageFactoryCreator(nodeName,
workDir.resolve("volatile-log-spillout")),
- ForkJoinPool.commonPool(),
+ Executors.newSingleThreadScheduledExecutor(),
replicaGrpId -> nullCompletedFuture(),
ForkJoinPool.commonPool()
);
diff --git a/modules/transactions/build.gradle
b/modules/transactions/build.gradle
index ea2bd4285a7..fae253133f6 100644
--- a/modules/transactions/build.gradle
+++ b/modules/transactions/build.gradle
@@ -95,6 +95,7 @@ dependencies {
integrationTestImplementation testFixtures(project(':ignite-runner'))
integrationTestImplementation testFixtures(project(':ignite-catalog'))
integrationTestImplementation libs.netty.transport
+ integrationTestImplementation libs.awaitility
testFixturesImplementation project(':ignite-configuration')
testFixturesImplementation project(':ignite-core')
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
index dad4fa57c70..a492367a781 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
@@ -23,6 +23,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import java.nio.ByteBuffer;
import java.util.List;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
@@ -95,7 +96,11 @@ class DisasterRecoveryTestUtil {
ByteArray stablePartAssignmentsKey =
stablePartAssignmentsKey(partId);
for (Operation operation : operations) {
- ByteArray opKey = new
ByteArray(toByteArray(operation.key()));
+ ByteBuffer operationKey = operation.key();
+ if (operationKey == null) {
+ continue;
+ }
+ ByteArray opKey = new
ByteArray(toByteArray(operationKey));
if (operation.type() == OperationType.PUT &&
opKey.equals(stablePartAssignmentsKey)) {
boolean equals =
blockedAssignments.equals(Assignments.fromBytes(toByteArray(operation.value())));
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 430246fb27c..2395c0e8353 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -34,6 +34,7 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.pendingPartAssignmentsQueueKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.plannedPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.ZoneRebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
import static
org.apache.ignite.internal.replicator.configuration.ReplicationConfigurationSchema.DEFAULT_IDLE_SAFE_TIME_PROP_DURATION;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.assertThrows;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.runRace;
@@ -42,6 +43,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedIn;
import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.anEmptyMap;
import static org.hamcrest.Matchers.empty;
@@ -60,6 +62,7 @@ import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -565,7 +568,6 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
* <li>We execute "resetPartitions" and expect that data from node 0
will be available after that.</li>
* </ul>
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-23783")
@Test
@ZoneParams(nodes = 6, replicas = 3, partitions = 1)
public void testIncompleteRebalanceAfterResetPartitions() throws Exception
{
@@ -632,6 +634,15 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
assertPendingAssignments(node0, partId, assignmentsPending);
+ // need to wait
+ // Need to verify that other nodes managed to switch to the new
configuration.
+ // Stopping the leader before the group switched to the new
configuration => the other nodes will never progress as they're
+ // on the old configuration. In out case - The first seen one, [1,4,5].
+ // In other words, need to wait:
+ // [StateMachineAdapter] onConfigurationCommitted:
idrrt_tirarp_0,idrrt_tirarp_3,idrrt_tirarp_1.
+ List<String> expectedPeers = List.of(node(0).name(), node(1).name(),
node(3).name());
+ assertConfigurationApplied(node0, partId, expectedPeers);
+
stopNode(1);
waitForScale(node0, 3);
@@ -809,7 +820,6 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
* disaster recovery API, but with manual flag set to false. We expect
that in this replica factor won't be restored.
* In this test, assignments will be (1, 3, 4), according to {@link
RendezvousDistributionFunction}.
*/
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-23783")
@Test
@ZoneParams(nodes = 5, replicas = 3, partitions = 1)
void testAutomaticRebalanceIfMajorityIsLost() throws Exception {
@@ -1004,15 +1014,18 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
node(6).name());
assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
- Assignments allAssignments = Assignments.of(Set.of(
- Assignment.forPeer(node(0).name()),
- Assignment.forLearner(node(1).name()),
- Assignment.forPeer(node(2).name()),
- Assignment.forPeer(node(3).name()),
- Assignment.forPeer(node(4).name()),
- Assignment.forLearner(node(5).name()),
- Assignment.forPeer(node(6).name())
- ), timestamp);
+ CatalogZoneDescriptor zone =
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
+ Collection<String> dataNodes = new HashSet<>();
+ for (int i = 0; i < 7; i++) {
+ dataNodes.add(node(i).name());
+ }
+
+ logger().info("Zone {}", zone);
+
+ Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
+ dataNodes, partId, zone.partitions(), zone.replicas(),
zone.consensusGroupSize());
+
+ Assignments allAssignments = Assignments.of(allAssignmentsSet,
timestamp);
assertStableAssignments(node0, partId, allAssignments);
@@ -1153,15 +1166,18 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
node(6).name());
assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
- Assignments allAssignments = Assignments.of(Set.of(
- Assignment.forLearner(node(0).name()),
- Assignment.forPeer(node(1).name()),
- Assignment.forPeer(node(2).name()),
- Assignment.forPeer(node(3).name()),
- Assignment.forPeer(node(4).name()),
- Assignment.forLearner(node(5).name()),
- Assignment.forPeer(node(6).name())
- ), timestamp);
+ CatalogZoneDescriptor zone =
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
+ Collection<String> dataNodes = new HashSet<>();
+ for (int i = 0; i < 7; i++) {
+ dataNodes.add(node(i).name());
+ }
+
+ logger().info("Zone {}", zone);
+
+ Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
+ dataNodes, partId, zone.partitions(), zone.replicas(),
zone.consensusGroupSize());
+
+ Assignments allAssignments = Assignments.of(allAssignmentsSet,
timestamp);
assertStableAssignments(node0, partId, allAssignments);
// Write data(1) to all seven nodes.
@@ -1452,7 +1468,23 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
}, 10_000));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-24160")
+ private void assertConfigurationApplied(IgniteImpl node0, int partId,
List<String> peers) {
+ await().atMost(10, SECONDS)
+ .until(() -> {
+ RaftGroupConfigurationConverter
raftGroupConfigurationConverter = new RaftGroupConfigurationConverter();
+
+ TableManager tableManager =
node0.distributedTableManager();
+
+ RaftGroupConfiguration raftGroupConfiguration =
raftGroupConfigurationConverter.fromBytes(
+
tableManager.cachedTable(TABLE_NAME).internalTable().storage().getMvPartition(partId)
+ .committedGroupConfiguration()
+ );
+
+ logger().info("Configuration Peers: {}",
raftGroupConfiguration.peers());
+ return peers.containsAll(raftGroupConfiguration.peers());
+ });
+ }
+
@Test
@ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode =
ConsistencyMode.HIGH_AVAILABILITY)
void testAssignmentsChainUpdatedOnAutomaticReset() throws Exception {
@@ -1470,17 +1502,18 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
- Assignments allAssignments = Assignments.of(Set.of(
- Assignment.forPeer(node(0).name()),
- Assignment.forPeer(node(1).name()),
- Assignment.forPeer(node(2).name()),
- Assignment.forPeer(node(3).name()),
- Assignment.forPeer(node(4).name()),
- Assignment.forPeer(node(5).name()),
- Assignment.forPeer(node(6).name())
- ), timestamp);
+ CatalogZoneDescriptor zone =
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
+ Collection<String> dataNodes = new HashSet<>();
+ for (int i = 0; i < 7; i++) {
+ dataNodes.add(node(i).name());
+ }
- assertStableAssignments(node0, partId, allAssignments);
+ logger().info("Zone {}", zone);
+
+ Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
+ dataNodes, partId, zone.partitions(), zone.replicas(),
zone.consensusGroupSize());
+
+ Assignments allAssignments = Assignments.of(allAssignmentsSet,
timestamp);
// Assignments chain is equal to the stable assignments.
assertAssignmentsChain(node0, partId,
AssignmentsChain.of(allAssignments));
@@ -1536,7 +1569,6 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertAssignmentsChain(node0, partId,
AssignmentsChain.of(allAssignments, link2Assignments, link3Assignments));
}
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-25285")
@Test
@ZoneParams(nodes = 7, replicas = 7, partitions = 1, consistencyMode =
ConsistencyMode.HIGH_AVAILABILITY)
void testSecondResetRewritesUnfinishedFirstPhaseReset() throws Exception {
@@ -1556,15 +1588,18 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertRealAssignments(node0, partId, 0, 1, 2, 3, 4, 5, 6);
- Assignments allAssignments = Assignments.of(Set.of(
- Assignment.forPeer(node(0).name()),
- Assignment.forPeer(node(1).name()),
- Assignment.forPeer(node(2).name()),
- Assignment.forPeer(node(3).name()),
- Assignment.forPeer(node(4).name()),
- Assignment.forPeer(node(5).name()),
- Assignment.forPeer(node(6).name())
- ), timestamp);
+ CatalogZoneDescriptor zone =
node0.catalogManager().activeCatalog(node0.clock().nowLong()).zone(zoneName);
+ Collection<String> dataNodes = new HashSet<>();
+ for (int i = 0; i < 7; i++) {
+ dataNodes.add(node(i).name());
+ }
+
+ logger().info("Zone {}", zone);
+
+ Set<Assignment> allAssignmentsSet = calculateAssignmentForPartition(
+ dataNodes, partId, zone.partitions(), zone.replicas(),
zone.consensusGroupSize());
+
+ Assignments allAssignments = Assignments.of(allAssignmentsSet,
timestamp);
assertStableAssignments(node0, partId, allAssignments);