This is an automated email from the ASF dual-hosted git repository. tkalkirill pushed a commit to branch ignite-26998 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit ba4127bc9634c4e79d57ae7698b384ef7df9a98d Author: Kirill Tkalenko <[email protected]> AuthorDate: Fri Nov 28 12:06:48 2025 +0300 IGNITE-26998 wip --- .../mv/AbstractPageMemoryMvPartitionStorage.java | 6 +- .../table/distributed/gc/GcUpdateHandler.java | 70 ++++++++++------------ 2 files changed, 36 insertions(+), 40 deletions(-) diff --git a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java index d2e9539901a..d0aeae565ac 100644 --- a/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java +++ b/modules/storage-page-memory/src/main/java/org/apache/ignite/internal/storage/pagememory/mv/AbstractPageMemoryMvPartitionStorage.java @@ -1094,7 +1094,11 @@ public abstract class AbstractPageMemoryMvPartitionStorage implements MvPartitio } catch (IgniteInternalCheckedException e) { throwStorageExceptionIfItCause(e); - throw new StorageException("Row version lookup failed: [rowId={}, {}]", e, lowWatermark, createStorageInfo()); + throw new StorageException( + "Peek GC entries failed: [lowWatermark={}, count={}, {}]", + e, + lowWatermark, count, createStorageInfo() + ); } return res; diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java index c5ab546b188..341ddb84d90 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/gc/GcUpdateHandler.java @@ -101,10 +101,16 @@ public class GcUpdateHandler { } private VacuumResult internalVacuumBatch(HybridTimestamp lowWatermark, IntHolder countHolder) { - return storage.runConsistently(locker -> { - int count = countHolder.get(); + int count = countHolder.get(); + + List<GcEntry> peekEntries = storage.peek(lowWatermark, count); - for (int i = 0; i < count; i++) { + if (peekEntries.isEmpty()) { + return VacuumResult.NO_GARBAGE_LEFT; + } + + return storage.runConsistently(locker -> { + for (int i = 0; i < peekEntries.size(); i++) { // Check if the storage engine needs resources before continuing. if (locker.shouldRelease()) { return VacuumResult.SHOULD_RELEASE; @@ -112,9 +118,11 @@ public class GcUpdateHandler { // It is safe for the first iteration to use a lock instead of tryLock, since there will be no deadlock for the first RowId // and a deadlock may happen with subsequent ones. - VacuumResult vacuumResult = internalVacuum(lowWatermark, locker, i > 0); + VacuumResult vacuumResult = internalVacuum(peekEntries.get(i), locker, i > 0); - if (vacuumResult != VacuumResult.SUCCESS) { + if (vacuumResult == VacuumResult.REMOVED_BY_ANOTHER_THREAD) { + continue; + } else if (vacuumResult != VacuumResult.SUCCESS) { return vacuumResult; } @@ -125,48 +133,32 @@ public class GcUpdateHandler { }); } - private VacuumResult internalVacuum(HybridTimestamp lowWatermark, Locker locker, boolean useTryLock) { - while (true) { - // Check if the storage engine needs resources before continuing. - if (locker.shouldRelease()) { - return VacuumResult.SHOULD_RELEASE; - } - - // TODO: IGNITE-26998 Переделать а пока заглушка - List<GcEntry> gcEntries = storage.peek(lowWatermark, 1); + private VacuumResult internalVacuum(GcEntry gcEntry, Locker locker, boolean useTryLock) { + RowId rowId = gcEntry.getRowId(); - if (gcEntries.isEmpty()) { - return VacuumResult.NO_GARBAGE_LEFT; + if (useTryLock) { + if (!locker.tryLock(rowId)) { + return VacuumResult.FAILED_ACQUIRE_LOCK; } + } else { + locker.lock(rowId); + } - GcEntry gcEntry = gcEntries.get(0); - RowId rowId = gcEntry.getRowId(); - - if (useTryLock) { - if (!locker.tryLock(rowId)) { - return VacuumResult.FAILED_ACQUIRE_LOCK; - } - } else { - locker.lock(rowId); - } - - BinaryRow binaryRow = storage.vacuum(gcEntry); - - if (binaryRow == null) { - // Removed by another thread, let's try to take another. - continue; - } + BinaryRow binaryRow = storage.vacuum(gcEntry); - try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) { - // TODO: IGNITE-21005 We need to choose only those indexes that are not available for transactions - indexUpdateHandler.tryRemoveFromIndexes(binaryRow, rowId, cursor, null); - } + if (binaryRow == null) { + return VacuumResult.REMOVED_BY_ANOTHER_THREAD; + } - return VacuumResult.SUCCESS; + try (Cursor<ReadResult> cursor = storage.scanVersions(rowId)) { + // TODO: IGNITE-21005 We need to choose only those indexes that are not available for transactions + indexUpdateHandler.tryRemoveFromIndexes(binaryRow, rowId, cursor, null); } + + return VacuumResult.SUCCESS; } private enum VacuumResult { - SUCCESS, NO_GARBAGE_LEFT, FAILED_ACQUIRE_LOCK, SHOULD_RELEASE + SUCCESS, NO_GARBAGE_LEFT, FAILED_ACQUIRE_LOCK, SHOULD_RELEASE, REMOVED_BY_ANOTHER_THREAD } }
