This is an automated email from the ASF dual-hosted git repository.

rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 8ffab2ef72e IGNITE-26595 Add tests for WI resolution of abandoned txs 
during index build (#6693)
8ffab2ef72e is described below

commit 8ffab2ef72e7b0af701ba25c6360815aff070d4d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Oct 3 19:44:46 2025 +0400

    IGNITE-26595 Add tests for WI resolution of abandoned txs during index 
build (#6693)
---
 modules/index/build.gradle                         |   1 +
 .../ignite/internal/index/ItBuildIndexTest.java    | 192 ++++++++++++++++++---
 2 files changed, 167 insertions(+), 26 deletions(-)

diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 28ad9d127fe..f8c4f7c1627 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -56,6 +56,7 @@ dependencies {
     testImplementation project(':ignite-replicator')
     testImplementation project(':ignite-cluster-management')
 
+    integrationTestImplementation libs.awaitility
     integrationTestImplementation project(':ignite-api')
     integrationTestImplementation project(':ignite-catalog')
     integrationTestImplementation project(':ignite-core')
diff --git 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
index b170e61a98e..b7d7b0753f8 100644
--- 
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
+++ 
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.index;
 
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
 import static java.util.concurrent.TimeUnit.SECONDS;
 import static java.util.stream.Collectors.joining;
 import static java.util.stream.Collectors.toList;
@@ -27,16 +28,17 @@ import static 
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
 import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
 import static 
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
 import static 
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexStrict;
 import static 
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
-import static 
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.awaitility.Awaitility.await;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
 import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import java.util.HashMap;
 import java.util.List;
@@ -55,6 +57,7 @@ import 
org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
 import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
+import 
org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest;
 import org.apache.ignite.internal.network.NetworkMessage;
 import 
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
@@ -67,15 +70,22 @@ import 
org.apache.ignite.internal.replicator.TablePartitionId;
 import org.apache.ignite.internal.replicator.ZonePartitionId;
 import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
 import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.IndexRow;
 import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
 import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.NodeUtils;
 import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.util.Cursor;
 import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
 import org.apache.ignite.table.Table;
+import org.apache.ignite.tx.Transaction;
 import org.jetbrains.annotations.Nullable;
 import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.ValueSource;
@@ -213,6 +223,130 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
         assertThat(sendBuildIndexCommandFuture, willSucceedFast());
     }
 
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546";)
+    void writeIntentFromTxAbandonedBeforeShouldNotBeIndexed() throws Exception 
{
+        createTable(1, 1);
+
+        disableWriteIntentSwitchExecution();
+
+        // Create and abandon a transaction.
+        int txCoordinatorOrdinal = 2;
+        Transaction tx = 
CLUSTER.node(txCoordinatorOrdinal).transactions().begin();
+        insertDataInTransaction(tx, TABLE_NAME, List.of("I0", "I1"), new 
Object[]{1, 1});
+
+        CLUSTER.restartNode(txCoordinatorOrdinal);
+
+        createIndex(INDEX_NAME);
+        await("Index did not become available in time")
+                .atMost(10, SECONDS)
+                .until(() -> 
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME));
+
+        verifyNoNodesHaveAnythingInIndex();
+    }
+
+    @Test
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546";)
+    void 
writeIntentFromTxAbandonedWhileWaitingForTransactionsToFinishShouldNotBeIndexed()
 throws Exception {
+        createTable(1, 1);
+
+        // Both disable write intent switch execution and track when we start 
waiting for transactions to finish before index build.
+        CompletableFuture<Void> startedWaitForPreIndexTxsToFinish = new 
CompletableFuture<>();
+        CLUSTER.nodes().forEach(node -> {
+            unwrapIgniteImpl(node).dropMessages((recipientId, message) -> {
+                if (message instanceof WriteIntentSwitchReplicaRequest) {
+                    return true;
+                }
+
+                if (message instanceof 
IsNodeFinishedRwTransactionsStartedBeforeRequest) {
+                    startedWaitForPreIndexTxsToFinish.complete(null);
+                }
+
+                return false;
+            });
+        });
+
+        // Create and abandon a transaction.
+        int txCoordinatorOrdinal = 2;
+        Transaction tx = 
CLUSTER.node(txCoordinatorOrdinal).transactions().begin();
+        insertDataInTransaction(tx, TABLE_NAME, List.of("I0", "I1"), new 
Object[]{1, 1});
+
+        createIndex(INDEX_NAME);
+        assertThat(startedWaitForPreIndexTxsToFinish, 
willCompleteSuccessfully());
+
+        // The index pre-build wait has started, let's restart the coordinator 
to abandon the transaction and abruptly terminate
+        // the pre-build wait.
+        CLUSTER.restartNode(txCoordinatorOrdinal);
+
+        await("Index did not become available in time")
+                .atMost(10, SECONDS)
+                .until(() -> 
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME));
+
+        verifyNoNodesHaveAnythingInIndex();
+    }
+
+    private void verifyNoNodesHaveAnythingInIndex() {
+        for (int nodeIndex = 0; nodeIndex < initialNodes(); nodeIndex++) {
+            IgniteImpl ignite = unwrapIgniteImpl(node(nodeIndex));
+
+            CatalogIndexDescriptor indexDescriptor = 
indexDescriptor(INDEX_NAME, ignite);
+            SortedIndexStorage indexStorage = (SortedIndexStorage) 
indexStorage(indexDescriptor, 0, ignite);
+
+            if (indexStorage != null) {
+                try (Cursor<IndexRow> indexRows = 
indexStorage.readOnlyScan(null, null, 0)) {
+                    assertFalse(indexRows.hasNext(), "Nothing should have been 
put to the index, but it was found on node " + nodeIndex);
+                }
+            }
+        }
+    }
+
+    private static void disableWriteIntentSwitchExecution() {
+        CLUSTER.runningNodes().forEach(ignite -> {
+            unwrapIgniteImpl(ignite).dropMessages((recipientId, message) -> 
message instanceof WriteIntentSwitchReplicaRequest);
+        });
+    }
+
+    private static CatalogIndexDescriptor indexDescriptor(String indexName, 
IgniteImpl ignite) {
+        return getIndexStrict(ignite.catalogManager(), indexName, 
ignite.clock().nowLong());
+    }
+
+    private static @Nullable IndexStorage indexStorage(CatalogIndexDescriptor 
indexDescriptor, int partitionId, IgniteImpl ignite) {
+        TableViewInternal tableViewInternal = 
tableViewInternal(indexDescriptor.tableId(), ignite);
+
+        int indexId = indexDescriptor.id();
+
+        IndexStorage indexStorage;
+        try {
+            indexStorage = 
tableViewInternal.internalTable().storage().getIndex(partitionId, indexId);
+        } catch (StorageException e) {
+            if (e.getMessage().contains("Partition ID " + partitionId + " does 
not exist")) {
+                return null;
+            }
+
+            throw e;
+        }
+
+        assertNotNull(indexStorage, String.format("No index storage exists for 
indexId=%s, partitionId=%s", indexId, partitionId));
+
+        return indexStorage;
+    }
+
+    private static TableViewInternal tableViewInternal(int tableId, Ignite 
ignite) {
+        CompletableFuture<List<Table>> tablesFuture = 
ignite.tables().tablesAsync();
+
+        assertThat(tablesFuture, willCompleteSuccessfully());
+
+        TableViewInternal tableViewInternal = tablesFuture.join().stream()
+                .map(TestWrappers::unwrapTableViewInternal)
+                .filter(table -> table.tableId() == tableId)
+                .findFirst()
+                .orElse(null);
+
+        assertNotNull(tableViewInternal, "No table object found for tableId=" 
+ tableId);
+
+        return tableViewInternal;
+    }
+
     @SafeVarargs
     private static String toValuesString(List<Object>... values) {
         return Stream.of(values)
@@ -222,6 +356,15 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
     }
 
     private static void createAndPopulateTable(int replicas, int partitions) {
+        createTable(replicas, partitions);
+
+        sql(format(
+                "INSERT INTO {} VALUES {}",
+                TABLE_NAME, toValuesString(List.of(1, 1), List.of(2, 2), 
List.of(3, 3), List.of(4, 4), List.of(5, 5))
+        ));
+    }
+
+    private static void createTable(int replicas, int partitions) {
         sql(format("CREATE ZONE IF NOT EXISTS {} (REPLICAS {}, PARTITIONS {}) 
STORAGE PROFILES ['{}']",
                 ZONE_NAME, replicas, partitions, DEFAULT_STORAGE_PROFILE
         ));
@@ -230,19 +373,19 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
                 "CREATE TABLE {} (i0 INTEGER PRIMARY KEY, i1 INTEGER) ZONE {}",
                 TABLE_NAME, ZONE_NAME
         ));
-
-        sql(format(
-                "INSERT INTO {} VALUES {}",
-                TABLE_NAME, toValuesString(List.of(1, 1), List.of(2, 2), 
List.of(3, 3), List.of(4, 4), List.of(5, 5))
-        ));
     }
 
-    private static void createIndex(String indexName) throws Exception {
+    private void createIndex(String indexName) throws Exception {
         // We execute this operation asynchronously, because some tests block 
network messages, which makes the underlying code
         // stuck with timeouts. We don't need to wait for the operation to 
complete, as we wait for the necessary invariants further
         // below.
         CLUSTER.aliveNode().sql()
-                .executeAsync(null, format("CREATE INDEX {} ON {} (i1)", 
indexName, TABLE_NAME));
+                .executeAsync(null, format("CREATE INDEX {} ON {} (i1)", 
indexName, TABLE_NAME))
+                .whenComplete((res, ex) -> {
+                    if (ex != null) {
+                        log.error("Failed to create index", ex);
+                    }
+                });
 
         waitForIndex(indexName);
     }
@@ -253,12 +396,11 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
      * @param indexName Name of an index to wait for.
      */
     private static void waitForIndex(String indexName) throws 
InterruptedException {
-        assertTrue(waitForCondition(
+        await().atMost(10, SECONDS).until(
                 () -> CLUSTER.runningNodes()
                         .map(TestWrappers::unwrapIgniteImpl)
                         .map(node -> getIndexDescriptor(node, indexName))
-                        .allMatch(Objects::nonNull),
-                10_000)
+                        .allMatch(Objects::nonNull)
         );
     }
 
@@ -305,7 +447,7 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
             );
         }
 
-        assertTrue(waitForCondition(() -> 
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME), 10_000));
+        await().atMost(10, SECONDS).until(() -> 
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME));
 
         waitForReadTimestampThatObservesMostRecentCatalog();
     }
