This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 2c80e497793 IGNITE-26265 Fix flaky testTwoPhaseResetEqualLogIndex
(#6465)
2c80e497793 is described below
commit 2c80e49779343f0a6b334644cb4e27da40e6ece5
Author: Mirza Aliev <[email protected]>
AuthorDate: Mon Aug 25 11:57:18 2025 +0400
IGNITE-26265 Fix flaky testTwoPhaseResetEqualLogIndex (#6465)
---
.../disaster/DisasterRecoveryTestUtil.java | 61 ++++++++++++++++++++++
.../disaster/ItDisasterRecoveryManagerTest.java | 49 +----------------
.../ItDisasterRecoveryReconfigurationTest.java | 50 ++++++++++++++++--
3 files changed, 110 insertions(+), 50 deletions(-)
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
index 0d78cb38abf..1ee20b3821f 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/DisasterRecoveryTestUtil.java
@@ -17,13 +17,19 @@
package org.apache.ignite.internal.disaster;
+import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.stablePartitionAssignmentsKey;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static org.apache.ignite.internal.util.ByteUtils.toByteArray;
+import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.List;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
import java.util.function.BiPredicate;
import org.apache.ignite.internal.Cluster;
import org.apache.ignite.internal.TestWrappers;
+import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.lang.ByteArray;
import org.apache.ignite.internal.metastorage.command.MultiInvokeCommand;
import org.apache.ignite.internal.metastorage.dsl.Operation;
@@ -34,6 +40,11 @@ import org.apache.ignite.internal.network.NetworkMessage;
import org.apache.ignite.internal.partitiondistribution.Assignments;
import org.apache.ignite.internal.raft.WriteCommand;
import org.apache.ignite.internal.replicator.PartitionGroupId;
+import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
+import org.apache.ignite.internal.schema.row.Row;
+import org.apache.ignite.internal.schema.row.RowAssembler;
+import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
/**
@@ -85,4 +96,54 @@ class DisasterRecoveryTestUtil {
return false;
}
+
+ static void assertValueOnSpecificNodes(
+ String tableName,
+ Set<IgniteImpl> nodes,
+ int id,
+ int val,
+ SchemaDescriptor schema
+ ) throws Exception {
+ for (IgniteImpl node : nodes) {
+ assertValueOnSpecificNode(tableName, node, id, val, schema);
+ }
+ }
+
+ static void assertValueOnSpecificNode(String tableName, IgniteImpl node,
int id, int val, SchemaDescriptor schema) throws Exception {
+ InternalTable internalTable =
unwrapTableViewInternal(node.tables().table(tableName)).internalTable();
+
+ Row keyValueRow0 = createKeyValueRow(schema, id, val);
+ Row keyRow0 = createKeyRow(schema, id);
+
+ assertTrue(waitForCondition(() -> {
+ try {
+ CompletableFuture<BinaryRow> getFut =
internalTable.get(keyRow0, node.clock().now(), node.node());
+
+ return compareRows(getFut.get(), keyValueRow0);
+ } catch (Exception e) {
+ return false;
+ }
+ }, 10_000), "Row comparison failed within the timeout.");
+ }
+
+ static Row createKeyValueRow(SchemaDescriptor schema, int id, int value) {
+ RowAssembler rowBuilder = new RowAssembler(schema, -1);
+
+ rowBuilder.appendInt(id);
+ rowBuilder.appendInt(value);
+
+ return Row.wrapBinaryRow(schema, rowBuilder.build());
+ }
+
+ private static boolean compareRows(BinaryRow row1, BinaryRow row2) {
+ return row1.schemaVersion() == row2.schemaVersion() &&
row1.tupleSlice().equals(row2.tupleSlice());
+ }
+
+ private static Row createKeyRow(SchemaDescriptor schema, int id) {
+ RowAssembler rowBuilder = new RowAssembler(schema.version(),
schema.keyColumns(), -1);
+
+ rowBuilder.appendInt(id);
+
+ return Row.wrapKeyOnlyBinaryRow(schema, rowBuilder.build());
+ }
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
index 148a0ed7c19..7bb1656c639 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryManagerTest.java
@@ -21,7 +21,6 @@ import static java.util.Collections.emptySet;
import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static org.apache.ignite.internal.TestWrappers.unwrapIgniteImpl;
-import static org.apache.ignite.internal.TestWrappers.unwrapTableViewInternal;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.blockMessage;
import static
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.stableKeySwitchMessage;
@@ -66,13 +65,9 @@ import
org.apache.ignite.internal.placementdriver.ReplicaMeta;
import org.apache.ignite.internal.replicator.ReplicationGroupId;
import org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
-import org.apache.ignite.internal.schema.BinaryRow;
import org.apache.ignite.internal.schema.Column;
import org.apache.ignite.internal.schema.SchemaDescriptor;
-import org.apache.ignite.internal.schema.row.Row;
-import org.apache.ignite.internal.schema.row.RowAssembler;
import org.apache.ignite.internal.sql.SqlCommon;
-import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.TableImpl;
import
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
import
org.apache.ignite.internal.table.distributed.disaster.GlobalPartitionState;
@@ -491,29 +486,6 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
.orElseThrow(() -> new IllegalStateException("No node found
that is a primary replica for the specified options."));
}
- private static void assertValueOnSpecificNodes(String tableName,
Set<IgniteImpl> nodes, int id, int val) throws Exception {
- for (IgniteImpl node : nodes) {
- assertValueOnSpecificNode(tableName, node, id, val);
- }
- }
-
- private static void assertValueOnSpecificNode(String tableName, IgniteImpl
node, int id, int val) throws Exception {
- InternalTable internalTable =
unwrapTableViewInternal(node.tables().table(tableName)).internalTable();
-
- Row keyValueRow0 = createKeyValueRow(id, val);
- Row keyRow0 = createKeyRow(id);
-
- assertTrue(waitForCondition(() -> {
- try {
- CompletableFuture<BinaryRow> getFut =
internalTable.get(keyRow0, node.clock().now(), node.node());
-
- return compareRows(getFut.get(), keyValueRow0);
- } catch (Exception e) {
- return false;
- }
- }, 10_000), "Row comparison failed within the timeout.");
- }
-
@Test
@WithSystemProperty(key = IgniteSystemProperties.COLOCATION_FEATURE_FLAG,
value = "true")
void testRestartZonePartitions() {
@@ -820,24 +792,7 @@ public class ItDisasterRecoveryManagerTest extends
ClusterPerTestIntegrationTest
node(0).sql().executeScript(String.format("ALTER ZONE \"%s\"SET (AUTO
SCALE UP 0)", defaultZone.name()));
}
- private static Row createKeyRow(int id) {
- RowAssembler rowBuilder = new RowAssembler(SCHEMA.version(),
SCHEMA.keyColumns(), -1);
-
- rowBuilder.appendInt(id);
-
- return Row.wrapKeyOnlyBinaryRow(SCHEMA, rowBuilder.build());
- }
-
- private static Row createKeyValueRow(int id, int value) {
- RowAssembler rowBuilder = new RowAssembler(SCHEMA, -1);
-
- rowBuilder.appendInt(id);
- rowBuilder.appendInt(value);
-
- return Row.wrapBinaryRow(SCHEMA, rowBuilder.build());
- }
-
- private static boolean compareRows(BinaryRow row1, BinaryRow row2) {
- return row1.schemaVersion() == row2.schemaVersion() &&
row1.tupleSlice().equals(row2.tupleSlice());
+ private static void assertValueOnSpecificNodes(String tableName,
Set<IgniteImpl> nodes, int id, int val) throws Exception {
+ DisasterRecoveryTestUtil.assertValueOnSpecificNodes(tableName, nodes,
id, val, SCHEMA);
}
}
diff --git
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
index 23a29daf917..3f42b74ba71 100644
---
a/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
+++
b/modules/transactions/src/integrationTest/java/org/apache/ignite/internal/disaster/ItDisasterRecoveryReconfigurationTest.java
@@ -27,6 +27,7 @@ import static
org.apache.ignite.internal.TestWrappers.unwrapTableImpl;
import static org.apache.ignite.internal.TestWrappers.unwrapTableManager;
import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_PROFILE;
import static
org.apache.ignite.internal.catalog.commands.CatalogUtils.INFINITE_TIMER_VALUE;
+import static
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.assertValueOnSpecificNode;
import static
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.blockMessage;
import static
org.apache.ignite.internal.disaster.DisasterRecoveryTestUtil.stableKeySwitchMessage;
import static
org.apache.ignite.internal.distributionzones.DistributionZonesTestUtil.pendingPartitionAssignmentsKey;
@@ -59,6 +60,7 @@ import java.lang.annotation.RetentionPolicy;
import java.lang.reflect.Method;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
@@ -75,6 +77,7 @@ import java.util.stream.IntStream;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.ClusterConfiguration.Builder;
import org.apache.ignite.internal.ClusterPerTestIntegrationTest;
+import org.apache.ignite.internal.TestWrappers;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.descriptors.ConsistencyMode;
@@ -105,6 +108,8 @@ import
org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import
org.apache.ignite.internal.replicator.configuration.ReplicationConfiguration;
import
org.apache.ignite.internal.replicator.configuration.ReplicationExtensionConfiguration;
+import org.apache.ignite.internal.schema.Column;
+import org.apache.ignite.internal.schema.SchemaDescriptor;
import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import
org.apache.ignite.internal.table.distributed.disaster.DisasterRecoveryManager;
@@ -116,6 +121,7 @@ import
org.apache.ignite.internal.table.distributed.disaster.LocalTablePartition
import
org.apache.ignite.internal.table.distributed.disaster.TestDisasterRecoveryUtils;
import
org.apache.ignite.internal.testframework.failure.FailureManagerExtension;
import
org.apache.ignite.internal.testframework.failure.MuteFailureManagerLogging;
+import org.apache.ignite.internal.type.NativeTypes;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.ErrorGroups.Replicator;
import org.apache.ignite.lang.IgniteException;
@@ -166,6 +172,14 @@ public class ItDisasterRecoveryReconfigurationTest extends
ClusterPerTestIntegra
/** ID of the table with name {@link #TABLE_NAME} in zone {@link #zoneId}.
*/
private int tableId;
+ private static final SchemaDescriptor SCHEMA = new SchemaDescriptor(
+ 1,
+ new Column[]{new Column("id", NativeTypes.INT32, false)},
+ new Column[]{
+ new Column("val", NativeTypes.INT32, false),
+ }
+ );
+
@Override
protected int initialNodes() {
return INITIAL_NODES;
@@ -1078,7 +1092,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertThat(updateFuture, willCompleteSuccessfully());
// It's important to unblock appendEntries requests after
resetPartitions, otherwise 2 or even all 3 nodes may align by data/index
- // and thus GroupUpdateRequestHandler#nextAssignment may evaluate
second or third node as reset first phaze target instead of
+ // and thus GroupUpdateRequestHandler#nextAssignment may evaluate
second or third node as reset first phase target instead of
// expected within test leaderName = findLeader(1, partId);
// Unblock raft.
@@ -1125,7 +1139,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
*/
@Test
@ZoneParams(nodes = 7, replicas = 7, partitions = 1)
- void testThoPhaseResetEqualLogIndex() throws Exception {
+ void testTwoPhaseResetEqualLogIndex() throws Exception {
int partId = 0;
IgniteImpl node0 = igniteImpl(0);
@@ -1186,6 +1200,8 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
errors = insertValues(table, partId, 10);
assertThat(errors, is(empty()));
+ assertInsertedValuesOnSpecificNodes(table.name(), followerNodes,
partId, 10);
+
// Disable scale down.
executeSql(format("ALTER ZONE %s SET (auto scale down %d)", zoneName,
INFINITE_TIMER_VALUE));
@@ -1221,7 +1237,7 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
assertThat(updateFuture, willCompleteSuccessfully());
// It's important to unblock appendEntries requests after
resetPartitions, otherwise 2 or even all 3 nodes may align by data/index
- // and thus GroupUpdateRequestHandler#nextAssignment may evaluate
second or third node as reset first phaze target instead of
+ // and thus GroupUpdateRequestHandler#nextAssignment may evaluate
second or third node as reset first phase target instead of
// expected within test leaderName = findLeader(1, partId);
// Unblock raft.
@@ -2160,4 +2176,32 @@ public class ItDisasterRecoveryReconfigurationTest
extends ClusterPerTestIntegra
ConsistencyMode consistencyMode() default
ConsistencyMode.STRONG_CONSISTENCY;
}
+
+ private void assertInsertedValuesOnSpecificNodes(
+ String tableName,
+ Collection<String> nodesNames,
+ int partitionId,
+ int offset
+ ) throws Exception {
+ Set<IgniteImpl> nodes = cluster.runningNodes()
+ .map(TestWrappers::unwrapIgniteImpl)
+ .filter(node -> nodesNames.contains(node.name()))
+ .collect(Collectors.toSet());;
+
+ for (IgniteImpl node : nodes) {
+ Table table = node.tables().table(tableName);
+
+ for (int i = 0, created = 0; created < ENTRIES; i++) {
+ Tuple key = Tuple.create(of("id", i));
+ if ((unwrapTableImpl(table)).partitionId(key) != partitionId) {
+ continue;
+ }
+
+ //noinspection AssignmentToForLoopParameter
+ created++;
+
+ assertValueOnSpecificNode(tableName, node, i, i + offset,
SCHEMA);
+ }
+ }
+ }
}