This is an automated email from the ASF dual-hosted git repository.

tkalkirill pushed a commit to branch ignite-26998
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit 1372be5be5542fadade4a66905c1c92aa3b3179d
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Thu Nov 27 18:13:53 2025 +0300

    IGNITE-26998 wip
---
 .../replicator/raft/snapshot/PartitionDataStorage.java    |  7 ++++---
 .../ignite/internal/storage/MvPartitionStorage.java       |  9 +++++----
 .../storage/ThreadAssertingMvPartitionStorage.java        |  4 ++--
 .../internal/storage/BaseMvPartitionStorageTest.java      |  7 +++++--
 .../internal/storage/impl/TestMvPartitionStorage.java     |  9 +++++----
 .../mv/AbstractPageMemoryMvPartitionStorage.java          |  9 +++++----
 .../ignite/internal/storage/rocksdb/GarbageCollector.java | 15 ++++++++-------
 .../storage/rocksdb/RocksDbMvPartitionStorage.java        |  3 ++-
 .../internal/table/distributed/gc/GcUpdateHandler.java    |  9 ++++++---
 .../raft/snapshot/SnapshotAwarePartitionDataStorage.java  |  4 ++--
 .../table/distributed/gc/AbstractGcUpdateHandlerTest.java |  6 +++---
 .../ignite/distributed/TestPartitionDataStorage.java      |  5 +++--
 12 files changed, 50 insertions(+), 37 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
