This is an automated email from the ASF dual-hosted git repository. sdanilov pushed a commit to branch ignite-17720 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 28a299ddd14e6469fe815dbbee71fd84ad59316a Author: Semyon Danilov <[email protected]> AuthorDate: Tue Sep 27 16:19:07 2022 +0400 IGNITE-17720 Extend MvPartitionStorage scan API with write intent resolution capabilities --- .../internal/storage/MvPartitionStorage.java | 4 +- .../internal/storage/PartitionScanCursor.java | 35 ++++ .../apache/ignite/internal/storage/ReadResult.java | 8 + .../storage/AbstractMvPartitionStorageTest.java | 60 +++++-- .../TestConcurrentHashMapMvPartitionStorage.java | 67 +++++++- .../mv/AbstractPageMemoryMvPartitionStorage.java | 179 +++++++++++++++------ .../storage/rocksdb/RocksDbMvPartitionStorage.java | 153 +++++++++++++++++- .../distributed/storage/VersionedRowStore.java | 5 +- 8 files changed, 438 insertions(+), 73 deletions(-) diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java index de9742bf86..4d3dc080b3 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java @@ -228,7 +228,7 @@ public interface MvPartitionStorage extends AutoCloseable { * @deprecated Use {@link #scan(Predicate, HybridTimestamp)} */ @Deprecated - default Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException { + default PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, Timestamp timestamp) throws StorageException { return scan(keyFilter, convertTimestamp(timestamp)); } @@ -241,7 +241,7 @@ public interface MvPartitionStorage extends AutoCloseable { * @throws TxIdMismatchException If there's another pending update associated with different transaction id. * @throws StorageException If failed to read data from the storage. */ - Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException; + PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException; /** * Returns rows count belongs to current storage. diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java new file mode 100644 index 0000000000..8e76c3131b --- /dev/null +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/PartitionScanCursor.java @@ -0,0 +1,35 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.internal.storage; + +import org.apache.ignite.hlc.HybridTimestamp; +import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.util.Cursor; + +/** + * Partition cursor. + */ +public interface PartitionScanCursor extends Cursor<ReadResult> { + /** + * Returns a committed row within the current row id. + * + * @param timestamp Commit timestamp. + * @return Row or {@code null} if it doesn't exist. + */ + BinaryRow committed(HybridTimestamp timestamp); +} diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java index 4ebd74fe3a..a1d912c6b2 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ReadResult.java @@ -125,4 +125,12 @@ public class ReadResult { public int commitPartitionId() { return commitPartitionId; } + + public boolean isWriteIntent() { + return transactionId != null; + } + + public boolean isEmpty() { + return this == EMPTY; + } } diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java index 21e01306c8..b88f388d56 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java @@ -48,7 +48,6 @@ import org.apache.ignite.internal.tx.Timestamp; import org.apache.ignite.internal.util.Cursor; import org.apache.ignite.internal.util.Pair; import org.apache.ignite.lang.IgniteBiTuple; -import org.junit.jupiter.api.Disabled; import org.junit.jupiter.api.Test; /** @@ -93,7 +92,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest /** * Scans partition inside of consistency closure. */ - protected Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) { + protected Cursor<ReadResult> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) { return storage.runConsistently(() -> storage.scan(filter, timestamp)); } @@ -159,7 +158,7 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest @Test public void testScanOverEmpty() throws Exception { assertEquals(List.of(), convert(scan(row -> true, newTransactionId()))); - assertEquals(List.of(), convert(scan(row -> true, clock.now()))); + assertEquals(List.of(), convert0(scan(row -> true, clock.now()))); } /** @@ -312,13 +311,13 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest HybridTimestamp ts5 = clock.now(); // Full scan with various timestamp values. - assertEquals(List.of(), convert(scan(row -> true, ts1))); + assertEquals(List.of(), convert0(scan(row -> true, ts1))); - assertEquals(List.of(value1), convert(scan(row -> true, ts2))); - assertEquals(List.of(value1), convert(scan(row -> true, ts3))); + assertEquals(List.of(value1), convert0(scan(row -> true, ts2))); + assertEquals(List.of(value1), convert0(scan(row -> true, ts3))); - assertEquals(List.of(value1, value2), convert(scan(row -> true, ts4))); - assertEquals(List.of(value1, value2), convert(scan(row -> true, ts5))); + assertEquals(List.of(value1, value2), convert0(scan(row -> true, ts4))); + assertEquals(List.of(value1, value2), convert0(scan(row -> true, ts5))); } @Test @@ -364,6 +363,15 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest } } + private List<TestValue> convert0(Cursor<ReadResult> cursor) throws Exception { + try (cursor) { + return cursor.stream() + .map((ReadResult rs) -> BaseMvStoragesTest.value(rs.binaryRow())) + .sorted(Comparator.nullsFirst(Comparator.naturalOrder())) + .collect(Collectors.toList()); + } + } + @Test void readOfUncommittedRowWithCorrespondingTransactionIdReturnsTheRow() { RowId rowId = insert(binaryRow, txId); @@ -765,12 +773,11 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest } @Test - @Disabled("https://issues.apache.org/jira/browse/IGNITE-17720") void scanByTimestampWorksCorrectlyAfterCommitAndAbortFollowedByUncommittedWrite() throws Exception { commitAbortAndAddUncommitted(); - try (Cursor<BinaryRow> cursor = storage.scan(k -> true, clock.now())) { - BinaryRow foundRow = cursor.next(); + try (Cursor<ReadResult> cursor = storage.scan(k -> true, clock.now())) { + BinaryRow foundRow = cursor.next().binaryRow(); assertRowMatches(foundRow, binaryRow3); @@ -1025,6 +1032,37 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvStoragesTest } } + @Test + void testScanWithWriteIntent() throws Exception { + RowId rowId1 = new RowId(PARTITION_ID); + + HybridTimestamp commit1ts = clock.now(); + + storage.runConsistently(() -> { + addWrite(rowId1, binaryRow, newTransactionId()); + + commitWrite(rowId1, commit1ts); + + addWrite(rowId1, binaryRow2, newTransactionId()); + + return null; + }); + + try (PartitionScanCursor cursor = storage.scan(r -> true, clock.now())) { + assertTrue(cursor.hasNext()); + + ReadResult next = cursor.next(); + + assertTrue(next.isWriteIntent()); + + assertRowMatches(next.binaryRow(), binaryRow2); + + BinaryRow committedRow = cursor.committed(next.newestCommitTimestamp()); + + assertRowMatches(committedRow, binaryRow); + } + } + @Test void testScanVersionsWithWriteIntent() throws Exception { RowId rowId = new RowId(PARTITION_ID, 100, 0); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java index 65103d0ae9..be90f55363 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/chm/TestConcurrentHashMapMvPartitionStorage.java @@ -19,6 +19,7 @@ package org.apache.ignite.internal.storage.chm; import java.util.Iterator; import java.util.Map.Entry; +import java.util.NoSuchElementException; import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -30,6 +31,7 @@ import java.util.stream.Stream; import org.apache.ignite.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.PartitionScanCursor; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageException; @@ -327,14 +329,65 @@ public class TestConcurrentHashMapMvPartitionStorage implements MvPartitionStora /** {@inheritDoc} */ @Override - public Cursor<BinaryRow> scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) { - Iterator<BinaryRow> iterator = map.values().stream() - .map(versionChain -> read(versionChain, timestamp, null, filter)) - .map(ReadResult::binaryRow) - .filter(Objects::nonNull) - .iterator(); + public PartitionScanCursor scan(Predicate<BinaryRow> filter, HybridTimestamp timestamp) { + Iterator<VersionChain> iterator = map.values().iterator(); - return Cursor.fromIterator(iterator); + return new PartitionScanCursor() { + + private VersionChain currentChain; + + private ReadResult currentReadResult; + + @Override + public BinaryRow committed(HybridTimestamp timestamp) { + ReadResult read = read(currentChain, timestamp, null, filter); + + if (read.transactionId() == null) { + return read.binaryRow(); + } + + return null; + } + + @Override + public void close() { + // No-op. + } + + @Override + public boolean hasNext() { + if (currentReadResult != null) { + return true; + } + + while (iterator.hasNext()) { + VersionChain chain = iterator.next(); + ReadResult readResult = read(chain, timestamp, null, filter); + + if (!readResult.isEmpty()) { + currentChain = chain; + currentReadResult = readResult; + + return true; + } + } + + return false; + } + + @Override + public ReadResult next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + ReadResult res = currentReadResult; + + currentReadResult = null; + + return res; + } + }; } /** {@inheritDoc} */ diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java index 96d4843873..9cc60288cd 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java @@ -44,6 +44,7 @@ import org.apache.ignite.internal.pagememory.util.PageLockListenerNoOp; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.PartitionScanCursor; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageException; @@ -273,7 +274,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio return null; } - return findLatestRowVersion(versionChain, txId, MATCH_ALL).binaryRow(); + return findLatestRowVersion(versionChain, txId, MATCH_ALL); } @Override @@ -300,13 +301,13 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio } } - private ReadResult findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) { + private @Nullable BinaryRow findLatestRowVersion(VersionChain versionChain, UUID txId, Predicate<BinaryRow> keyFilter) { RowVersion rowVersion = readRowVersion(versionChain.headLink(), ALWAYS_LOAD_VALUE); ByteBufferRow row = rowVersionToBinaryRow(rowVersion); if (!keyFilter.test(row)) { - return ReadResult.EMPTY; + return null; } if (versionChain.isUncommitted()) { @@ -316,11 +317,10 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio throwIfChainBelongsToAnotherTx(versionChain, txId); - return ReadResult.createFromWriteIntent(row, versionChain.transactionId(), versionChain.commitTableId(), null, - versionChain.commitPartitionId()); + return row; } - return ReadResult.createFromCommitted(row); + return row; } private RowVersion readRowVersion(long nextLink, Predicate<HybridTimestamp> loadValue) { @@ -351,36 +351,9 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio return new ByteBufferRow(rowVersion.value()); } - private @Nullable BinaryRow findRowVersionInChain( - VersionChain versionChain, - @Nullable UUID transactionId, - @Nullable HybridTimestamp timestamp, - Predicate<BinaryRow> keyFilter - ) { - assert transactionId != null ^ timestamp != null; - - if (transactionId != null) { - ReadResult res = findLatestRowVersion(versionChain, transactionId, keyFilter); - - if (res == null) { - return null; - } - - return res.binaryRow(); - } else { - ReadResult res = findRowVersionByTimestamp(versionChain, timestamp); - - if (res == null) { - return null; - } - - BinaryRow row = res.binaryRow(); - - return keyFilter.test(row) ? row : null; - } - } - private ReadResult findRowVersionByTimestamp(VersionChain versionChain, HybridTimestamp timestamp) { + assert timestamp != null; + long headLink = versionChain.headLink(); if (versionChain.isUncommitted()) { @@ -660,16 +633,26 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio @Override public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, UUID txId) throws TxIdMismatchException, StorageException { - return internalScan(keyFilter, txId, null); + return internalScan(keyFilter, txId); } @Override - public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException { - return internalScan(keyFilter, null, timestamp); + public PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException { + assert timestamp != null; + + IgniteCursor<VersionChain> treeCursor; + + try { + treeCursor = versionChainTree.find(null, null); + } catch (IgniteInternalCheckedException e) { + throw new StorageException("Find failed", e); + } + + return new TimestampCursor(treeCursor, keyFilter, timestamp); } - private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, @Nullable UUID txId, @Nullable HybridTimestamp timestamp) { - assert txId != null ^ timestamp != null; + private Cursor<BinaryRow> internalScan(Predicate<BinaryRow> keyFilter, UUID txId) { + assert txId != null; IgniteCursor<VersionChain> treeCursor; @@ -679,7 +662,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio throw new StorageException("Find failed", e); } - return new ScanCursor(treeCursor, keyFilter, txId, timestamp); + return new ScanCursor(treeCursor, keyFilter, txId); } @Override @@ -712,6 +695,110 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio // TODO: IGNITE-17132 Implement it } + private class TimestampCursor implements PartitionScanCursor { + private final IgniteCursor<VersionChain> treeCursor; + + private final Predicate<BinaryRow> keyFilter; + + private final HybridTimestamp timestamp; + + private ReadResult nextRead = null; + + private boolean iterationExhausted = false; + + public TimestampCursor( + IgniteCursor<VersionChain> treeCursor, + Predicate<BinaryRow> keyFilter, + HybridTimestamp timestamp + ) { + this.treeCursor = treeCursor; + this.keyFilter = keyFilter; + this.timestamp = timestamp; + } + + @Override + public boolean hasNext() { + if (nextRead != null) { + return true; + } + + if (iterationExhausted) { + return false; + } + + while (true) { + boolean positionedToNext = tryAdvanceTreeCursor(); + + if (!positionedToNext) { + iterationExhausted = true; + return false; + } + + VersionChain chain = getCurrentChainFromTreeCursor(); + ReadResult res = findRowVersionByTimestamp(chain, timestamp); + + if (res.isEmpty()) { + continue; + } + + if (!keyFilter.test(res.binaryRow())) { + continue; + } + + nextRead = res; + return true; + } + } + + private boolean tryAdvanceTreeCursor() { + try { + return treeCursor.next(); + } catch (IgniteInternalCheckedException e) { + throw new StorageException("Error when trying to advance tree cursor", e); + } + } + + private VersionChain getCurrentChainFromTreeCursor() { + try { + return treeCursor.get(); + } catch (IgniteInternalCheckedException e) { + throw new StorageException("Failed to get element from tree cursor", e); + } + } + + @Override + public ReadResult next() { + if (!hasNext()) { + throw new NoSuchElementException("The cursor is exhausted"); + } + + assert nextRead != null; + + ReadResult res = nextRead; + nextRead = null; + + return res; + } + + @Override + public void close() { + // No-op. + } + + @Override + public BinaryRow committed(HybridTimestamp timestamp) { + if (iterationExhausted) { + throw new NoSuchElementException(); + } + + VersionChain chain = getCurrentChainFromTreeCursor(); + + ReadResult res = findRowVersionByTimestamp(chain, timestamp); + + return res.binaryRow(); + } + } + private class ScanCursor implements Cursor<BinaryRow> { private final IgniteCursor<VersionChain> treeCursor; @@ -719,8 +806,6 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio private final @Nullable UUID transactionId; - private final @Nullable HybridTimestamp timestamp; - private BinaryRow nextRow = null; private boolean iterationExhausted = false; @@ -728,13 +813,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio public ScanCursor( IgniteCursor<VersionChain> treeCursor, Predicate<BinaryRow> keyFilter, - @Nullable UUID transactionId, - @Nullable HybridTimestamp timestamp + @Nullable UUID transactionId ) { this.treeCursor = treeCursor; this.keyFilter = keyFilter; this.transactionId = transactionId; - this.timestamp = timestamp; } @Override @@ -756,7 +839,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio } VersionChain chain = getCurrentChainFromTreeCursor(); - BinaryRow row = findRowVersionInChain(chain, transactionId, timestamp, keyFilter); + BinaryRow row = findLatestRowVersion(chain, transactionId, keyFilter); if (row != null) { nextRow = row; @@ -797,7 +880,7 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio @Override public void close() { - // no-op + // No-op. } } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java index 2d0d46e4be..3028d9afe2 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java @@ -42,6 +42,7 @@ import org.apache.ignite.internal.rocksdb.RocksIteratorAdapter; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.schema.ByteBufferRow; import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.PartitionScanCursor; import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.StorageException; @@ -541,6 +542,10 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { // It is guaranteed by descending order of timestamps. seekIterator.seek(keyBuf.array()); + return processIterator(seekIterator, rowId, timestamp, keyBuf); + } + + private static ReadResult processIterator(RocksIterator seekIterator, RowId rowId, HybridTimestamp timestamp, ByteBuffer keyBuf) { // There's no guarantee that required key even exists. If it doesn't, then "seek" will point to a different key. // To avoid returning its value, we have to check that actual key matches what we need. // Here we prepare direct buffer to read key without timestamp. Shared direct buffer is used to avoid extra memory allocations. @@ -689,8 +694,150 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { /** {@inheritDoc} */ @Override - public Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException { - return scan(keyFilter, timestamp, null); + public PartitionScanCursor scan(Predicate<BinaryRow> keyFilter, HybridTimestamp timestamp) throws StorageException { + assert timestamp != null; + + // Set next partition as an upper bound. + ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true); + + RocksIterator it = db.newIterator(cf, options); + + // Here's seek buffer itself. Originally it contains a valid partition id, row id payload that's filled with zeroes, and maybe + // a timestamp value. Zero row id guarantees that it's lexicographically less than or equal to any other row id stored in the + // partition. + // Byte buffer from a thread-local field can't be used here, because of two reasons: + // - no one guarantees that there will only be a single cursor; + // - no one guarantees that returned cursor will not be used by other threads. + // The thing is, we need this buffer to preserve its content between invocations of "hasNext" method. + ByteBuffer seekKeyBuf = ByteBuffer.allocate(MAX_KEY_SIZE).order(BIG_ENDIAN).putShort((short) partitionId); + + return new PartitionScanCursor() { + + private RowId currentRowId; + + @Override + public BinaryRow committed(HybridTimestamp timestamp) { + if (currentRowId == null) { + throw new IllegalStateException(); + } + + seekKeyBuf.putLong(ROW_ID_OFFSET, currentRowId.mostSignificantBits()); + seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, currentRowId.leastSignificantBits()); + putTimestamp(seekKeyBuf.position(ROW_PREFIX_SIZE), timestamp); + + it.seek(seekKeyBuf.position(0)); + + ReadResult readResult = processIterator(it, currentRowId, timestamp, seekKeyBuf); + + if (readResult.isEmpty()) { + return null; + } + + return readResult.binaryRow(); + } + + /** Cached value for {@link #next()} method. Also optimizes the code of {@link #hasNext()}. */ + private ReadResult next; + + /** {@inheritDoc} */ + @Override + public boolean hasNext() { + // Fast-path for consecutive invocations. + if (next != null) { + return true; + } + + // Prepare direct buffer slice to read keys from the iterator. + ByteBuffer directBuffer = MV_KEY_BUFFER.get().position(0); + + while (true) { + it.seek(seekKeyBuf); + + // We should do after each seek. Here in particular it means one of two things: + // - partition is empty; + // - iterator exhausted all the data in partition. + if (invalid(it)) { + return false; + } + + it.key(directBuffer.position(0)); + + directBuffer.position(ROW_ID_OFFSET); + long msb = directBuffer.getLong(); + long lsb = directBuffer.getLong(); + + var rowId = new RowId(partitionId, msb, lsb); + + seekKeyBuf.putLong(ROW_ID_OFFSET, msb); + seekKeyBuf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb); + putTimestamp(seekKeyBuf.position(ROW_PREFIX_SIZE), timestamp); + + it.seek(seekKeyBuf.position(0)); + + ReadResult readResult = processIterator(it, rowId, timestamp, seekKeyBuf); + + if (readResult.isEmpty()) { + incrementRowId(seekKeyBuf); + + seekKeyBuf.position(0); + continue; + } + + next = readResult; + currentRowId = rowId; + + return true; + } + } + + /** {@inheritDoc} */ + @Override + public ReadResult next() { + if (!hasNext()) { + throw new NoSuchElementException(); + } + + ReadResult res = next; + + next = null; + + incrementRowId(seekKeyBuf); + + seekKeyBuf.position(0); + + return res; + } + + /** {@inheritDoc} */ + @Override + public void close() throws Exception { + IgniteUtils.closeAll(it, options); + } + + private void incrementRowId(ByteBuffer buf) { + long lsb = 1 + buf.getLong(ROW_ID_OFFSET + Long.BYTES); + + buf.putLong(ROW_ID_OFFSET + Long.BYTES, lsb); + + if (lsb != 0L) { + return; + } + + long msb = 1 + buf.getLong(ROW_ID_OFFSET); + + buf.putLong(ROW_ID_OFFSET, msb); + + if (msb != 0L) { + return; + } + + short partitionId = (short) (1 + buf.getShort(0)); + + assert partitionId != 0; + + buf.putShort(0, partitionId); + } + }; } private Cursor<BinaryRow> scan(Predicate<BinaryRow> keyFilter, @Nullable HybridTimestamp timestamp, @Nullable UUID txId) @@ -698,7 +845,7 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { assert timestamp == null ^ txId == null; // Set next partition as an upper bound. - var options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true); + ReadOptions options = new ReadOptions().setIterateUpperBound(upperBound).setTotalOrderSeek(true); RocksIterator it = db.newIterator(cf, options); diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java index 940e8047f9..aa09719b8c 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/VersionedRowStore.java @@ -33,6 +33,7 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.function.Predicate; import org.apache.ignite.internal.schema.BinaryRow; import org.apache.ignite.internal.storage.MvPartitionStorage; +import org.apache.ignite.internal.storage.ReadResult; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.tx.Timestamp; import org.apache.ignite.internal.tx.TxManager; @@ -542,7 +543,7 @@ public class VersionedRowStore { */ public Cursor<BinaryRow> scan(Predicate<BinaryRow> pred) { // TODO <MUTED> https://issues.apache.org/jira/browse/IGNITE-17309 Transactional support for partition scans - Cursor<BinaryRow> delegate = storage.scan(pred, Timestamp.nextVersion()); + Cursor<ReadResult> delegate = storage.scan(pred, Timestamp.nextVersion()); // TODO asch add tx support IGNITE-15087. return new Cursor<BinaryRow>() { @@ -560,7 +561,7 @@ public class VersionedRowStore { } if (delegate.hasNext()) { - cur = delegate.next(); + cur = delegate.next().binaryRow(); return cur != null ? true : hasNext(); // Skip tombstones. }
