This is an automated email from the ASF dual-hosted git repository. amashenkov pushed a commit to branch ignite-20680 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a465989787ad2dda0769d17498c6ed15d89c346d Author: amashenkov <andrey.mashen...@gmail.com> AuthorDate: Tue Jan 30 15:04:01 2024 +0300 wip --- .../ignite/client/fakes/FakeInternalTable.java | 10 +++++++ .../ignite/internal/table/InternalTable.java | 4 +++ .../internal/table/distributed/LowWatermark.java | 11 ++++---- .../internal/table/distributed/TableManager.java | 33 ++++++++++++++-------- .../distributed/storage/InternalTableImpl.java | 12 ++++++++ .../table/distributed/LowWatermarkTest.java | 3 +- 6 files changed, 55 insertions(+), 18 deletions(-) diff --git a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java index ff0804d97f..4f9a790188 100644 --- a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java +++ b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java @@ -475,4 +475,14 @@ public class FakeInternalTable implements InternalTable { public @Nullable PendingComparableValuesTracker<Long, Void> getPartitionStorageIndexTracker(int partitionId) { return null; } + + @Override + public long readOnlySince() { + return Long.MAX_VALUE; + } + + @Override + public void markReadOnly(long time) { + // noop. + } } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java index eae46725ee..7219e3472e 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java @@ -477,4 +477,8 @@ public interface InternalTable extends ManuallyCloseable { * @param partitionId Partition ID. */ @Nullable PendingComparableValuesTracker<Long, Void> getPartitionStorageIndexTracker(int partitionId); + + long readOnlySince(); + + void markReadOnly(long time); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java index f6322251e5..4e43f46e4b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java @@ -20,12 +20,14 @@ package org.apache.ignite.internal.table.distributed; import static org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture; import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock; +import java.util.List; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import org.apache.ignite.internal.close.ManuallyCloseable; import org.apache.ignite.internal.hlc.HybridClock; import org.apache.ignite.internal.hlc.HybridTimestamp; @@ -34,7 +36,6 @@ import org.apache.ignite.internal.lang.NodeStoppingException; import org.apache.ignite.internal.logger.IgniteLogger; import org.apache.ignite.internal.logger.Loggers; import org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration; -import org.apache.ignite.internal.table.distributed.gc.MvGc; import org.apache.ignite.internal.thread.NamedThreadFactory; import org.apache.ignite.internal.tx.TxManager; import org.apache.ignite.internal.util.ByteUtils; @@ -65,7 +66,7 @@ public class LowWatermark implements ManuallyCloseable { private final VaultManager vaultManager; - private final MvGc mvGc; + private final List<Consumer<HybridTimestamp>> gcListeners; private final ScheduledExecutorService scheduledThreadPool; @@ -92,13 +93,13 @@ public class LowWatermark implements ManuallyCloseable { HybridClock clock, TxManager txManager, VaultManager vaultManager, - MvGc mvGc + List<Consumer<HybridTimestamp>> mvGc ) { this.lowWatermarkConfig = lowWatermarkConfig; this.clock = clock; this.txManager = txManager; this.vaultManager = vaultManager; - this.mvGc = mvGc; + this.gcListeners = mvGc; scheduledThreadPool = Executors.newSingleThreadScheduledExecutor( NamedThreadFactory.create(nodeName, "low-watermark-updater", LOG) @@ -212,7 +213,7 @@ public class LowWatermark implements ManuallyCloseable { } private void runGcAndScheduleUpdateLowWatermarkBusy(HybridTimestamp lowWatermark) { - mvGc.updateLowWatermark(lowWatermark); + gcListeners.forEach(t -> t.accept(lowWatermark)); scheduleUpdateLowWatermarkBusy(); } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index f33e3ecc92..a7001c2add 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -75,6 +75,7 @@ import java.util.function.Function; import java.util.function.IntSupplier; import java.util.function.LongFunction; import java.util.function.Supplier; +import java.util.stream.Collectors; import java.util.stream.IntStream; import java.util.stream.Stream; import org.apache.ignite.internal.affinity.AffinityUtils; @@ -481,7 +482,8 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { mvGc = new MvGc(nodeName, gcConfig); - lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), clock, txManager, vaultManager, mvGc); + lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), clock, txManager, vaultManager, + List.of(this::onLowWatermarkChanged, mvGc::updateLowWatermark)); raftCommandsMarshaller = new ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry()); @@ -684,19 +686,30 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { private CompletableFuture<Void> onTableDelete(DropTableEventParameters parameters) { return inBusyLockAsync(busyLock, () -> { long causalityToken = parameters.causalityToken(); - int catalogVersion = parameters.catalogVersion(); + long removalTimestamp = catalogService.catalog(parameters.catalogVersion()).time(); int tableId = parameters.tableId(); - CatalogTableDescriptor tableDescriptor = getTableDescriptor(tableId, catalogVersion - 1); - CatalogZoneDescriptor zoneDescriptor = getZoneDescriptor(tableDescriptor, catalogVersion - 1); + tablesByIdVv.update(causalityToken, (prev, throwable) -> { + prev.get(tableId).internalTable().markReadOnly(removalTimestamp); - dropTableLocally(causalityToken, tableDescriptor, zoneDescriptor); + return completedFuture(prev); + }); return nullCompletedFuture(); }); } + private void onLowWatermarkChanged(HybridTimestamp lwm) { + inBusyLock(busyLock, () -> { + long causalityToken = localPartsByTableIdVv.latestCausalityToken(); + + tablesByIdVv.get(causalityToken).thenAccept(map -> map.values().stream() + .filter(tbl -> tbl.internalTable().readOnlySince() < lwm.longValue()) + .forEach(tbl -> dropTableLocally(causalityToken, tbl))); + }); + } + private CompletableFuture<?> onTableRename(RenameTableEventParameters parameters) { return inBusyLockAsync(busyLock, () -> tablesByIdVv.update( parameters.causalityToken(), @@ -1328,14 +1341,10 @@ public class TableManager implements IgniteTablesInternal, IgniteComponent { /** * Drops local structures for a table. - * - * @param causalityToken Causality token. - * @param tableDescriptor Catalog table descriptor. - * @param zoneDescriptor Catalog distributed zone descriptor. */ - private void dropTableLocally(long causalityToken, CatalogTableDescriptor tableDescriptor, CatalogZoneDescriptor zoneDescriptor) { - int tableId = tableDescriptor.id(); - int partitions = zoneDescriptor.partitions(); + private void dropTableLocally(long causalityToken, TableImpl tbl) { + int tableId = tbl.tableId(); + int partitions = tbl.internalTable().partitions(); localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> inBusyLock(busyLock, () -> { if (e != null) { diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java index f1a95e24f8..0fb73dd944 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java @@ -176,6 +176,8 @@ public class InternalTableImpl implements InternalTable { /** Map update guarded by {@link #updatePartitionMapsMux}. */ private volatile Int2ObjectMap<PendingComparableValuesTracker<Long, Void>> storageIndexTrackerByPartitionId = emptyMap(); + private volatile long readOnlySince = Long.MAX_VALUE; + /** * Constructor. * @@ -248,6 +250,16 @@ public class InternalTableImpl implements InternalTable { this.tableName = newName; } + @Override + public long readOnlySince() { + return readOnlySince; + } + + @Override + public void markReadOnly(long timestamp) { + readOnlySince = timestamp; + } + /** * Enlists a single row into a transaction. * diff --git a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java index a625802438..8cd55bd9b6 100644 --- a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java +++ b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java @@ -37,6 +37,7 @@ import static org.mockito.Mockito.timeout; import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; +import java.util.List; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -79,7 +80,7 @@ public class LowWatermarkTest extends BaseIgniteAbstractTest { @BeforeEach void setUp() { - lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, txManager, vaultManager, mvGc); + lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, txManager, vaultManager, List.of(mvGc::updateLowWatermark)); } @AfterEach