This is an automated email from the ASF dual-hosted git repository.

amashenkov pushed a commit to branch ignite-20680
in repository https://gitbox.apache.org/repos/asf/ignite-3.git

commit a465989787ad2dda0769d17498c6ed15d89c346d
Author: amashenkov <andrey.mashen...@gmail.com>
AuthorDate: Tue Jan 30 15:04:01 2024 +0300

    wip
---
 .../ignite/client/fakes/FakeInternalTable.java     | 10 +++++++
 .../ignite/internal/table/InternalTable.java       |  4 +++
 .../internal/table/distributed/LowWatermark.java   | 11 ++++----
 .../internal/table/distributed/TableManager.java   | 33 ++++++++++++++--------
 .../distributed/storage/InternalTableImpl.java     | 12 ++++++++
 .../table/distributed/LowWatermarkTest.java        |  3 +-
 6 files changed, 55 insertions(+), 18 deletions(-)

diff --git 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
index ff0804d97f..4f9a790188 100644
--- 
a/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
+++ 
b/modules/client/src/test/java/org/apache/ignite/client/fakes/FakeInternalTable.java
@@ -475,4 +475,14 @@ public class FakeInternalTable implements InternalTable {
     public @Nullable PendingComparableValuesTracker<Long, Void> 
getPartitionStorageIndexTracker(int partitionId) {
         return null;
     }
+
+    @Override
+    public long readOnlySince() {
+        return Long.MAX_VALUE;
+    }
+
+    @Override
+    public void markReadOnly(long time) {
+        // noop.
+    }
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
index eae46725ee..7219e3472e 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/InternalTable.java
@@ -477,4 +477,8 @@ public interface InternalTable extends ManuallyCloseable {
      * @param partitionId Partition ID.
      */
     @Nullable PendingComparableValuesTracker<Long, Void> 
getPartitionStorageIndexTracker(int partitionId);
+
+    long readOnlySince();
+
+    void markReadOnly(long time);
 }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
index f6322251e5..4e43f46e4b 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/LowWatermark.java
@@ -20,12 +20,14 @@ package org.apache.ignite.internal.table.distributed;
 import static 
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
 import static org.apache.ignite.internal.util.IgniteUtils.inBusyLock;
 
+import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
 import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.function.Consumer;
 import org.apache.ignite.internal.close.ManuallyCloseable;
 import org.apache.ignite.internal.hlc.HybridClock;
 import org.apache.ignite.internal.hlc.HybridTimestamp;
@@ -34,7 +36,6 @@ import org.apache.ignite.internal.lang.NodeStoppingException;
 import org.apache.ignite.internal.logger.IgniteLogger;
 import org.apache.ignite.internal.logger.Loggers;
 import 
org.apache.ignite.internal.schema.configuration.LowWatermarkConfiguration;
-import org.apache.ignite.internal.table.distributed.gc.MvGc;
 import org.apache.ignite.internal.thread.NamedThreadFactory;
 import org.apache.ignite.internal.tx.TxManager;
 import org.apache.ignite.internal.util.ByteUtils;
@@ -65,7 +66,7 @@ public class LowWatermark implements ManuallyCloseable {
 
     private final VaultManager vaultManager;
 
-    private final MvGc mvGc;
+    private final List<Consumer<HybridTimestamp>> gcListeners;
 
     private final ScheduledExecutorService scheduledThreadPool;
 
@@ -92,13 +93,13 @@ public class LowWatermark implements ManuallyCloseable {
             HybridClock clock,
             TxManager txManager,
             VaultManager vaultManager,
-            MvGc mvGc
+            List<Consumer<HybridTimestamp>> mvGc
     ) {
         this.lowWatermarkConfig = lowWatermarkConfig;
         this.clock = clock;
         this.txManager = txManager;
         this.vaultManager = vaultManager;
-        this.mvGc = mvGc;
+        this.gcListeners = mvGc;
 
         scheduledThreadPool = Executors.newSingleThreadScheduledExecutor(
                 NamedThreadFactory.create(nodeName, "low-watermark-updater", 
LOG)
@@ -212,7 +213,7 @@ public class LowWatermark implements ManuallyCloseable {
     }
 
     private void runGcAndScheduleUpdateLowWatermarkBusy(HybridTimestamp 
lowWatermark) {
-        mvGc.updateLowWatermark(lowWatermark);
+        gcListeners.forEach(t -> t.accept(lowWatermark));
 
         scheduleUpdateLowWatermarkBusy();
     }
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index f33e3ecc92..a7001c2add 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -75,6 +75,7 @@ import java.util.function.Function;
 import java.util.function.IntSupplier;
 import java.util.function.LongFunction;
 import java.util.function.Supplier;
+import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import java.util.stream.Stream;
 import org.apache.ignite.internal.affinity.AffinityUtils;
@@ -481,7 +482,8 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
         mvGc = new MvGc(nodeName, gcConfig);
 
-        lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), 
clock, txManager, vaultManager, mvGc);
+        lowWatermark = new LowWatermark(nodeName, gcConfig.lowWatermark(), 
clock, txManager, vaultManager,
+                List.of(this::onLowWatermarkChanged, 
mvGc::updateLowWatermark));
 
         raftCommandsMarshaller = new 
ThreadLocalPartitionCommandsMarshaller(clusterService.serializationRegistry());
 
@@ -684,19 +686,30 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
     private CompletableFuture<Void> onTableDelete(DropTableEventParameters 
parameters) {
         return inBusyLockAsync(busyLock, () -> {
             long causalityToken = parameters.causalityToken();
-            int catalogVersion = parameters.catalogVersion();
+            long removalTimestamp = 
catalogService.catalog(parameters.catalogVersion()).time();
 
             int tableId = parameters.tableId();
 
-            CatalogTableDescriptor tableDescriptor = 
getTableDescriptor(tableId, catalogVersion - 1);
-            CatalogZoneDescriptor zoneDescriptor = 
getZoneDescriptor(tableDescriptor, catalogVersion - 1);
+            tablesByIdVv.update(causalityToken, (prev, throwable) -> {
+                
prev.get(tableId).internalTable().markReadOnly(removalTimestamp);
 
-            dropTableLocally(causalityToken, tableDescriptor, zoneDescriptor);
+                return completedFuture(prev);
+            });
 
             return nullCompletedFuture();
         });
     }
 
+    private void onLowWatermarkChanged(HybridTimestamp lwm) {
+        inBusyLock(busyLock, () -> {
+            long causalityToken = localPartsByTableIdVv.latestCausalityToken();
+
+            tablesByIdVv.get(causalityToken).thenAccept(map -> 
map.values().stream()
+                    .filter(tbl -> tbl.internalTable().readOnlySince() < 
lwm.longValue())
+                    .forEach(tbl -> dropTableLocally(causalityToken, tbl)));
+        });
+    }
+
     private CompletableFuture<?> onTableRename(RenameTableEventParameters 
parameters) {
         return inBusyLockAsync(busyLock, () -> tablesByIdVv.update(
                 parameters.causalityToken(),
@@ -1328,14 +1341,10 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
     /**
      * Drops local structures for a table.
-     *
-     * @param causalityToken Causality token.
-     * @param tableDescriptor Catalog table descriptor.
-     * @param zoneDescriptor Catalog distributed zone descriptor.
      */
-    private void dropTableLocally(long causalityToken, CatalogTableDescriptor 
tableDescriptor, CatalogZoneDescriptor zoneDescriptor) {
-        int tableId = tableDescriptor.id();
-        int partitions = zoneDescriptor.partitions();
+    private void dropTableLocally(long causalityToken, TableImpl tbl) {
+        int tableId = tbl.tableId();
+        int partitions = tbl.internalTable().partitions();
 
         localPartsByTableIdVv.update(causalityToken, (previousVal, e) -> 
inBusyLock(busyLock, () -> {
             if (e != null) {
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
index f1a95e24f8..0fb73dd944 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/storage/InternalTableImpl.java
@@ -176,6 +176,8 @@ public class InternalTableImpl implements InternalTable {
     /** Map update guarded by {@link #updatePartitionMapsMux}. */
     private volatile Int2ObjectMap<PendingComparableValuesTracker<Long, Void>> 
storageIndexTrackerByPartitionId = emptyMap();
 
+    private volatile long readOnlySince = Long.MAX_VALUE;
+
     /**
      * Constructor.
      *
@@ -248,6 +250,16 @@ public class InternalTableImpl implements InternalTable {
         this.tableName = newName;
     }
 
+    @Override
+    public long readOnlySince() {
+        return readOnlySince;
+    }
+
+    @Override
+    public void markReadOnly(long timestamp) {
+        readOnlySince = timestamp;
+    }
+
     /**
      * Enlists a single row into a transaction.
      *
diff --git 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
index a625802438..8cd55bd9b6 100644
--- 
a/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
+++ 
b/modules/table/src/test/java/org/apache/ignite/internal/table/distributed/LowWatermarkTest.java
@@ -37,6 +37,7 @@ import static org.mockito.Mockito.timeout;
 import static org.mockito.Mockito.verify;
 import static org.mockito.Mockito.when;
 
+import java.util.List;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -79,7 +80,7 @@ public class LowWatermarkTest extends BaseIgniteAbstractTest {
 
     @BeforeEach
     void setUp() {
-        lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, 
txManager, vaultManager, mvGc);
+        lowWatermark = new LowWatermark("test", lowWatermarkConfig, clock, 
txManager, vaultManager, List.of(mvGc::updateLowWatermark));
     }
 
     @AfterEach

Reply via email to