This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26998 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 1372be5be5542fadade4a66905c1c92aa3b3179d Author: Kirill Tkalenko <[email protected]> AuthorDate: Thu Nov 27 18:13:53 2025 +0300 IGNITE-26998 wip --- .../replicator/raft/snapshot/PartitionDataStorage.java | 7 ++++--- .../ignite/internal/storage/MvPartitionStorage.java | 9 +++++---- .../storage/ThreadAssertingMvPartitionStorage.java | 4 ++-- .../internal/storage/BaseMvPartitionStorageTest.java | 7 +++++-- .../internal/storage/impl/TestMvPartitionStorage.java | 9 +++++---- .../mv/AbstractPageMemoryMvPartitionStorage.java | 9 +++++---- .../ignite/internal/storage/rocksdb/GarbageCollector.java | 15 ++++++++------- .../storage/rocksdb/RocksDbMvPartitionStorage.java | 3 ++- .../internal/table/distributed/gc/GcUpdateHandler.java | 9 ++++++--- .../raft/snapshot/SnapshotAwarePartitionDataStorage.java | 4 ++-- .../table/distributed/gc/AbstractGcUpdateHandlerTest.java | 6 +++--- .../ignite/distributed/TestPartitionDataStorage.java | 5 +++-- 12 files changed, 50 insertions(+), 37 deletions(-) diff --git a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java index 7ce61504bf1..4f183843a17 100644 --- a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java +++ b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.partition.replicator.raft.snapshot; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.close.ManuallyCloseable; @@ -246,11 +247,11 @@ public interface PartitionDataStorage extends ManuallyCloseable { PartitionTimestampCursor scan(HybridTimestamp timestamp) throws StorageException; /** - * Returns the head of GC queue. + * Returns entries from the queue starting from the head. * - * @see MvPartitionStorage#peek(HybridTimestamp) + * @see MvPartitionStorage#peek */ - @Nullable GcEntry peek(HybridTimestamp lowWatermark); + List<GcEntry> peek(HybridTimestamp lowWatermark, int count); /** * Delete GC entry from the GC queue and corresponding version chain. diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java index cdbc31fe921..da4cf7dc8ba 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java @@ -302,17 +302,18 @@ public interface MvPartitionStorage extends ManuallyCloseable { List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId upperBoundInclusive, int limit) throws StorageException; /** - * Returns the head of GC queue. + * Returns entries from the queue starting from the head. * * @param lowWatermark Upper bound for commit timestamp of GC entry, inclusive. - * @return Queue head or {@code null} if there are no entries below passed low watermark. + * @param count Requested count of entries. + * @return First entries in the GC queue that are less than or equal to passed low watermark. */ - @Nullable GcEntry peek(HybridTimestamp lowWatermark); + List<GcEntry> peek(HybridTimestamp lowWatermark, int count); /** * Delete GC entry from the GC queue and corresponding version chain. Row ID of the entry must be locked to call this method. * - * @param entry Entry, previously returned by {@link #peek(HybridTimestamp)}. + * @param entry Entry, previously returned by {@link #peek}. * @return Polled binary row, or {@code null} if the entry has already been deleted by another thread. * * @see Locker#lock(RowId) diff --git a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java index 93b58b9b577..9d83df999d9 100644 --- a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java +++ b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java @@ -166,10 +166,10 @@ public class ThreadAssertingMvPartitionStorage implements MvPartitionStorage, Wr } @Override - public @Nullable GcEntry peek(HybridTimestamp lowWatermark) { + public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { assertThreadAllowsToRead(); - return partitionStorage.peek(lowWatermark); + return partitionStorage.peek(lowWatermark, count); } @Override diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java index c5d6ba98e63..0959e136c6d 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.storage; +import java.util.List; import java.util.UUID; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; @@ -159,12 +160,14 @@ public abstract class BaseMvPartitionStorageTest extends BaseMvStoragesTest { @Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) { while (true) { BinaryRowAndRowId binaryRowAndRowId = storage.runConsistently(locker -> { - GcEntry gcEntry = storage.peek(lowWatermark); + List<GcEntry> gcEntries = storage.peek(lowWatermark, 1); - if (gcEntry == null) { + if (gcEntries.isEmpty()) { return null; } + GcEntry gcEntry = gcEntries.get(0); + locker.lock(gcEntry.getRowId()); return new BinaryRowAndRowId(storage.vacuum(gcEntry), gcEntry.getRowId()); diff --git a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java index 4666c5e59e3..0f6b8a7d3a2 100644 --- a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java +++ b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java @@ -698,17 +698,18 @@ public class TestMvPartitionStorage implements MvPartitionStorage { } @Override - public synchronized @Nullable GcEntry peek(HybridTimestamp lowWatermark) { + // TODO: IGNITE-26998 Переделать + public synchronized List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { assert THREAD_LOCAL_LOCKER.get() != null; try { VersionChain versionChain = gcQueue.first(); - if (versionChain.ts.compareTo(lowWatermark) > 0) { - return null; + if (versionChain == null || versionChain.ts.compareTo(lowWatermark) > 0) { + return List.of(); } - return versionChain; + return List.of(versionChain); } catch (NoSuchElementException e) { return null; } diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java index c5b4c2a4e78..3964c85eedb 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java @@ -962,7 +962,8 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio } @Override - public @Nullable GcEntry peek(HybridTimestamp lowWatermark) { + // TODO: IGNITE-26998 Исправить реализацию + public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { assert THREAD_LOCAL_LOCKER.get() != null; // Assertion above guarantees that we're in "runConsistently" closure. @@ -972,17 +973,17 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio // Garbage collection queue is empty. if (head == null) { - return null; + return List.of(); } HybridTimestamp rowTimestamp = head.getTimestamp(); // There are no versions in the garbage collection queue before watermark. if (rowTimestamp.compareTo(lowWatermark) > 0) { - return null; + return List.of(); } - return head; + return List.of(head); } @Override diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java index d6eeac69064..8b2624d174c 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java @@ -35,8 +35,10 @@ import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW import static org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.TABLE_ID_SIZE; import java.nio.ByteBuffer; +import java.util.List; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.schema.BinaryRow; +import org.apache.ignite.internal.storage.MvPartitionStorage; import org.apache.ignite.internal.storage.RowId; import org.apache.ignite.internal.storage.gc.GcEntry; import org.jetbrains.annotations.Nullable; @@ -173,13 +175,12 @@ class GarbageCollector { } /** - * Polls an element for vacuum. See {@link org.apache.ignite.internal.storage.MvPartitionStorage#peek(HybridTimestamp)}. + * Polls an elements for vacuum. See {@link MvPartitionStorage#peek}. * * @param writeBatch Current Write Batch. * @param lowWatermark Low watermark. - * @return Garbage collected element descriptor. */ - @Nullable GcEntry peek(WriteBatchWithIndex writeBatch, HybridTimestamp lowWatermark) { + List<GcEntry> peek(WriteBatchWithIndex writeBatch, HybridTimestamp lowWatermark) { // We retrieve the first element of the GC queue and seek for it in the data CF. // However, the element that we need to garbage collect is the next (older one) element. // First we check if there's anything to garbage collect. If the element is a tombstone we remove it. @@ -189,7 +190,7 @@ class GarbageCollector { if (invalid(gcIt)) { // GC queue is empty. - return null; + return List.of(); } ByteBuffer gcKeyBuffer = readGcKey(gcIt); @@ -198,16 +199,16 @@ class GarbageCollector { if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) { // No elements to garbage collect. - return null; + return List.of(); } - return gcRowVersion; + return List.of(gcRowVersion); } } /** - * Polls an element for vacuum. See {@link org.apache.ignite.internal.storage.MvPartitionStorage#vacuum(GcEntry)}. + * Polls an element for vacuum. See {@link MvPartitionStorage#vacuum(GcEntry)}. * * @param batch Write batch. * @param entry Entry, previously returned by {@link #peek}. diff --git a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java index 38676f31b88..2ca56d14e98 100644 --- a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java +++ b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java @@ -1290,7 +1290,8 @@ public class RocksDbMvPartitionStorage implements MvPartitionStorage { } @Override - public @Nullable GcEntry peek(HybridTimestamp lowWatermark) { + // TODO: IGNITE-26988 Переделать + public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { WriteBatchWithIndex batch = requireWriteBatch(); // No busy lock required, we're already in "runConsistently" closure. diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java index 5b239514e23..c5ab546b188 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java @@ -17,6 +17,7 @@ package org.apache.ignite.internal.table.distributed.gc; +import java.util.List; import org.apache.ignite.internal.hlc.HybridTimestamp; import org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage; import org.apache.ignite.internal.schema.BinaryRow; @@ -75,7 +76,7 @@ public class GcUpdateHandler { return true; } - IntHolder countHolder = new IntHolder(count); + var countHolder = new IntHolder(count); while (countHolder.get() > 0) { VacuumResult vacuumResult = internalVacuumBatch(lowWatermark, countHolder); @@ -131,12 +132,14 @@ public class GcUpdateHandler { return VacuumResult.SHOULD_RELEASE; } - GcEntry gcEntry = storage.peek(lowWatermark); + // TODO: IGNITE-26998 Переделать а пока заглушка + List<GcEntry> gcEntries = storage.peek(lowWatermark, 1); - if (gcEntry == null) { + if (gcEntries.isEmpty()) { return VacuumResult.NO_GARBAGE_LEFT; } + GcEntry gcEntry = gcEntries.get(0); RowId rowId = gcEntry.getRowId(); if (useTryLock) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java index 6eaeea5b58d..507f0ea49e2 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java @@ -244,8 +244,8 @@ public class SnapshotAwarePartitionDataStorage implements PartitionDataStorage { } @Override - public @Nullable GcEntry peek(HybridTimestamp lowWatermark) { - return partitionStorage.peek(lowWatermark); + public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { + return partitionStorage.peek(lowWatermark, count); } @Override diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java index 0f8fcd8db21..61b130b62a1 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java @@ -90,7 +90,7 @@ abstract class AbstractGcUpdateHandlerTest extends BaseMvStoragesTest { HybridTimestamp lowWatermark = HybridTimestamp.MAX_VALUE; assertFalse(gcUpdateHandler.vacuumBatch(lowWatermark, 1)); - verify(partitionStorage).peek(lowWatermark); + verify(partitionStorage).peek(eq(lowWatermark), eq(1)); // Let's check that StorageUpdateHandler#vacuumBatch returns true. clearInvocations(partitionStorage); @@ -102,7 +102,7 @@ abstract class AbstractGcUpdateHandlerTest extends BaseMvStoragesTest { addWriteCommitted(partitionStorage, rowId, row, clock.now()); assertTrue(gcUpdateHandler.vacuumBatch(lowWatermark, 1)); - verify(partitionStorage).peek(lowWatermark); + verify(partitionStorage).peek(eq(lowWatermark), eq(1)); verify(indexUpdateHandler).tryRemoveFromIndexes(any(), eq(rowId), any(), isNull()); } @@ -129,7 +129,7 @@ abstract class AbstractGcUpdateHandlerTest extends BaseMvStoragesTest { assertFalse(gcUpdateHandler.vacuumBatch(lowWatermark, 5)); - verify(partitionStorage, times(3)).peek(lowWatermark); + verify(partitionStorage, times(3)).peek(eq(lowWatermark), eq(5)); verify(indexUpdateHandler).tryRemoveFromIndexes(any(), eq(rowId0), any(), isNull()); verify(indexUpdateHandler).tryRemoveFromIndexes(any(), eq(rowId1), any(), isNull()); } diff --git a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java index d446e5445db..4d68eb2a087 100644 --- a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java +++ b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java @@ -17,6 +17,7 @@ package org.apache.ignite.distributed; +import java.util.List; import java.util.UUID; import java.util.concurrent.CompletableFuture; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -167,8 +168,8 @@ public class TestPartitionDataStorage implements PartitionDataStorage { } @Override - public @Nullable GcEntry peek(HybridTimestamp lowWatermark) { - return partitionStorage.peek(lowWatermark); + public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) { + return partitionStorage.peek(lowWatermark, count); } @Override
