This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ffb8449b3b9 [IGNITE-24413] More HA tests (#5193)
ffb8449b3b9 is described below
commit ffb8449b3b9d37cea9638689e857d5c6c2112ff0
Author: Cyrill <[email protected]>
AuthorDate: Wed Feb 12 12:12:13 2025 +0300
[IGNITE-24413] More HA tests (#5193)
---
.../rebalance/ItRebalanceDistributedTest.java | 4 +-
.../ignite/internal/rebalance/ItRebalanceTest.java | 4 +-
.../distributionzones/rebalance/RebalanceUtil.java | 2 +-
.../internal/index/ItIndexAndRebalanceTest.java | 4 +-
...bstractHighAvailablePartitionsRecoveryTest.java | 94 +++++++++++++++++++++-
.../ItHighAvailablePartitionsRecoveryTest.java | 62 ++++++++++++++
.../ItDisasterRecoveryReconfigurationTest.java | 4 +-
7 files changed, 164 insertions(+), 10 deletions(-)
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index c7fa4fdd2fd..8995391ae1f 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -32,9 +32,9 @@ import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -1056,7 +1056,7 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
private static Set<Assignment> getPartitionClusterNodes(Node node, String
tableName, int partNum) {
return Optional.ofNullable(getTableId(node, tableName))
- .map(tableId -> partitionAssignments(node.metaStorageManager,
tableId, partNum).join())
+ .map(tableId ->
stablePartitionAssignments(node.metaStorageManager, tableId, partNum).join())
.orElse(Set.of());
}
diff --git
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 1d312c3795b..58958f6c87e 100644
---
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.rebalance;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -168,7 +168,7 @@ public class ItRebalanceTest extends
ClusterPerTestIntegrationTest {
assertTrue(waitForCondition(() -> {
Set<String> assignments =
-
await(partitionAssignments(unwrapIgniteImpl(cluster.aliveNode()).metaStorageManager(),
table, 0))
+
await(stablePartitionAssignments(unwrapIgniteImpl(cluster.aliveNode()).metaStorageManager(),
table, 0))
.stream()
.map(Assignment::consistentId)
.collect(Collectors.toSet());
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 1f1ec672ac9..129c58ce5a0 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -611,7 +611,7 @@ public class RebalanceUtil {
* @param partitionId Partition ID.
* @return Future with partition assignments as a value.
*/
- public static CompletableFuture<Set<Assignment>> partitionAssignments(
+ public static CompletableFuture<Set<Assignment>>
stablePartitionAssignments(
MetaStorageManager metaStorageManager,
int tableId,
int partitionId
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java
index 9e7f51f9a1e..08fc1c73057 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.index;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
@@ -102,7 +102,7 @@ public class ItIndexAndRebalanceTest extends
BaseSqlIntegrationTest {
Set<Assignment>[] actualAssignmentsHolder = new Set[]{Set.of()};
assertTrue(waitForCondition(() -> {
- CompletableFuture<Set<Assignment>> partitionAssignmentsFuture =
partitionAssignments(
+ CompletableFuture<Set<Assignment>> partitionAssignmentsFuture =
stablePartitionAssignments(
node.metaStorageManager(),
tableId,
partitionId
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
index 970edf0d7e6..753da2324e2 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
@@ -18,24 +18,30 @@
package org.apache.ignite.internal.table.distributed.disaster;
import static java.util.Collections.emptySet;
+import static java.util.Map.of;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
-import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
import static org.apache.ignite.internal.table.TableTestUtils.getTableId;
import static
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.RECOVERY_TRIGGER_KEY;
import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
@@ -44,6 +50,7 @@ import java.util.Optional;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
@@ -58,6 +65,7 @@ import
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfig
import org.apache.ignite.internal.distributionzones.Node;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.lang.IgniteStringFormatter;
import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -65,8 +73,22 @@ import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.schema.row.Row;
import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.lang.ErrorGroups.Replicator;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.TransactionException;
/** Parent for tests of HA zones feature. */
public abstract class AbstractHighAvailablePartitionsRecoveryTest extends
ClusterPerTestIntegrationTest {
@@ -82,6 +104,8 @@ public abstract class
AbstractHighAvailablePartitionsRecoveryTest extends Cluste
private static final int PARTITIONS_NUMBER = 2;
+ private static final int ENTRIES = 2;
+
static Set<Integer> PARTITION_IDS = IntStream
.range(0, PARTITIONS_NUMBER)
.boxed()
@@ -233,7 +257,7 @@ public abstract class
AbstractHighAvailablePartitionsRecoveryTest extends Cluste
private Set<Assignment> getPartitionClusterNodes(IgniteImpl node, String
tableName, int partNum) {
return Optional.ofNullable(getTableId(node.catalogManager(),
tableName, clock.nowLong()))
- .map(tableId ->
partitionAssignments(node.metaStorageManager(), tableId, partNum).join())
+ .map(tableId ->
stablePartitionAssignments(node.metaStorageManager(), tableId, partNum).join())
.orElse(Set.of());
}
@@ -416,4 +440,70 @@ public abstract class
AbstractHighAvailablePartitionsRecoveryTest extends Cluste
.map(i -> node(i).name())
.collect(Collectors.toUnmodifiableSet());
}
+
+ static List<Throwable> insertValues(Table table, int offset) {
+ KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+ List<Throwable> errors = new ArrayList<>();
+
+ for (int i = 0; i < ENTRIES; i++) {
+ Tuple key = Tuple.create(of("id", i));
+
+ CompletableFuture<Void> insertFuture = keyValueView.putAsync(null,
key, Tuple.create(of("val", i + offset)));
+
+ try {
+ assertThat(insertFuture, willCompleteSuccessfully());
+
+ Tuple value = keyValueView.get(null, key);
+
+ assertNotNull(value);
+ } catch (Throwable e) {
+ Throwable cause = unwrapCause(e);
+
+ if (cause instanceof IgniteException &&
isPrimaryReplicaHasChangedException((IgniteException) cause)
+ || cause instanceof TransactionException
+ || cause instanceof TimeoutException
+ ) {
+ errors.add(cause);
+ } else {
+ fail("Unexpected exception", e);
+ }
+ }
+ }
+
+ return errors;
+ }
+
+ void assertValuesPresentOnNodes(HybridTimestamp ts, Table table,
Integer... indexes) {
+ for (Integer index : indexes) {
+ assertValuesPresentOnNode(table, ts, index);
+ }
+ }
+
+ private void assertValuesPresentOnNode(Table table, HybridTimestamp ts,
int targetNodeIndex) {
+ IgniteImpl targetNode = unwrapIgniteImpl(node(targetNodeIndex));
+
+ TableImpl tableImpl = unwrapTableImpl(table);
+ InternalTable internalTable = tableImpl.internalTable();
+
+ for (int i = 0; i < ENTRIES; i++) {
+ CompletableFuture<BinaryRow> fut =
+ internalTable.get(marshalKey(tableImpl,
Tuple.create(of("id", i))), ts, targetNode.node());
+ assertThat(fut, willCompleteSuccessfully());
+
+ assertNotNull(fut.join());
+ }
+ }
+
+ private static Row marshalKey(TableViewInternal table, Tuple key) {
+ SchemaRegistry schemaReg = table.schemaView();
+
+ var marshaller = new TupleMarshallerImpl(schemaReg.lastKnownSchema());
+
+ return marshaller.marshal(key, null);
+ }
+
+ private static boolean isPrimaryReplicaHasChangedException(IgniteException
cause) {
+ return ExceptionUtils.extractCodeFrom(cause) ==
Replicator.REPLICA_MISS_ERR;
+ }
}
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
index f6671972686..5b2424b24a5 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.disaster;
+import static java.lang.String.format;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
@@ -26,6 +27,8 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
import static
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -43,6 +46,7 @@ import
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfig
import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.table.Table;
import org.junit.jupiter.api.Test;
/** Test for the HA zones recovery. */
@@ -297,6 +301,64 @@ public class ItHighAvailablePartitionsRecoveryTest extends
AbstractHighAvailable
waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME,
PARTITION_IDS, allNodes);
}
+ @Test
+ void testRebalanceInHaZone() throws InterruptedException {
+ createHaZoneWithTable();
+
+ startNode(3);
+
+ IgniteImpl node = igniteImpl(0);
+ Table table = node.tables().table(HA_TABLE_NAME);
+
+ List<Throwable> errors = insertValues(table, 0);
+ assertThat(errors, is(empty()));
+
+ Set<String> fourNodes =
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+ executeSql(format("ALTER ZONE %s SET REPLICAS=%d", HA_ZONE_NAME, 4));
+
+ waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME,
PARTITION_IDS, fourNodes);
+
+ assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2, 3);
+
+ stopNode(3);
+
+ Set<String> threeNodes =
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+ executeSql(format("ALTER ZONE %s SET
data_nodes_auto_adjust_scale_down=%d", HA_ZONE_NAME, 1));
+
+ waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME,
PARTITION_IDS, threeNodes);
+
+ assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2);
+ }
+
+ @Test
+ void testRestartNodesOneByOne() throws InterruptedException {
+ startNode(3);
+ startNode(4);
+
+ createHaZoneWithTable();
+
+ IgniteImpl node = igniteImpl(0);
+
+ Table table = node.tables().table(HA_TABLE_NAME);
+
+ List<Throwable> errors = insertValues(table, 0);
+ assertThat(errors, is(empty()));
+
+ for (int i = 0; i < 5; i++) {
+ restartNode(i);
+ }
+
+ Set<String> allNodes =
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+ node = igniteImpl(0);
+
+ waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME,
PARTITION_IDS, allNodes);
+
+ assertValuesPresentOnNodes(node.clock().now(),
node.tables().table(HA_TABLE_NAME), 0, 1, 2, 3, 4);
+ }
+
@Test
void testScaleDownTimerIsWorkingForHaZone() throws InterruptedException {
IgniteImpl node = igniteImpl(0);
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 7d2540c3f60..9533647be90 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -109,6 +109,8 @@ import
org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
import
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum;
import
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteException;
import org.apache.ignite.raft.jraft.RaftGroupService;
import org.apache.ignite.raft.jraft.Status;
@@ -1929,7 +1931,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
}
private static boolean isPrimaryReplicaHasChangedException(IgniteException
cause) {
- return cause.getMessage() != null && cause.getMessage().contains("The
primary replica has changed");
+ return ExceptionUtils.extractCodeFrom(cause) ==
Replicator.REPLICA_MISS_ERR;
}
private void startNodesInParallel(int... nodeIndexes) {