This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26998
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a8a6c83d11ec133388b10cf74145d46f3467bdd9
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Nov 28 11:50:35 2025 +0300

    IGNITE-26998 wip
---
 .../storage/AbstractMvPartitionStorageTest.java    | 92 ++++++++++++++++++++++
 .../storage/impl/TestMvPartitionStorage.java       | 23 +++---
 .../mv/AbstractPageMemoryMvPartitionStorage.java   | 68 +++++++++++-----
 .../internal/storage/rocksdb/GarbageCollector.java | 40 +++++-----
 .../storage/rocksdb/RocksDbMvPartitionStorage.java | 10 +--
 5 files changed, 180 insertions(+), 53 deletions(-)

diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index ec0eaf8a3dc..ae4e1e5922a 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -25,6 +25,7 @@ import static 
org.apache.ignite.internal.storage.AddWriteResultMatcher.equalsToA
 import static 
org.apache.ignite.internal.storage.CommitResultMatcher.equalsToCommitResult;
 import static 
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.empty;
 import static org.hamcrest.Matchers.equalTo;
@@ -52,9 +53,15 @@ import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.lang.IgniteBiTuple;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.gc.GcEntry;
 import org.apache.ignite.internal.storage.lease.LeaseInfo;
 import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
 import org.apache.ignite.internal.util.Cursor;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.EnumSource;
@@ -2074,6 +2081,48 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
         assertThat(storage.estimatedSize(), is(1L));
     }
 
+    @Test
+    void testPeekEmptyStorage() {
+        assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 1), empty());
+    }
+
+    @Test
+    void testPeek() {
+        var commitTimestamp = new HybridTimestamp(10, 0);
+
+        addWriteCommitted(ROW_ID, binaryRow, commitTimestamp);
+        addWriteCommitted(ROW_ID, binaryRow, 
commitTimestamp.addPhysicalTime(10));
+        addWriteCommitted(ROW_ID, binaryRow, 
commitTimestamp.addPhysicalTime(20));
+
+        Matcher<GcEntry> expGcEntry0 = eqGcEntry(new TestGcEntry(ROW_ID, 
commitTimestamp.addPhysicalTime(10)));
+        Matcher<GcEntry> expGcEntry1 = eqGcEntry(new TestGcEntry(ROW_ID, 
commitTimestamp.addPhysicalTime(20)));
+
+        assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 0), empty());
+        assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 1), 
contains(expGcEntry0));
+        assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 2), 
contains(expGcEntry0, expGcEntry1));
+        assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 3), 
contains(expGcEntry0, expGcEntry1));
+
+        assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 0), empty());
+        assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 1), empty());
+        assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 2), empty());
+        assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 3), empty());
+
+        assertThat(storage.peek(commitTimestamp, 0), empty());
+        assertThat(storage.peek(commitTimestamp, 1), empty());
+        assertThat(storage.peek(commitTimestamp, 2), empty());
+        assertThat(storage.peek(commitTimestamp, 3), empty());
+
+        assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 0), 
empty());
+        assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 1), 
contains(expGcEntry0));
+        assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 2), 
contains(expGcEntry0));
+        assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 3), 
contains(expGcEntry0));
+
+        assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 0), 
empty());
+        assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 1), 
contains(expGcEntry0));
+        assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 2), 
contains(expGcEntry0, expGcEntry1));
+        assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 3), 
contains(expGcEntry0, expGcEntry1));
+    }
+
     /**
      * Returns row id that is lexicographically smaller (by the value of one) 
than the argument.
      *
@@ -2109,4 +2158,47 @@ public abstract class AbstractMvPartitionStorageTest 
extends BaseMvPartitionStor
 
         abstract HybridTimestamp scanTimestamp(HybridClock clock);
     }
+
+    protected static Matcher<GcEntry> eqGcEntry(GcEntry gcEntry) {
+        return new TypeSafeMatcher<>() {
+            @Override
+            protected boolean matchesSafely(GcEntry item) {
+                return gcEntry.getRowId().equals(item.getRowId()) && 
gcEntry.getTimestamp().equals(item.getTimestamp());
+            }
+
+            @Override
+            public void describeTo(Description description) {
+                description.appendValue(gcEntry);
+            }
+        };
+    }
+
+    /** Implementation for tests. */
+    protected static class TestGcEntry implements GcEntry {
+        @IgniteToStringInclude
+        private final RowId rowId;
+
+        @IgniteToStringInclude
+        private final HybridTimestamp timestamp;
+
+        protected TestGcEntry(RowId rowId, HybridTimestamp timestamp) {
+            this.rowId = rowId;
+            this.timestamp = timestamp;
+        }
+
+        @Override
+        public RowId getRowId() {
+            return rowId;
+        }
+
+        @Override
+        public HybridTimestamp getTimestamp() {
+            return timestamp;
+        }
+
+        @Override
+        public String toString() {
+            return S.toString(this);
+        }
+    }
 }
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 0f6b8a7d3a2..14bae494659 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -698,21 +698,26 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    // TODO: IGNITE-26998 Переделать
     public synchronized List<GcEntry> peek(HybridTimestamp lowWatermark, int 
count) {
-        assert THREAD_LOCAL_LOCKER.get() != null;
+        if (count <= 0) {
+            return List.of();
+        }
 
-        try {
-            VersionChain versionChain = gcQueue.first();
+        var res = new ArrayList<GcEntry>(count);
+
+        Iterator<VersionChain> it = gcQueue.iterator();
+
+        for (int i = 0; i < count && it.hasNext(); i++) {
+            VersionChain next = it.next();
 
-            if (versionChain == null || 
versionChain.ts.compareTo(lowWatermark) > 0) {
-                return List.of();
+            if (next.ts.compareTo(lowWatermark) > 0) {
+                break;
             }
 
-            return List.of(versionChain);
-        } catch (NoSuchElementException e) {
-            return null;
+            res.add(next);
         }
+
+        return res;
     }
 
     @Override
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 3964c85eedb..d2e9539901a 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -962,28 +962,18 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
     }
 
     @Override
-    // TODO: IGNITE-26998 Исправить реализацию
     public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
-        assert THREAD_LOCAL_LOCKER.get() != null;
-
-        // Assertion above guarantees that we're in "runConsistently" closure.
-        throwExceptionIfStorageNotInRunnableState();
-
-        GcRowVersion head = renewableState.gcQueue().getFirst();
-
-        // Garbage collection queue is empty.
-        if (head == null) {
-            return List.of();
-        }
-
-        HybridTimestamp rowTimestamp = head.getTimestamp();
+        return busy(() -> {
+            throwExceptionIfStorageNotInRunnableState();
 
-        // There are no versions in the garbage collection queue before 
watermark.
-        if (rowTimestamp.compareTo(lowWatermark) > 0) {
-            return List.of();
-        }
+            if (count <= 0) {
+                return List.of();
+            } else if (count == 1) {
+                return peekSingleGcEntryBusy(lowWatermark);
+            }
 
-        return List.of(head);
+            return peekGcEntriesBusy(lowWatermark, count);
+        });
     }
 
     @Override