@@ -336,23 +478,21 @@ public class ItBuildIndexTest extends 
BaseSqlIntegrationTest {
         int indexId = indexId(indexName);
 
         CLUSTER.runningNodes().forEach(node -> {
-            try {
-                InternalTable internalTable = internalTable(node, tableName);
+            InternalTable internalTable = internalTable(node, tableName);
 
-                for (Entry<Integer, Set<String>> entry : 
partitionIdToNodes.entrySet()) {
-                    // Let's check if there is a node in the partition 
assignments.
-                    if (!entry.getValue().contains(node.name())) {
-                        continue;
-                    }
+            for (Entry<Integer, Set<String>> entry : 
partitionIdToNodes.entrySet()) {
+                // Let's check if there is a node in the partition assignments.
+                if (!entry.getValue().contains(node.name())) {
+                    continue;
+                }
 
-                    IndexStorage index = 
internalTable.storage().getIndex(entry.getKey(), indexId);
+                IndexStorage index = 
internalTable.storage().getIndex(entry.getKey(), indexId);
 
-                    assertNotNull(index, String.format("No index %d for 
partition %d", indexId, entry.getKey()));
+                assertNotNull(index, String.format("No index %d for partition 
%d", indexId, entry.getKey()));
 
-                    assertTrue(waitForCondition(() -> 
index.getNextRowIdToBuild() == null, 10, SECONDS.toMillis(10)));
-                }
-            } catch (InterruptedException e) {
-                throw new RuntimeException("Node operation failed: node=" + 
node.name(), e);
+                await().atMost(10, SECONDS)
+                        .pollInterval(10, MILLISECONDS)
+                        .until(() -> index.getNextRowIdToBuild() == null);
             }
         });
 

Reply via email to