This is an automated email from the ASF dual-hosted git repository.
sanpwc pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new ba9ec92e6a IGNITE-22950 Implement local events about zone replica stop
in PartitionReplicaLifecycleManager. (#4459)
ba9ec92e6a is described below
commit ba9ec92e6a00126690e92ed806966c46acc26ee1
Author: Kirill Gusakov <[email protected]>
AuthorDate: Mon Oct 7 16:50:20 2024 +0300
IGNITE-22950 Implement local events about zone replica stop in
PartitionReplicaLifecycleManager. (#4459)
---
.../replicator/ItReplicaLifecycleTest.java | 105 ++++++++++++++++++++-
.../replicator/LocalPartitionReplicaEvent.java | 7 +-
...a => LocalPartitionReplicaEventParameters.java} | 4 +-
.../PartitionReplicaLifecycleManager.java | 53 ++++++++---
.../replicator/ZonePartitionReplicaListener.java | 15 +++
.../replicator/ZonePartitionReplicaImpl.java | 2 +
.../internal/table/distributed/TableManager.java | 36 ++++++-
7 files changed, 199 insertions(+), 23 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 84f863723b..4f2200afd8 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -44,8 +44,13 @@ import static org.apache.ignite.sql.ColumnType.INT64;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
+import static org.mockito.Mockito.spy;
+import static org.mockito.Mockito.timeout;
+import static org.mockito.Mockito.verify;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
@@ -79,6 +84,8 @@ import org.apache.ignite.internal.app.ThreadPoolsManager;
import org.apache.ignite.internal.catalog.CatalogManager;
import org.apache.ignite.internal.catalog.CatalogManagerImpl;
import org.apache.ignite.internal.catalog.commands.ColumnParams;
+import org.apache.ignite.internal.catalog.descriptors.CatalogTableDescriptor;
+import org.apache.ignite.internal.catalog.descriptors.CatalogZoneDescriptor;
import org.apache.ignite.internal.catalog.storage.UpdateLogImpl;
import org.apache.ignite.internal.cluster.management.ClusterIdHolder;
import org.apache.ignite.internal.cluster.management.ClusterInitializer;
@@ -167,11 +174,14 @@ import
org.apache.ignite.internal.storage.DataStorageManager;
import org.apache.ignite.internal.storage.DataStorageModules;
import org.apache.ignite.internal.storage.configurations.StorageConfiguration;
import
org.apache.ignite.internal.storage.configurations.StorageExtensionConfigurationSchema;
+import org.apache.ignite.internal.storage.engine.MvTableStorage;
import
org.apache.ignite.internal.storage.pagememory.PersistentPageMemoryDataStorageModule;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.PersistentPageMemoryStorageEngineExtensionConfigurationSchema;
import
org.apache.ignite.internal.storage.pagememory.configuration.schema.VolatilePageMemoryStorageEngineExtensionConfigurationSchema;
+import org.apache.ignite.internal.table.InternalTable;
import org.apache.ignite.internal.table.StreamerReceiverRunner;
import org.apache.ignite.internal.table.TableTestUtils;
+import org.apache.ignite.internal.table.TableViewInternal;
import org.apache.ignite.internal.table.distributed.TableManager;
import org.apache.ignite.internal.table.distributed.index.IndexMetaStorage;
import
org.apache.ignite.internal.table.distributed.raft.MinimumRequiredTimeCollectorService;
@@ -197,12 +207,14 @@ import
org.apache.ignite.internal.tx.impl.TransactionInflights;
import org.apache.ignite.internal.tx.impl.TxManagerImpl;
import org.apache.ignite.internal.tx.message.TxMessageGroup;
import org.apache.ignite.internal.tx.message.WriteIntentSwitchReplicaRequest;
+import org.apache.ignite.internal.tx.storage.state.TxStateTableStorage;
import org.apache.ignite.internal.tx.test.TestLocalRwTxCounter;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.network.NetworkAddress;
import org.apache.ignite.raft.jraft.rpc.impl.RaftGroupEventsClientListener;
import org.apache.ignite.sql.IgniteSql;
import org.apache.ignite.table.KeyValueView;
+import org.apache.ignite.table.Table;
import org.apache.ignite.tx.IgniteTransactions;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -551,7 +563,6 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
}
@Test
- @Disabled("https://issues.apache.org/jira/browse/IGNITE-22858")
void testAlterFilterTrigger(TestInfo testInfo) throws Exception {
startNodes(testInfo, 3);
@@ -622,6 +633,48 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
));
}
+ @Test
+ void testTableReplicaListenersRemoveAfterRebalance(TestInfo testInfo)
throws Exception {
+ String zoneName = "TEST_ZONE";
+ String tableName = "TEST_TABLE";
+
+ startNodes(testInfo, 3);
+
+ Assignment replicaAssignment = (Assignment)
calculateAssignmentForPartition(
+ nodes.values().stream().map(n ->
n.name).collect(Collectors.toList()), 0, 3).toArray()[0];
+
+ Node node = getNode(replicaAssignment.consistentId());
+
+
placementDriver.setPrimary(node.clusterService.topologyService().localMember());
+
+ DistributionZonesTestUtil.createZone(node.catalogManager, zoneName, 1,
3);
+
+ int zoneId = DistributionZonesTestUtil.getZoneId(node.catalogManager,
zoneName, node.hybridClock.nowLong());
+
+ assertTrue(waitForCondition(() -> assertTableListenersCount(node,
zoneId, 0), 10_000L));
+
+ createTable(node, zoneName, tableName);
+
+ assertTrue(waitForCondition(
+ () -> IntStream.range(0, 3).allMatch(i ->
getNode(i).tableManager.table(tableName) != null),
+ 30_000L
+ ));
+
+ assertTrue(waitForCondition(
+ () -> IntStream.range(0, 3).allMatch(i ->
assertTableListenersCount(getNode(i), zoneId, 1)),
+ 30_000L
+ ));
+
+ nodes.values().forEach(n -> checkNoDestroyPartitionStoragesInvokes(n,
tableName, 0));
+
+ alterZone(node.catalogManager, zoneName, 1);
+
+ nodes.values().stream().filter(n ->
!replicaAssignment.consistentId().equals(n.name)).forEach(n -> {
+ checkDestroyPartitionStoragesInvokes(n, tableName, 0);
+ });
+
+ }
+
@Test
void testReplicaIsStartedOnNodeStart(TestInfo testInfo) throws Exception {
startNodes(testInfo, 3);
@@ -923,6 +976,10 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
private final HybridTimestampTracker observableTimestampTracker = new
HybridTimestampTracker();
+ private volatile MvTableStorage mvTableStorage;
+
+ private volatile TxStateTableStorage txStateTableStorage;
+
/**
* Constructor that simply creates a subset of components of this node.
*/
@@ -1271,7 +1328,25 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
logSyncer,
partitionReplicaLifecycleManager,
minTimeCollectorService
- );
+ ) {
+
+ @Override
+ protected MvTableStorage
createTableStorage(CatalogTableDescriptor tableDescriptor,
CatalogZoneDescriptor zoneDescriptor) {
+ mvTableStorage =
spy(super.createTableStorage(tableDescriptor, zoneDescriptor));
+
+ return mvTableStorage;
+ }
+
+ @Override
+ protected TxStateTableStorage createTxStateTableStorage(
+ CatalogTableDescriptor tableDescriptor,
+ CatalogZoneDescriptor zoneDescriptor
+ ) {
+ txStateTableStorage =
spy(super.createTxStateTableStorage(tableDescriptor, zoneDescriptor));
+
+ return txStateTableStorage;
+ }
+ };
tableManager.setStreamerReceiverRunner(mock(StreamerReceiverRunner.class));
@@ -1414,4 +1489,30 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
throw new RuntimeException(e);
}
}
+
+ private static InternalTable getInternalTable(Node node, String tableName)
{
+ Table table = node.tableManager.table(tableName);
+
+ assertNotNull(table, tableName);
+
+ return ((TableViewInternal) table).internalTable();
+ }
+
+ private static void checkNoDestroyPartitionStoragesInvokes(Node node,
String tableName, int partitionId) {
+ InternalTable internalTable = getInternalTable(node, tableName);
+
+ verify(internalTable.storage(), never())
+ .destroyPartition(partitionId);
+ verify(internalTable.txStateStorage(), never())
+ .destroyTxStateStorage(partitionId);
+ }
+
+ private static void checkDestroyPartitionStoragesInvokes(Node node, String
tableName, int partitionId) {
+ InternalTable internalTable = getInternalTable(node, tableName);
+
+ verify(internalTable.storage(),
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))
+ .destroyPartition(partitionId);
+ verify(internalTable.txStateStorage(),
timeout(AWAIT_TIMEOUT_MILLIS).atLeast(1))
+ .destroyTxStateStorage(partitionId);
+ }
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
index 933f528032..127f7a856b 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEvent.java
@@ -26,5 +26,10 @@ public enum LocalPartitionReplicaEvent implements Event {
/**
* Fired when partition replica has started.
*/
- AFTER_REPLICA_STARTED
+ AFTER_REPLICA_STARTED,
+
+ /**
+ * Fired when partition replica has stopped.
+ */
+ AFTER_REPLICA_STOPPED
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java
similarity index 90%
rename from
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
rename to
modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java
index 762c015b67..57889857a5 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaEventParameters.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/LocalPartitionReplicaEventParameters.java
@@ -23,7 +23,7 @@ import org.apache.ignite.internal.replicator.ZonePartitionId;
/**
* Parameters for the events about zone partition replicas produced by {@link
PartitionReplicaLifecycleManager}.
*/
-public class PartitionReplicaEventParameters implements EventParameters {
+public class LocalPartitionReplicaEventParameters implements EventParameters {
/** Zone partition id. */
private final ZonePartitionId zonePartitionId;
@@ -32,7 +32,7 @@ public class PartitionReplicaEventParameters implements
EventParameters {
*
* @param zonePartitionId Zone partition id.
*/
- public PartitionReplicaEventParameters(ZonePartitionId zonePartitionId) {
+ public LocalPartitionReplicaEventParameters(ZonePartitionId
zonePartitionId) {
this.zonePartitionId = zonePartitionId;
}
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 9457884705..f359355777 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -140,7 +140,7 @@ import org.jetbrains.annotations.Nullable;
* - Support the rebalance mechanism and start the new replication nodes when
the rebalance triggers occurred.
*/
public class PartitionReplicaLifecycleManager extends
- AbstractEventProducer<LocalPartitionReplicaEvent,
PartitionReplicaEventParameters> implements IgniteComponent {
+ AbstractEventProducer<LocalPartitionReplicaEvent,
LocalPartitionReplicaEventParameters> implements IgniteComponent {
public static final String FEATURE_FLAG_NAME =
"IGNITE_ZONE_BASED_REPLICATION";
/* Feature flag for zone based collocation track */
// TODO IGNITE-22115 remove it
@@ -170,7 +170,7 @@ public class PartitionReplicaLifecycleManager extends
/** The logger. */
private static final IgniteLogger LOG =
Loggers.forClass(PartitionReplicaLifecycleManager.class);
- private final Set<ReplicationGroupId> replicationGroupIds =
ConcurrentHashMap.newKeySet();
+ private final Set<ZonePartitionId> replicationGroupIds =
ConcurrentHashMap.newKeySet();
/** (zoneId -> lock) map to provide concurrent access to the zone replicas
list. */
private final Map<Integer, StampedLock> zonePartitionsLocks = new
ConcurrentHashMap<>();
@@ -487,7 +487,7 @@ public class PartitionReplicaLifecycleManager extends
return fireEvent(
LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
- new PartitionReplicaEventParameters(
+ new LocalPartitionReplicaEventParameters(
new
ZonePartitionId(replicaGrpId.zoneId(), replicaGrpId.partitionId())
)
);
@@ -935,7 +935,7 @@ public class PartitionReplicaLifecycleManager extends
}
}
- private CompletableFuture<?> stopAndDestroyPartition(ReplicationGroupId
zonePartitionId) {
+ private CompletableFuture<?> stopAndDestroyPartition(ZonePartitionId
zonePartitionId) {
return weakStopPartition(zonePartitionId);
}
@@ -1239,17 +1239,13 @@ public class PartitionReplicaLifecycleManager extends
return nullCompletedFuture();
}
- private static String zoneInfo(CatalogZoneDescriptor zoneDescriptor) {
- return zoneDescriptor.id() + "/" + zoneDescriptor.name();
- }
-
private @Nullable Assignments stableAssignments(ZonePartitionId
zonePartitionId, long revision) {
Entry entry =
metaStorageMgr.getLocally(stablePartAssignmentsKey(zonePartitionId), revision);
return Assignments.fromBytes(entry.value());
}
- private CompletableFuture<Void> weakStopPartition(ReplicationGroupId
zonePartitionId) {
+ private CompletableFuture<Void> weakStopPartition(ZonePartitionId
zonePartitionId) {
return replicaMgr.weakStopReplica(
zonePartitionId,
WeakReplicaStopReason.EXCLUDED_FROM_ASSIGNMENTS,
@@ -1263,14 +1259,41 @@ public class PartitionReplicaLifecycleManager extends
* @param zonePartitionId Partition ID.
* @return Future that will be completed after all resources have been
closed.
*/
- private CompletableFuture<?> stopPartition(ReplicationGroupId
zonePartitionId) {
- CompletableFuture<Boolean> stopReplicaFuture;
+ private CompletableFuture<?> stopPartition(ZonePartitionId
zonePartitionId) {
+ CompletableFuture<?> stopReplicaFuture;
+
+ AtomicReference<Long> stamp = new AtomicReference<>(null);
try {
- stopReplicaFuture = replicaMgr.stopReplica(zonePartitionId);
+ zonePartitionsLocks.compute(zonePartitionId.zoneId(), (id, lock)
-> {
+ if (lock == null) {
+ lock = new StampedLock();
+ }
+
+ stamp.set(lock.writeLock());
+
+ return lock;
+ });
+
+ stopReplicaFuture = replicaMgr.stopReplica(zonePartitionId)
+ .thenCompose((replicaWasStopped) -> {
+ if (replicaWasStopped) {
+ replicationGroupIds.remove(zonePartitionId);
+
+ return
fireEvent(LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED, new
LocalPartitionReplicaEventParameters(
+ zonePartitionId));
+ } else {
+ return nullCompletedFuture();
+ }
+ }).whenComplete((result, th) -> {
+
zonePartitionsLocks.get(zonePartitionId.zoneId()).unlockWrite(stamp.get());
+ });
+
} catch (NodeStoppingException e) {
// No-op.
stopReplicaFuture = falseCompletedFuture();
+
+
zonePartitionsLocks.get(zonePartitionId.zoneId()).unlockWrite(stamp.get());
}
return stopReplicaFuture;
@@ -1281,7 +1304,7 @@ public class PartitionReplicaLifecycleManager extends
*
* @param partitionIds Partitions to stop.
*/
- private void cleanUpPartitionsResources(Set<ReplicationGroupId>
partitionIds) {
+ private void cleanUpPartitionsResources(Set<ZonePartitionId> partitionIds)
{
CompletableFuture<Void> future = runAsync(() -> {
Stream.Builder<ManuallyCloseable> stopping = Stream.builder();
@@ -1290,7 +1313,7 @@ public class PartitionReplicaLifecycleManager extends
int i = 0;
- for (ReplicationGroupId partitionId : partitionIds) {
+ for (ZonePartitionId partitionId : partitionIds) {
stopReplicaFutures[i++] = stopPartition(partitionId);
}
@@ -1300,7 +1323,7 @@ public class PartitionReplicaLifecycleManager extends
try {
IgniteUtils.closeAllManually(stopping.build());
} catch (Throwable t) {
- LOG.error("Unable to stop partition");
+ LOG.error("Unable to stop partition.", t);
}
}, ioExecutor);
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
index 9d355fbee6..20b2bae499 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/ZonePartitionReplicaListener.java
@@ -132,4 +132,19 @@ public class ZonePartitionReplicaListener implements
ReplicaListener {
public Map<TablePartitionId, ReplicaListener> tableReplicaListeners() {
return replicas;
}
+
+ @Override
+ public void onShutdown() {
+ replicas.forEach((id, listener) -> {
+ try {
+ listener.onShutdown();
+ } catch (Throwable th) {
+ LOG.error("Error during table partition listener stop
for [tableId="
+ + id.tableId() + ", partitionId=" +
id.partitionId() + "].",
+ th
+ );
+ }
+ }
+ );
+ }
}
diff --git
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
index da0b848410..a052e72e89 100644
---
a/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
+++
b/modules/replicator/src/main/java/org/apache/ignite/internal/replicator/ZonePartitionReplicaImpl.java
@@ -82,6 +82,8 @@ public class ZonePartitionReplicaImpl implements Replica {
@Override
public CompletableFuture<Void> shutdown() {
+ listener.onShutdown();
+
return nullCompletedFuture();
}
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 1eeef4c6b6..189946c6b8 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -138,7 +138,7 @@ import org.apache.ignite.internal.network.MessagingService;
import org.apache.ignite.internal.network.TopologyService;
import
org.apache.ignite.internal.network.serialization.MessageSerializationRegistry;
import
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEvent;
-import
org.apache.ignite.internal.partition.replicator.PartitionReplicaEventParameters;
+import
org.apache.ignite.internal.partition.replicator.LocalPartitionReplicaEventParameters;
import
org.apache.ignite.internal.partition.replicator.PartitionReplicaLifecycleManager;
import org.apache.ignite.internal.partitiondistribution.Assignment;
import org.apache.ignite.internal.partitiondistribution.Assignments;
@@ -596,6 +596,12 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
LocalPartitionReplicaEvent.AFTER_REPLICA_STARTED,
this::onZoneReplicaCreated
);
+
+ partitionReplicaLifecycleManager.listen(
+ LocalPartitionReplicaEvent.AFTER_REPLICA_STOPPED,
+ this::onZoneReplicaStopped
+ );
+
}
@Override
@@ -661,7 +667,7 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
return
executorInclinedSchemaSyncService.waitForMetadataCompleteness(HybridTimestamp.hybridTimestamp(ts));
}
- private CompletableFuture<Boolean>
onZoneReplicaCreated(PartitionReplicaEventParameters parameters) {
+ private CompletableFuture<Boolean>
onZoneReplicaCreated(LocalPartitionReplicaEventParameters parameters) {
if (!PartitionReplicaLifecycleManager.ENABLED) {
return completedFuture(false);
}
@@ -694,6 +700,31 @@ public class TableManager implements IgniteTablesInternal,
IgniteComponent {
});
}
+ private CompletableFuture<Boolean>
onZoneReplicaStopped(LocalPartitionReplicaEventParameters parameters) {
+ if (!PartitionReplicaLifecycleManager.ENABLED) {
+ return completedFuture(false);
+ }
+
+ return inBusyLockAsync(busyLock, () -> supplyAsync(() -> {
+ List<CompletableFuture<?>> futs = new ArrayList<>();
+
+ Set<TableImpl> zoneTables =
zoneTables(parameters.zonePartitionId().zoneId());
+
+ zoneTables.forEach(table -> {
+ closePartitionTrackers(table.internalTable(),
parameters.zonePartitionId().partitionId());
+
+ TablePartitionId tablePartitionId = new
TablePartitionId(table.tableId(), parameters.zonePartitionId().partitionId());
+
+ mvGc.removeStorage(tablePartitionId);
+
+ futs.add(destroyPartitionStorages(tablePartitionId, table));
+ });
+
+ return allOf(futs.toArray(new CompletableFuture[]{}));
+ }, ioExecutor).thenCompose(identity())).thenApply((unused) -> false);
+ }
+
+
private CompletableFuture<Boolean>
prepareTableResourcesAndLoadToZoneReplica(CreateTableEventParameters
parameters) {
if (!PartitionReplicaLifecycleManager.ENABLED) {
return completedFuture(false);
@@ -1754,7 +1785,6 @@ public class TableManager implements
IgniteTablesInternal, IgniteComponent {
InternalTable internalTable = table.internalTable();
int partitions = internalTable.partitions();
- // TODO https://issues.apache.org/jira/browse/IGNITE-22950 Move
assigment manipulations to Distribution zones.
Set<ByteArray> assignmentKeys = IntStream.range(0, partitions)
.mapToObj(p -> stablePartAssignmentsKey(new
TablePartitionId(tableId, p)))
.collect(toSet());