This is an automated email from the ASF dual-hosted git repository.

apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 36b119e37f IGNITE-22797 Fix estimated size for insert-after-delete 
scenario (#4125)
36b119e37f is described below

commit 36b119e37f8f3511148654cee00a62ef1d41c820
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Jul 23 17:04:03 2024 +0300

    IGNITE-22797 Fix estimated size for insert-after-delete scenario (#4125)
---
 .../storage/AbstractMvPartitionStorageTest.java    | 35 +++++++++++++
 .../storage/impl/TestMvPartitionStorage.java       | 12 +++--
 .../mv/AddWriteCommittedInvokeClosure.java         | 21 +++++---
 .../pagememory/mv/CommitWriteInvokeClosure.java    | 58 +++++++++++++---------
 .../rocksdb/RocksDbMvPartitionStorageTest.java     | 12 +++++
 5 files changed, 103 insertions(+), 35 deletions(-)

diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 92adba279a..32e168a959 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1490,6 +1490,41 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
         assertThat(storage.estimatedSize(), is(0L));
     }
 
+    @Test
+    public void estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted() {
+        addWriteCommitted(ROW_ID, binaryRow, clock.now());
+
+        assertThat(storage.estimatedSize(), is(1L));
+
+        addWriteCommitted(ROW_ID, null, clock.now());
+
+        assertThat(storage.estimatedSize(), is(0L));
+
+        addWriteCommitted(ROW_ID, binaryRow, clock.now());
+
+        assertThat(storage.estimatedSize(), is(1L));
+    }
+
+    @Test
+    public void estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite() {
+        UUID txId = UUID.randomUUID();
+
+        addWrite(ROW_ID, binaryRow, txId);
+        commitWrite(ROW_ID, clock.now());
+
+        assertThat(storage.estimatedSize(), is(1L));
+
+        addWrite(ROW_ID, null, txId);
+        commitWrite(ROW_ID, clock.now());
+
+        assertThat(storage.estimatedSize(), is(0L));
+
+        addWrite(ROW_ID, binaryRow, txId);
+        commitWrite(ROW_ID, clock.now());
+
+        assertThat(storage.estimatedSize(), is(1L));
+    }
+
     @Test
     public void estimatedSizeShowsLatestRowsNumberUsingWriteCommited() {
         assertThat(storage.estimatedSize(), is(0L));
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 058b8574b3..b3a2721074 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -320,13 +320,19 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
     private @Nullable VersionChain resolveCommittedVersionChain(VersionChain 
committedVersionChain) {
         VersionChain nextChain = committedVersionChain.next;
 
+        boolean isNewValueTombstone = committedVersionChain.row == null;
+
         if (nextChain != null) {
-            if (committedVersionChain.row == null) {
-                if (nextChain.row == null) {
+            boolean isOldValueTombstone = nextChain.row == null;
+
+            if (isOldValueTombstone) {
+                if (isNewValueTombstone) {
                     // Avoid creating tombstones for tombstones.
                     return nextChain;
                 }
 
+                ESTIMATED_SIZE_UPDATER.incrementAndGet(this);
+            } else if (isNewValueTombstone) {
                 ESTIMATED_SIZE_UPDATER.decrementAndGet(this);
             }
 
@@ -334,7 +340,7 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
             // several times, the same tuple will be inserted into the GC 
queue (timestamp and rowId don't change in this case).
             gcQueue.add(committedVersionChain);
         } else {
-            if (committedVersionChain.row == null) {
+            if (isNewValueTombstone) {
                 // If there is only one version, and it is a tombstone, then 
remove the chain.
                 return null;
             }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
index a2d4ebf103..ef60f80557 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
@@ -63,6 +63,9 @@ class AddWriteCommittedInvokeClosure implements 
InvokeClosure<VersionChain> {
      */
     private long rowLinkForAddToGcQueue = NULL_LINK;
 
+    @Nullable
+    private RowVersion prevRowVersion;
+
     AddWriteCommittedInvokeClosure(
             RowId rowId,
             @Nullable BinaryRow row,
@@ -97,10 +100,10 @@ class AddWriteCommittedInvokeClosure implements 
InvokeClosure<VersionChain> {
 
             newRow = VersionChain.createCommitted(rowId, newVersion.link(), 
newVersion.nextLink());
         } else {
-            RowVersion current = storage.readRowVersion(oldRow.headLink(), 
DONT_LOAD_VALUE);
+            prevRowVersion = storage.readRowVersion(oldRow.headLink(), 
DONT_LOAD_VALUE);
 
             // If the current and new version are tombstones, then there is no 
need to add a new version.
-            if (current.isTombstone() && row == null) {
+            if (prevRowVersion.isTombstone() && row == null) {
                 operationType = OperationType.NOOP;
             } else {
                 operationType = OperationType.PUT;
@@ -145,12 +148,14 @@ class AddWriteCommittedInvokeClosure implements 
InvokeClosure<VersionChain> {
         }
 
         if (operationType == OperationType.PUT) {
-            if (row == null) {
-                storage.decrementEstimatedSize();
-            } else if (rowLinkForAddToGcQueue == NULL_LINK) {
-                // Checking for NULL_LINK allows us to distinguish if a new 
version chain was created or not. In other words if this is
-                // an insert or an update to an existing row.
-                storage.incrementEstimatedSize();
+            if (prevRowVersion == null || prevRowVersion.isTombstone()) {
+                if (row != null) {
+                    storage.incrementEstimatedSize();
+                }
+            } else {
+                if (row == null) {
+                    storage.decrementEstimatedSize();
+                }
             }
         }
     }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
index 2a7565da3c..e52cb498cb 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
@@ -73,10 +73,11 @@ class CommitWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
 
     private final UpdateTimestampHandler updateTimestampHandler;
 
-    /**
-     * Flag indicating that we are committing a tombstone.
-     */
-    private boolean isCurrentRowTombstone = false;
+    @Nullable
+    private RowVersion currentRowVersion;
+
+    @Nullable
+    private RowVersion prevRowVersion;
 
     CommitWriteInvokeClosure(
             RowId rowId,
@@ -129,30 +130,33 @@ class CommitWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
 
         operationType = OperationType.PUT;
 
-        RowVersion current = storage.readRowVersion(oldRow.headLink(), 
DONT_LOAD_VALUE);
-        RowVersion next = oldRow.hasNextLink() ? 
storage.readRowVersion(oldRow.nextLink(), DONT_LOAD_VALUE) : null;
+        currentRowVersion = storage.readRowVersion(oldRow.headLink(), 
DONT_LOAD_VALUE);
 
-        isCurrentRowTombstone = current.isTombstone();
+        assert currentRowVersion != null;
 
-        if (next == null && isCurrentRowTombstone) {
+        prevRowVersion = oldRow.hasNextLink() ? 
storage.readRowVersion(oldRow.nextLink(), DONT_LOAD_VALUE) : null;
+
+        if (prevRowVersion == null && currentRowVersion.isTombstone()) {
             // If there is only one version, and it is a tombstone, then 
remove the chain.
             operationType = OperationType.REMOVE;
 
             return;
         }
 
+        boolean isPreviousRowTombstone = prevRowVersion != null && 
prevRowVersion.isTombstone();
+
         // If the previous and current version are tombstones, then delete the 
current version.
-        if (next != null && isCurrentRowTombstone && next.isTombstone()) {
-            toRemove = current;
+        if (isPreviousRowTombstone && currentRowVersion.isTombstone()) {
+            toRemove = currentRowVersion;
 
-            newRow = VersionChain.createCommitted(oldRow.rowId(), next.link(), 
next.nextLink());
+            newRow = VersionChain.createCommitted(rowId, 
prevRowVersion.link(), prevRowVersion.nextLink());
         } else {
-            updateTimestampLink = oldRow.headLink();
+            updateTimestampLink = currentRowVersion.link();
 
-            newRow = VersionChain.createCommitted(oldRow.rowId(), 
oldRow.headLink(), oldRow.nextLink());
+            newRow = VersionChain.createCommitted(rowId, 
currentRowVersion.link(), currentRowVersion.nextLink());
 
-            if (oldRow.hasNextLink()) {
-                rowLinkForAddToGcQueue = oldRow.headLink();
+            if (currentRowVersion.hasNextLink()) {
+                rowLinkForAddToGcQueue = currentRowVersion.link();
             }
         }
     }
@@ -194,6 +198,12 @@ class CommitWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
     void afterCompletion() {
         assert operationType == OperationType.PUT || toRemove == null : 
"toRemove=" + toRemove + ", op=" + operationType;
 
+        if (operationType == OperationType.NOOP) {
+            return;
+        }
+
+        assert currentRowVersion != null;
+
         if (toRemove != null) {
             storage.removeRowVersion(toRemove);
         }
@@ -202,15 +212,15 @@ class CommitWriteInvokeClosure implements 
InvokeClosure<VersionChain> {
             gcQueue.add(rowId, timestamp, rowLinkForAddToGcQueue);
         }
 
-        // We need to check the "toRemove" field in order to avoid a situation 
when we are committing a tombstone
-        // over an existing tombstone.
-        if (operationType == OperationType.PUT && toRemove == null) {
-            if (isCurrentRowTombstone) {
-                storage.decrementEstimatedSize();
-            } else if (rowLinkForAddToGcQueue == NULL_LINK) {
-                // Checking for NULL_LINK allows us to distinguish if a new 
version chain was created or not. In other words if this is
-                // an insert or an update to an existing row.
-                storage.incrementEstimatedSize();
+        if (operationType == OperationType.PUT) {
+            if (prevRowVersion == null || prevRowVersion.isTombstone()) {
+                if (!currentRowVersion.isTombstone()) {
+                    storage.incrementEstimatedSize();
+                }
+            } else {
+                if (currentRowVersion.isTombstone()) {
+                    storage.decrementEstimatedSize();
+                }
             }
         }
     }
diff --git 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 8648722bbe..0364ac9dc1 100644
--- 
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++ 
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -108,6 +108,18 @@ public class RocksDbMvPartitionStorageTest extends 
AbstractMvPartitionStorageTes
         super.estimatedSizeNeverFallsBelowZeroUsingCommitWrite();
     }
 
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22617";)
+    @Override
+    public void estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted() {
+        super.estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted();
+    }
+
+    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22617";)
+    @Override
+    public void estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite() {
+        super.estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite();
+    }
+
     @Disabled("https://issues.apache.org/jira/browse/IGNITE-22617";)
     @Override
     public void estimatedSizeShowsLatestRowsNumberUsingWriteCommited() {

Reply via email to