This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 765c88d7566 IGNITE-25260 Fix an issue on dropping a table when the 
colocation is enabled (#5774)
765c88d7566 is described below

commit 765c88d756637fcda9f235427ba120c92bd51d9a
Author: Slava Koptilin <[email protected]>
AuthorDate: Mon May 12 14:45:28 2025 +0300

    IGNITE-25260 Fix an issue on dropping a table when the colocation is 
enabled (#5774)
---
 .../PartitionReplicaLifecycleManager.java          |   1 +
 .../internal/table/distributed/TableManager.java   | 238 ++++++++++++++-------
 2 files changed, 167 insertions(+), 72 deletions(-)

diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 4137e53d020..c02fa7af934 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -200,6 +200,7 @@ public class PartitionReplicaLifecycleManager extends
 
     private final Set<ZonePartitionId> replicationGroupIds = 
ConcurrentHashMap.newKeySet();
 
+    // TODO https://issues.apache.org/jira/browse/IGNITE-25347
     /** (zoneId -> lock) map to provide concurrent access to the zone replicas 
list. */
     private final Map<Integer, NaiveAsyncReadWriteLock> zonePartitionsLocks = 
new ConcurrentHashMap<>();
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 7406cba00ec..99190c42f25 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -68,6 +68,8 @@ import static 
org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
 import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -85,6 +87,7 @@ import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.StampedLock;
 import java.util.function.BiFunction;
 import java.util.function.Consumer;
 import java.util.function.Function;
@@ -428,7 +431,11 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
 
     private final CompletableFuture<Void> readyToProcessReplicaStarts = new 
CompletableFuture<>();
 
-    private final Map<Integer, Set<TableImpl>> tablesPerZone = new 
ConcurrentHashMap<>();
+    // TODO https://issues.apache.org/jira/browse/IGNITE-25347
+    /** Mapping zone identifier to a collection of tables related to the zone. 
*/
+    private final Map<Integer, Set<TableImpl>> tablesPerZone = new HashMap<>();
+    /** Locks to synchronize an access to the {@link #tablesPerZone}. */
+    private final Map<Integer, StampedLock> tablesPerZoneLocks = new 
ConcurrentHashMap<>();
 
     /** Configuration of rebalance retries delay. */
     private final SystemDistributedConfigurationPropertyHolder<Integer> 
rebalanceRetryDelayConfiguration;
@@ -689,33 +696,48 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                 .thenCompose(v -> {
                     ZonePartitionId zonePartitionId = 
parameters.zonePartitionId();
 
-                    Set<TableImpl> zoneTables = 
zoneTablesRawSet(zonePartitionId.zoneId());
-
-                    int partitionIndex = zonePartitionId.partitionId();
-                    PartitionSet singlePartitionIdSet = 
PartitionSet.of(partitionIndex);
-
-                    CompletableFuture<?>[] futures = zoneTables.stream()
-                            .map(tbl -> inBusyLockAsync(busyLock, () -> {
-                                return getOrCreatePartitionStorages(tbl, 
singlePartitionIdSet)
-                                        .thenRun(() -> {
-                                            localPartsByTableId.compute(
-                                                    tbl.tableId(),
-                                                    (tableId, oldPartitionSet) 
-> extendPartitionSet(oldPartitionSet, partitionIndex)
-                                            );
-                                        })
-                                        .thenRunAsync(() -> 
inBusyLock(busyLock, () -> {
-                                            
lowWatermark.getLowWatermarkSafe(lwm ->
-                                                    
registerIndexesToTable(tbl, catalogService, singlePartitionIdSet, 
tbl.schemaView(), lwm)
-                                            );
-
-                                            
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, false);
-                                        }), ioExecutor)
-                                        // If the table is already closed, 
it's not a problem (probably the node is stopping).
-                                        
.exceptionally(ignoreTableClosedException());
-                            }))
-                            .toArray(CompletableFuture[]::new);
+                    StampedLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new 
StampedLock());
+                    long stamp = zoneLock.readLock();
+
+                    try {
+                        Set<TableImpl> zoneTables = 
zoneTablesRawSet(zonePartitionId.zoneId());
+
+                        int partitionIndex = zonePartitionId.partitionId();
+                        PartitionSet singlePartitionIdSet = 
PartitionSet.of(partitionIndex);
+
+                        CompletableFuture<?>[] futures = zoneTables.stream()
+                                .map(tbl -> inBusyLockAsync(busyLock, () -> {
+                                    return getOrCreatePartitionStorages(tbl, 
singlePartitionIdSet)
+                                            .thenRun(() -> {
+                                                localPartsByTableId.compute(
+                                                        tbl.tableId(),
+                                                        (tableId, 
oldPartitionSet) -> extendPartitionSet(oldPartitionSet, partitionIndex)
+                                                );
+                                            })
+                                            .thenRunAsync(() -> 
inBusyLock(busyLock, () -> {
+                                                
lowWatermark.getLowWatermarkSafe(lwm ->
+                                                        registerIndexesToTable(
+                                                                tbl,
+                                                                catalogService,
+                                                                
singlePartitionIdSet,
+                                                                
tbl.schemaView(),
+                                                                lwm
+                                                        )
+                                                );
+
+                                                
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, false);
+                                            }), ioExecutor)
+                                            // If the table is already closed, 
it's not a problem (probably the node is stopping).
+                                            
.exceptionally(ignoreTableClosedException());
+                                }))
+                                .toArray(CompletableFuture[]::new);
+
+                        return allOf(futures).thenAccept(unused -> 
zoneLock.unlockRead(stamp));
+                    } catch (Throwable t) {
+                        zoneLock.unlockRead(stamp);
 
-                    return allOf(futures);
+                        return failedFuture(t);
+                    }
                 })
                 .thenApply(unused -> false)
         );
