This is an automated email from the ASF dual-hosted git repository.

sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new ba9ec92e6a IGNITE-22950 Implement local events about zone replica stop 
in PartitionReplicaLifecycleManager.  (#4459)
ba9ec92e6a is described below

commit ba9ec92e6a00126690e92ed806966c46acc26ee1
Author: Kirill Gusakov <[email protected]>
AuthorDate: Mon Oct 7 16:50:20 2024 +0300

    IGNITE-22950 Implement local events about zone replica stop in 
PartitionReplicaLifecycleManager.  (#4459)
---
 .../replicator/ItReplicaLifecycleTest.java         | 105 ++++++++++++++++++++-
 .../replicator/LocalPartitionReplicaEvent.java     |   7 +-
 ...a => LocalPartitionReplicaEventParameters.java} |   4 +-
 .../PartitionReplicaLifecycleManager.java          |  53 ++++++++---
 .../replicator/ZonePartitionReplicaListener.java   |  15 +++
 .../replicator/ZonePartitionReplicaImpl.java       |   2 +
 .../internal/table/distributed/TableManager.java   |  36 ++++++-
 7 files changed, 199 insertions(+), 23 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 84f863723b..4f2200afd8 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -44,8 +44,13 @@ import static org.apache.ignite.sql.ColumnType.INT64;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
 import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
 
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
@@ -79,6 +84,8 @@ import org.apache.ignite.internal.app.ThreadPoolsManager;
 import org.apache.ignite.internal.catalog.CatalogManager;
 import org.apache.ignite.internal.catalog.CatalogManagerImpl;
 import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
 import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
 import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
 import org.apache.ignite.internal.cluster.management.ClusterInitializer;
@@ -167,11 +174,14 @@ import 
org.apache.ignite.internal.storage.DataStorageManager;
 import org.apache.ignite.internal.storage.DataStorageModules;
 import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
 import 
org.apache.ignite.internal.storage.configurations.StorageExtensionConfigurationSchema;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
 import 
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryDataStorageModule;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineExtensionConfigurationSchema;
 import 
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineExtensionConfigurationSchema;
+import org.apache.ignite.internal.table.InternalTable;
 import org.apache.ignite.internal.table.StreamerReceiverRunner;
 import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.TableViewInternal;
 import org.apache.ignite.internal.table.distributed.TableManager;
 import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
 import 
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -197,12 +207,14 @@ import 
org.apache.ignite.internal.tx.impl.TransactionInflights;
 import org.apache.ignite.internal.tx.impl.TxManagerImpl;
 import org.apache.ignite.internal.tx.message.TxMessageGroup;
 import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
 import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
 import org.apache.ignite.internal.vault.VaultManager;
 import org.apache.ignite.network.NetworkAddress;
 import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
 import org.apache.ignite.sql.IgniteSql;
 import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
 import org.apache.ignite.tx.IgniteTransactions;
 import org.junit.jupiter.api.AfterAll;
 import org.junit.jupiter.api.AfterEach;
@@ -551,7 +563,6 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
     }
 
     @Test
-    @Disabled("https://issues.apache.org/jira/browse/IGNITE-22858";)
     void testAlterFilterTrigger(TestInfo testInfo) throws Exception {
         startNodes(testInfo, 3);
 
@@ -622,6 +633,48 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
         ));
     }
 
