This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ffb8449b3b9 [IGNITE-24413] More HA tests (#5193)
ffb8449b3b9 is described below

commit ffb8449b3b9d37cea9638689e857d5c6c2112ff0
Author: Cyrill <[email protected]>
AuthorDate: Wed Feb 12 12:12:13 2025 +0300

    [IGNITE-24413] More HA tests (#5193)
---
 .../rebalance/ItRebalanceDistributedTest.java      |  4 +-
 .../ignite/internal/rebalance/ItRebalanceTest.java |  4 +-
 .../distributionzones/rebalance/RebalanceUtil.java |  2 +-
 .../internal/index/ItIndexAndRebalanceTest.java    |  4 +-
 ...bstractHighAvailablePartitionsRecoveryTest.java | 94 +++++++++++++++++++++-
 .../ItHighAvailablePartitionsRecoveryTest.java     | 62 ++++++++++++++
 .../ItDisasterRecoveryReconfigurationTest.java     |  4 +-
 7 files changed, 164 insertions(+), 10 deletions(-)

diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index c7fa4fdd2fd..8995391ae1f 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -32,9 +32,9 @@ import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.REBALANCE_SCHEDULER_POOL_SIZE;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.STABLE_ASSIGNMENTS_PREFIX_BYTES;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.extractTablePartitionId;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
 import static 
org.apache.ignite.internal.partitiondistribution.PartitionDistributionUtils.calculateAssignmentForPartition;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
@@ -1056,7 +1056,7 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
 
     private static Set<Assignment> getPartitionClusterNodes(Node node, String 
tableName, int partNum) {
         return Optional.ofNullable(getTableId(node, tableName))
-                .map(tableId -> partitionAssignments(node.metaStorageManager, 
tableId, partNum).join())
+                .map(tableId -> 
stablePartitionAssignments(node.metaStorageManager, tableId, partNum).join())
                 .orElse(Set.of());
     }
 
diff --git 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
index 1d312c3795b..58958f6c87e 100644
--- 
a/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
+++ 
b/modules/distribution-zones/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceTest.java
@@ -20,7 +20,7 @@ package org.apache.ignite.internal.rebalance;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
 import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.executeUpdate;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.testNodeName;
@@ -168,7 +168,7 @@ public class ItRebalanceTest extends 
ClusterPerTestIntegrationTest {
 
         assertTrue(waitForCondition(() -> {
             Set<String> assignments =
-                    
await(partitionAssignments(unwrapIgniteImpl(cluster.aliveNode()).metaStorageManager(),
 table, 0))
+                    
await(stablePartitionAssignments(unwrapIgniteImpl(cluster.aliveNode()).metaStorageManager(),
 table, 0))
                             .stream()
                             .map(Assignment::consistentId)
                             .collect(Collectors.toSet());
diff --git 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
index 1f1ec672ac9..129c58ce5a0 100644
--- 
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
+++ 
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/rebalance/RebalanceUtil.java
@@ -611,7 +611,7 @@ public class RebalanceUtil {
      * @param partitionId Partition ID.
      * @return Future with partition assignments as a value.
      */
-    public static CompletableFuture<Set<Assignment>> partitionAssignments(
+    public static CompletableFuture<Set<Assignment>> 
stablePartitionAssignments(
             MetaStorageManager metaStorageManager,
             int tableId,
             int partitionId
diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java
index 9e7f51f9a1e..08fc1c73057 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItIndexAndRebalanceTest.java
@@ -19,7 +19,7 @@ package org.apache.ignite.internal.index;
 
 import static java.util.stream.Collectors.toList;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
 import static org.apache.ignite.internal.table.TableTestUtils.getTableIdStrict;
@@ -102,7 +102,7 @@ public class ItIndexAndRebalanceTest extends 
BaseSqlIntegrationTest {
         Set<Assignment>[] actualAssignmentsHolder = new Set[]{Set.of()};
 
         assertTrue(waitForCondition(() -> {
-            CompletableFuture<Set<Assignment>> partitionAssignmentsFuture = 
partitionAssignments(
+            CompletableFuture<Set<Assignment>> partitionAssignmentsFuture = 
stablePartitionAssignments(
                     node.metaStorageManager(),
                     tableId,
                     partitionId
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
index 970edf0d7e6..753da2324e2 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/AbstractHighAvailablePartitionsRecoveryTest.java
@@ -18,24 +18,30 @@
 package org.apache.ignite.internal.table.distributed.disaster;
 
 import static java.util.Collections.emptySet;
+import static java.util.Map.of;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.DEFAULT_FILTER;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesUtil.PARTITION_DISTRIBUTION_RESET_TIMEOUT;
-import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.partitionAssignments;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.pendingPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.plannedPartAssignmentsKey;
 import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartAssignmentsKey;
+import static 
org.apache.ignite.internal.distributionzones.rebalance.RebalanceUtil.stablePartitionAssignments;
 import static org.apache.ignite.internal.table.TableTestUtils.getTableId;
 import static 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager.RECOVERY_TRIGGER_KEY;
 import static org.apache.ignite.internal.testframework.IgniteTestUtils.await;
 import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
+import static org.apache.ignite.internal.util.ExceptionUtils.unwrapCause;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.fail;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
@@ -44,6 +50,7 @@ import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.concurrent.atomic.AtomicLong;
 import java.util.concurrent.atomic.AtomicReference;
@@ -58,6 +65,7 @@ import 
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfig
 import org.apache.ignite.internal.distributionzones.Node;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridClockImpl;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.lang.IgniteStringFormatter;
 import org.apache.ignite.internal.lang.NodeStoppingException;
@@ -65,8 +73,22 @@ import org.apache.ignite.internal.metastorage.Entry;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.replicator.TablePartitionId;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaRegistry;
+import org.apache.ignite.internal.schema.marshaller.TupleMarshallerImpl;
+import org.apache.ignite.internal.schema.row.Row;
 import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.table.InternalTable;
+import org.apache.ignite.internal.table.TableImpl;
+import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.internal.versioned.VersionedSerialization;
+import org.apache.ignite.lang.ErrorGroups.Replicator;
+import org.apache.ignite.lang.IgniteException;
+import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
+import org.apache.ignite.table.Tuple;
+import org.apache.ignite.tx.TransactionException;
 
 /** Parent for tests of HA zones feature. */
 public abstract class AbstractHighAvailablePartitionsRecoveryTest extends 
ClusterPerTestIntegrationTest {
@@ -82,6 +104,8 @@ public abstract class 
AbstractHighAvailablePartitionsRecoveryTest extends Cluste
 
     private static final int PARTITIONS_NUMBER = 2;
 
+    private static final int ENTRIES = 2;
+
     static Set<Integer> PARTITION_IDS = IntStream
             .range(0, PARTITIONS_NUMBER)
             .boxed()
@@ -233,7 +257,7 @@ public abstract class 
AbstractHighAvailablePartitionsRecoveryTest extends Cluste
 
     private Set<Assignment> getPartitionClusterNodes(IgniteImpl node, String 
tableName, int partNum) {
         return Optional.ofNullable(getTableId(node.catalogManager(), 
tableName, clock.nowLong()))
-                .map(tableId -> 
partitionAssignments(node.metaStorageManager(), tableId, partNum).join())
+                .map(tableId -> 
stablePartitionAssignments(node.metaStorageManager(), tableId, partNum).join())
                 .orElse(Set.of());
     }
 
@@ -416,4 +440,70 @@ public abstract class 
AbstractHighAvailablePartitionsRecoveryTest extends Cluste
                 .map(i -> node(i).name())
                 .collect(Collectors.toUnmodifiableSet());
     }
+
+    static List<Throwable> insertValues(Table table, int offset) {
+        KeyValueView<Tuple, Tuple> keyValueView = table.keyValueView();
+
+        List<Throwable> errors = new ArrayList<>();
+
+        for (int i = 0; i < ENTRIES; i++) {
+            Tuple key = Tuple.create(of("id", i));
+
+            CompletableFuture<Void> insertFuture = keyValueView.putAsync(null, 
key, Tuple.create(of("val", i + offset)));
+
+            try {
+                assertThat(insertFuture, willCompleteSuccessfully());
+
+                Tuple value = keyValueView.get(null, key);
+
+                assertNotNull(value);
+            } catch (Throwable e) {
+                Throwable cause = unwrapCause(e);
+
+                if (cause instanceof IgniteException && 
isPrimaryReplicaHasChangedException((IgniteException) cause)
+                        || cause instanceof TransactionException
+                        || cause instanceof TimeoutException
+                ) {
+                    errors.add(cause);
+                } else {
+                    fail("Unexpected exception", e);
+                }
+            }
+        }
+
+        return errors;
+    }
+
+    void assertValuesPresentOnNodes(HybridTimestamp ts, Table table, 
Integer... indexes) {
+        for (Integer index : indexes) {
+            assertValuesPresentOnNode(table, ts, index);
+        }
+    }
+
+    private void assertValuesPresentOnNode(Table table, HybridTimestamp ts, 
int targetNodeIndex) {
+        IgniteImpl targetNode = unwrapIgniteImpl(node(targetNodeIndex));
+
+        TableImpl tableImpl = unwrapTableImpl(table);
+        InternalTable internalTable = tableImpl.internalTable();
+
+        for (int i = 0; i < ENTRIES; i++) {
+            CompletableFuture<BinaryRow> fut =
+                    internalTable.get(marshalKey(tableImpl, 
Tuple.create(of("id", i))), ts, targetNode.node());
+            assertThat(fut, willCompleteSuccessfully());
+
+            assertNotNull(fut.join());
+        }
+    }
+
+    private static Row marshalKey(TableViewInternal table, Tuple key) {
+        SchemaRegistry schemaReg = table.schemaView();
+
+        var marshaller = new TupleMarshallerImpl(schemaReg.lastKnownSchema());
+
+        return marshaller.marshal(key, null);
+    }
+
+    private static boolean isPrimaryReplicaHasChangedException(IgniteException 
cause) {
+        return ExceptionUtils.extractCodeFrom(cause) == 
Replicator.REPLICA_MISS_ERR;
+    }
 }
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
index f6671972686..5b2424b24a5 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/table/distributed/disaster/ItHighAvailablePartitionsRecoveryTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.disaster;
 
+import static java.lang.String.format;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.IMMEDIATE_TIMER_VALUE;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
@@ -26,6 +27,8 @@ import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCo
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureExceptionMatcher.willThrowWithCauseOrSuppressed;
 import static 
org.apache.ignite.internal.util.ByteUtils.bytesToLongKeepingOrder;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.is;
 import static org.junit.jupiter.api.Assertions.assertNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
@@ -43,6 +46,7 @@ import 
org.apache.ignite.internal.configuration.SystemDistributedExtensionConfig
 import org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil;
 import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
 import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
+import org.apache.ignite.table.Table;
 import org.junit.jupiter.api.Test;
 
 /** Test for the HA zones recovery. */
@@ -297,6 +301,64 @@ public class ItHighAvailablePartitionsRecoveryTest extends 
AbstractHighAvailable
         waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, 
PARTITION_IDS, allNodes);
     }
 
+    @Test
+    void testRebalanceInHaZone() throws InterruptedException {
+        createHaZoneWithTable();
+
+        startNode(3);
+
+        IgniteImpl node = igniteImpl(0);
+        Table table = node.tables().table(HA_TABLE_NAME);
+
+        List<Throwable> errors = insertValues(table, 0);
+        assertThat(errors, is(empty()));
+
+        Set<String> fourNodes = 
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+        executeSql(format("ALTER ZONE %s SET REPLICAS=%d", HA_ZONE_NAME, 4));
+
+        waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, 
PARTITION_IDS, fourNodes);
+
+        assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2, 3);
+
+        stopNode(3);
+
+        Set<String> threeNodes = 
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+        executeSql(format("ALTER ZONE %s SET 
data_nodes_auto_adjust_scale_down=%d", HA_ZONE_NAME, 1));
+
+        waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, 
PARTITION_IDS, threeNodes);
+
+        assertValuesPresentOnNodes(node.clock().now(), table, 0, 1, 2);
+    }
+
+    @Test
+    void testRestartNodesOneByOne() throws InterruptedException {
+        startNode(3);
+        startNode(4);
+
+        createHaZoneWithTable();
+
+        IgniteImpl node = igniteImpl(0);
+
+        Table table = node.tables().table(HA_TABLE_NAME);
+
+        List<Throwable> errors = insertValues(table, 0);
+        assertThat(errors, is(empty()));
+
+        for (int i = 0; i < 5; i++) {
+            restartNode(i);
+        }
+
+        Set<String> allNodes = 
runningNodes().map(Ignite::name).collect(Collectors.toUnmodifiableSet());
+
+        node = igniteImpl(0);
+
+        waitAndAssertStableAssignmentsOfPartitionEqualTo(node, HA_TABLE_NAME, 
PARTITION_IDS, allNodes);
+
+        assertValuesPresentOnNodes(node.clock().now(), 
node.tables().table(HA_TABLE_NAME), 0, 1, 2, 3, 4);
+    }
+
     @Test
     void testScaleDownTimerIsWorkingForHaZone() throws InterruptedException {
         IgniteImpl node = igniteImpl(0);
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 7d2540c3f60..9533647be90 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -109,6 +109,8 @@ import 
org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
 import 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionStateEnum;
 import 
org.apache.ignite.internal.table.distributed.disaster.LocalPartitionStateByNode;
+import org.apache.ignite.internal.util.ExceptionUtils;
+import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.IgniteException;
 import org.apache.ignite.raft.jraft.RaftGroupService;
 import org.apache.ignite.raft.jraft.Status;
@@ -1929,7 +1931,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
     }
 
     private static boolean isPrimaryReplicaHasChangedException(IgniteException 
cause) {
-        return cause.getMessage() != null && cause.getMessage().contains("The 
primary replica has changed");
+        return ExceptionUtils.extractCodeFrom(cause) == 
Replicator.REPLICA_MISS_ERR;
     }
 
     private void startNodesInParallel(int... nodeIndexes) {

Reply via email to