This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 2c80e497793 IGNITE-26265 Fix flaky testTwoPhaseResetEqualLogIndex 
(#6465)
2c80e497793 is described below

commit 2c80e49779343f0a6b334644cb4e27da40e6ece5
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Aug 25 11:57:18 2025 +0400

    IGNITE-26265 Fix flaky testTwoPhaseResetEqualLogIndex (#6465)
---
 .../disaster/DisasterRecoveryTestUtil.java         | 61 ++++++++++++++++++++++
 .../disaster/ItDisasterRecoveryManagerTest.java    | 49 +----------------
 .../ItDisasterRecoveryReconfigurationTest.java     | 50 ++++++++++++++++--
 3 files changed, 110 insertions(+), 50 deletions(-)

diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
index 0d78cb38abf..1ee20b3821f 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
@@ -17,13 +17,19 @@
 
 package org.apache.ignite.internal.disaster;
 
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.stablePartitionAssignmentsKey;
+import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
 import java.util.function.BiPredicate;
 import org.apache.ignite.internal.Cluster;
 import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.lang.ByteArray;
 import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
 import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -34,6 +40,11 @@ import org.apache.ignite.internal.network.NetworkMessage;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
 import org.apache.ignite.internal.raft.WriteCommand;
 import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
 
 /**
@@ -85,4 +96,54 @@ class DisasterRecoveryTestUtil {
 
         return false;
     }
+
+    static void assertValueOnSpecificNodes(
+            String tableName,
+            Set<IgniteImpl> nodes,
+            int id,
+            int val,
+            SchemaDescriptor schema
+    ) throws Exception {
+        for (IgniteImpl node : nodes) {
+            assertValueOnSpecificNode(tableName, node, id, val, schema);
+        }
+    }
+
+    static void assertValueOnSpecificNode(String tableName, IgniteImpl node, 
int id, int val, SchemaDescriptor schema) throws Exception {
+        InternalTable internalTable = 
unwrapTableViewInternal(node.tables().table(tableName)).internalTable();
+
+        Row keyValueRow0 = createKeyValueRow(schema, id, val);
+        Row keyRow0 = createKeyRow(schema, id);
+
+        assertTrue(waitForCondition(() -> {
+            try {
+                CompletableFuture<BinaryRow> getFut = 
internalTable.get(keyRow0, node.clock().now(), node.node());
+
+                return compareRows(getFut.get(), keyValueRow0);
+            } catch (Exception e) {
+                return false;
+            }
+        }, 10_000), "Row comparison failed within the timeout.");
+    }
+
+    static Row createKeyValueRow(SchemaDescriptor schema, int id, int value) {
+        RowAssembler rowBuilder = new RowAssembler(schema, -1);
+
+        rowBuilder.appendInt(id);
+        rowBuilder.appendInt(value);
+
+        return Row.wrapBinaryRow(schema, rowBuilder.build());
+    }
+
+    private static boolean compareRows(BinaryRow row1, BinaryRow row2) {
+        return row1.schemaVersion() == row2.schemaVersion() && 
row1.tupleSlice().equals(row2.tupleSlice());
+    }
+
+    private static Row createKeyRow(SchemaDescriptor schema, int id) {
+        RowAssembler rowBuilder = new RowAssembler(schema.version(), 
schema.keyColumns(), -1);
+
+        rowBuilder.appendInt(id);
+
+        return Row.wrapKeyOnlyBinaryRow(schema, rowBuilder.build());
+    }
 }
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index 148a0ed7c19..7bb1656c639 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -21,7 +21,6 @@ import static java.util.Collections.emptySet;
 import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.blockMessage;
 import static 
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.stableKeySwitchMessage;
@@ -66,13 +65,9 @@ import 
org.apache.ignite.internal.placementdriver.ReplicaMeta;
 import org.apache.ignite.internal.replicator.ReplicationGroupId;
 import org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.schema.BinaryRow;
 import org.apache.ignite.internal.schema.Column;
 import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.schema.row.RowAssembler;
 import org.apache.ignite.internal.sql.SqlCommon;
-import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.TableImpl;
 import 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
 import 
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
@@ -491,29 +486,6 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
                 .orElseThrow(() -> new IllegalStateException("No node found 
that is a primary replica for the specified options."));
     }
 
-    private static void assertValueOnSpecificNodes(String tableName, 
Set<IgniteImpl> nodes, int id, int val) throws Exception {
-        for (IgniteImpl node : nodes) {
-            assertValueOnSpecificNode(tableName, node, id, val);
-        }
-    }
-
-    private static void assertValueOnSpecificNode(String tableName, IgniteImpl 
node, int id, int val) throws Exception {
-        InternalTable internalTable = 
unwrapTableViewInternal(node.tables().table(tableName)).internalTable();
-
-        Row keyValueRow0 = createKeyValueRow(id, val);
-        Row keyRow0 = createKeyRow(id);
-
-        assertTrue(waitForCondition(() -> {
-            try {
-                CompletableFuture<BinaryRow> getFut = 
internalTable.get(keyRow0, node.clock().now(), node.node());
-
-                return compareRows(getFut.get(), keyValueRow0);
-            } catch (Exception e) {
-                return false;
-            }
-        }, 10_000), "Row comparison failed within the timeout.");
-    }
-
     @Test
     @WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG, 
value = "true")
     void testRestartZonePartitions() {
@@ -820,24 +792,7 @@ public class ItDisasterRecoveryManagerTest extends 
ClusterPerTestIntegrationTest
         node(0).sql().executeScript(String.format("ALTER ZONE \"%s\"SET (AUTO 
SCALE UP 0)", defaultZone.name()));
     }
 
-    private static Row createKeyRow(int id) {
-        RowAssembler rowBuilder = new RowAssembler(SCHEMA.version(), 
SCHEMA.keyColumns(), -1);
-
-        rowBuilder.appendInt(id);
-
-        return Row.wrapKeyOnlyBinaryRow(SCHEMA, rowBuilder.build());
-    }
-
-    private static Row createKeyValueRow(int id, int value) {
-        RowAssembler rowBuilder = new RowAssembler(SCHEMA, -1);
-
-        rowBuilder.appendInt(id);
-        rowBuilder.appendInt(value);
-
-        return Row.wrapBinaryRow(SCHEMA, rowBuilder.build());
-    }
-
-    private static boolean compareRows(BinaryRow row1, BinaryRow row2) {
-        return row1.schemaVersion() == row2.schemaVersion() && 
row1.tupleSlice().equals(row2.tupleSlice());
+    private static void assertValueOnSpecificNodes(String tableName, 
Set<IgniteImpl> nodes, int id, int val) throws Exception {
+        DisasterRecoveryTestUtil.assertValueOnSpecificNodes(tableName, nodes, 
id, val, SCHEMA);
     }
 }
diff --git 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 23a29daf917..3f42b74ba71 100644
--- 
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++ 
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -27,6 +27,7 @@ import static 
org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
 import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
 import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
 import static 
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static 
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.assertValueOnSpecificNode;
 import static 
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.blockMessage;
 import static 
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.stableKeySwitchMessage;
 import static 
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.pendingPartitionAssignmentsKey;
@@ -59,6 +60,7 @@ import java.lang.annotation.RetentionPolicy;
 import java.lang.reflect.Method;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.Collection;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
@@ -75,6 +77,7 @@ import java.util.stream.IntStream;
 import org.apache.ignite.Ignite;
 import org.apache.ignite.internal.ClusterConfiguration.Builder;
 import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
 import org.apache.ignite.internal.app.IgniteImpl;
 import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
@@ -105,6 +108,8 @@ import 
org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
 import 
org.apache.ignite.internal.replicator.configuration.ReplicationExtensionConfiguration;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
 import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import 
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
@@ -116,6 +121,7 @@ import 
org.apache.ignite.internal.table.distributed.disaster.LocalTablePartition
 import 
org.apache.ignite.internal.table.distributed.disaster.TestDisasterRecoveryUtils;
 import 
org.apache.ignite.internal.testframework.failure.FailureManagerExtension;
 import 
org.apache.ignite.internal.testframework.failure.MuteFailureManagerLogging;
+import org.apache.ignite.internal.type.NativeTypes;
 import org.apache.ignite.internal.util.ExceptionUtils;
 import org.apache.ignite.lang.ErrorGroups.Replicator;
 import org.apache.ignite.lang.IgniteException;
@@ -166,6 +172,14 @@ public class ItDisasterRecoveryReconfigurationTest extends 
ClusterPerTestIntegra
     /** ID of the table with name {@link #TABLE_NAME} in zone {@link #zoneId}. 
*/
     private int tableId;
 
+    private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
+            1,
+            new Column[]{new Column("id", NativeTypes.INT32, false)},
+            new Column[]{
+                    new Column("val", NativeTypes.INT32, false),
+            }
+    );
+
     @Override
     protected int initialNodes() {
         return INITIAL_NODES;
@@ -1078,7 +1092,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         assertThat(updateFuture, willCompleteSuccessfully());
 
         // It's important to unblock appendEntries requests after 
resetPartitions, otherwise 2 or even all 3 nodes may align by data/index
-        // and thus GroupUpdateRequestHandler#nextAssignment may evaluate 
second or third node as reset first phaze target instead of
+        // and thus GroupUpdateRequestHandler#nextAssignment may evaluate 
second or third node as reset first phase target instead of
         // expected within test leaderName = findLeader(1, partId);
 
         // Unblock raft.
@@ -1125,7 +1139,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
      */
     @Test
     @ZoneParams(nodes = 7, replicas = 7, partitions = 1)
-    void testThoPhaseResetEqualLogIndex() throws Exception {
+    void testTwoPhaseResetEqualLogIndex() throws Exception {
         int partId = 0;
 
         IgniteImpl node0 = igniteImpl(0);
@@ -1186,6 +1200,8 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         errors = insertValues(table, partId, 10);
         assertThat(errors, is(empty()));
 
+        assertInsertedValuesOnSpecificNodes(table.name(), followerNodes, 
partId, 10);
+
         // Disable scale down.
         executeSql(format("ALTER ZONE %s SET (auto scale down %d)", zoneName, 
INFINITE_TIMER_VALUE));
 
@@ -1221,7 +1237,7 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
         assertThat(updateFuture, willCompleteSuccessfully());
 
         // It's important to unblock appendEntries requests after 
resetPartitions, otherwise 2 or even all 3 nodes may align by data/index
-        // and thus GroupUpdateRequestHandler#nextAssignment may evaluate 
second or third node as reset first phaze target instead of
+        // and thus GroupUpdateRequestHandler#nextAssignment may evaluate 
second or third node as reset first phase target instead of
         // expected within test leaderName = findLeader(1, partId);
 
         // Unblock raft.
@@ -2160,4 +2176,32 @@ public class ItDisasterRecoveryReconfigurationTest 
extends ClusterPerTestIntegra
 
         ConsistencyMode consistencyMode() default 
ConsistencyMode.STRONG_CONSISTENCY;
     }
+
+    private void assertInsertedValuesOnSpecificNodes(
+            String tableName,
+            Collection<String> nodesNames,
+            int partitionId,
+            int offset
+    ) throws Exception {
+        Set<IgniteImpl> nodes = cluster.runningNodes()
+                .map(TestWrappers::unwrapIgniteImpl)
+                .filter(node -> nodesNames.contains(node.name()))
+                .collect(Collectors.toSet());;
+
+        for (IgniteImpl node : nodes) {
+            Table table = node.tables().table(tableName);
+
+            for (int i = 0, created = 0; created < ENTRIES; i++) {
+                Tuple key = Tuple.create(of("id", i));
+                if ((unwrapTableImpl(table)).partitionId(key) != partitionId) {
+                    continue;
+                }
+
+                //noinspection AssignmentToForLoopParameter
+                created++;
+
+                assertValueOnSpecificNode(tableName, node, i, i + offset, 
SCHEMA);
+            }
+        }
+    }
 }

Reply via email to