@@ -737,11 +759,25 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         ZonePartitionId zonePartitionId = parameters.zonePartitionId();
 
-        CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
-                .map(this::stopTablePartitions)
-                .toArray(CompletableFuture[]::new);
+        StampedLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new 
StampedLock());
+
+        long stamp = zoneLock.readLock();
 
-        return allOf(futures).thenApply(unused -> false);
+        try {
+            CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+                    .map(this::stopTablePartitions)
+                    .toArray(CompletableFuture[]::new);
+
+            return allOf(futures).thenApply(unused -> {
+                zoneLock.unlockRead(stamp);
+
+                return false;
+            });
+        } catch (Throwable t) {
+            zoneLock.unlockRead(stamp);
+
+            return failedFuture(t);
+        }
     }
 
     private CompletableFuture<Boolean> 
onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
@@ -751,21 +787,31 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         ZonePartitionId zonePartitionId = parameters.zonePartitionId();
 
-        return inBusyLockAsync(busyLock, () -> {
-            CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
-                    .map(table -> supplyAsync(
-                            () -> inBusyLockAsync(
-                                    busyLock,
-                                    () -> stopAndDestroyTablePartition(
-                                            new 
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
-                                            parameters.causalityToken()
-                                    )
-                            ),
-                            ioExecutor))
-                    .toArray(CompletableFuture[]::new);
+        StampedLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new 
StampedLock());
 
-            return allOf(futures);
-        }).thenApply((unused) -> false);
+        long stamp = zoneLock.readLock();
+
+        try {
+            return inBusyLockAsync(busyLock, () -> {
+                CompletableFuture<?>[] futures = 
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+                        .map(table -> supplyAsync(
+                                () -> inBusyLockAsync(
+                                        busyLock,
+                                        () -> stopAndDestroyTablePartition(
+                                                new 
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
+                                                parameters.causalityToken()
+                                        )
+                                ),
+                                ioExecutor))
+                        .toArray(CompletableFuture[]::new);
+
+                return allOf(futures);
+            }).thenApply((unused) -> false);
+        } catch (Throwable t) {
+            zoneLock.unlockRead(stamp);
+
+            return failedFuture(t);
+        }
     }
 
     private CompletableFuture<Boolean> 
prepareTableResourcesAndLoadToZoneReplica(CreateTableEventParameters 
parameters) {
@@ -1569,7 +1615,8 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
                     onZoneReplicaDestroyedListener
             );
 
