This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 055838ecede IGNITE-26411 Revised existing storage tests (#6711)
055838ecede is described below
commit 055838ecede395e8a85bd54ad82c38153d2398c2
Author: Egor <[email protected]>
AuthorDate: Thu Oct 9 21:17:44 2025 +0400
IGNITE-26411 Revised existing storage tests (#6711)
Co-authored-by: Egor Kuts <[email protected]>
---
.../table/distributed/StorageUpdateHandler.java | 33 ++++---
.../internal/table/distributed/IndexBaseTest.java | 2 +-
.../table/distributed/StorageCleanupTest.java | 104 +++++++++++++++++++--
3 files changed, 121 insertions(+), 18 deletions(-)
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 9a8dcf91e96..8868d7b1adc 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -48,6 +48,7 @@ import
org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
import org.apache.ignite.internal.table.distributed.replicator.PendingRows;
import org.apache.ignite.internal.util.Cursor;
import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
/** Handler for storage updates that can be performed on processing of primary
replica requests and partition replication requests. */
public class StorageUpdateHandler {
@@ -121,6 +122,9 @@ public class StorageUpdateHandler {
@Nullable HybridTimestamp lastCommitTs,
@Nullable List<Integer> indexIds
) {
+ // Either we track write intents for later commit (2PC) or commit
immediately with timestamp (1PC).
+ assert trackWriteIntent || commitTs != null : "either trackWriteIntent
must be true or commitTs must be non-null";
+
storage.runConsistently(locker -> {
RowId rowId = new RowId(partitionId, rowUuid);
@@ -138,11 +142,9 @@ public class StorageUpdateHandler {
if (trackWriteIntent) {
pendingRows.addPendingRowId(txId, rowId);
- } else
- // TODO https://issues.apache.org/jira/browse/IGNITE-26411 No
need to check commiTs for null
- if (commitTs != null) {
- modificationCounter.updateValue(1, commitTs);
- }
+ } else {
+ modificationCounter.updateValue(1, commitTs);
+ }
if (onApplication != null) {
onApplication.run();
@@ -202,6 +204,8 @@ public class StorageUpdateHandler {
@Nullable HybridTimestamp commitTs,
@Nullable List<Integer> indexIds
) {
+ // Either we track write intents for later commit (2PC) or commit
immediately with timestamp (1PC).
+ assert trackWriteIntent || commitTs != null : "either trackWriteIntent
must be true or commitTs must be non-null";
if (nullOrEmpty(rowsToUpdate)) {
return;
}
@@ -273,12 +277,9 @@ public class StorageUpdateHandler {
if (trackWriteIntent) {
pendingRows.addPendingRowIds(txId, processedRowIds);
- } else
- // TODO https://issues.apache.org/jira/browse/IGNITE-26411 No
need to check commiTs for null
- if (commitTs != null) {
- modificationCounter.updateValue(processedRowIds.size(),
commitTs);
- }
-
+ } else {
+ modificationCounter.updateValue(processedRowIds.size(),
commitTs);
+ }
if (entryToProcess == null && onApplication != null) {
onApplication.run();
}
@@ -545,4 +546,14 @@ public class StorageUpdateHandler {
performAbortWrite(writeIntentTxId, Set.of(rowId), indexIds);
}
}
+
+ /**
+ * Erases volatile state for a transaction to simulate node restart in
tests.
+ * This creates a state where write intents are persisted in storage but
no information
+ * about them exists in memory.
+ */
+ @TestOnly
+ public void eraseVolatileState(UUID txId) {
+ this.pendingRows.removePendingRowIds(txId);
+ }
}
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index 438bb3fe804..a10d582703e 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -201,7 +201,7 @@ public abstract class IndexBaseTest extends
BaseMvStoragesTest {
static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable
BinaryRow row, @Nullable HybridTimestamp lastCommitTime) {
TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
- handler.handleUpdate(TX_ID, rowUuid, partitionId, row, false, null,
null, lastCommitTime, null);
+ handler.handleUpdate(TX_ID, rowUuid, partitionId, row, true, null,
null, lastCommitTime, null);
}
static BinaryRow defaultRow() {
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
index 502f85ad9e4..63c960b2651 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
@@ -51,6 +51,8 @@ import org.apache.ignite.internal.storage.BaseMvStoragesTest;
import org.apache.ignite.internal.storage.PartitionTimestampCursor;
import org.apache.ignite.internal.storage.ReadResult;
import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
@@ -335,9 +337,10 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
UUID row2Id = UUID.randomUUID();
UUID row3Id = UUID.randomUUID();
- storageUpdateHandler.handleUpdate(txUuid, row1Id, partitionId, row1,
false, null, null, null, null);
- storageUpdateHandler.handleUpdate(txUuid, row2Id, partitionId, row2,
false, null, null, null, null);
- storageUpdateHandler.handleUpdate(txUuid, row3Id, partitionId, row3,
false, null, null, null, null);
+ storageUpdateHandler.handleUpdate(txUuid, row1Id, partitionId, row1,
true, null, null, null, null);
+ storageUpdateHandler.handleUpdate(txUuid, row2Id, partitionId, row2,
true, null, null, null, null);
+ storageUpdateHandler.handleUpdate(txUuid, row3Id, partitionId, row3,
true, null, null, null, null);
+ storageUpdateHandler.eraseVolatileState(txUuid); // simulate the loss
of a volatile state.
// Now run cleanup.
storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
@@ -403,7 +406,8 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
row3Id, tb3
);
// Do not track write intents to simulate the loss of a volatile state.
- storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate,
partitionId, false, null, null, null);
+ storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate,
partitionId, true, null, null, null);
+ storageUpdateHandler.eraseVolatileState(txUuid);
// Now run cleanup.
storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
@@ -460,7 +464,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
HybridTimestamp commitTs = CLOCK.now();
- storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1,
false, null, null, commitTs, null);
+ storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1,
true, null, null, commitTs, null);
verify(storage, never()).commitWrite(any(), any(), any());
verify(storage, never()).abortWrite(any(), any());
@@ -956,7 +960,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7,
"zzu"));
- // This should lead to an exception
+ // This should lead to an exception.
HybridTimestamp lastCommitTs = commitTs.subtractPhysicalTime(100);
// Last commit time is before the time of the previously committed
value => this should not happen.
@@ -970,4 +974,92 @@ public class StorageCleanupTest extends BaseMvStoragesTest
{
verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(),
any(), any());
}
+ @Test
+ void testDeleteCommitCleanup() {
+ UUID txUuid = UUID.randomUUID();
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+ UUID rowId = UUID.randomUUID();
+ HybridTimestamp commitTs = CLOCK.now();
+
+ storageUpdateHandler.handleUpdate(txUuid, rowId, partitionId, row1,
true, null, null, null, null);
+ storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
+
+ UUID deleteTx = UUID.randomUUID();
+
+ storageUpdateHandler.handleUpdate(deleteTx, rowId, partitionId, null,
true, null, null, null, null);
+
+ storageUpdateHandler.switchWriteIntents(deleteTx, true, CLOCK.now(),
null);
+
+ ReadResult finalResult = storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE);
+ assertFalse(finalResult.isWriteIntent());
+ assertNull(finalResult.binaryRow());
+ }
+
+ @Test
+ void testDeleteAbortCleanup() {
+ UUID txUuid = UUID.randomUUID();
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+ UUID rowId = UUID.randomUUID();
+ HybridTimestamp commitTs = CLOCK.now();
+
+ storageUpdateHandler.handleUpdate(txUuid, rowId, partitionId, row1,
true, null, null, null, null);
+ storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
+
+ UUID deleteTx = UUID.randomUUID();
+
+ storageUpdateHandler.handleUpdate(deleteTx, rowId, partitionId, null,
true, null, null, null, null);
+
+ storageUpdateHandler.switchWriteIntents(deleteTx, false, null, null);
+
+ ReadResult finalResult = storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE);
+ assertFalse(finalResult.isWriteIntent());
+ assertEquals(row1, finalResult.binaryRow());
+ }
+
+ @Test
+ void testCleanupWithNullLastCommitTsThrowsException() {
+ UUID tx1 = UUID.randomUUID();
+ UUID tx2 = UUID.randomUUID();
+ UUID tx3 = UUID.randomUUID();
+ TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+ BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2,
"bar"));
+ UUID rowId = UUID.randomUUID();
+
+ storageUpdateHandler.handleUpdate(tx1, rowId, partitionId, row1, true,
null, null, null, null);
+ assertTrue(storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+ // add write from tx2 without lastCommitTs.
+ BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4,
"baz"));
+
+ // When handleUpdate tries to add write and encounters write intent
with null lastCommitTs.
+ TxIdMismatchException exception =
assertThrows(TxIdMismatchException.class, () ->
+ storageUpdateHandler.handleUpdate(tx2, rowId, partitionId,
row2, true, null, null, null, null));
+ assertTrue(exception.getMessage().contains(tx1.toString()));
+ assertTrue(exception.getMessage().contains(tx2.toString()));
+
+ // Verify that no cleanup operations were attempted.
+ verify(storage, never()).commitWrite(any(), any(), any());
+ verify(storage, never()).abortWrite(any(), any());
+ verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(),
any(), any());
+
+ // Verify the original write intent is still there.
+ ReadResult result = storage.read(new RowId(PARTITION_ID, rowId),
HybridTimestamp.MAX_VALUE);
+ assertTrue(result.isWriteIntent());
+ assertEquals(row1, result.binaryRow());
+
+ // Also check the same logic applies for the committed case.
+ HybridTimestamp commitTs = CLOCK.now();
+ StorageException storageException =
assertThrows(StorageException.class, () ->
+ storageUpdateHandler.handleUpdate(tx3, rowId, partitionId,
row2, false, null, commitTs, null, null));
+ assertTrue(storageException.getMessage().contains("Write intent
exists"));
+
+ verify(storage, never()).commitWrite(any(), any(), any());
+ verify(storage, never()).abortWrite(any(), any());
+ verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(),
any(), any());
+ }
}