This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 055838ecede IGNITE-26411 Revised existing storage tests (#6711)
055838ecede is described below

commit 055838ecede395e8a85bd54ad82c38153d2398c2
Author: Egor <[email protected]>
AuthorDate: Thu Oct 9 21:17:44 2025 +0400

    IGNITE-26411 Revised existing storage tests (#6711)
    
    Co-authored-by: Egor Kuts <[email protected]>
---
 .../table/distributed/StorageUpdateHandler.java    |  33 ++++---
 .../internal/table/distributed/IndexBaseTest.java  |   2 +-
 .../table/distributed/StorageCleanupTest.java      | 104 +++++++++++++++++++--
 3 files changed, 121 insertions(+), 18 deletions(-)

diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
index 9a8dcf91e96..8868d7b1adc 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/StorageUpdateHandler.java
@@ -48,6 +48,7 @@ import 
org.apache.ignite.internal.table.distributed.index.IndexUpdateHandler;
 import org.apache.ignite.internal.table.distributed.replicator.PendingRows;
 import org.apache.ignite.internal.util.Cursor;
 import org.jetbrains.annotations.Nullable;
+import org.jetbrains.annotations.TestOnly;
 
 /** Handler for storage updates that can be performed on processing of primary 
replica requests and partition replication requests. */
 public class StorageUpdateHandler {
@@ -121,6 +122,9 @@ public class StorageUpdateHandler {
             @Nullable HybridTimestamp lastCommitTs,
             @Nullable List<Integer> indexIds
     ) {
+        // Either we track write intents for later commit (2PC) or commit 
immediately with timestamp (1PC).
+        assert trackWriteIntent || commitTs != null : "either trackWriteIntent 
must be true or commitTs must be non-null";
+
         storage.runConsistently(locker -> {
             RowId rowId = new RowId(partitionId, rowUuid);
 
@@ -138,11 +142,9 @@ public class StorageUpdateHandler {
 
             if (trackWriteIntent) {
                 pendingRows.addPendingRowId(txId, rowId);
-            } else
-                // TODO https://issues.apache.org/jira/browse/IGNITE-26411 No 
need to check commiTs for null
-                if (commitTs != null) {
-                    modificationCounter.updateValue(1, commitTs);
-                }
+            } else {
+                modificationCounter.updateValue(1, commitTs);
+            }
 
             if (onApplication != null) {
                 onApplication.run();
@@ -202,6 +204,8 @@ public class StorageUpdateHandler {
             @Nullable HybridTimestamp commitTs,
             @Nullable List<Integer> indexIds
     ) {
+        // Either we track write intents for later commit (2PC) or commit 
immediately with timestamp (1PC).
+        assert trackWriteIntent || commitTs != null : "either trackWriteIntent 
must be true or commitTs must be non-null";
         if (nullOrEmpty(rowsToUpdate)) {
             return;
         }
@@ -273,12 +277,9 @@ public class StorageUpdateHandler {
 
             if (trackWriteIntent) {
                 pendingRows.addPendingRowIds(txId, processedRowIds);
-            } else
-                // TODO https://issues.apache.org/jira/browse/IGNITE-26411 No 
need to check commiTs for null
-                if (commitTs != null) {
-                    modificationCounter.updateValue(processedRowIds.size(), 
commitTs);
-                }
-
+            } else {
+                modificationCounter.updateValue(processedRowIds.size(), 
commitTs);
+            }
             if (entryToProcess == null && onApplication != null) {
                 onApplication.run();
             }
@@ -545,4 +546,14 @@ public class StorageUpdateHandler {
             performAbortWrite(writeIntentTxId, Set.of(rowId), indexIds);
         }
     }
+
+    /**
+     * Erases volatile state for a transaction to simulate node restart in 
tests.
+     * This creates a state where write intents are persisted in storage but 
no information
+     * about them exists in memory.
+     */
+    @TestOnly
+    public void eraseVolatileState(UUID txId) {
+        this.pendingRows.removePendingRowIds(txId);
+    }
 }
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
index 438bb3fe804..a10d582703e 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexBaseTest.java
@@ -201,7 +201,7 @@ public abstract class IndexBaseTest extends 
BaseMvStoragesTest {
     static void addWrite(StorageUpdateHandler handler, UUID rowUuid, @Nullable 
BinaryRow row, @Nullable HybridTimestamp lastCommitTime) {
         TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
 
-        handler.handleUpdate(TX_ID, rowUuid, partitionId, row, false, null, 
null, lastCommitTime, null);
+        handler.handleUpdate(TX_ID, rowUuid, partitionId, row, true, null, 
null, lastCommitTime, null);
     }
 
     static BinaryRow defaultRow() {
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
index 502f85ad9e4..63c960b2651 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/StorageCleanupTest.java
@@ -51,6 +51,8 @@ import org.apache.ignite.internal.storage.BaseMvStoragesTest;
 import org.apache.ignite.internal.storage.PartitionTimestampCursor;
 import org.apache.ignite.internal.storage.ReadResult;
 import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.apache.ignite.internal.storage.TxIdMismatchException;
 import org.apache.ignite.internal.storage.impl.TestMvPartitionStorage;
 import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
 import 
org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor.StorageHashIndexColumnDescriptor;
@@ -335,9 +337,10 @@ public class StorageCleanupTest extends BaseMvStoragesTest 
{
         UUID row2Id = UUID.randomUUID();
         UUID row3Id = UUID.randomUUID();
 
-        storageUpdateHandler.handleUpdate(txUuid, row1Id, partitionId, row1, 
false, null, null, null, null);
-        storageUpdateHandler.handleUpdate(txUuid, row2Id, partitionId, row2, 
false, null, null, null, null);
-        storageUpdateHandler.handleUpdate(txUuid, row3Id, partitionId, row3, 
false, null, null, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, row1Id, partitionId, row1, 
true, null, null, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, row2Id, partitionId, row2, 
true, null, null, null, null);
+        storageUpdateHandler.handleUpdate(txUuid, row3Id, partitionId, row3, 
true, null, null, null, null);
+        storageUpdateHandler.eraseVolatileState(txUuid); // simulate the loss 
of a volatile state.
 
         // Now run cleanup.
         storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
@@ -403,7 +406,8 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
                 row3Id, tb3
         );
         // Do not track write intents to simulate the loss of a volatile state.
-        storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate, 
partitionId, false, null, null, null);
+        storageUpdateHandler.handleUpdateAll(txUuid, rowsToUpdate, 
partitionId, true, null, null, null);
+        storageUpdateHandler.eraseVolatileState(txUuid);
 
         // Now run cleanup.
         storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
@@ -460,7 +464,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         HybridTimestamp commitTs = CLOCK.now();
 
-        storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1, 
false, null, null, commitTs, null);
+        storageUpdateHandler.handleUpdate(runningTx, rowId, partitionId, row1, 
true, null, null, commitTs, null);
 
         verify(storage, never()).commitWrite(any(), any(), any());
         verify(storage, never()).abortWrite(any(), any());
@@ -956,7 +960,7 @@ public class StorageCleanupTest extends BaseMvStoragesTest {
 
         BinaryRow row3 = binaryRow(new TestKey(5, "foo5"), new TestValue(7, 
"zzu"));
 
-        // This should lead to an exception
+        // This should lead to an exception.
         HybridTimestamp lastCommitTs = commitTs.subtractPhysicalTime(100);
 
         // Last commit time is before the time of the previously committed 
value => this should not happen.
@@ -970,4 +974,92 @@ public class StorageCleanupTest extends BaseMvStoragesTest 
{
         verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), 
any(), any());
     }
 
+    @Test
+    void testDeleteCommitCleanup() {
+        UUID txUuid = UUID.randomUUID();
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+        UUID rowId = UUID.randomUUID();
+        HybridTimestamp commitTs = CLOCK.now();
+
+        storageUpdateHandler.handleUpdate(txUuid, rowId, partitionId, row1, 
true, null, null, null, null);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
+
+        UUID deleteTx = UUID.randomUUID();
+
+        storageUpdateHandler.handleUpdate(deleteTx, rowId, partitionId, null, 
true, null, null, null, null);
+
+        storageUpdateHandler.switchWriteIntents(deleteTx, true, CLOCK.now(), 
null);
+
+        ReadResult finalResult = storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE);
+        assertFalse(finalResult.isWriteIntent());
+        assertNull(finalResult.binaryRow());
+    }
+
+    @Test
+    void testDeleteAbortCleanup() {
+        UUID txUuid = UUID.randomUUID();
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+        UUID rowId = UUID.randomUUID();
+        HybridTimestamp commitTs = CLOCK.now();
+
+        storageUpdateHandler.handleUpdate(txUuid, rowId, partitionId, row1, 
true, null, null, null, null);
+        storageUpdateHandler.switchWriteIntents(txUuid, true, commitTs, null);
+
+        UUID deleteTx = UUID.randomUUID();
+
+        storageUpdateHandler.handleUpdate(deleteTx, rowId, partitionId, null, 
true, null, null, null, null);
+
+        storageUpdateHandler.switchWriteIntents(deleteTx, false, null, null);
+
+        ReadResult finalResult = storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE);
+        assertFalse(finalResult.isWriteIntent());
+        assertEquals(row1, finalResult.binaryRow());
+    }
+
+    @Test
+    void testCleanupWithNullLastCommitTsThrowsException() {
+        UUID tx1 = UUID.randomUUID();
+        UUID tx2 = UUID.randomUUID();
+        UUID tx3 = UUID.randomUUID();
+        TablePartitionId partitionId = new TablePartitionId(333, PARTITION_ID);
+
+        BinaryRow row1 = binaryRow(new TestKey(1, "foo1"), new TestValue(2, 
"bar"));
+        UUID rowId = UUID.randomUUID();
+
+        storageUpdateHandler.handleUpdate(tx1, rowId, partitionId, row1, true, 
null, null, null, null);
+        assertTrue(storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE).isWriteIntent());
+
+        // add write from tx2 without lastCommitTs.
+        BinaryRow row2 = binaryRow(new TestKey(3, "foo3"), new TestValue(4, 
"baz"));
+
+        // When handleUpdate tries to add write and encounters write intent 
with null lastCommitTs.
+        TxIdMismatchException exception = 
assertThrows(TxIdMismatchException.class, () ->
+                storageUpdateHandler.handleUpdate(tx2, rowId, partitionId, 
row2, true, null, null, null, null));
+        assertTrue(exception.getMessage().contains(tx1.toString()));
+        assertTrue(exception.getMessage().contains(tx2.toString()));
+
+        // Verify that no cleanup operations were attempted.
+        verify(storage, never()).commitWrite(any(), any(), any());
+        verify(storage, never()).abortWrite(any(), any());
+        verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), 
any(), any());
+
+        // Verify the original write intent is still there.
+        ReadResult result = storage.read(new RowId(PARTITION_ID, rowId), 
HybridTimestamp.MAX_VALUE);
+        assertTrue(result.isWriteIntent());
+        assertEquals(row1, result.binaryRow());
+
+        // Also check the same logic applies for the committed case.
+        HybridTimestamp commitTs = CLOCK.now();
+        StorageException storageException = 
assertThrows(StorageException.class, () ->
+                storageUpdateHandler.handleUpdate(tx3, rowId, partitionId, 
row2, false, null, commitTs, null, null));
+        assertTrue(storageException.getMessage().contains("Write intent 
exists"));
+
+        verify(storage, never()).commitWrite(any(), any(), any());
+        verify(storage, never()).abortWrite(any(), any());
+        verify(indexUpdateHandler, never()).tryRemoveFromIndexes(any(), any(), 
any(), any());
+    }
 }

Reply via email to