This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 36b119e37f IGNITE-22797 Fix estimated size for insert-after-delete
scenario (#4125)
36b119e37f is described below
commit 36b119e37f8f3511148654cee00a62ef1d41c820
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Tue Jul 23 17:04:03 2024 +0300
IGNITE-22797 Fix estimated size for insert-after-delete scenario (#4125)
---
.../storage/AbstractMvPartitionStorageTest.java | 35 +++++++++++++
.../storage/impl/TestMvPartitionStorage.java | 12 +++--
.../mv/AddWriteCommittedInvokeClosure.java | 21 +++++---
.../pagememory/mv/CommitWriteInvokeClosure.java | 58 +++++++++++++---------
.../rocksdb/RocksDbMvPartitionStorageTest.java | 12 +++++
5 files changed, 103 insertions(+), 35 deletions(-)
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index 92adba279a..32e168a959 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -1490,6 +1490,41 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
assertThat(storage.estimatedSize(), is(0L));
}
+ @Test
+ public void estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted() {
+ addWriteCommitted(ROW_ID, binaryRow, clock.now());
+
+ assertThat(storage.estimatedSize(), is(1L));
+
+ addWriteCommitted(ROW_ID, null, clock.now());
+
+ assertThat(storage.estimatedSize(), is(0L));
+
+ addWriteCommitted(ROW_ID, binaryRow, clock.now());
+
+ assertThat(storage.estimatedSize(), is(1L));
+ }
+
+ @Test
+ public void estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite() {
+ UUID txId = UUID.randomUUID();
+
+ addWrite(ROW_ID, binaryRow, txId);
+ commitWrite(ROW_ID, clock.now());
+
+ assertThat(storage.estimatedSize(), is(1L));
+
+ addWrite(ROW_ID, null, txId);
+ commitWrite(ROW_ID, clock.now());
+
+ assertThat(storage.estimatedSize(), is(0L));
+
+ addWrite(ROW_ID, binaryRow, txId);
+ commitWrite(ROW_ID, clock.now());
+
+ assertThat(storage.estimatedSize(), is(1L));
+ }
+
@Test
public void estimatedSizeShowsLatestRowsNumberUsingWriteCommited() {
assertThat(storage.estimatedSize(), is(0L));
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 058b8574b3..b3a2721074 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -320,13 +320,19 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
private @Nullable VersionChain resolveCommittedVersionChain(VersionChain
committedVersionChain) {
VersionChain nextChain = committedVersionChain.next;
+ boolean isNewValueTombstone = committedVersionChain.row == null;
+
if (nextChain != null) {
- if (committedVersionChain.row == null) {
- if (nextChain.row == null) {
+ boolean isOldValueTombstone = nextChain.row == null;
+
+ if (isOldValueTombstone) {
+ if (isNewValueTombstone) {
// Avoid creating tombstones for tombstones.
return nextChain;
}
+ ESTIMATED_SIZE_UPDATER.incrementAndGet(this);
+ } else if (isNewValueTombstone) {
ESTIMATED_SIZE_UPDATER.decrementAndGet(this);
}
@@ -334,7 +340,7 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
// several times, the same tuple will be inserted into the GC
queue (timestamp and rowId don't change in this case).
gcQueue.add(committedVersionChain);
} else {
- if (committedVersionChain.row == null) {
+ if (isNewValueTombstone) {
// If there is only one version, and it is a tombstone, then
remove the chain.
return null;
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
index a2d4ebf103..ef60f80557 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AddWriteCommittedInvokeClosure.java
@@ -63,6 +63,9 @@ class AddWriteCommittedInvokeClosure implements
InvokeClosure<VersionChain> {
*/
private long rowLinkForAddToGcQueue = NULL_LINK;
+ @Nullable
+ private RowVersion prevRowVersion;
+
AddWriteCommittedInvokeClosure(
RowId rowId,
@Nullable BinaryRow row,
@@ -97,10 +100,10 @@ class AddWriteCommittedInvokeClosure implements
InvokeClosure<VersionChain> {
newRow = VersionChain.createCommitted(rowId, newVersion.link(),
newVersion.nextLink());
} else {
- RowVersion current = storage.readRowVersion(oldRow.headLink(),
DONT_LOAD_VALUE);
+ prevRowVersion = storage.readRowVersion(oldRow.headLink(),
DONT_LOAD_VALUE);
// If the current and new version are tombstones, then there is no
need to add a new version.
- if (current.isTombstone() && row == null) {
+ if (prevRowVersion.isTombstone() && row == null) {
operationType = OperationType.NOOP;
} else {
operationType = OperationType.PUT;
@@ -145,12 +148,14 @@ class AddWriteCommittedInvokeClosure implements
InvokeClosure<VersionChain> {
}
if (operationType == OperationType.PUT) {
- if (row == null) {
- storage.decrementEstimatedSize();
- } else if (rowLinkForAddToGcQueue == NULL_LINK) {
- // Checking for NULL_LINK allows us to distinguish if a new
version chain was created or not. In other words if this is
- // an insert or an update to an existing row.
- storage.incrementEstimatedSize();
+ if (prevRowVersion == null || prevRowVersion.isTombstone()) {
+ if (row != null) {
+ storage.incrementEstimatedSize();
+ }
+ } else {
+ if (row == null) {
+ storage.decrementEstimatedSize();
+ }
}
}
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
index 2a7565da3c..e52cb498cb 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/CommitWriteInvokeClosure.java
@@ -73,10 +73,11 @@ class CommitWriteInvokeClosure implements
InvokeClosure<VersionChain> {
private final UpdateTimestampHandler updateTimestampHandler;
- /**
- * Flag indicating that we are committing a tombstone.
- */
- private boolean isCurrentRowTombstone = false;
+ @Nullable
+ private RowVersion currentRowVersion;
+
+ @Nullable
+ private RowVersion prevRowVersion;
CommitWriteInvokeClosure(
RowId rowId,
@@ -129,30 +130,33 @@ class CommitWriteInvokeClosure implements
InvokeClosure<VersionChain> {
operationType = OperationType.PUT;
- RowVersion current = storage.readRowVersion(oldRow.headLink(),
DONT_LOAD_VALUE);
- RowVersion next = oldRow.hasNextLink() ?
storage.readRowVersion(oldRow.nextLink(), DONT_LOAD_VALUE) : null;
+ currentRowVersion = storage.readRowVersion(oldRow.headLink(),
DONT_LOAD_VALUE);
- isCurrentRowTombstone = current.isTombstone();
+ assert currentRowVersion != null;
- if (next == null && isCurrentRowTombstone) {
+ prevRowVersion = oldRow.hasNextLink() ?
storage.readRowVersion(oldRow.nextLink(), DONT_LOAD_VALUE) : null;
+
+ if (prevRowVersion == null && currentRowVersion.isTombstone()) {
// If there is only one version, and it is a tombstone, then
remove the chain.
operationType = OperationType.REMOVE;
return;
}
+ boolean isPreviousRowTombstone = prevRowVersion != null &&
prevRowVersion.isTombstone();
+
// If the previous and current version are tombstones, then delete the
current version.
- if (next != null && isCurrentRowTombstone && next.isTombstone()) {
- toRemove = current;
+ if (isPreviousRowTombstone && currentRowVersion.isTombstone()) {
+ toRemove = currentRowVersion;
- newRow = VersionChain.createCommitted(oldRow.rowId(), next.link(),
next.nextLink());
+ newRow = VersionChain.createCommitted(rowId,
prevRowVersion.link(), prevRowVersion.nextLink());
} else {
- updateTimestampLink = oldRow.headLink();
+ updateTimestampLink = currentRowVersion.link();
- newRow = VersionChain.createCommitted(oldRow.rowId(),
oldRow.headLink(), oldRow.nextLink());
+ newRow = VersionChain.createCommitted(rowId,
currentRowVersion.link(), currentRowVersion.nextLink());
- if (oldRow.hasNextLink()) {
- rowLinkForAddToGcQueue = oldRow.headLink();
+ if (currentRowVersion.hasNextLink()) {
+ rowLinkForAddToGcQueue = currentRowVersion.link();
}
}
}
@@ -194,6 +198,12 @@ class CommitWriteInvokeClosure implements
InvokeClosure<VersionChain> {
void afterCompletion() {
assert operationType == OperationType.PUT || toRemove == null :
"toRemove=" + toRemove + ", op=" + operationType;
+ if (operationType == OperationType.NOOP) {
+ return;
+ }
+
+ assert currentRowVersion != null;
+
if (toRemove != null) {
storage.removeRowVersion(toRemove);
}
@@ -202,15 +212,15 @@ class CommitWriteInvokeClosure implements
InvokeClosure<VersionChain> {
gcQueue.add(rowId, timestamp, rowLinkForAddToGcQueue);
}
- // We need to check the "toRemove" field in order to avoid a situation
when we are committing a tombstone
- // over an existing tombstone.
- if (operationType == OperationType.PUT && toRemove == null) {
- if (isCurrentRowTombstone) {
- storage.decrementEstimatedSize();
- } else if (rowLinkForAddToGcQueue == NULL_LINK) {
- // Checking for NULL_LINK allows us to distinguish if a new
version chain was created or not. In other words if this is
- // an insert or an update to an existing row.
- storage.incrementEstimatedSize();
+ if (operationType == OperationType.PUT) {
+ if (prevRowVersion == null || prevRowVersion.isTombstone()) {
+ if (!currentRowVersion.isTombstone()) {
+ storage.incrementEstimatedSize();
+ }
+ } else {
+ if (currentRowVersion.isTombstone()) {
+ storage.decrementEstimatedSize();
+ }
}
}
}
diff --git
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
index 8648722bbe..0364ac9dc1 100644
---
a/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
+++
b/modules/storage-rocksdb/src/test/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorageTest.java
@@ -108,6 +108,18 @@ public class RocksDbMvPartitionStorageTest extends
AbstractMvPartitionStorageTes
super.estimatedSizeNeverFallsBelowZeroUsingCommitWrite();
}
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-22617")
+ @Override
+ public void estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted() {
+ super.estimatedSizeIncreasedAfterTombstoneUsingWriteCommitted();
+ }
+
+ @Disabled("https://issues.apache.org/jira/browse/IGNITE-22617")
+ @Override
+ public void estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite() {
+ super.estimatedSizeIncreasedAfterTombstoneUsingCommiteWrite();
+ }
+
@Disabled("https://issues.apache.org/jira/browse/IGNITE-22617")
@Override
public void estimatedSizeShowsLatestRowsNumberUsingWriteCommited() {