+    @Test
+    void testTableReplicaListenersRemoveAfterRebalance(TestInfo testInfo) 
throws Exception {
+        String zoneName = "TEST_ZONE";
+        String tableName = "TEST_TABLE";
+
+        startNodes(testInfo, 3);
+
+        Assignment replicaAssignment = (Assignment) 
calculateAssignmentForPartition(
+                nodes.values().stream().map(n -> 
n.name).collect(Collectors.toList()), 0, 3).toArray()[0];
+
+        Node node = getNode(replicaAssignment.consistentId());
+
+        
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+
+        DistributionZonesTestUtil.createZone(node.catalogManager, zoneName, 1, 
3);
+
+        int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager, 
zoneName, node.hybridClock.nowLong());
+
+        assertTrue(waitForCondition(() -> assertTableListenersCount(node, 
zoneId, 0), 10_000L));
+
+        createTable(node, zoneName, tableName);
+
+        assertTrue(waitForCondition(
+                () -> IntStream.range(0, 3).allMatch(i -> 
getNode(i).tableManager.table(tableName) != null),
+                30_000L
+        ));
+
+        assertTrue(waitForCondition(
+                () -> IntStream.range(0, 3).allMatch(i -> 
assertTableListenersCount(getNode(i), zoneId, 1)),
+                30_000L
+        ));
+
+        nodes.values().forEach(n -> checkNoDestroyPartitionStoragesInvokes(n, 
tableName, 0));
+
+        alterZone(node.catalogManager, zoneName, 1);
+
+        nodes.values().stream().filter(n -> 
!replicaAssignment.consistentId().equals(n.name)).forEach(n -> {
+            checkDestroyPartitionStoragesInvokes(n, tableName, 0);
+        });
+
+    }
+
     @Test
     void testReplicaIsStartedOnNodeStart(TestInfo testInfo) throws Exception {
         startNodes(testInfo, 3);
@@ -923,6 +976,10 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
 
         private final HybridTimestampTracker observableTimestampTracker = new 
HybridTimestampTracker();
 
+        private volatile MvTableStorage mvTableStorage;
+
+        private volatile TxStateTableStorage txStateTableStorage;
+
         /**
          * Constructor that simply creates a subset of components of this node.
          */
@@ -1271,7 +1328,25 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     logSyncer,
                     partitionReplicaLifecycleManager,
                     minTimeCollectorService
-            );
+            ) {
+
+                @Override
+                protected MvTableStorage 
createTableStorage(CatalogTableDescriptor tableDescriptor, 
CatalogZoneDescriptor zoneDescriptor) {
+                    mvTableStorage = 
spy(super.createTableStorage(tableDescriptor, zoneDescriptor));
+
+                    return mvTableStorage;
+                }
+
+                @Override
+                protected TxStateTableStorage createTxStateTableStorage(
+                        CatalogTableDescriptor tableDescriptor,
+                        CatalogZoneDescriptor zoneDescriptor
+                ) {
+                    txStateTableStorage = 
spy(super.createTxStateTableStorage(tableDescriptor, zoneDescriptor));
+
+                    return txStateTableStorage;
+                }
+            };
 
             
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
 
@@ -1414,4 +1489,30 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
             throw new RuntimeException(e);
         }
     }
+
+    private static InternalTable getInternalTable(Node node, String tableName) 
{
+        Table table = node.tableManager.table(tableName);
+
+        assertNotNull(table, tableName);
+
+        return ((TableViewInternal) table).internalTable();
+    }
+
+    private static void checkNoDestroyPartitionStoragesInvokes(Node node, 
String tableName, int partitionId) {
+        InternalTable internalTable = getInternalTable(node, tableName);
+
+        verify(internalTable.storage(), never())
+                .destroyPartition(partitionId);
+        verify(internalTable.txStateStorage(), never())
+                .destroyTxStateStorage(partitionId);
+    }
+
+    private static void checkDestroyPartitionStoragesInvokes(Node node, String 
tableName, int partitionId) {
+        InternalTable internalTable = getInternalTable(node, tableName);
+
+        verify(internalTable.storage(), 
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))
+                .destroyPartition(partitionId);
+        verify(internalTable.txStateStorage(), 
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))
+                .destroyTxStateStorage(partitionId);
+    }
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
index 933f528032..127f7a856b 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
@@ -26,5 +26,10 @@ public enum LocalPartitionReplicaEvent implements Event {
     /**
      * Fired when partition replica has started.
      */
-    AFTER_REPLICA_STARTED
+    AFTER_REPLICA_STARTED,
+
+    /**
+     * Fired when partition replica has stopped.
+     */
+    AFTER_REPLICA_STOPPED
 }
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java
similarity index 90%
rename from 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
rename to 
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java
index 762c015b67..57889857a5 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.replicator.ZonePartitionId;
 /**
  * Parameters for the events about zone partition replicas produced by {@link 
PartitionReplicaLifecycleManager}.
  */
