This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 765c88d7566 IGNITE-25260 Fix an issue on dropping a table when the
colocation is enabled (#5774)
765c88d7566 is described below
commit 765c88d756637fcda9f235427ba120c92bd51d9a
Author: Slava Koptilin <[email protected]>
AuthorDate: Mon May 12 14:45:28 2025 +0300
IGNITE-25260 Fix an issue on dropping a table when the colocation is
enabled (#5774)
---
.../PartitionReplicaLifecycleManager.java | 1 +
.../internal/table/distributed/TableManager.java | 238 ++++++++++++++-------
2 files changed, 167 insertions(+), 72 deletions(-)
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 4137e53d020..c02fa7af934 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -200,6 +200,7 @@ public class PartitionReplicaLifecycleManager extends
private final Set<ZonePartitionId> replicationGroupIds =
ConcurrentHashMap.newKeySet();
+ // TODO https://issues.apache.org/jira/browse/IGNITE-25347
/** (zoneId -> lock) map to provide concurrent access to the zone replicas
list. */
private final Map<Integer, NaiveAsyncReadWriteLock> zonePartitionsLocks =
new ConcurrentHashMap<>();
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 7406cba00ec..99190c42f25 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -68,6 +68,8 @@ import static
org.apache.ignite.lang.ErrorGroups.Common.NODE_STOPPING_ERR;
import it.unimi.dsi.fastutil.ints.IntOpenHashSet;
import java.util.ArrayList;
import java.util.Collection;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Objects;
@@ -85,6 +87,7 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.StampedLock;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Function;
@@ -428,7 +431,11 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
private final CompletableFuture<Void> readyToProcessReplicaStarts = new
CompletableFuture<>();
- private final Map<Integer, Set<TableImpl>> tablesPerZone = new
ConcurrentHashMap<>();
+ // TODO https://issues.apache.org/jira/browse/IGNITE-25347
+ /** Mapping zone identifier to a collection of tables related to the zone.
*/
+ private final Map<Integer, Set<TableImpl>> tablesPerZone = new HashMap<>();
+ /** Locks to synchronize an access to the {@link #tablesPerZone}. */
+ private final Map<Integer, StampedLock> tablesPerZoneLocks = new
ConcurrentHashMap<>();
/** Configuration of rebalance retries delay. */
private final SystemDistributedConfigurationPropertyHolder<Integer>
rebalanceRetryDelayConfiguration;
@@ -689,33 +696,48 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
.thenCompose(v -> {
ZonePartitionId zonePartitionId =
parameters.zonePartitionId();
- Set<TableImpl> zoneTables =
zoneTablesRawSet(zonePartitionId.zoneId());
-
- int partitionIndex = zonePartitionId.partitionId();
- PartitionSet singlePartitionIdSet =
PartitionSet.of(partitionIndex);
-
- CompletableFuture<?>[] futures = zoneTables.stream()
- .map(tbl -> inBusyLockAsync(busyLock, () -> {
- return getOrCreatePartitionStorages(tbl,
singlePartitionIdSet)
- .thenRun(() -> {
- localPartsByTableId.compute(
- tbl.tableId(),
- (tableId, oldPartitionSet)
-> extendPartitionSet(oldPartitionSet, partitionIndex)
- );
- })
- .thenRunAsync(() ->
inBusyLock(busyLock, () -> {
-
lowWatermark.getLowWatermarkSafe(lwm ->
-
registerIndexesToTable(tbl, catalogService, singlePartitionIdSet,
tbl.schemaView(), lwm)
- );
-
-
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, false);
- }), ioExecutor)
- // If the table is already closed,
it's not a problem (probably the node is stopping).
-
.exceptionally(ignoreTableClosedException());
- }))
- .toArray(CompletableFuture[]::new);
+ StampedLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new
StampedLock());
+ long stamp = zoneLock.readLock();
+
+ try {
+ Set<TableImpl> zoneTables =
zoneTablesRawSet(zonePartitionId.zoneId());
+
+ int partitionIndex = zonePartitionId.partitionId();
+ PartitionSet singlePartitionIdSet =
PartitionSet.of(partitionIndex);
+
+ CompletableFuture<?>[] futures = zoneTables.stream()
+ .map(tbl -> inBusyLockAsync(busyLock, () -> {
+ return getOrCreatePartitionStorages(tbl,
singlePartitionIdSet)
+ .thenRun(() -> {
+ localPartsByTableId.compute(
+ tbl.tableId(),
+ (tableId,
oldPartitionSet) -> extendPartitionSet(oldPartitionSet, partitionIndex)
+ );
+ })
+ .thenRunAsync(() ->
inBusyLock(busyLock, () -> {
+
lowWatermark.getLowWatermarkSafe(lwm ->
+ registerIndexesToTable(
+ tbl,
+ catalogService,
+
singlePartitionIdSet,
+
tbl.schemaView(),
+ lwm
+ )
+ );
+
+
preparePartitionResourcesAndLoadToZoneReplica(tbl, zonePartitionId, false);
+ }), ioExecutor)
+ // If the table is already closed,
it's not a problem (probably the node is stopping).
+
.exceptionally(ignoreTableClosedException());
+ }))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures).thenAccept(unused ->
zoneLock.unlockRead(stamp));
+ } catch (Throwable t) {
+ zoneLock.unlockRead(stamp);
- return allOf(futures);
+ return failedFuture(t);
+ }
})
.thenApply(unused -> false)
);
@@ -737,11 +759,25 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
ZonePartitionId zonePartitionId = parameters.zonePartitionId();
- CompletableFuture<?>[] futures =
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
- .map(this::stopTablePartitions)
- .toArray(CompletableFuture[]::new);
+ StampedLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new
StampedLock());
+
+ long stamp = zoneLock.readLock();
- return allOf(futures).thenApply(unused -> false);
+ try {
+ CompletableFuture<?>[] futures =
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+ .map(this::stopTablePartitions)
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures).thenApply(unused -> {
+ zoneLock.unlockRead(stamp);
+
+ return false;
+ });
+ } catch (Throwable t) {
+ zoneLock.unlockRead(stamp);
+
+ return failedFuture(t);
+ }
}
private CompletableFuture<Boolean>
onZoneReplicaDestroyed(LocalPartitionReplicaEventParameters parameters) {
@@ -751,21 +787,31 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
ZonePartitionId zonePartitionId = parameters.zonePartitionId();
- return inBusyLockAsync(busyLock, () -> {
- CompletableFuture<?>[] futures =
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
- .map(table -> supplyAsync(
- () -> inBusyLockAsync(
- busyLock,
- () -> stopAndDestroyTablePartition(
- new
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
- parameters.causalityToken()
- )
- ),
- ioExecutor))
- .toArray(CompletableFuture[]::new);
+ StampedLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(zonePartitionId.zoneId(), id -> new
StampedLock());
- return allOf(futures);
- }).thenApply((unused) -> false);
+ long stamp = zoneLock.readLock();
+
+ try {
+ return inBusyLockAsync(busyLock, () -> {
+ CompletableFuture<?>[] futures =
zoneTablesRawSet(zonePartitionId.zoneId()).stream()
+ .map(table -> supplyAsync(
+ () -> inBusyLockAsync(
+ busyLock,
+ () -> stopAndDestroyTablePartition(
+ new
TablePartitionId(table.tableId(), zonePartitionId.partitionId()),
+ parameters.causalityToken()
+ )
+ ),
+ ioExecutor))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(futures);
+ }).thenApply((unused) -> false);
+ } catch (Throwable t) {
+ zoneLock.unlockRead(stamp);
+
+ return failedFuture(t);
+ }
}
private CompletableFuture<Boolean>
prepareTableResourcesAndLoadToZoneReplica(CreateTableEventParameters
parameters) {
@@ -1569,7 +1615,8 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
onZoneReplicaDestroyedListener
);
-
partitionReplicaLifecycleManager.removeListener(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED,
+ partitionReplicaLifecycleManager.removeListener(
+ LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED,
onZoneReplicaStoppedListener);
partitionReplicaLifecycleManager.removeListener(
@@ -2784,29 +2831,43 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
int partitions = internalTable.partitions();
- CompletableFuture<?>[] stopReplicaAndDestroyFutures = new
CompletableFuture<?>[partitions];
+ StampedLock zoneLock =
tablesPerZoneLocks.computeIfAbsent(internalTable.zoneId(), id -> new
StampedLock());
- // TODO https://issues.apache.org/jira/browse/IGNITE-19170 Partitions
should be stopped on the assignments change
- // event triggered by zone drop or alter. Stop replica
asynchronously, out of metastorage event pipeline.
- for (int partitionId = 0; partitionId < partitions; partitionId++) {
- CompletableFuture<Void> resourcesUnloadFuture;
+ long stamp = zoneLock.writeLock();
- if (enabledColocation) {
- resourcesUnloadFuture =
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
- new ZonePartitionId(internalTable.zoneId(),
partitionId),
- internalTable.tableId()
- );
- } else {
- resourcesUnloadFuture = nullCompletedFuture();
+ try {
+ CompletableFuture<?>[] stopReplicaAndDestroyFutures = new
CompletableFuture<?>[partitions];
+
+ // TODO https://issues.apache.org/jira/browse/IGNITE-19170
Partitions should be stopped on the assignments change
+ // event triggered by zone drop or alter. Stop replica
asynchronously, out of metastorage event pipeline.
+ for (int partitionId = 0; partitionId < partitions; partitionId++)
{
+ CompletableFuture<Void> resourcesUnloadFuture;
+
+ if (enabledColocation) {
+ resourcesUnloadFuture =
partitionReplicaLifecycleManager.unloadTableResourcesFromZoneReplica(
+ new ZonePartitionId(internalTable.zoneId(),
partitionId),
+ internalTable.tableId()
+ );
+ } else {
+ resourcesUnloadFuture = nullCompletedFuture();
+ }
+
+ var tablePartitionId = new
TablePartitionId(internalTable.tableId(), partitionId);
+
+ stopReplicaAndDestroyFutures[partitionId] =
resourcesUnloadFuture
+ .thenCompose(v ->
stopAndDestroyTablePartition(tablePartitionId, table));
}
- var tablePartitionId = new
TablePartitionId(internalTable.tableId(), partitionId);
+ return allOf(stopReplicaAndDestroyFutures).whenComplete((res, th)
-> {
+ tablesPerZone.getOrDefault(internalTable.zoneId(),
emptySet()).remove(table);
- stopReplicaAndDestroyFutures[partitionId] = resourcesUnloadFuture
- .thenCompose(v ->
stopAndDestroyTablePartition(tablePartitionId, table));
- }
+ zoneLock.unlockWrite(stamp);
+ });
+ } catch (Throwable t) {
+ zoneLock.unlockWrite(stamp);
- return allOf(stopReplicaAndDestroyFutures);
+ throw t;
+ }
}
private CompletableFuture<Void>
stopAndDestroyTablePartition(TablePartitionId tablePartitionId, long
causalityToken) {
@@ -3182,25 +3243,58 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
this.streamerReceiverRunner = runner;
}
+ /**
+ * Returns a copy of tables that belong to the specified zone.
+ *
+ * @param zoneId Zone identifier.
+ * @return Set of tables.
+ */
public Set<TableImpl> zoneTables(int zoneId) {
- return Set.copyOf(zoneTablesRawSet(zoneId));
+ StampedLock zoneLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id
-> new StampedLock());
+
+ long stamp = zoneLock.readLock();
+
+ try {
+ return Set.copyOf(zoneTablesRawSet(zoneId));
+ } finally {
+ zoneLock.unlockRead(stamp);
+ }
}
+ /**
+ * Returns a set of tables that belong to the specified zone.
+ * Note that this method call should be properly synchronized using {@link
#tablesPerZoneLocks}.
+ *
+ * @param zoneId Zone identifier.
+ * @return Set of tables.
+ */
private Set<TableImpl> zoneTablesRawSet(int zoneId) {
return tablesPerZone.getOrDefault(zoneId, Set.of());
}
+ /**
+ * Adds a table to the specified zone.
+ *
+ * @param zoneId Zone identifier.
+ * @param table Table to add.
+ */
private void addTableToZone(int zoneId, TableImpl table) {
- tablesPerZone.compute(zoneId, (id, tbls) -> {
- if (tbls == null) {
- // Using a concurrent set as a value as it can be read (and
iterated over) without any synchronization by external callers.
- tbls = ConcurrentHashMap.newKeySet();
- }
+ StampedLock rwLock = tablesPerZoneLocks.computeIfAbsent(zoneId, id ->
new StampedLock());
+
+ long stamp = rwLock.writeLock();
+ try {
+ tablesPerZone.compute(zoneId, (id, tables) -> {
+ if (tables == null) {
+ tables = new HashSet<>();
+ }
- tbls.add(table);
+ tables.add(table);
- return tbls;
- });
+ return tables;
+ });
+ } finally {
+ rwLock.unlockWrite(stamp);
+ }
}
private static class TableClosedException extends IgniteInternalException {