This is an automated email from the ASF dual-hosted git repository.
sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6d47468ce1 IGNITE-23131 java.lang.AssertionError: The local node is
outside of the replication group (#4396)
6d47468ce1 is described below
commit 6d47468ce1ec1ee584d23b4e6427edcef73526b8
Author: Mikhail Efremov <[email protected]>
AuthorDate: Mon Sep 23 20:41:09 2024 +0600
IGNITE-23131 java.lang.AssertionError: The local node is outside of the
replication group (#4396)
---
.../replicator/ItReplicaLifecycleTest.java | 78 ++++++++++------------
.../PartitionReplicaLifecycleManager.java | 2 -
.../rebalance/ItRebalanceDistributedTest.java | 77 ++++++++++-----------
3 files changed, 69 insertions(+), 88 deletions(-)
diff --git
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 6cdba72af1..5b9cb27114 100644
---
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -1292,7 +1292,9 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
* Starts the created components.
*/
void start() {
- List<IgniteComponent> firstComponents = List.of(
+ ComponentContext componentContext = new ComponentContext();
+
+ deployWatchesFut = startComponentsAsync(componentContext, List.of(
threadPoolsManager,
vaultManager,
nodeCfgMgr,
@@ -1303,54 +1305,44 @@ public class ItReplicaLifecycleTest extends
BaseIgniteAbstractTest {
cmgLogStorageFactory,
raftManager,
cmgManager
- );
-
- ComponentContext componentContext = new ComponentContext();
-
- List<CompletableFuture<?>> componentFuts =
- firstComponents.stream()
- .map(component ->
component.startAsync(componentContext))
- .collect(Collectors.toList());
-
- nodeComponents.addAll(firstComponents);
-
- deployWatchesFut = CompletableFuture.supplyAsync(() -> {
- List<IgniteComponent> secondComponents = List.of(
- lowWatermark,
- metaStorageManager,
- clusterCfgMgr,
- clockWaiter,
- catalogManager,
- indexMetaStorage,
- distributionZoneManager,
- replicaManager,
- txManager,
- dataStorageMgr,
- schemaManager,
- partitionReplicaLifecycleManager,
- tableManager,
- indexManager
- );
-
- componentFuts.addAll(secondComponents.stream()
- .map(component ->
component.startAsync(componentContext)).collect(Collectors.toList()));
-
- nodeComponents.addAll(secondComponents);
-
- var configurationNotificationFut =
metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> {
- return allOf(
-
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
-
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
- ((MetaStorageManagerImpl)
metaStorageManager).notifyRevisionUpdateListenerOnStart()
- );
- });
+ )).thenApplyAsync(v -> startComponentsAsync(componentContext,
List.of(
+ lowWatermark,
+ metaStorageManager,
+ clusterCfgMgr,
+ clockWaiter,
+ catalogManager,
+ indexMetaStorage,
+ distributionZoneManager,
+ replicaManager,
+ txManager,
+ dataStorageMgr,
+ schemaManager,
+ partitionReplicaLifecycleManager,
+ tableManager,
+ indexManager
+ ))).thenComposeAsync(componentFuts -> {
+ CompletableFuture<Void> configurationNotificationFut =
metaStorageManager.recoveryFinishedFuture()
+ .thenCompose(rev -> allOf(
+
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+ ((MetaStorageManagerImpl)
metaStorageManager).notifyRevisionUpdateListenerOnStart(),
+ componentFuts
+ ));
assertThat(configurationNotificationFut, willSucceedIn(1,
TimeUnit.MINUTES));
lowWatermark.scheduleUpdates();
return metaStorageManager.deployWatches();
-
}).thenCombine(allOf(componentFuts.toArray(CompletableFuture[]::new)),
(deployWatchesFut, unused) -> null);
+ });
+ }
+
+ private CompletableFuture<Void> startComponentsAsync(ComponentContext
componentContext, List<IgniteComponent> components) {
+ nodeComponents.addAll(components);
+
+ return allOf(components.stream()
+ .map(component -> component.startAsync(componentContext))
+ .toArray(CompletableFuture[]::new));
}
/**
diff --git
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 58de200b96..9457884705 100644
---
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -268,9 +268,7 @@ public class PartitionReplicaLifecycleManager extends
.thenCompose(ignored ->
processAssignmentsOnRecovery(recoveryRevision));
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
pendingAssignmentsRebalanceListener);
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
stableAssignmentsRebalanceListener);
-
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
assignmentsSwitchRebalanceListener);
catalogMgr.listen(ZONE_CREATE,
diff --git
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 06b48b8085..9eccf3b699 100644
---
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -92,7 +92,6 @@ import java.util.function.Function;
import java.util.function.LongFunction;
import java.util.function.LongSupplier;
import java.util.function.Predicate;
-import java.util.stream.Collectors;
import java.util.stream.IntStream;
import
org.apache.ignite.client.handler.configuration.ClientConnectorExtensionConfigurationSchema;
import org.apache.ignite.internal.app.ThreadPoolsManager;
@@ -1493,7 +1492,9 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
* Starts the created components.
*/
void start() {
- List<IgniteComponent> firstComponents = List.of(
+ ComponentContext componentContext = new ComponentContext();
+
+ deployWatchesFut = startComponentsAsync(componentContext, List.of(
threadPoolsManager,
vaultManager,
nodeCfgMgr,
@@ -1504,53 +1505,43 @@ public class ItRebalanceDistributedTest extends
BaseIgniteAbstractTest {
msLogStorageFactory,
raftManager,
cmgManager
- );
-
- ComponentContext componentContext = new ComponentContext();
- List<CompletableFuture<?>> componentFuts =
- firstComponents.stream()
- .map(component ->
component.startAsync(componentContext))
- .collect(Collectors.toList());
-
- nodeComponents.addAll(firstComponents);
-
- deployWatchesFut = CompletableFuture.supplyAsync(() -> {
- List<IgniteComponent> secondComponents = List.of(
- lowWatermark,
- metaStorageManager,
- clusterCfgMgr,
- clockWaiter,
- catalogManager,
- indexMetaStorage,
- distributionZoneManager,
- replicaManager,
- txManager,
- dataStorageMgr,
- schemaManager,
- tableManager,
- indexManager
- );
-
- componentFuts.addAll(secondComponents.stream()
- .map(component ->
component.startAsync(componentContext))
- .collect(Collectors.toList()));
-
- nodeComponents.addAll(secondComponents);
-
- var configurationNotificationFut =
metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> {
- return allOf(
-
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
-
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
- ((MetaStorageManagerImpl)
metaStorageManager).notifyRevisionUpdateListenerOnStart()
- );
- });
+ )).thenApplyAsync(v -> startComponentsAsync(componentContext,
List.of(
+ lowWatermark,
+ metaStorageManager,
+ clusterCfgMgr,
+ clockWaiter,
+ catalogManager,
+ indexMetaStorage,
+ distributionZoneManager,
+ replicaManager,
+ txManager,
+ dataStorageMgr,
+ schemaManager,
+ tableManager,
+ indexManager
+ ))).thenComposeAsync(componentFuts -> {
+ CompletableFuture<Void> configurationNotificationFut =
metaStorageManager.recoveryFinishedFuture()
+ .thenCompose(rev -> allOf(
+
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+ ((MetaStorageManagerImpl)
metaStorageManager).notifyRevisionUpdateListenerOnStart(),
+ componentFuts
+ ));
assertThat(configurationNotificationFut, willSucceedIn(1,
TimeUnit.MINUTES));
lowWatermark.scheduleUpdates();
return metaStorageManager.deployWatches();
-
}).thenCombine(allOf(componentFuts.toArray(CompletableFuture[]::new)),
(deployWatchesFut, unused) -> null);
+ });
+ }
+
+ private CompletableFuture<Void> startComponentsAsync(ComponentContext
componentContext, List<IgniteComponent> components) {
+ nodeComponents.addAll(components);
+
+ return allOf(components.stream()
+ .map(component -> component.startAsync(componentContext))
+ .toArray(CompletableFuture[]::new));
}
/**