@@ -1069,4 +1059,44 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
      * @see MvPartitionStorage#estimatedSize
      */
     public abstract void decrementEstimatedSize();
+
+    private List<GcEntry> peekSingleGcEntryBusy(HybridTimestamp lowWatermark) {
+        GcRowVersion head = renewableState.gcQueue().getFirst();
+
+        // Garbage collection queue is empty.
+        if (head == null) {
+            return List.of();
+        }
+
+        HybridTimestamp rowTimestamp = head.getTimestamp();
+
+        // There are no versions in the garbage collection queue before 
watermark.
+        if (rowTimestamp.compareTo(lowWatermark) > 0) {
+            return List.of();
+        }
+
+        return List.of(head);
+    }
+
+    private List<GcEntry> peekGcEntriesBusy(HybridTimestamp lowWatermark, int 
count) {
+        var res = new ArrayList<GcEntry>(count);
+
+        try (Cursor<GcRowVersion> cursor = renewableState.gcQueue().find(null, 
null)) {
+            while (res.size() < count && cursor.hasNext()) {
+                GcRowVersion next = cursor.next();
+
+                if (next.getTimestamp().compareTo(lowWatermark) > 0) {
+                    break;
+                }
+
+                res.add(next);
+            }
+        } catch (IgniteInternalCheckedException e) {
+            throwStorageExceptionIfItCause(e);
+
+            throw new StorageException("Row version lookup failed: [rowId={}, 
{}]", e, lowWatermark, createStorageInfo());
+        }
+
+        return res;
+    }
 }
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index 8b2624d174c..dff8da44c0b 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -35,6 +35,7 @@ import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.TABLE_ID_SIZE;
 
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.List;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -177,35 +178,36 @@ class GarbageCollector {
     /**
      * Polls an elements for vacuum. See {@link MvPartitionStorage#peek}.
      *
-     * @param writeBatch Current Write Batch.
      * @param lowWatermark Low watermark.
+     * @param count Requested count of entries.
      */
-    List<GcEntry> peek(WriteBatchWithIndex writeBatch, HybridTimestamp 
lowWatermark) {
-        // We retrieve the first element of the GC queue and seek for it in 
the data CF.
-        // However, the element that we need to garbage collect is the next 
(older one) element.
-        // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
-        // If the next element exists, that should be the element that we want 
to garbage collect.
-        try (RocksIterator gcIt = newWrappedIterator(writeBatch, gcQueueCf, 
helper.upperBoundReadOpts)) {
+    List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
+        if (count <= 0) {
+            return List.of();
+        }
+
+        var res = new ArrayList<GcEntry>(count);
+
+        try (RocksIterator gcIt = db.newIterator(gcQueueCf, 
helper.upperBoundReadOpts)) {
             gcIt.seek(helper.partitionStartPrefix());
 
-            if (invalid(gcIt)) {
-                // GC queue is empty.
-                return List.of();
-            }
+            while (res.size() < count && !invalid(gcIt)) {
+                ByteBuffer gcKeyBuffer = readGcKey(gcIt);
 
-            ByteBuffer gcKeyBuffer = readGcKey(gcIt);
+                GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
 
-            GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
+                if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) {
+                    break;
+                }
 
-            if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) {
-                // No elements to garbage collect.
-                return List.of();
-            }
+                res.add(gcRowVersion);
 
-            return List.of(gcRowVersion);
+                gcIt.next();
+            }
         }
-    }
 
+        return res;
+    }
 
     /**
      * Polls an element for vacuum. See {@link 
MvPartitionStorage#vacuum(GcEntry)}.
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 2ca56d14e98..af1319f5f1b 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1290,14 +1290,12 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    // TODO: IGNITE-26988 Переделать
     public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
-        WriteBatchWithIndex batch = requireWriteBatch();
-
-        // No busy lock required, we're already in "runConsistently" closure.
-        throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
+        return busy(() -> {
+            throwExceptionIfStorageInProgressOfRebalance(state.get(), 
this::createStorageInfo);
 
-        return gc.peek(batch, lowWatermark);
+            return gc.peek(lowWatermark, count);
+        });
     }
 
     @Override

Reply via email to