-            
partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED,
+            partitionReplicaLifecycleManager.removeListener(
+                    LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED,
                     onZoneReplicaStoppedListener);
 
             partitionReplicaLifecycleManager.removeListener(
@@ -2784,29 +2831,43 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
 
         int partitions = internalTable.partitions();
 
-        CompletableFuture<?>[] stopReplicaAndDestroyFutures = new 
CompletableFuture<?>[partitions];
+        StampedLock zoneLock = 
tablesPerZoneLocks.computeIfAbsent(internalTable.zoneId(), id -> new 
StampedLock());
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions 
should be stopped on the assignments change
-        //  event triggered by zone drop or alter. Stop replica 
asynchronously, out of metastorage event pipeline.
-        for (int partitionId = 0; partitionId < partitions; partitionId++) {
-            CompletableFuture<Void> resourcesUnloadFuture;
+        long stamp = zoneLock.writeLock();
 
-            if (enabledColocation) {
-                resourcesUnloadFuture = 
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
-                        new ZonePartitionId(internalTable.zoneId(), 
partitionId),
-                        internalTable.tableId()
-                );
-            } else {
-                resourcesUnloadFuture = nullCompletedFuture();
+        try {
+            CompletableFuture<?>[] stopReplicaAndDestroyFutures = new 
CompletableFuture<?>[partitions];
+
+            // TODO https://issues.apache.org/jira/browse/IGNITE-19170 
Partitions should be stopped on the assignments change
+            //  event triggered by zone drop or alter. Stop replica 
asynchronously, out of metastorage event pipeline.
+            for (int partitionId = 0; partitionId < partitions; partitionId++) 
{
+                CompletableFuture<Void> resourcesUnloadFuture;
+
+                if (enabledColocation) {
+                    resourcesUnloadFuture = 
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
+                            new ZonePartitionId(internalTable.zoneId(), 
partitionId),
+                            internalTable.tableId()
+                    );
+                } else {
+                    resourcesUnloadFuture = nullCompletedFuture();
+                }
+
+                var tablePartitionId = new 
TablePartitionId(internalTable.tableId(), partitionId);
+
+                stopReplicaAndDestroyFutures[partitionId] = 
resourcesUnloadFuture
+                        .thenCompose(v -> 
stopAndDestroyTablePartition(tablePartitionId, table));
             }
 
-            var tablePartitionId = new 
TablePartitionId(internalTable.tableId(), partitionId);
+            return allOf(stopReplicaAndDestroyFutures).whenComplete((res, th) 
-> {
+                tablesPerZone.getOrDefault(internalTable.zoneId(), 
emptySet()).remove(table);
 
-            stopReplicaAndDestroyFutures[partitionId] = resourcesUnloadFuture
-                    .thenCompose(v -> 
stopAndDestroyTablePartition(tablePartitionId, table));
-        }
+                zoneLock.unlockWrite(stamp);
+            });
+        } catch (Throwable t) {
+            zoneLock.unlockWrite(stamp);
 
-        return allOf(stopReplicaAndDestroyFutures);
+            throw t;
+        }
     }
 
     private CompletableFuture<Void> 
stopAndDestroyTablePartition(TablePartitionId tablePartitionId, long 
causalityToken) {
@@ -3182,25 +3243,58 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         this.streamerReceiverRunner = runner;
     }
 
+    /**
+     * Returns a copy of tables that belong to the specified zone.
+     *
+     * @param zoneId Zone identifier.
+     * @return Set of tables.
+     */
     public Set<TableImpl> zoneTables(int zoneId) {
-        return Set.copyOf(zoneTablesRawSet(zoneId));
+        StampedLock zoneLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id 
-> new StampedLock());
+
+        long stamp = zoneLock.readLock();
+
+        try {
+            return Set.copyOf(zoneTablesRawSet(zoneId));
+        } finally {
+            zoneLock.unlockRead(stamp);
+        }
     }
 
+    /**
+     * Returns a set of tables that belong to the specified zone.
+     * Note that this method call should be properly synchronized using {@link 
#tablesPerZoneLocks}.
+     *
+     * @param zoneId Zone identifier.
+     * @return Set of tables.
+     */
     private Set<TableImpl> zoneTablesRawSet(int zoneId) {
         return tablesPerZone.getOrDefault(zoneId, Set.of());
     }
 
+    /**
+     * Adds a table to the specified zone.
+     *
+     * @param zoneId Zone identifier.
+     * @param table Table to add.
+     */
     private void addTableToZone(int zoneId, TableImpl table) {
-        tablesPerZone.compute(zoneId, (id, tbls) -> {
-            if (tbls == null) {
-                // Using a concurrent set as a value as it can be read (and 
iterated over) without any synchronization by external callers.
-                tbls = ConcurrentHashMap.newKeySet();
-            }
+        StampedLock rwLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id -> 
new StampedLock());
+
+        long stamp = rwLock.writeLock();
+        try {
+            tablesPerZone.compute(zoneId, (id, tables) -> {
+                if (tables == null) {
+                    tables = new HashSet<>();
+                }
 
-            tbls.add(table);
+                tables.add(table);
 
-            return tbls;
-        });
+                return tables;
+            });
+        } finally {
+            rwLock.unlockWrite(stamp);
+        }
     }
 
     private static class TableClosedException extends IgniteInternalException {

Reply via email to