-public class PartitionReplicaEventParameters implements EventParameters {
+public class LocalPartitionReplicaEventParameters implements EventParameters {
     /** Zone partition id. */
     private final ZonePartitionId zonePartitionId;
 
@@ -32,7 +32,7 @@ public class PartitionReplicaEventParameters implements 
EventParameters {
      *
      * @param zonePartitionId Zone partition id.
      */
-    public PartitionReplicaEventParameters(ZonePartitionId zonePartitionId) {
+    public LocalPartitionReplicaEventParameters(ZonePartitionId 
zonePartitionId) {
         this.zonePartitionId = zonePartitionId;
     }
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 9457884705..f359355777 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -140,7 +140,7 @@ import org.jetbrains.annotations.Nullable;
  * - Support the rebalance mechanism and start the new replication nodes when 
the rebalance triggers occurred.
  */
 public class PartitionReplicaLifecycleManager  extends
-        AbstractEventProducer<LocalPartitionReplicaEvent, 
PartitionReplicaEventParameters> implements IgniteComponent {
+        AbstractEventProducer<LocalPartitionReplicaEvent, 
LocalPartitionReplicaEventParameters> implements IgniteComponent {
     public static final String FEATURE_FLAG_NAME = 
"IGNITE_ZONE_BASED_REPLICATION";
     /* Feature flag for zone based collocation track */
     // TODO IGNITE-22115 remove it
@@ -170,7 +170,7 @@ public class PartitionReplicaLifecycleManager  extends
     /** The logger. */
     private static final IgniteLogger LOG = 
Loggers.forClass(PartitionReplicaLifecycleManager.class);
 
-    private final Set<ReplicationGroupId> replicationGroupIds = 
ConcurrentHashMap.newKeySet();
+    private final Set<ZonePartitionId> replicationGroupIds = 
ConcurrentHashMap.newKeySet();
 
     /** (zoneId -> lock) map to provide concurrent access to the zone replicas 
list. */
     private final Map<Integer, StampedLock> zonePartitionsLocks = new 
ConcurrentHashMap<>();
@@ -487,7 +487,7 @@ public class PartitionReplicaLifecycleManager  extends
 
                             return fireEvent(
                                     
LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
-                                    new PartitionReplicaEventParameters(
+                                    new LocalPartitionReplicaEventParameters(
                                             new 
ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId())
                                     )
                             );
@@ -935,7 +935,7 @@ public class PartitionReplicaLifecycleManager  extends
         }
     }
 
-    private CompletableFuture<?> stopAndDestroyPartition(ReplicationGroupId 
zonePartitionId) {
+    private CompletableFuture<?> stopAndDestroyPartition(ZonePartitionId 
zonePartitionId) {
         return weakStopPartition(zonePartitionId);
     }
 
@@ -1239,17 +1239,13 @@ public class PartitionReplicaLifecycleManager  extends
         return nullCompletedFuture();
     }
 
-    private static String zoneInfo(CatalogZoneDescriptor zoneDescriptor) {
-        return zoneDescriptor.id() + "/" + zoneDescriptor.name();
-    }
-
     private @Nullable Assignments stableAssignments(ZonePartitionId 
zonePartitionId, long revision) {
         Entry entry = 
metaStorageMgr.getLocally(stablePartAssignmentsKey(zonePartitionId), revision);
 
         return Assignments.fromBytes(entry.value());
     }
 
-    private CompletableFuture<Void> weakStopPartition(ReplicationGroupId 
zonePartitionId) {
+    private CompletableFuture<Void> weakStopPartition(ZonePartitionId 
zonePartitionId) {
         return replicaMgr.weakStopReplica(
                 zonePartitionId,
                 WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS,
@@ -1263,14 +1259,41 @@ public class PartitionReplicaLifecycleManager  extends
      * @param zonePartitionId Partition ID.
      * @return Future that will be completed after all resources have been 
closed.
      */
-    private CompletableFuture<?> stopPartition(ReplicationGroupId 
zonePartitionId) {
-        CompletableFuture<Boolean> stopReplicaFuture;
+    private CompletableFuture<?> stopPartition(ZonePartitionId 
zonePartitionId) {
+        CompletableFuture<?> stopReplicaFuture;
+
+        AtomicReference<Long> stamp = new AtomicReference<>(null);
 
         try {
-            stopReplicaFuture = replicaMgr.stopReplica(zonePartitionId);
+            zonePartitionsLocks.compute(zonePartitionId.zoneId(), (id, lock) 
-> {
+                if (lock == null) {
+                    lock = new StampedLock();
+                }
+
+                stamp.set(lock.writeLock());
+
+                return lock;
+            });
+
+            stopReplicaFuture = replicaMgr.stopReplica(zonePartitionId)
+                    .thenCompose((replicaWasStopped) -> {
+                        if (replicaWasStopped) {
+                            replicationGroupIds.remove(zonePartitionId);
+
+                            return 
fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, new 
LocalPartitionReplicaEventParameters(
+                                    zonePartitionId));
+                        } else {
+                            return nullCompletedFuture();
+                        }
+                    }).whenComplete((result, th) -> {
+                        
zonePartitionsLocks.get(zonePartitionId.zoneId()).unlockWrite(stamp.get());
+                    });
+
         } catch (NodeStoppingException e) {
             // No-op.
             stopReplicaFuture = falseCompletedFuture();
+
+            
zonePartitionsLocks.get(zonePartitionId.zoneId()).unlockWrite(stamp.get());
         }
 
         return stopReplicaFuture;
@@ -1281,7 +1304,7 @@ public class PartitionReplicaLifecycleManager  extends
      *
      * @param partitionIds Partitions to stop.
      */
-    private void cleanUpPartitionsResources(Set<ReplicationGroupId> 
partitionIds) {
+    private void cleanUpPartitionsResources(Set<ZonePartitionId> partitionIds) 
{
         CompletableFuture<Void> future = runAsync(() -> {
             Stream.Builder<ManuallyCloseable> stopping = Stream.builder();
 
@@ -1290,7 +1313,7 @@ public class PartitionReplicaLifecycleManager  extends
 
                 int i = 0;
 
-                for (ReplicationGroupId partitionId : partitionIds) {
+                for (ZonePartitionId partitionId : partitionIds) {
                     stopReplicaFutures[i++] = stopPartition(partitionId);
                 }
 
@@ -1300,7 +1323,7 @@ public class PartitionReplicaLifecycleManager  extends
             try {
                 IgniteUtils.closeAllManually(stopping.build());
             } catch (Throwable t) {
-                LOG.error("Unable to stop partition");
+                LOG.error("Unable to stop partition.", t);
             }
         }, ioExecutor);
 
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 9d355fbee6..20b2bae499 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -132,4 +132,19 @@ public class ZonePartitionReplicaListener implements 
ReplicaListener {
     public Map<TablePartitionId, ReplicaListener> tableReplicaListeners() {
         return replicas;
     }
+
+    @Override
+    public void onShutdown() {
+        replicas.forEach((id, listener) -> {
+                    try {
+                        listener.onShutdown();
+                    } catch (Throwable th) {
+                        LOG.error("Error during table partition listener stop 
for [tableId="
+                                        + id.tableId() + ", partitionId=" + 
id.partitionId() + "].",
+                                th
+                        );
+                    }
+                }
+        );
+    }
 }
diff --git 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
index da0b848410..a052e72e89 100644
--- 
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
+++ 
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
@@ -82,6 +82,8 @@ public class ZonePartitionReplicaImpl implements Replica {
 
     @Override
     public CompletableFuture<Void> shutdown() {
+        listener.onShutdown();
+
         return nullCompletedFuture();
     }
 
diff --git 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 1eeef4c6b6..189946c6b8 100644
--- 
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++ 
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -138,7 +138,7 @@ import org.apache.ignite.internal.network.MessagingService;
 import org.apache.ignite.internal.network.TopologyService;
 import 
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
 import 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent;
-import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaEventParameters;
+import 
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
 import 
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
 import org.apache.ignite.internal.partitiondistribution.Assignment;
 import org.apache.ignite.internal.partitiondistribution.Assignments;
@@ -596,6 +596,12 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
                 LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
                 this::onZoneReplicaCreated
         );
+
+        partitionReplicaLifecycleManager.listen(
+                LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED,
+                this::onZoneReplicaStopped
+        );
+
     }
 
     @Override
@@ -661,7 +667,7 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         return 
executorInclinedSchemaSyncService.waitForMetadataCompleteness(HybridTimestamp.hybridTimestamp(ts));
     }
 
-    private CompletableFuture<Boolean> 
onZoneReplicaCreated(PartitionReplicaEventParameters parameters) {
+    private CompletableFuture<Boolean> 
onZoneReplicaCreated(LocalPartitionReplicaEventParameters parameters) {
         if (!PartitionReplicaLifecycleManager.ENABLED) {
             return completedFuture(false);
         }
@@ -694,6 +700,31 @@ public class TableManager implements IgniteTablesInternal, 
IgniteComponent {
         });
     }
 
+    private CompletableFuture<Boolean> 
onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
+        if (!PartitionReplicaLifecycleManager.ENABLED) {
+            return completedFuture(false);
+        }
+
+        return inBusyLockAsync(busyLock, () -> supplyAsync(() -> {
+            List<CompletableFuture<?>> futs = new ArrayList<>();
+
+            Set<TableImpl> zoneTables = 
zoneTables(parameters.zonePartitionId().zoneId());
+
+            zoneTables.forEach(table -> {
+                closePartitionTrackers(table.internalTable(), 
parameters.zonePartitionId().partitionId());
+
+                TablePartitionId tablePartitionId = new 
TablePartitionId(table.tableId(), parameters.zonePartitionId().partitionId());
+
+                mvGc.removeStorage(tablePartitionId);
+
+                futs.add(destroyPartitionStorages(tablePartitionId, table));
+            });
+
+            return allOf(futs.toArray(new CompletableFuture[]{}));
+        }, ioExecutor).thenCompose(identity())).thenApply((unused) -> false);
+    }
+
+
     private CompletableFuture<Boolean> 
prepareTableResourcesAndLoadToZoneReplica(CreateTableEventParameters 
parameters) {
         if (!PartitionReplicaLifecycleManager.ENABLED) {
             return completedFuture(false);
@@ -1754,7 +1785,6 @@ public class TableManager implements 
IgniteTablesInternal, IgniteComponent {
         InternalTable internalTable = table.internalTable();
         int partitions = internalTable.partitions();
 
-        // TODO https://issues.apache.org/jira/browse/IGNITE-22950 Move 
assigment manipulations to Distribution zones.
         Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
                 .mapToObj(p -> stablePartAssignmentsKey(new 
TablePartitionId(tableId, p)))
                 .collect(toSet());

Reply via email to