This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 20bf34da4fe IGNITE-26998 Warmup gc entries before `runConsistently`
(#7109)
20bf34da4fe is described below
commit 20bf34da4fe2ae5a435775a5f39d3ce1d5d2d2b3
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Wed Dec 3 14:18:58 2025 +0300
IGNITE-26998 Warmup gc entries before `runConsistently` (#7109)
---
.../raft/snapshot/PartitionDataStorage.java | 7 +-
.../internal/storage/MvPartitionStorage.java | 9 +-
.../storage/ThreadAssertingMvPartitionStorage.java | 4 +-
.../storage/AbstractMvPartitionStorageTest.java | 92 ++++++++++++++++++++
.../storage/BaseMvPartitionStorageTest.java | 7 +-
.../storage/impl/TestMvPartitionStorage.java | 24 ++++--
.../mv/AbstractPageMemoryMvPartitionStorage.java | 82 +++++++++++++-----
.../mv/PersistentPageMemoryMvPartitionStorage.java | 32 +++++++
.../mv/PreloadingForGcInvokeClosure.java | 99 ++++++++++++++++++++++
.../internal/storage/rocksdb/GarbageCollector.java | 47 +++++-----
.../storage/rocksdb/RocksDbMvPartitionStorage.java | 11 ++-
.../table/distributed/gc/GcUpdateHandler.java | 90 ++++++++++----------
.../ignite/internal/table/distributed/gc/MvGc.java | 4 +-
.../SnapshotAwarePartitionDataStorage.java | 4 +-
.../internal/table/distributed/IndexGcTest.java | 22 ++---
.../gc/AbstractGcUpdateHandlerTest.java | 38 ++++-----
.../internal/table/distributed/gc/MvGcTest.java | 12 ++-
.../distributed/TestPartitionDataStorage.java | 5 +-
18 files changed, 431 insertions(+), 158 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
index 7ce61504bf1..4f183843a17 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/raft/snapshot/PartitionDataStorage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.partition.replicator.raft.snapshot;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.close.ManuallyCloseable;
@@ -246,11 +247,11 @@ public interface PartitionDataStorage extends
ManuallyCloseable {
PartitionTimestampCursor scan(HybridTimestamp timestamp) throws
StorageException;
/**
- * Returns the head of GC queue.
+ * Returns entries from the queue starting from the head.
*
- * @see MvPartitionStorage#peek(HybridTimestamp)
+ * @see MvPartitionStorage#peek
*/
- @Nullable GcEntry peek(HybridTimestamp lowWatermark);
+ List<GcEntry> peek(HybridTimestamp lowWatermark, int count);
/**
* Delete GC entry from the GC queue and corresponding version chain.
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
index cdbc31fe921..da4cf7dc8ba 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/MvPartitionStorage.java
@@ -302,17 +302,18 @@ public interface MvPartitionStorage extends
ManuallyCloseable {
List<RowMeta> rowsStartingWith(RowId lowerBoundInclusive, RowId
upperBoundInclusive, int limit) throws StorageException;
/**
- * Returns the head of GC queue.
+ * Returns entries from the queue starting from the head.
*
* @param lowWatermark Upper bound for commit timestamp of GC entry,
inclusive.
- * @return Queue head or {@code null} if there are no entries below passed
low watermark.
+ * @param count Requested count of entries.
+ * @return First entries in the GC queue that are less than or equal to
passed low watermark.
*/
- @Nullable GcEntry peek(HybridTimestamp lowWatermark);
+ List<GcEntry> peek(HybridTimestamp lowWatermark, int count);
/**
* Delete GC entry from the GC queue and corresponding version chain. Row
ID of the entry must be locked to call this method.
*
- * @param entry Entry, previously returned by {@link
#peek(HybridTimestamp)}.
+ * @param entry Entry, previously returned by {@link #peek}.
* @return Polled binary row, or {@code null} if the entry has already
been deleted by another thread.
*
* @see Locker#lock(RowId)
diff --git
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
index 93b58b9b577..9d83df999d9 100644
---
a/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
+++
b/modules/storage-api/src/main/java/org/apache/ignite/internal/storage/ThreadAssertingMvPartitionStorage.java
@@ -166,10 +166,10 @@ public class ThreadAssertingMvPartitionStorage implements
MvPartitionStorage, Wr
}
@Override
- public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
+ public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
assertThreadAllowsToRead();
- return partitionStorage.peek(lowWatermark);
+ return partitionStorage.peek(lowWatermark, count);
}
@Override
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
index ec0eaf8a3dc..ae4e1e5922a 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/AbstractMvPartitionStorageTest.java
@@ -25,6 +25,7 @@ import static
org.apache.ignite.internal.storage.AddWriteResultMatcher.equalsToA
import static
org.apache.ignite.internal.storage.CommitResultMatcher.equalsToCommitResult;
import static
org.apache.ignite.internal.testframework.matchers.CompletableFutureMatcher.willCompleteSuccessfully;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
import static org.hamcrest.Matchers.containsInAnyOrder;
import static org.hamcrest.Matchers.empty;
import static org.hamcrest.Matchers.equalTo;
@@ -52,9 +53,15 @@ import org.apache.ignite.internal.hlc.HybridClock;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteBiTuple;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.gc.GcEntry;
import org.apache.ignite.internal.storage.lease.LeaseInfo;
import org.apache.ignite.internal.testframework.IgniteTestUtils;
+import org.apache.ignite.internal.tostring.IgniteToStringInclude;
+import org.apache.ignite.internal.tostring.S;
import org.apache.ignite.internal.util.Cursor;
+import org.hamcrest.Description;
+import org.hamcrest.Matcher;
+import org.hamcrest.TypeSafeMatcher;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
@@ -2074,6 +2081,48 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
assertThat(storage.estimatedSize(), is(1L));
}
+ @Test
+ void testPeekEmptyStorage() {
+ assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 1), empty());
+ }
+
+ @Test
+ void testPeek() {
+ var commitTimestamp = new HybridTimestamp(10, 0);
+
+ addWriteCommitted(ROW_ID, binaryRow, commitTimestamp);
+ addWriteCommitted(ROW_ID, binaryRow,
commitTimestamp.addPhysicalTime(10));
+ addWriteCommitted(ROW_ID, binaryRow,
commitTimestamp.addPhysicalTime(20));
+
+ Matcher<GcEntry> expGcEntry0 = eqGcEntry(new TestGcEntry(ROW_ID,
commitTimestamp.addPhysicalTime(10)));
+ Matcher<GcEntry> expGcEntry1 = eqGcEntry(new TestGcEntry(ROW_ID,
commitTimestamp.addPhysicalTime(20)));
+
+ assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 0), empty());
+ assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 1),
contains(expGcEntry0));
+ assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 2),
contains(expGcEntry0, expGcEntry1));
+ assertThat(storage.peek(HybridTimestamp.MAX_VALUE, 3),
contains(expGcEntry0, expGcEntry1));
+
+ assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 0), empty());
+ assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 1), empty());
+ assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 2), empty());
+ assertThat(storage.peek(HybridTimestamp.MIN_VALUE, 3), empty());
+
+ assertThat(storage.peek(commitTimestamp, 0), empty());
+ assertThat(storage.peek(commitTimestamp, 1), empty());
+ assertThat(storage.peek(commitTimestamp, 2), empty());
+ assertThat(storage.peek(commitTimestamp, 3), empty());
+
+ assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 0),
empty());
+ assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 1),
contains(expGcEntry0));
+ assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 2),
contains(expGcEntry0));
+ assertThat(storage.peek(commitTimestamp.addPhysicalTime(10), 3),
contains(expGcEntry0));
+
+ assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 0),
empty());
+ assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 1),
contains(expGcEntry0));
+ assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 2),
contains(expGcEntry0, expGcEntry1));
+ assertThat(storage.peek(commitTimestamp.addPhysicalTime(20), 3),
contains(expGcEntry0, expGcEntry1));
+ }
+
/**
* Returns row id that is lexicographically smaller (by the value of one)
than the argument.
*
@@ -2109,4 +2158,47 @@ public abstract class AbstractMvPartitionStorageTest
extends BaseMvPartitionStor
abstract HybridTimestamp scanTimestamp(HybridClock clock);
}
+
+ protected static Matcher<GcEntry> eqGcEntry(GcEntry gcEntry) {
+ return new TypeSafeMatcher<>() {
+ @Override
+ protected boolean matchesSafely(GcEntry item) {
+ return gcEntry.getRowId().equals(item.getRowId()) &&
gcEntry.getTimestamp().equals(item.getTimestamp());
+ }
+
+ @Override
+ public void describeTo(Description description) {
+ description.appendValue(gcEntry);
+ }
+ };
+ }
+
+ /** Implementation for tests. */
+ protected static class TestGcEntry implements GcEntry {
+ @IgniteToStringInclude
+ private final RowId rowId;
+
+ @IgniteToStringInclude
+ private final HybridTimestamp timestamp;
+
+ protected TestGcEntry(RowId rowId, HybridTimestamp timestamp) {
+ this.rowId = rowId;
+ this.timestamp = timestamp;
+ }
+
+ @Override
+ public RowId getRowId() {
+ return rowId;
+ }
+
+ @Override
+ public HybridTimestamp getTimestamp() {
+ return timestamp;
+ }
+
+ @Override
+ public String toString() {
+ return S.toString(this);
+ }
+ }
}
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
index edb30f06745..19e966a3f46 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/BaseMvPartitionStorageTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.storage;
+import java.util.List;
import java.util.UUID;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -159,12 +160,14 @@ public abstract class BaseMvPartitionStorageTest extends
BaseMvStoragesTest {
@Nullable BinaryRowAndRowId pollForVacuum(HybridTimestamp lowWatermark) {
while (true) {
BinaryRowAndRowId binaryRowAndRowId =
storage.runConsistently(locker -> {
- GcEntry gcEntry = storage.peek(lowWatermark);
+ List<GcEntry> gcEntries = storage.peek(lowWatermark, 1);
- if (gcEntry == null) {
+ if (gcEntries.isEmpty()) {
return null;
}
+ GcEntry gcEntry = gcEntries.get(0);
+
locker.lock(gcEntry.getRowId());
return new BinaryRowAndRowId(storage.vacuum(gcEntry),
gcEntry.getRowId());
diff --git
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
index 4666c5e59e3..14bae494659 100644
---
a/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
+++
b/modules/storage-api/src/testFixtures/java/org/apache/ignite/internal/storage/impl/TestMvPartitionStorage.java
@@ -698,20 +698,26 @@ public class TestMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public synchronized @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
- assert THREAD_LOCAL_LOCKER.get() != null;
+ public synchronized List<GcEntry> peek(HybridTimestamp lowWatermark, int
count) {
+ if (count <= 0) {
+ return List.of();
+ }
- try {
- VersionChain versionChain = gcQueue.first();
+ var res = new ArrayList<GcEntry>(count);
- if (versionChain.ts.compareTo(lowWatermark) > 0) {
- return null;
+ Iterator<VersionChain> it = gcQueue.iterator();
+
+ for (int i = 0; i < count && it.hasNext(); i++) {
+ VersionChain next = it.next();
+
+ if (next.ts.compareTo(lowWatermark) > 0) {
+ break;
}
- return versionChain;
- } catch (NoSuchElementException e) {
- return null;
+ res.add(next);
}
+
+ return res;
}
@Override
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
index 6f2fe8a34a6..54cdb460ed8 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java
@@ -962,27 +962,19 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
}
@Override
- public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
- assert THREAD_LOCAL_LOCKER.get() != null;
-
- // Assertion above guarantees that we're in "runConsistently" closure.
- throwExceptionIfStorageNotInRunnableState();
-
- GcRowVersion head = renewableState.gcQueue().getFirst();
-
- // Garbage collection queue is empty.
- if (head == null) {
- return null;
- }
-
- HybridTimestamp rowTimestamp = head.getTimestamp();
+ public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
+ return busy(() -> {
+ throwExceptionIfStorageNotInRunnableState();
- // There are no versions in the garbage collection queue before
watermark.
- if (rowTimestamp.compareTo(lowWatermark) > 0) {
- return null;
- }
+ if (count <= 0) {
+ return List.of();
+ } else if (count == 1) {
+ // Use a more optimal implementation to avoid creating a
cursor that can load multiple elements from the page at once.
+ return peekSingleGcEntryBusy(lowWatermark);
+ }
- return head;
+ return peekGcEntriesBusy(lowWatermark, count);
+ });
}
@Override
@@ -1068,4 +1060,56 @@ public abstract class
AbstractPageMemoryMvPartitionStorage implements MvPartitio
* @see MvPartitionStorage#estimatedSize
*/
public abstract void decrementEstimatedSize();
+
+ private List<GcEntry> peekSingleGcEntryBusy(HybridTimestamp lowWatermark) {
+ GcRowVersion head = renewableState.gcQueue().getFirst();
+
+ // Garbage collection queue is empty.
+ if (head == null) {
+ return List.of();
+ }
+
+ HybridTimestamp rowTimestamp = head.getTimestamp();
+
+ // There are no versions in the garbage collection queue before
watermark.
+ if (rowTimestamp.compareTo(lowWatermark) > 0) {
+ return List.of();
+ }
+
+ preloadingForGcIfNeededBusy(head);
+
+ return List.of(head);
+ }
+
+ private List<GcEntry> peekGcEntriesBusy(HybridTimestamp lowWatermark, int
count) {
+ var res = new ArrayList<GcEntry>(count);
+
+ try (Cursor<GcRowVersion> cursor = renewableState.gcQueue().find(null,
null)) {
+ while (res.size() < count && cursor.hasNext()) {
+ GcRowVersion next = cursor.next();
+
+ if (next.getTimestamp().compareTo(lowWatermark) > 0) {
+ break;
+ }
+
+ res.add(next);
+
+ preloadingForGcIfNeededBusy(next);
+ }
+ } catch (IgniteInternalCheckedException e) {
+ throwStorageExceptionIfItCause(e);
+
+ throw new StorageException(
+ "Peek GC entries failed: [lowWatermark={}, count={}, {}]",
+ e,
+ lowWatermark, count, createStorageInfo()
+ );
+ }
+
+ return res;
+ }
+
+ protected void preloadingForGcIfNeededBusy(GcRowVersion gcRowVersion) {
+ // No-op.
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
index 7098dd5b040..c21ca8aa7ad 100644
---
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PersistentPageMemoryMvPartitionStorage.java
@@ -20,6 +20,7 @@ package org.apache.ignite.internal.storage.pagememory.mv;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInCleanupOrRebalancedState;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInProgressOfRebalance;
import static
org.apache.ignite.internal.storage.util.StorageUtils.throwExceptionIfStorageNotInRunnableOrRebalanceState;
+import static
org.apache.ignite.internal.storage.util.StorageUtils.throwStorageExceptionIfItCause;
import static org.apache.ignite.internal.util.ByteUtils.stringToBytes;
import java.util.List;
@@ -30,6 +31,7 @@ import java.util.concurrent.ExecutorService;
import java.util.concurrent.locks.ReadWriteLock;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import org.apache.ignite.internal.failure.FailureProcessor;
+import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
import org.apache.ignite.internal.pagememory.DataRegion;
import org.apache.ignite.internal.pagememory.freelist.FreeListImpl;
@@ -42,6 +44,7 @@ import
org.apache.ignite.internal.pagememory.persistence.checkpoint.CheckpointTi
import org.apache.ignite.internal.pagememory.tree.BplusTree;
import org.apache.ignite.internal.pagememory.util.GradualTaskExecutor;
import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.StorageException;
import org.apache.ignite.internal.storage.index.StorageHashIndexDescriptor;
import org.apache.ignite.internal.storage.index.StorageSortedIndexDescriptor;
@@ -51,6 +54,7 @@ import
org.apache.ignite.internal.storage.pagememory.StoragePartitionMeta;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineView;
import org.apache.ignite.internal.storage.pagememory.index.meta.IndexMetaTree;
import org.apache.ignite.internal.storage.pagememory.mv.gc.GcQueue;
+import org.apache.ignite.internal.storage.pagememory.mv.gc.GcRowVersion;
import org.apache.ignite.internal.storage.util.LocalLocker;
import org.apache.ignite.internal.util.ByteUtils;
import org.jetbrains.annotations.Nullable;
@@ -586,4 +590,32 @@ public class PersistentPageMemoryMvPartitionStorage
extends AbstractPageMemoryMv
return checkpointTimeoutLock.shouldReleaseReadLock();
}
}
+
+ /**
+ * This optimization reduces the IO operations performed when executing
{@link #vacuum} inside {@link #runConsistently}, which is
+ * currently executed in a loop. This will allow the checkpoint to acquire
a write lock more quickly.
+ */
+ @Override
+ protected void preloadingForGcIfNeededBusy(GcRowVersion gcRowVersion) {
+ RowId rowId = gcRowVersion.getRowId();
+ HybridTimestamp timestamp = gcRowVersion.getTimestamp();
+
+ var preloadingForGc = new PreloadingForGcInvokeClosure(rowId,
timestamp, gcRowVersion.getLink(), this);
+
+ lockByRowId.lock(rowId);
+
+ try {
+ renewableState.versionChainTree().invoke(new
VersionChainKey(rowId), null, preloadingForGc);
+ } catch (IgniteInternalCheckedException e) {
+ throwStorageExceptionIfItCause(e);
+
+ throw new StorageException(
+ "Error preloading row versions for garbage collection:
[rowId={}, rowTimestamp={}, {}]",
+ e,
+ rowId, timestamp, createStorageInfo()
+ );
+ } finally {
+ lockByRowId.unlockAll(rowId);
+ }
+ }
}
diff --git
a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PreloadingForGcInvokeClosure.java
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PreloadingForGcInvokeClosure.java
new file mode 100644
index 00000000000..9dda4a67c66
--- /dev/null
+++
b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/PreloadingForGcInvokeClosure.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.storage.pagememory.mv;
+
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.ALWAYS_LOAD_VALUE;
+import static
org.apache.ignite.internal.storage.pagememory.mv.AbstractPageMemoryMvPartitionStorage.DONT_LOAD_VALUE;
+import static
org.apache.ignite.internal.storage.pagememory.mv.FindRowVersion.RowVersionFilter.equalsByNextLink;
+
+import org.apache.ignite.internal.hlc.HybridTimestamp;
+import org.apache.ignite.internal.lang.IgniteInternalCheckedException;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.InvokeClosure;
+import org.apache.ignite.internal.pagememory.tree.IgniteTree.OperationType;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
+import org.apache.ignite.internal.storage.RowId;
+import org.apache.ignite.internal.storage.StorageException;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Implementation of {@link InvokeClosure} to preload row versions before
calling {@link MvPartitionStorage#vacuum} to reduce the number of
+ * IO operations that may occur when executed in the loop in {@link
MvPartitionStorage#runConsistently}. This will ultimately reduce the
+ * time to acquire a write lock at a checkpoint.
+ *
+ * @see RemoveWriteOnGcInvokeClosure
+ */
+class PreloadingForGcInvokeClosure implements InvokeClosure<VersionChain> {
+ private final RowId rowId;
+
+ private final HybridTimestamp timestamp;
+
+ private final long link;
+
+ private final AbstractPageMemoryMvPartitionStorage storage;
+
+ PreloadingForGcInvokeClosure(RowId rowId, HybridTimestamp timestamp, long
link, AbstractPageMemoryMvPartitionStorage storage) {
+ this.rowId = rowId;
+ this.timestamp = timestamp;
+ this.link = link;
+ this.storage = storage;
+ }
+
+ @Override
+ public void call(@Nullable VersionChain oldRow) throws
IgniteInternalCheckedException {
+ if (oldRow == null || !oldRow.hasNextLink()) {
+ return;
+ }
+
+ RowVersion rowVersion = storage.readRowVersion(link, DONT_LOAD_VALUE);
+
+ if (rowVersion == null || !rowVersion.hasNextLink()) {
+ return;
+ }
+
+ storage.readRowVersion(rowVersion.nextLink(), ALWAYS_LOAD_VALUE);
+
+ if (rowVersion.isTombstone()) {
+ if (oldRow.nextLink() == link) {
+ storage.readRowVersion(oldRow.headLink(), DONT_LOAD_VALUE);
+ } else if (oldRow.headLink() != link) {
+ storage.findRowVersion(oldRow, equalsByNextLink(link), false);
+ }
+ }
+ }
+
+ @Override
+ public OperationType operationType() {
+ return OperationType.NOOP;
+ }
+
+ @Override
+ public @Nullable VersionChain newRow() {
+ throw new StorageException(
+ "No updates should occur: [rowId={}, timestamp={}, link={},
storage={}]",
+ rowId, timestamp, link, storage.createStorageInfo()
+ );
+ }
+
+ @Override
+ public void onUpdate() {
+ throw new StorageException(
+ "No updates should occur: [rowId={}, timestamp={}, link={},
storage={}]",
+ rowId, timestamp, link, storage.createStorageInfo()
+ );
+ }
+}
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
index d6eeac69064..dff8da44c0b 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/GarbageCollector.java
@@ -35,8 +35,11 @@ import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.ROW
import static
org.apache.ignite.internal.storage.rocksdb.RocksDbStorageUtils.TABLE_ID_SIZE;
import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.List;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import org.apache.ignite.internal.schema.BinaryRow;
+import org.apache.ignite.internal.storage.MvPartitionStorage;
import org.apache.ignite.internal.storage.RowId;
import org.apache.ignite.internal.storage.gc.GcEntry;
import org.jetbrains.annotations.Nullable;
@@ -173,41 +176,41 @@ class GarbageCollector {
}
/**
- * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#peek(HybridTimestamp)}.
+ * Polls an elements for vacuum. See {@link MvPartitionStorage#peek}.
*
- * @param writeBatch Current Write Batch.
* @param lowWatermark Low watermark.
- * @return Garbage collected element descriptor.
+ * @param count Requested count of entries.
*/
- @Nullable GcEntry peek(WriteBatchWithIndex writeBatch, HybridTimestamp
lowWatermark) {
- // We retrieve the first element of the GC queue and seek for it in
the data CF.
- // However, the element that we need to garbage collect is the next
(older one) element.
- // First we check if there's anything to garbage collect. If the
element is a tombstone we remove it.
- // If the next element exists, that should be the element that we want
to garbage collect.
- try (RocksIterator gcIt = newWrappedIterator(writeBatch, gcQueueCf,
helper.upperBoundReadOpts)) {
+ List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
+ if (count <= 0) {
+ return List.of();
+ }
+
+ var res = new ArrayList<GcEntry>(count);
+
+ try (RocksIterator gcIt = db.newIterator(gcQueueCf,
helper.upperBoundReadOpts)) {
gcIt.seek(helper.partitionStartPrefix());
- if (invalid(gcIt)) {
- // GC queue is empty.
- return null;
- }
+ while (res.size() < count && !invalid(gcIt)) {
+ ByteBuffer gcKeyBuffer = readGcKey(gcIt);
- ByteBuffer gcKeyBuffer = readGcKey(gcIt);
+ GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
- GcRowVersion gcRowVersion = toGcRowVersion(gcKeyBuffer);
+ if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) {
+ break;
+ }
- if (gcRowVersion.getTimestamp().compareTo(lowWatermark) > 0) {
- // No elements to garbage collect.
- return null;
- }
+ res.add(gcRowVersion);
- return gcRowVersion;
+ gcIt.next();
+ }
}
- }
+ return res;
+ }
/**
- * Polls an element for vacuum. See {@link
org.apache.ignite.internal.storage.MvPartitionStorage#vacuum(GcEntry)}.
+ * Polls an element for vacuum. See {@link
MvPartitionStorage#vacuum(GcEntry)}.
*
* @param batch Write batch.
* @param entry Entry, previously returned by {@link #peek}.
diff --git
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
index 38676f31b88..af1319f5f1b 100644
---
a/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
+++
b/modules/storage-rocksdb/src/main/java/org/apache/ignite/internal/storage/rocksdb/RocksDbMvPartitionStorage.java
@@ -1290,13 +1290,12 @@ public class RocksDbMvPartitionStorage implements
MvPartitionStorage {
}
@Override
- public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
- WriteBatchWithIndex batch = requireWriteBatch();
-
- // No busy lock required, we're already in "runConsistently" closure.
- throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
+ public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
+ return busy(() -> {
+ throwExceptionIfStorageInProgressOfRebalance(state.get(),
this::createStorageInfo);
- return gc.peek(batch, lowWatermark);
+ return gc.peek(lowWatermark, count);
+ });
}
@Override
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java
index 4fb0358565e..08659b28ae3 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.table.distributed.gc;
+import java.util.List;
import org.apache.ignite.internal.hlc.HybridTimestamp;
import
org.apache.ignite.internal.partition.replicator.raft.snapshot.PartitionDataStorage;
import org.apache.ignite.internal.schema.BinaryRow;
@@ -68,16 +69,14 @@ public class GcUpdateHandler {
*
* @param lowWatermark Low watermark for the vacuum.
* @param count Count of entries to GC.
- * @param strict {@code true} if needed to remove the strictly passed
{@code count} oldest stale entries, {@code false} if a premature
- * exit is allowed when it is not possible to acquire a lock for the
{@link RowId}.
* @return {@code False} if there is no garbage left in the storage.
*/
- public boolean vacuumBatch(HybridTimestamp lowWatermark, int count,
boolean strict) {
+ public boolean vacuumBatch(HybridTimestamp lowWatermark, int count) {
if (count <= 0) {
return true;
}
- IntHolder countHolder = new IntHolder(count);
+ var countHolder = new IntHolder(count);
while (countHolder.get() > 0) {
VacuumResult vacuumResult = internalVacuumBatch(lowWatermark,
countHolder);
@@ -88,11 +87,8 @@ public class GcUpdateHandler {
case SUCCESS:
return true;
case FAILED_ACQUIRE_LOCK:
- if (strict) {
- continue;
- }
-
- return true;
+ case REMOVED_BY_ANOTHER_THREAD:
+ continue;
case SHOULD_RELEASE:
// Storage engine needs resources (e.g., checkpoint needs
write lock).
// Exit the closure to allow the engine to proceed.
@@ -106,10 +102,18 @@ public class GcUpdateHandler {
}
private VacuumResult internalVacuumBatch(HybridTimestamp lowWatermark,
IntHolder countHolder) {
+ int count = countHolder.get();
+
+ List<GcEntry> peekEntries = storage.peek(lowWatermark, count);
+
+ if (peekEntries.isEmpty()) {
+ return VacuumResult.NO_GARBAGE_LEFT;
+ }
+
return storage.runConsistently(locker -> {
- int count = countHolder.get();
+ boolean someRowRemovedByAnotherThread = false;
- for (int i = 0; i < count; i++) {
+ for (int i = 0; i < peekEntries.size(); i++) {
// Check if the storage engine needs resources before
continuing.
if (locker.shouldRelease()) {
return VacuumResult.SHOULD_RELEASE;
@@ -117,59 +121,53 @@ public class GcUpdateHandler {
// It is safe for the first iteration to use a lock instead of
tryLock, since there will be no deadlock for the first RowId
// and a deadlock may happen with subsequent ones.
- VacuumResult vacuumResult = internalVacuum(lowWatermark,
locker, i > 0);
+ VacuumResult vacuumResult = internalVacuum(peekEntries.get(i),
locker, i > 0);
- if (vacuumResult != VacuumResult.SUCCESS) {
+ if (vacuumResult == VacuumResult.REMOVED_BY_ANOTHER_THREAD) {
+ someRowRemovedByAnotherThread = true;
+
+ continue;
+ } else if (vacuumResult != VacuumResult.SUCCESS) {
return vacuumResult;
}
countHolder.getAndDecrement();
}
- return VacuumResult.SUCCESS;
- });
- }
-
- private VacuumResult internalVacuum(HybridTimestamp lowWatermark, Locker
locker, boolean useTryLock) {
- while (true) {
- // Check if the storage engine needs resources before continuing.
- if (locker.shouldRelease()) {
- return VacuumResult.SHOULD_RELEASE;
+ if (someRowRemovedByAnotherThread) {
+ return VacuumResult.REMOVED_BY_ANOTHER_THREAD;
}
- GcEntry gcEntry = storage.peek(lowWatermark);
-
- if (gcEntry == null) {
- return VacuumResult.NO_GARBAGE_LEFT;
- }
+ return countHolder.get() == 0 ? VacuumResult.SUCCESS :
VacuumResult.NO_GARBAGE_LEFT;
+ });
+ }
- RowId rowId = gcEntry.getRowId();
+ private VacuumResult internalVacuum(GcEntry gcEntry, Locker locker,
boolean useTryLock) {
+ RowId rowId = gcEntry.getRowId();
- if (useTryLock) {
- if (!locker.tryLock(rowId)) {
- return VacuumResult.FAILED_ACQUIRE_LOCK;
- }
- } else {
- locker.lock(rowId);
+ if (useTryLock) {
+ if (!locker.tryLock(rowId)) {
+ return VacuumResult.FAILED_ACQUIRE_LOCK;
}
+ } else {
+ locker.lock(rowId);
+ }
- BinaryRow binaryRow = storage.vacuum(gcEntry);
-
- if (binaryRow == null) {
- // Removed by another thread, let's try to take another.
- continue;
- }
+ BinaryRow binaryRow = storage.vacuum(gcEntry);
- try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
- // TODO: IGNITE-21005 We need to choose only those indexes
that are not available for transactions
- indexUpdateHandler.tryRemoveFromIndexes(binaryRow, rowId,
cursor, null);
- }
+ if (binaryRow == null) {
+ return VacuumResult.REMOVED_BY_ANOTHER_THREAD;
+ }
- return VacuumResult.SUCCESS;
+ try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) {
+ // TODO: IGNITE-21005 We need to choose only those indexes that
are not available for transactions
+ indexUpdateHandler.tryRemoveFromIndexes(binaryRow, rowId, cursor,
null);
}
+
+ return VacuumResult.SUCCESS;
}
private enum VacuumResult {
- SUCCESS, NO_GARBAGE_LEFT, FAILED_ACQUIRE_LOCK, SHOULD_RELEASE
+ SUCCESS, NO_GARBAGE_LEFT, FAILED_ACQUIRE_LOCK, SHOULD_RELEASE,
REMOVED_BY_ANOTHER_THREAD
}
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
index c8bd08da27d..b3857f92e14 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/MvGc.java
@@ -54,7 +54,7 @@ import org.jetbrains.annotations.TestOnly;
/**
* Garbage collector for multi-versioned storages and their indexes in the
background.
*
- * @see GcUpdateHandler#vacuumBatch(HybridTimestamp, int, boolean)
+ * @see GcUpdateHandler#vacuumBatch
*/
public class MvGc implements ManuallyCloseable {
private static final IgniteLogger LOG = Loggers.forClass(MvGc.class);
@@ -252,7 +252,7 @@ public class MvGc implements ManuallyCloseable {
});
currentAwaitSafeTimeFuture
- .thenApplyAsync(unused ->
gcUpdateHandler.vacuumBatch(lowWatermark, gcConfig.value().batchSize(), true),
executor)
+ .thenApplyAsync(unused ->
gcUpdateHandler.vacuumBatch(lowWatermark, gcConfig.value().batchSize()),
executor)
.whenComplete((isGarbageLeft, throwable) -> {
if (throwable != null) {
if (hasCause(throwable,
TrackerClosedException.class, StorageRemovedException.class)) {
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
index 6eaeea5b58d..507f0ea49e2 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/raft/snapshot/SnapshotAwarePartitionDataStorage.java
@@ -244,8 +244,8 @@ public class SnapshotAwarePartitionDataStorage implements
PartitionDataStorage {
}
@Override
- public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
- return partitionStorage.peek(lowWatermark);
+ public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
+ return partitionStorage.peek(lowWatermark, count);
}
@Override
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
index 669b69bea8e..498f4edefe1 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/IndexGcTest.java
@@ -42,7 +42,7 @@ public class IndexGcTest extends IndexBaseTest {
addWrite(storageUpdateHandler, rowUuid, row);
commitWrite(rowId);
- assertTrue(gcUpdateHandler.vacuumBatch(now(), 1, true));
+ assertTrue(gcUpdateHandler.vacuumBatch(now(), 1));
assertEquals(1, getRowVersions(rowId).size());
// Newer entry has the same index value, so it should not be removed.
@@ -70,13 +70,13 @@ public class IndexGcTest extends IndexBaseTest {
HybridTimestamp afterCommits = now();
- assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
+ assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1));
// row1 should still be in the index, because second write was
identical to the first.
assertTrue(inAllIndexes(row1));
- assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
- assertFalse(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
+ assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1));
+ assertFalse(gcUpdateHandler.vacuumBatch(afterCommits, 1));
assertEquals(1, getRowVersions(rowId).size());
// Older entries have different indexes, should be removed.
@@ -103,8 +103,8 @@ public class IndexGcTest extends IndexBaseTest {
HybridTimestamp afterCommits = now();
- assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
- assertFalse(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
+ assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1));
+ assertFalse(gcUpdateHandler.vacuumBatch(afterCommits, 1));
assertEquals(0, getRowVersions(rowId).size());
// The last entry was a tombstone, so no indexes should be left.
@@ -129,8 +129,8 @@ public class IndexGcTest extends IndexBaseTest {
HybridTimestamp afterCommits = now();
- assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
- assertFalse(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
+ assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1));
+ assertFalse(gcUpdateHandler.vacuumBatch(afterCommits, 1));
assertEquals(1, getRowVersions(rowId).size());
assertTrue(inAllIndexes(row));
@@ -154,9 +154,9 @@ public class IndexGcTest extends IndexBaseTest {
HybridTimestamp afterCommits = now();
- assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
- assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
- assertFalse(gcUpdateHandler.vacuumBatch(afterCommits, 1, true));
+ assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1));
+ assertTrue(gcUpdateHandler.vacuumBatch(afterCommits, 1));
+ assertFalse(gcUpdateHandler.vacuumBatch(afterCommits, 1));
assertEquals(0, getRowVersions(rowId).size());
assertTrue(notInAnyIndex(row));
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
index c7bb5da96c7..86d94a9e6a9 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/AbstractGcUpdateHandlerTest.java
@@ -56,8 +56,6 @@ import
org.apache.ignite.internal.util.PendingComparableValuesTracker;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.RepeatedTest;
import org.junit.jupiter.api.Test;
-import org.junit.jupiter.params.ParameterizedTest;
-import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
/**
@@ -82,9 +80,8 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
this.tableStorage = tableStorage;
}
- @ParameterizedTest(name = "strict : {0}")
- @ValueSource(booleans = {true, false})
- void testVacuum(boolean strict) {
+ @Test
+ void testVacuum() {
TestPartitionDataStorage partitionStorage =
spy(createPartitionDataStorage());
IndexUpdateHandler indexUpdateHandler =
spy(createIndexUpdateHandler());
@@ -92,8 +89,8 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
HybridTimestamp lowWatermark = HybridTimestamp.MAX_VALUE;
- assertFalse(gcUpdateHandler.vacuumBatch(lowWatermark, 1, strict));
- verify(partitionStorage).peek(lowWatermark);
+ assertFalse(gcUpdateHandler.vacuumBatch(lowWatermark, 1));
+ verify(partitionStorage).peek(eq(lowWatermark), eq(1));
// Let's check that StorageUpdateHandler#vacuumBatch returns true.
clearInvocations(partitionStorage);
@@ -104,14 +101,13 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
addWriteCommitted(partitionStorage, rowId, row, clock.now());
addWriteCommitted(partitionStorage, rowId, row, clock.now());
- assertTrue(gcUpdateHandler.vacuumBatch(lowWatermark, 1, strict));
- verify(partitionStorage).peek(lowWatermark);
+ assertTrue(gcUpdateHandler.vacuumBatch(lowWatermark, 1));
+ verify(partitionStorage).peek(eq(lowWatermark), eq(1));
verify(indexUpdateHandler).tryRemoveFromIndexes(any(), eq(rowId),
any(), isNull());
}
- @ParameterizedTest(name = "strict : {0}")
- @ValueSource(booleans = {true, false})
- void testVacuumBatch(boolean strict) {
+ @Test
+ void testVacuumBatch() {
TestPartitionDataStorage partitionStorage =
spy(createPartitionDataStorage());
IndexUpdateHandler indexUpdateHandler =
spy(createIndexUpdateHandler());
@@ -131,9 +127,9 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
addWriteCommitted(partitionStorage, rowId1, row1, clock.now());
addWriteCommitted(partitionStorage, rowId1, row1, clock.now());
- assertFalse(gcUpdateHandler.vacuumBatch(lowWatermark, 5, strict));
+ assertFalse(gcUpdateHandler.vacuumBatch(lowWatermark, 5));
- verify(partitionStorage, times(3)).peek(lowWatermark);
+ verify(partitionStorage, times(1)).peek(eq(lowWatermark), eq(5));
verify(indexUpdateHandler).tryRemoveFromIndexes(any(), eq(rowId0),
any(), isNull());
verify(indexUpdateHandler).tryRemoveFromIndexes(any(), eq(rowId1),
any(), isNull());
}
@@ -162,8 +158,8 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
addWriteCommitted(partitionStorage, rowId1, null, clock.now());
runRace(
- () ->
gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 2, true),
- () ->
gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 2, true)
+ () ->
gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 2),
+ () ->
gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 2)
);
assertNull(partitionStorage.getStorage().closestRowId(RowId.lowestRowId(PARTITION_ID)));
@@ -199,8 +195,8 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
addWriteCommitted(partitionStorage, rowId3, null, clock.now());
runRace(
- () ->
gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 4, false),
- () ->
gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 4, false)
+ () ->
gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 4),
+ () ->
gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, 4)
);
assertNull(partitionStorage.getStorage().closestRowId(RowId.lowestRowId(PARTITION_ID)));
@@ -243,7 +239,7 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
}
for (GcUpdateHandler gcUpdateHandler : gcUpdateHandlers) {
- gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE,
Integer.MAX_VALUE, true);
+ gcUpdateHandler.vacuumBatch(HybridTimestamp.MAX_VALUE, numRows);
}
for (int i = 0; i < numPartitions; i++) {
@@ -284,7 +280,7 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
// When: Vacuum runs with shouldRelease() returning true (simulating
lock contention)
when(shouldReleaseSupplier.getAsBoolean()).thenReturn(true);
- boolean hasGarbageLeft = gcUpdateHandler.vacuumBatch(lowWatermark, 10,
true);
+ boolean hasGarbageLeft = gcUpdateHandler.vacuumBatch(lowWatermark, 10);
// Then: Vacuum exits early and reports garbage remaining
assertTrue(hasGarbageLeft, "Expected garbage to remain after early
exit");
@@ -292,7 +288,7 @@ abstract class AbstractGcUpdateHandlerTest extends
BaseMvStoragesTest {
// When: Vacuum runs again with shouldRelease() returning false
when(shouldReleaseSupplier.getAsBoolean()).thenReturn(false);
- hasGarbageLeft = gcUpdateHandler.vacuumBatch(lowWatermark, 100, true);
+ hasGarbageLeft = gcUpdateHandler.vacuumBatch(lowWatermark, 100);
// Then: All remaining garbage is processed
assertFalse(hasGarbageLeft, "Expected no garbage left after completing
vacuum");
diff --git
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
index ce4c665081d..3ed0edd984c 100644
---
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
+++
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/gc/MvGcTest.java
@@ -29,9 +29,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
-import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
-import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.spy;
@@ -389,7 +387,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
assertThat(startAwaitSafeTimeFuture, willCompleteSuccessfully());
assertThat(gc.removeStorage(tablePartitionId),
willCompleteSuccessfully());
- verify(gcUpdateHandler, never()).vacuumBatch(any(), anyInt(),
anyBoolean());
+ verify(gcUpdateHandler, never()).vacuumBatch(any(), anyInt());
}
private TablePartitionId createTablePartitionId() {
@@ -409,7 +407,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
CompletableFuture<Void> future,
@Nullable HybridTimestamp exp
) {
- when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(),
eq(true))).then(invocation -> {
+ when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class),
anyInt())).then(invocation -> {
if (exp != null) {
try {
assertEquals(exp, invocation.getArgument(0));
@@ -429,7 +427,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
private static GcUpdateHandler createWithCountDownOnVacuum(CountDownLatch
latch) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
- when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(),
eq(true))).then(invocation -> {
+ when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class),
anyInt())).then(invocation -> {
latch.countDown();
return latch.getCount() > 0;
@@ -441,7 +439,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
private static GcUpdateHandler
createWithWaitFinishVacuum(CompletableFuture<Void> startFuture,
CompletableFuture<Void> finishFuture) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
- when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(),
eq(true))).then(invocation -> {
+ when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class),
anyInt())).then(invocation -> {
startFuture.complete(null);
finishFuture.get(1, TimeUnit.SECONDS);
@@ -461,7 +459,7 @@ public class MvGcTest extends BaseIgniteAbstractTest {
private static GcUpdateHandler
createWithCountDownOnVacuumWithoutNextBatch(CountDownLatch latch) {
GcUpdateHandler gcUpdateHandler = createGcUpdateHandler();
- when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class), anyInt(),
eq(true))).then(invocation -> {
+ when(gcUpdateHandler.vacuumBatch(any(HybridTimestamp.class),
anyInt())).then(invocation -> {
latch.countDown();
// So that there is no processing of the next batch.
diff --git
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
index d446e5445db..4d68eb2a087 100644
---
a/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
+++
b/modules/table/src/testFixtures/java/org/apache/ignite/distributed/TestPartitionDataStorage.java
@@ -17,6 +17,7 @@
package org.apache.ignite.distributed;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -167,8 +168,8 @@ public class TestPartitionDataStorage implements
PartitionDataStorage {
}
@Override
- public @Nullable GcEntry peek(HybridTimestamp lowWatermark) {
- return partitionStorage.peek(lowWatermark);
+ public List<GcEntry> peek(HybridTimestamp lowWatermark, int count) {
+ return partitionStorage.peek(lowWatermark, count);
}
@Override