This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-17968 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 64f82fe38372889419c2cf99b054df30fc54da8b Author: Semyon Danilov <[email protected]> AuthorDate: Wed Oct 26 13:37:49 2022 +0400 IGNITE-17968 Fix write-intents being filtered out in case if it's a tombstone --- .../apache/ignite/internal/storage/ReadResult.java | 3 +- .../storage/AbstractMvPartitionStorageTest.java | 18 ++++++++- .../storage/impl/TestMvPartitionStorage.java | 2 +- .../mv/AbstractPageMemoryMvPartitionStorage.java | 13 ++++-- .../storage/rocksdb/RocksDbMvPartitionStorage.java | 46 +++++++++++++++++----- 5 files changed, 65 insertions(+), 17 deletions(-) diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java index cca33ed69f..90f01bb21f 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java @@ -130,7 +130,8 @@ public class ReadResult { } /** - * Returns timestamp of the most recent commit of the row. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)} + * Returns timestamp of the most recent commit of the row. Not {@code null} if committed version exists, this is a + * write-intent and read was made with a timestamp. Might be {@code null} for {@link MvPartitionStorage#scanVersions(RowId)} * even for write intents having a preceding committed version. * * @return Timestamp of the most recent commit of the row. diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java index 76c6db0b00..7576f92cef 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java @@ -1269,8 +1269,22 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest @ParameterizedTest @EnumSource(ScanTimestampProvider.class) - public void scanDoesNotSeeTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception { - testScanDoesNotSeeTombstones(tsProvider, false); + public void scanSeesTombstonesWhenTombstoneIsNotCommitted(ScanTimestampProvider tsProvider) throws Exception { + RowId rowId = insert(binaryRow, txId); + HybridTimestamp commitTs = clock.now(); + commitWrite(rowId, commitTs); + + addWrite(rowId, null, newTransactionId()); + + try (PartitionTimestampCursor cursor = scan(tsProvider.scanTimestamp(clock))) { + assertTrue(cursor.hasNext()); + + ReadResult next = cursor.next(); + assertNull(next.binaryRow()); + assertEquals(commitTs, next.newestCommitTimestamp()); + + assertFalse(cursor.hasNext()); + } } @ParameterizedTest diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java index f6043ae9a8..50e27bd297 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java @@ -351,7 +351,7 @@ public class TestMvPartitionStorage implements MvPartitionStorage { VersionChain chain = iterator.next(); ReadResult readResult = read(chain, timestamp, null); - if (!readResult.isEmpty()) { + if (!readResult.isEmpty() || readResult.isWriteIntent()) { currentChain = chain; currentReadResult = readResult; diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java index 3297c8cbc6..d05f62ed46 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java @@ -296,7 +296,14 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio if (versionChain.isUncommitted()) { assert versionChain.transactionId() != null; - return writeIntentToResult(versionChain, rowVersion, null); + HybridTimestamp newestCommitTs = null; + + if (versionChain.hasCommittedVersions()) { + long newestCommitLink = versionChain.newestCommittedLink(); + newestCommitTs = readRowVersion(newestCommitLink, ALWAYS_LOAD_VALUE).timestamp(); + } + + return writeIntentToResult(versionChain, rowVersion, newestCommitTs); } else { ByteBufferRow row = rowVersionToBinaryRow(rowVersion); @@ -798,7 +805,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio VersionChain chain = treeCursor.next(); ReadResult result = findRowVersionByTimestamp(chain, timestamp); - if (result.isEmpty()) { + if (result.isEmpty() && !result.isWriteIntent()) { continue; } @@ -841,7 +848,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio VersionChain chain = treeCursor.next(); ReadResult result = findLatestRowVersion(chain); - if (result.isEmpty()) { + if (result.isEmpty() && !result.isWriteIntent()) { continue; } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java index 1a598edd3b..ea5524377f 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java @@ -1115,7 +1115,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { currentRowId = null; // Prepare direct buffer slice to read keys from the iterator. - ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0); + ByteBuffer currentKeyBuffer = MV_KEY_BUFFER.get().position(0); while (true) { // At this point, seekKeyBuf should contain row id that's above the one we already scanned, but not greater than any @@ -1140,15 +1140,15 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { } // Read the actual key into a direct buffer. - int keyLength = it.key(directBuffer.limit(MAX_KEY_SIZE)); + int keyLength = it.key(currentKeyBuffer.limit(MAX_KEY_SIZE)); boolean isWriteIntent = keyLength == ROW_PREFIX_SIZE; - directBuffer.limit(ROW_PREFIX_SIZE); + currentKeyBuffer.limit(ROW_PREFIX_SIZE); // Copy actual row id into a "seekKeyBuf" buffer. - seekKeyBuf.putLong(ROW_ID_OFFSET, directBuffer.getLong(ROW_ID_OFFSET)); - seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, directBuffer.getLong(ROW_ID_OFFSET + Long.BYTES)); + seekKeyBuf.putLong(ROW_ID_OFFSET, currentKeyBuffer.getLong(ROW_ID_OFFSET)); + seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, currentKeyBuffer.getLong(ROW_ID_OFFSET + Long.BYTES)); // This one might look tricky. We finished processing next row. There are three options: // - "found" flag is false - there's no fitting version of the row. We'll continue to next iteration; @@ -1168,12 +1168,38 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { // Cache row and return "true" if it's found and not a tombstone. byte[] valueBytes = it.value(); - directBuffer.limit(keyLength); - ReadResult readResult = readResultFromKeyAndValue(isWriteIntent, directBuffer, valueBytes); + RowId rowId = getRowId(currentKeyBuffer); + HybridTimestamp nextCommitTimestamp = null; + + if (isWriteIntent) { + it.next(); + + if (!invalid(it)) { + ByteBuffer key = ByteBuffer.wrap(it.key()).order(KEY_BYTE_ORDER); + + if (matches(rowId, key)) { + // This is a next version of current row. + nextCommitTimestamp = readTimestamp(key); + } + } + } + + currentKeyBuffer.limit(keyLength); + + assert valueBytes != null; + + ReadResult readResult; + + if (!isWriteIntent) { + // There is no write-intent, return latest committed row. + readResult = wrapCommittedValue(valueBytes, readTimestamp(currentKeyBuffer)); + } else { + readResult = wrapUncommittedValue(valueBytes, nextCommitTimestamp); + } - if (!readResult.isEmpty()) { + if (!readResult.isEmpty() || readResult.isWriteIntent()) { next = readResult; - currentRowId = getRowId(directBuffer); + currentRowId = rowId; return true; } @@ -1224,7 +1250,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { ReadResult readResult = handleReadByTimestampIterator(it, rowId, timestamp, seekKeyBuf); - if (readResult.isEmpty()) { + if (readResult.isEmpty() && !readResult.isWriteIntent()) { // Seek to next row id as we found nothing that matches. incrementRowId(seekKeyBuf);
