This is an automated email from the ASF dual-hosted git repository.
rpuch pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 8ffab2ef72e IGNITE-26595 Add tests for WI resolution of abandoned txs
during index build (#6693)
8ffab2ef72e is described below
commit 8ffab2ef72e7b0af701ba25c6360815aff070d4d
Author: Roman Puchkovskiy <[email protected]>
AuthorDate: Fri Oct 3 19:44:46 2025 +0400
IGNITE-26595 Add tests for WI resolution of abandoned txs during index
build (#6693)
---
modules/index/build.gradle | 1 +
.../ignite/internal/index/ItBuildIndexTest.java | 192 ++++++++++++++++++---
2 files changed, 167 insertions(+), 26 deletions(-)
diff --git a/modules/index/build.gradle b/modules/index/build.gradle
index 28ad9d127fe..f8c4f7c1627 100644
--- a/modules/index/build.gradle
+++ b/modules/index/build.gradle
@@ -56,6 +56,7 @@ dependencies {
testImplementation project(':ignite-replicator')
testImplementation project(':ignite-cluster-management')
+ integrationTestImplementation libs.awaitility
integrationTestImplementation project(':ignite-api')
integrationTestImplementation project(':ignite-catalog')
integrationTestImplementation project(':ignite-core')
diff --git
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
index b170e61a98e..b7d7b0753f8 100644
---
a/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
+++
b/modules/index/src/integrationTest/java/org/apache/ignite/internal/index/ItBuildIndexTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.index;
+import static java.util.concurrent.TimeUnit.MILLISECONDS;
import static java.util.concurrent.TimeUnit.SECONDS;
import static java.util.stream.Collectors.joining;
import static java.util.stream.Collectors.toList;
@@ -27,16 +28,17 @@ import static
org.apache.ignite.internal.catalog.CatalogService.DEFAULT_STORAGE_
import static org.apache.ignite.internal.lang.IgniteStringFormatter.format;
import static
org.apache.ignite.internal.lang.IgniteSystemProperties.colocationEnabled;
import static
org.apache.ignite.internal.sql.engine.util.QueryChecker.containsIndexScan;
+import static org.apache.ignite.internal.table.TableTestUtils.getIndexStrict;
import static
org.apache.ignite.internal.table.distributed.storage.InternalTableImpl.AWAIT_PRIMARY_REPLICA_TIMEOUT;
-import static
org.apache.ignite.internal.testframework.IgniteTestUtils.waitForCondition;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willBe;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willSucceedFast;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
import java.util.HashMap;
import java.util.List;
@@ -55,6 +57,7 @@ import
org.apache.ignite.internal.catalog.descriptors.CatalogIndexDescriptor;
import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
+import
org.apache.ignite.internal.index.message.IsNodeFinishedRwTransactionsStartedBeforeRequest;
import org.apache.ignite.internal.network.NetworkMessage;
import
org.apache.ignite.internal.partition.replicator.network.command.BuildIndexCommand;
import org.apache.ignite.internal.partitiondistribution.Assignment;
@@ -67,15 +70,22 @@ import
org.apache.ignite.internal.replicator.TablePartitionId;
import org.apache.ignite.internal.replicator.ZonePartitionId;
import org.apache.ignite.internal.sql.BaseSqlIntegrationTest;
import org.apache.ignite.internal.sql.SqlCommon;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.index.IndexRow;
import org.apache.ignite.internal.storage.index.IndexStorage;
+import org.apache.ignite.internal.storage.index.SortedIndexStorage;
import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.NodeUtils;
import org.apache.ignite.internal.table.TableViewInternal;
+import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.util.Cursor;
import org.apache.ignite.raft.jraft.rpc.WriteActionRequest;
import org.apache.ignite.table.Table;
+import org.apache.ignite.tx.Transaction;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ValueSource;
@@ -213,6 +223,130 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
assertThat(sendBuildIndexCommandFuture, willSucceedFast());
}
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546")
+ void writeIntentFromTxAbandonedBeforeShouldNotBeIndexed() throws Exception
{
+ createTable(1, 1);
+
+ disableWriteIntentSwitchExecution();
+
+ // Create and abandon a transaction.
+ int txCoordinatorOrdinal = 2;
+ Transaction tx =
CLUSTER.node(txCoordinatorOrdinal).transactions().begin();
+ insertDataInTransaction(tx, TABLE_NAME, List.of("I0", "I1"), new
Object[]{1, 1});
+
+ CLUSTER.restartNode(txCoordinatorOrdinal);
+
+ createIndex(INDEX_NAME);
+ await("Index did not become available in time")
+ .atMost(10, SECONDS)
+ .until(() ->
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME));
+
+ verifyNoNodesHaveAnythingInIndex();
+ }
+
+ @Test
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-21546")
+ void
writeIntentFromTxAbandonedWhileWaitingForTransactionsToFinishShouldNotBeIndexed()
throws Exception {
+ createTable(1, 1);
+
+ // Both disable write intent switch execution and track when we start
waiting for transactions to finish before index build.
+ CompletableFuture<Void> startedWaitForPreIndexTxsToFinish = new
CompletableFuture<>();
+ CLUSTER.nodes().forEach(node -> {
+ unwrapIgniteImpl(node).dropMessages((recipientId, message) -> {
+ if (message instanceof WriteIntentSwitchReplicaRequest) {
+ return true;
+ }
+
+ if (message instanceof
IsNodeFinishedRwTransactionsStartedBeforeRequest) {
+ startedWaitForPreIndexTxsToFinish.complete(null);
+ }
+
+ return false;
+ });
+ });
+
+ // Create and abandon a transaction.
+ int txCoordinatorOrdinal = 2;
+ Transaction tx =
CLUSTER.node(txCoordinatorOrdinal).transactions().begin();
+ insertDataInTransaction(tx, TABLE_NAME, List.of("I0", "I1"), new
Object[]{1, 1});
+
+ createIndex(INDEX_NAME);
+ assertThat(startedWaitForPreIndexTxsToFinish,
willCompleteSuccessfully());
+
+ // The index pre-build wait has started, let's restart the coordinator
to abandon the transaction and abruptly terminate
+ // the pre-build wait.
+ CLUSTER.restartNode(txCoordinatorOrdinal);
+
+ await("Index did not become available in time")
+ .atMost(10, SECONDS)
+ .until(() ->
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME));
+
+ verifyNoNodesHaveAnythingInIndex();
+ }
+
+ private void verifyNoNodesHaveAnythingInIndex() {
+ for (int nodeIndex = 0; nodeIndex < initialNodes(); nodeIndex++) {
+ IgniteImpl ignite = unwrapIgniteImpl(node(nodeIndex));
+
+ CatalogIndexDescriptor indexDescriptor =
indexDescriptor(INDEX_NAME, ignite);
+ SortedIndexStorage indexStorage = (SortedIndexStorage)
indexStorage(indexDescriptor, 0, ignite);
+
+ if (indexStorage != null) {
+ try (Cursor<IndexRow> indexRows =
indexStorage.readOnlyScan(null, null, 0)) {
+ assertFalse(indexRows.hasNext(), "Nothing should have been
put to the index, but it was found on node " + nodeIndex);
+ }
+ }
+ }
+ }
+
+ private static void disableWriteIntentSwitchExecution() {
+ CLUSTER.runningNodes().forEach(ignite -> {
+ unwrapIgniteImpl(ignite).dropMessages((recipientId, message) ->
message instanceof WriteIntentSwitchReplicaRequest);
+ });
+ }
+
+ private static CatalogIndexDescriptor indexDescriptor(String indexName,
IgniteImpl ignite) {
+ return getIndexStrict(ignite.catalogManager(), indexName,
ignite.clock().nowLong());
+ }
+
+ private static @Nullable IndexStorage indexStorage(CatalogIndexDescriptor
indexDescriptor, int partitionId, IgniteImpl ignite) {
+ TableViewInternal tableViewInternal =
tableViewInternal(indexDescriptor.tableId(), ignite);
+
+ int indexId = indexDescriptor.id();
+
+ IndexStorage indexStorage;
+ try {
+ indexStorage =
tableViewInternal.internalTable().storage().getIndex(partitionId, indexId);
+ } catch (StorageException e) {
+ if (e.getMessage().contains("Partition ID " + partitionId + " does
not exist")) {
+ return null;
+ }
+
+ throw e;
+ }
+
+ assertNotNull(indexStorage, String.format("No index storage exists for
indexId=%s, partitionId=%s", indexId, partitionId));
+
+ return indexStorage;
+ }
+
+ private static TableViewInternal tableViewInternal(int tableId, Ignite
ignite) {
+ CompletableFuture<List<Table>> tablesFuture =
ignite.tables().tablesAsync();
+
+ assertThat(tablesFuture, willCompleteSuccessfully());
+
+ TableViewInternal tableViewInternal = tablesFuture.join().stream()
+ .map(TestWrappers::unwrapTableViewInternal)
+ .filter(table -> table.tableId() == tableId)
+ .findFirst()
+ .orElse(null);
+
+ assertNotNull(tableViewInternal, "No table object found for tableId="
+ tableId);
+
+ return tableViewInternal;
+ }
+
@SafeVarargs
private static String toValuesString(List<Object>... values) {
return Stream.of(values)
@@ -222,6 +356,15 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
}
private static void createAndPopulateTable(int replicas, int partitions) {
+ createTable(replicas, partitions);
+
+ sql(format(
+ "INSERT INTO {} VALUES {}",
+ TABLE_NAME, toValuesString(List.of(1, 1), List.of(2, 2),
List.of(3, 3), List.of(4, 4), List.of(5, 5))
+ ));
+ }
+
+ private static void createTable(int replicas, int partitions) {
sql(format("CREATE ZONE IF NOT EXISTS {} (REPLICAS {}, PARTITIONS {})
STORAGE PROFILES ['{}']",
ZONE_NAME, replicas, partitions, DEFAULT_STORAGE_PROFILE
));
@@ -230,19 +373,19 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
"CREATE TABLE {} (i0 INTEGER PRIMARY KEY, i1 INTEGER) ZONE {}",
TABLE_NAME, ZONE_NAME
));
-
- sql(format(
- "INSERT INTO {} VALUES {}",
- TABLE_NAME, toValuesString(List.of(1, 1), List.of(2, 2),
List.of(3, 3), List.of(4, 4), List.of(5, 5))
- ));
}
- private static void createIndex(String indexName) throws Exception {
+ private void createIndex(String indexName) throws Exception {
// We execute this operation asynchronously, because some tests block
network messages, which makes the underlying code
// stuck with timeouts. We don't need to wait for the operation to
complete, as we wait for the necessary invariants further
// below.
CLUSTER.aliveNode().sql()
- .executeAsync(null, format("CREATE INDEX {} ON {} (i1)",
indexName, TABLE_NAME));
+ .executeAsync(null, format("CREATE INDEX {} ON {} (i1)",
indexName, TABLE_NAME))
+ .whenComplete((res, ex) -> {
+ if (ex != null) {
+ log.error("Failed to create index", ex);
+ }
+ });
waitForIndex(indexName);
}
@@ -253,12 +396,11 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
* @param indexName Name of an index to wait for.
*/
private static void waitForIndex(String indexName) throws
InterruptedException {
- assertTrue(waitForCondition(
+ await().atMost(10, SECONDS).until(
() -> CLUSTER.runningNodes()
.map(TestWrappers::unwrapIgniteImpl)
.map(node -> getIndexDescriptor(node, indexName))
- .allMatch(Objects::nonNull),
- 10_000)
+ .allMatch(Objects::nonNull)
);
}
@@ -305,7 +447,7 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
);
}
- assertTrue(waitForCondition(() ->
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME), 10_000));
+ await().atMost(10, SECONDS).until(() ->
isIndexAvailable(unwrapIgniteImpl(CLUSTER.aliveNode()), INDEX_NAME));
waitForReadTimestampThatObservesMostRecentCatalog();
}
@@ -336,23 +478,21 @@ public class ItBuildIndexTest extends
BaseSqlIntegrationTest {
int indexId = indexId(indexName);
CLUSTER.runningNodes().forEach(node -> {
- try {
- InternalTable internalTable = internalTable(node, tableName);
+ InternalTable internalTable = internalTable(node, tableName);
- for (Entry<Integer, Set<String>> entry :
partitionIdToNodes.entrySet()) {
- // Let's check if there is a node in the partition
assignments.
- if (!entry.getValue().contains(node.name())) {
- continue;
- }
+ for (Entry<Integer, Set<String>> entry :
partitionIdToNodes.entrySet()) {
+ // Let's check if there is a node in the partition assignments.
+ if (!entry.getValue().contains(node.name())) {
+ continue;
+ }
- IndexStorage index =
internalTable.storage().getIndex(entry.getKey(), indexId);
+ IndexStorage index =
internalTable.storage().getIndex(entry.getKey(), indexId);
- assertNotNull(index, String.format("No index %d for
partition %d", indexId, entry.getKey()));
+ assertNotNull(index, String.format("No index %d for partition
%d", indexId, entry.getKey()));
- assertTrue(waitForCondition(() ->
index.getNextRowIdToBuild() == null, 10, SECONDS.toMillis(10)));
- }
- } catch (InterruptedException e) {
- throw new RuntimeException("Node operation failed: node=" +
node.name(), e);
+ await().atMost(10, SECONDS)
+ .pollInterval(10, MILLISECONDS)
+ .until(() -> index.getNextRowIdToBuild() == null);
}
});