index 7ce61504bf1..4f183843a17 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.partition.replicator.raft.snapshot;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.close.ManuallyCloseable;
@@ -246,11 +247,11 @@ public interface PartitionDataStorage extends 
ManuallyCloseable {
     PartitionTimestampCursor scan(HybridTimestamp timestamp) throws 
StorageException;
 
     /**
-     * Returns the head of GC queue.
+     * Returns entries from the queue starting from the head.
      *
-     * @see MvPartitionStorage#peek(HybridTimestamp)
+     * @see MvPartitionStorage#peek
      */
-    @Nullable GcEntry peek(HybridTimestamp lowWatermark);
+    List<GcEntry> peek(HybridTimestamp lowWatermark, int count);
 
     /**
      * Delete GC entry from the GC queue and corresponding version chain.
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index cdbc31fe921..da4cf7dc8ba 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -302,17 +302,18 @@ public interface MvPartitionStorage extends 
ManuallyCloseable {
     List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId 
upperBoundInclusive, int limit) throws StorageException;
 
     /**
-     * Returns the head of GC queue.
+     * Returns entries from the queue starting from the head.
      *
      * @param lowWatermark Upper bound for commit timestamp of GC entry, 
inclusive.
-     * @return Queue head or {@code null} if there are no entries below passed 
low watermark.
+     * @param count Requested count of entries.
+     * @return First entries in the GC queue that are less than or equal to 
passed low watermark.
      */
-    @Nullable GcEntry peek(HybridTimestamp lowWatermark);
+    List<GcEntry> peek(HybridTimestamp lowWatermark, int count);
 
     /**
      * Delete GC entry from the GC queue and corresponding version chain. Row 
ID of the entry must be locked to call this method.
      *
-     * @param entry Entry, previously returned by {@link 
#peek(HybridTimestamp)}.
+     * @param entry Entry, previously returned by {@link #peek}.
      * @return Polled binary row, or {@code null} if the entry has already 
been deleted by another thread.
      *
      * @see Locker#lock(RowId)
diff --git 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index 93b58b9b577..9d83df999d9 100644
--- 
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++ 
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -166,10 +166,10 @@ public class ThreadAssertingMvPartitionStorage implements 
MvPartitionStorage, Wr
     }
 
     @Override
-    public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
+    public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
         assertThreadAllowsToRead();
 
-        return partitionStorage.peek(lowWatermark);
+        return partitionStorage.peek(lowWatermark, count);
     }
 
     @Override
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
index c5d6ba98e63..0959e136c6d 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.storage;
 
+import java.util.List;
 import java.util.UUID;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -159,12 +160,14 @@ public abstract class BaseMvPartitionStorageTest extends 
BaseMvStoragesTest {
     @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
         while (true) {
             BinaryRowAndRowId binaryRowAndRowId = 
storage.runConsistently(locker -> {
-                GcEntry gcEntry = storage.peek(lowWatermark);
+                List<GcEntry> gcEntries = storage.peek(lowWatermark, 1);
 
-                if (gcEntry == null) {
+                if (gcEntries.isEmpty()) {
                     return null;
                 }
 
+                GcEntry gcEntry = gcEntries.get(0);
+
                 locker.lock(gcEntry.getRowId());
 
                 return new BinaryRowAndRowId(storage.vacuum(gcEntry), 
gcEntry.getRowId());
diff --git 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 4666c5e59e3..0f6b8a7d3a2 100644
--- 
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++ 
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -698,17 +698,18 @@ public class TestMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    public synchronized @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
+    // TODO: IGNITE-26998 Переделать
+    public synchronized List<GcEntry> peek(HybridTimestamp lowWatermark, int 
count) {
         assert THREAD_LOCAL_LOCKER.get() != null;
 
         try {
             VersionChain versionChain = gcQueue.first();
 
-            if (versionChain.ts.compareTo(lowWatermark) > 0) {
-                return null;
+            if (versionChain == null || 
versionChain.ts.compareTo(lowWatermark) > 0) {
+                return List.of();
             }
 
-            return versionChain;
+            return List.of(versionChain);
         } catch (NoSuchElementException e) {
             return null;
         }
diff --git 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index c5b4c2a4e78..3964c85eedb 100644
--- 
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++ 
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -962,7 +962,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage 
implements MvPartitio
     }
 
     @Override
-    public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
+    // TODO: IGNITE-26998 Исправить реализацию
+    public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
         assert THREAD_LOCAL_LOCKER.get() != null;
 
         // Assertion above guarantees that we're in "runConsistently" closure.
@@ -972,17 +973,17 @@ public abstract class 
AbstractPageMemoryMvPartitionStorage implements MvPartitio
 
         // Garbage collection queue is empty.
         if (head == null) {
-            return null;
+            return List.of();
         }
 
         HybridTimestamp rowTimestamp = head.getTimestamp();
 
         // There are no versions in the garbage collection queue before 
watermark.
         if (rowTimestamp.compareTo(lowWatermark) > 0) {
-            return null;
+            return List.of();
         }
 
-        return head;
+        return List.of(head);
     }
 
     @Override
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index d6eeac69064..8b2624d174c 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -35,8 +35,10 @@ import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW
 import static 
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.TABLE_ID_SIZE;
 
 import java.nio.ByteBuffer;
+import java.util.List;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
 import org.apache.ignite.internal.storage.RowId;
 import org.apache.ignite.internal.storage.gc.GcEntry;
 import org.jetbrains.annotations.Nullable;
@@ -173,13 +175,12 @@ class GarbageCollector {
     }
 
     /**
-     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#peek(HybridTimestamp)}.
+     * Polls an elements for vacuum. See {@link MvPartitionStorage#peek}.
      *
      * @param writeBatch Current Write Batch.
      * @param lowWatermark Low watermark.
-     * @return Garbage collected element descriptor.
      */
-    @Nullable GcEntry peek(WriteBatchWithIndex writeBatch, HybridTimestamp 
lowWatermark) {
+    List<GcEntry> peek(WriteBatchWithIndex writeBatch, HybridTimestamp 
lowWatermark) {
         // We retrieve the first element of the GC queue and seek for it in 
the data CF.
         // However, the element that we need to garbage collect is the next 
(older one) element.
         // First we check if there's anything to garbage collect. If the 
element is a tombstone we remove it.
@@ -189,7 +190,7 @@ class GarbageCollector {
 
             if (invalid(gcIt)) {
                 // GC queue is empty.
-                return null;
+                return List.of();
             }
 
             ByteBuffer gcKeyBuffer = readGcKey(gcIt);
@@ -198,16 +199,16 @@ class GarbageCollector {
 
             if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) {
                 // No elements to garbage collect.
-                return null;
+                return List.of();
             }
 
-            return gcRowVersion;
+            return List.of(gcRowVersion);
         }
     }
 
 
     /**
-     * Polls an element for vacuum. See {@link 
org.apache.ignite.internal.storage.MvPartitionStorage#vacuum(GcEntry)}.
+     * Polls an element for vacuum. See {@link 
MvPartitionStorage#vacuum(GcEntry)}.
      *
      * @param batch Write batch.
      * @param entry Entry, previously returned by {@link #peek}.
diff --git 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 38676f31b88..2ca56d14e98 100644
--- 
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++ 
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1290,7 +1290,8 @@ public class RocksDbMvPartitionStorage implements 
MvPartitionStorage {
     }
 
     @Override
-    public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
+    // TODO: IGNITE-26988 Переделать
+    public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
         WriteBatchWithIndex batch = requireWriteBatch();
 
         // No busy lock required, we're already in "runConsistently" closure.
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java
index 5b239514e23..c5ab546b188 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.internal.table.distributed.gc;
 
+import java.util.List;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
 import 
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
 import org.apache.ignite.internal.schema.BinaryRow;
@@ -75,7 +76,7 @@ public class GcUpdateHandler {
             return true;
         }
 
-        IntHolder countHolder = new IntHolder(count);
+        var countHolder = new IntHolder(count);
 
         while (countHolder.get() > 0) {
             VacuumResult vacuumResult = internalVacuumBatch(lowWatermark, 
countHolder);
@@ -131,12 +132,14 @@ public class GcUpdateHandler {
                 return VacuumResult.SHOULD_RELEASE;
             }
 
-            GcEntry gcEntry = storage.peek(lowWatermark);
+            // TODO: IGNITE-26998 Переделать а пока заглушка
+            List<GcEntry> gcEntries = storage.peek(lowWatermark, 1);
 
-            if (gcEntry == null) {
+            if (gcEntries.isEmpty()) {
                 return VacuumResult.NO_GARBAGE_LEFT;
             }
 
+            GcEntry gcEntry = gcEntries.get(0);
             RowId rowId = gcEntry.getRowId();
 
             if (useTryLock) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
index 6eaeea5b58d..507f0ea49e2 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
@@ -244,8 +244,8 @@ public class SnapshotAwarePartitionDataStorage implements 
PartitionDataStorage {
     }
 
     @Override
-    public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
-        return partitionStorage.peek(lowWatermark);
+    public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
+        return partitionStorage.peek(lowWatermark, count);
     }
 
     @Override
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
index 0f8fcd8db21..61b130b62a1 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
@@ -90,7 +90,7 @@ abstract class AbstractGcUpdateHandlerTest extends 
BaseMvStoragesTest {
         HybridTimestamp lowWatermark = HybridTimestamp.MAX_VALUE;
 
         assertFalse(gcUpdateHandler.vacuumBatch(lowWatermark, 1));
-        verify(partitionStorage).peek(lowWatermark);
+        verify(partitionStorage).peek(eq(lowWatermark), eq(1));
 
         // Let's check that StorageUpdateHandler#vacuumBatch returns true.
         clearInvocations(partitionStorage);
@@ -102,7 +102,7 @@ abstract class AbstractGcUpdateHandlerTest extends 
BaseMvStoragesTest {
         addWriteCommitted(partitionStorage, rowId, row, clock.now());
 
         assertTrue(gcUpdateHandler.vacuumBatch(lowWatermark, 1));
-        verify(partitionStorage).peek(lowWatermark);
+        verify(partitionStorage).peek(eq(lowWatermark), eq(1));
         verify(indexUpdateHandler).tryRemoveFromIndexes(any(), eq(rowId), 
any(), isNull());
     }
 
@@ -129,7 +129,7 @@ abstract class AbstractGcUpdateHandlerTest extends 
BaseMvStoragesTest {
 
         assertFalse(gcUpdateHandler.vacuumBatch(lowWatermark, 5));
 
-        verify(partitionStorage, times(3)).peek(lowWatermark);
+        verify(partitionStorage, times(3)).peek(eq(lowWatermark), eq(5));
         verify(indexUpdateHandler).tryRemoveFromIndexes(any(), eq(rowId0), 
any(), isNull());
         verify(indexUpdateHandler).tryRemoveFromIndexes(any(), eq(rowId1), 
any(), isNull());
     }
diff --git 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index d446e5445db..4d68eb2a087 100644
--- 
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++ 
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -17,6 +17,7 @@
 
 package org.apache.ignite.distributed;
 
+import java.util.List;
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -167,8 +168,8 @@ public class TestPartitionDataStorage implements 
PartitionDataStorage {
     }
 
     @Override
-    public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
-        return partitionStorage.peek(lowWatermark);
+    public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
+        return partitionStorage.peek(lowWatermark, count);
     }
 
     @Override

Reply via email to