This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26998 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a8a6c83d11ec133388b10cf74145d46f3467bdd9 Author: Kirill Tkalenko <[email protected]> AuthorDate: Fri Nov 28 11:50:35 2025 +0300 IGNITE-26998 wip --- .../storage/AbstractMvPartitionStorageTest.java | 92 ++++++++++++++++++++++ .../storage/impl/TestMvPartitionStorage.java | 23 +++--- .../mv/AbstractPageMemoryMvPartitionStorage.java | 68 +++++++++++----- .../internal/storage/rocksdb/GarbageCollector.java | 40 +++++----- .../storage/rocksdb/RocksDbMvPartitionStorage.java | 10 +-- 5 files changed, 180 insertions(+), 53 deletions(-) diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java index ec0eaf8a3dc..ae4e1e5922a 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java @@ -25,6 +25,7 @@ import static org.apache.ignite.internal.storage.AddWriteResultMatcher.equalsToA import static org.apache.ignite.internal.storage.CommitResultMatcher.equalsToCommitResult; import static org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.empty; import static org.hamcrest.Matchers.equalTo; @@ -52,9 +53,15 @@ import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.lang.IgniteBiTuple; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.gc.GcEntry; import org.apache.ignite.internal.storage.lease.LeaseInfo; import org.apache.ignite.internal.testframework.IgniteTestUtils; +import org.apache.ignite.internal.tostring.IgniteToStringInclude; +import org.apache.ignite.internal.tostring.S; import org.apache.ignite.internal.util.Cursor; +import org.hamcrest.Description; +import org.hamcrest.Matcher; +import org.hamcrest.TypeSafeMatcher; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.EnumSource; @@ -2074,6 +2081,48 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor assertThat(storage.estimatedSize(), is(1L)); } + @Test + void testPeekEmptyStorage() { + assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 1), empty()); + } + + @Test + void testPeek() { + var commitTimestamp = new HybridTimestamp(10, 0); + + addWriteCommitted(ROW_ID, binaryRow, commitTimestamp); + addWriteCommitted(ROW_ID, binaryRow, commitTimestamp.addPhysicalTime(10)); + addWriteCommitted(ROW_ID, binaryRow, commitTimestamp.addPhysicalTime(20)); + + Matcher<GcEntry> expGcEntry0 = eqGcEntry(new TestGcEntry(ROW_ID, commitTimestamp.addPhysicalTime(10))); + Matcher<GcEntry> expGcEntry1 = eqGcEntry(new TestGcEntry(ROW_ID, commitTimestamp.addPhysicalTime(20))); + + assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 0), empty()); + assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 1), contains(expGcEntry0)); + assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 2), contains(expGcEntry0, expGcEntry1)); + assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 3), contains(expGcEntry0, expGcEntry1)); + + assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 0), empty()); + assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 1), empty()); + assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 2), empty()); + assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 3), empty()); + + assertThat(storage.peek(commitTimestamp, 0), empty()); + assertThat(storage.peek(commitTimestamp, 1), empty()); + assertThat(storage.peek(commitTimestamp, 2), empty()); + assertThat(storage.peek(commitTimestamp, 3), empty()); + + assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 0), empty()); + assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 1), contains(expGcEntry0)); + assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 2), contains(expGcEntry0)); + assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 3), contains(expGcEntry0)); + + assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 0), empty()); + assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 1), contains(expGcEntry0)); + assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 2), contains(expGcEntry0, expGcEntry1)); + assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 3), contains(expGcEntry0, expGcEntry1)); + } + /** * Returns row id that is lexicographically smaller (by the value of one) than the argument. * @@ -2109,4 +2158,47 @@ public abstract class AbstractMvPartitionStorageTest extends BaseMvPartitionStor abstract HybridTimestamp scanTimestamp(HybridClock clock); } + + protected static Matcher<GcEntry> eqGcEntry(GcEntry gcEntry) { + return new TypeSafeMatcher<>() { + @Override + protected boolean matchesSafely(GcEntry item) { + return gcEntry.getRowId().equals(item.getRowId()) && gcEntry.getTimestamp().equals(item.getTimestamp()); + } + + @Override + public void describeTo(Description description) { + description.appendValue(gcEntry); + } + }; + } + + /** Implementation for tests. */ + protected static class TestGcEntry implements GcEntry { + @IgniteToStringInclude + private final RowId rowId; + + @IgniteToStringInclude + private final HybridTimestamp timestamp; + + protected TestGcEntry(RowId rowId, HybridTimestamp timestamp) { + this.rowId = rowId; + this.timestamp = timestamp; + } + + @Override + public RowId getRowId() { + return rowId; + } + + @Override + public HybridTimestamp getTimestamp() { + return timestamp; + } + + @Override + public String toString() { + return S.toString(this); + } + } } diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java index 0f6b8a7d3a2..14bae494659 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java @@ -698,21 +698,26 @@ public class TestMvPartitionStorage implements MvPartitionStorage { } @Override - // TODO: IGNITE-26998 Переделать public synchronized List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { - assert THREAD_LOCAL_LOCKER.get() != null; + if (count <= 0) { + return List.of(); + } - try { - VersionChain versionChain = gcQueue.first(); + var res = new ArrayList<GcEntry>(count); + + Iterator<VersionChain> it = gcQueue.iterator(); + + for (int i = 0; i < count && it.hasNext(); i++) { + VersionChain next = it.next(); - if (versionChain == null || versionChain.ts.compareTo(lowWatermark) > 0) { - return List.of(); + if (next.ts.compareTo(lowWatermark) > 0) { + break; } - return List.of(versionChain); - } catch (NoSuchElementException e) { - return null; + res.add(next); } + + return res; } @Override diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java index 3964c85eedb..d2e9539901a 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java @@ -962,28 +962,18 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio } @Override - // TODO: IGNITE-26998 Исправить реализацию public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { - assert THREAD_LOCAL_LOCKER.get() != null; - - // Assertion above guarantees that we're in "runConsistently" closure. - throwExceptionIfStorageNotInRunnableState(); - - GcRowVersion head = renewableState.gcQueue().getFirst(); - - // Garbage collection queue is empty. - if (head == null) { - return List.of(); - } - - HybridTimestamp rowTimestamp = head.getTimestamp(); + return busy(() -> { + throwExceptionIfStorageNotInRunnableState(); - // There are no versions in the garbage collection queue before watermark. - if (rowTimestamp.compareTo(lowWatermark) > 0) { - return List.of(); - } + if (count <= 0) { + return List.of(); + } else if (count == 1) { + return peekSingleGcEntryBusy(lowWatermark); + } - return List.of(head); + return peekGcEntriesBusy(lowWatermark, count); + }); } @Override @@ -1069,4 +1059,44 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio * @see MvPartitionStorage#estimatedSize */ public abstract void decrementEstimatedSize(); + + private List<GcEntry> peekSingleGcEntryBusy(HybridTimestamp lowWatermark) { + GcRowVersion head = renewableState.gcQueue().getFirst(); + + // Garbage collection queue is empty. + if (head == null) { + return List.of(); + } + + HybridTimestamp rowTimestamp = head.getTimestamp(); + + // There are no versions in the garbage collection queue before watermark. + if (rowTimestamp.compareTo(lowWatermark) > 0) { + return List.of(); + } + + return List.of(head); + } + + private List<GcEntry> peekGcEntriesBusy(HybridTimestamp lowWatermark, int count) { + var res = new ArrayList<GcEntry>(count); + + try (Cursor<GcRowVersion> cursor = renewableState.gcQueue().find(null, null)) { + while (res.size() < count && cursor.hasNext()) { + GcRowVersion next = cursor.next(); + + if (next.getTimestamp().compareTo(lowWatermark) > 0) { + break; + } + + res.add(next); + } + } catch (IgniteInternalCheckedException e) { + throwStorageExceptionIfItCause(e); + + throw new StorageException("Row version lookup failed: [rowId={}, {}]", e, lowWatermark, createStorageInfo()); + } + + return res; + } } diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java index 8b2624d174c..dff8da44c0b 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java @@ -35,6 +35,7 @@ import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.TABLE_ID_SIZE; import java.nio.ByteBuffer; +import java.util.ArrayList; import java.util.List; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; @@ -177,35 +178,36 @@ class GarbageCollector { /** * Polls an elements for vacuum. See {@link MvPartitionStorage#peek}. * - * @param writeBatch Current Write Batch. * @param lowWatermark Low watermark. + * @param count Requested count of entries. */ - List<GcEntry> peek(WriteBatchWithIndex writeBatch, HybridTimestamp lowWatermark) { - // We retrieve the first element of the GC queue and seek for it in the data CF. - // However, the element that we need to garbage collect is the next (older one) element. - // First we check if there's anything to garbage collect. If the element is a tombstone we remove it. - // If the next element exists, that should be the element that we want to garbage collect. - try (RocksIterator gcIt = newWrappedIterator(writeBatch, gcQueueCf, helper.upperBoundReadOpts)) { + List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { + if (count <= 0) { + return List.of(); + } + + var res = new ArrayList<GcEntry>(count); + + try (RocksIterator gcIt = db.newIterator(gcQueueCf, helper.upperBoundReadOpts)) { gcIt.seek(helper.partitionStartPrefix()); - if (invalid(gcIt)) { - // GC queue is empty. - return List.of(); - } + while (res.size() < count && !invalid(gcIt)) { + ByteBuffer gcKeyBuffer = readGcKey(gcIt); - ByteBuffer gcKeyBuffer = readGcKey(gcIt); + GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer); - GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer); + if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) { + break; + } - if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) { - // No elements to garbage collect. - return List.of(); - } + res.add(gcRowVersion); - return List.of(gcRowVersion); + gcIt.next(); + } } - } + return res; + } /** * Polls an element for vacuum. See {@link MvPartitionStorage#vacuum(GcEntry)}. diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java index 2ca56d14e98..af1319f5f1b 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java @@ -1290,14 +1290,12 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { } @Override - // TODO: IGNITE-26988 Переделать public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { - WriteBatchWithIndex batch = requireWriteBatch(); - - // No busy lock required, we're already in "runConsistently" closure. - throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo); + return busy(() -> { + throwExceptionIfStorageInProgressOfRebalance(state.get(), this::createStorageInfo); - return gc.peek(batch, lowWatermark); + return gc.peek(lowWatermark, count); + }); } @Override
