This is an automated email from the ASF dual-hosted git repository.

sk0x50 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new 6d47468ce1 IGNITE-23131 java.lang.AssertionError: The local node is 
outside of the replication group (#4396)
6d47468ce1 is described below

commit 6d47468ce1ec1ee584d23b4e6427edcef73526b8
Author: Mikhail Efremov <[email protected]>
AuthorDate: Mon Sep 23 20:41:09 2024 +0600

    IGNITE-23131 java.lang.AssertionError: The local node is outside of the 
replication group (#4396)
---
 .../replicator/ItReplicaLifecycleTest.java         | 78 ++++++++++------------
 .../PartitionReplicaLifecycleManager.java          |  2 -
 .../rebalance/ItRebalanceDistributedTest.java      | 77 ++++++++++-----------
 3 files changed, 69 insertions(+), 88 deletions(-)

diff --git 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
index 6cdba72af1..5b9cb27114 100644
--- 
a/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
+++ 
b/modules/partition-replicator/src/integrationTest/java/org/apache/ignite/internal/partition/replicator/ItReplicaLifecycleTest.java
@@ -1292,7 +1292,9 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
          * Starts the created components.
          */
         void start() {
-            List<IgniteComponent> firstComponents = List.of(
+            ComponentContext componentContext = new ComponentContext();
+
+            deployWatchesFut = startComponentsAsync(componentContext, List.of(
                     threadPoolsManager,
                     vaultManager,
                     nodeCfgMgr,
@@ -1303,54 +1305,44 @@ public class ItReplicaLifecycleTest extends 
BaseIgniteAbstractTest {
                     cmgLogStorageFactory,
                     raftManager,
                     cmgManager
-            );
-
-            ComponentContext componentContext = new ComponentContext();
-
-            List<CompletableFuture<?>> componentFuts =
-                    firstComponents.stream()
-                            .map(component -> 
component.startAsync(componentContext))
-                            .collect(Collectors.toList());
-
-            nodeComponents.addAll(firstComponents);
-
-            deployWatchesFut = CompletableFuture.supplyAsync(() -> {
-                List<IgniteComponent> secondComponents = List.of(
-                        lowWatermark,
-                        metaStorageManager,
-                        clusterCfgMgr,
-                        clockWaiter,
-                        catalogManager,
-                        indexMetaStorage,
-                        distributionZoneManager,
-                        replicaManager,
-                        txManager,
-                        dataStorageMgr,
-                        schemaManager,
-                        partitionReplicaLifecycleManager,
-                        tableManager,
-                        indexManager
-                );
-
-                componentFuts.addAll(secondComponents.stream()
-                        .map(component -> 
component.startAsync(componentContext)).collect(Collectors.toList()));
-
-                nodeComponents.addAll(secondComponents);
-
-                var configurationNotificationFut = 
metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> {
-                    return allOf(
-                            
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
-                            
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
-                            ((MetaStorageManagerImpl) 
metaStorageManager).notifyRevisionUpdateListenerOnStart()
-                    );
-                });
+            )).thenApplyAsync(v -> startComponentsAsync(componentContext, 
List.of(
+                    lowWatermark,
+                    metaStorageManager,
+                    clusterCfgMgr,
+                    clockWaiter,
+                    catalogManager,
+                    indexMetaStorage,
+                    distributionZoneManager,
+                    replicaManager,
+                    txManager,
+                    dataStorageMgr,
+                    schemaManager,
+                    partitionReplicaLifecycleManager,
+                    tableManager,
+                    indexManager
+            ))).thenComposeAsync(componentFuts -> {
+                CompletableFuture<Void> configurationNotificationFut = 
metaStorageManager.recoveryFinishedFuture()
+                        .thenCompose(rev -> allOf(
+                                
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+                                
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+                                ((MetaStorageManagerImpl) 
metaStorageManager).notifyRevisionUpdateListenerOnStart(),
+                                componentFuts
+                        ));
 
                 assertThat(configurationNotificationFut, willSucceedIn(1, 
TimeUnit.MINUTES));
 
                 lowWatermark.scheduleUpdates();
 
                 return metaStorageManager.deployWatches();
-            
}).thenCombine(allOf(componentFuts.toArray(CompletableFuture[]::new)), 
(deployWatchesFut, unused) -> null);
+            });
+        }
+
+        private CompletableFuture<Void> startComponentsAsync(ComponentContext 
componentContext, List<IgniteComponent> components) {
+            nodeComponents.addAll(components);
+
+            return allOf(components.stream()
+                    .map(component -> component.startAsync(componentContext))
+                    .toArray(CompletableFuture[]::new));
         }
 
         /**
diff --git 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
index 58de200b96..9457884705 100644
--- 
a/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
+++ 
b/modules/partition-replicator/src/main/java/org/apache/ignite/internal/partition/replicator/PartitionReplicaLifecycleManager.java
@@ -268,9 +268,7 @@ public class PartitionReplicaLifecycleManager  extends
                 .thenCompose(ignored -> 
processAssignmentsOnRecovery(recoveryRevision));
 
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(PENDING_ASSIGNMENTS_PREFIX),
 pendingAssignmentsRebalanceListener);
-
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(STABLE_ASSIGNMENTS_PREFIX),
 stableAssignmentsRebalanceListener);
-
         
metaStorageMgr.registerPrefixWatch(ByteArray.fromString(ASSIGNMENTS_SWITCH_REDUCE_PREFIX),
 assignmentsSwitchRebalanceListener);
 
         catalogMgr.listen(ZONE_CREATE,
diff --git 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
index 06b48b8085..9eccf3b699 100644
--- 
a/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
+++ 
b/modules/table/src/integrationTest/java/org/apache/ignite/internal/rebalance/ItRebalanceDistributedTest.java
@@ -92,7 +92,6 @@ import java.util.function.Function;
 import java.util.function.LongFunction;
 import java.util.function.LongSupplier;
 import java.util.function.Predicate;
-import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 import 
org.apache.ignite.client.handler.configuration.ClientConnectorExtensionConfigurationSchema;
 import org.apache.ignite.internal.app.ThreadPoolsManager;
@@ -1493,7 +1492,9 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
          * Starts the created components.
          */
         void start() {
-            List<IgniteComponent> firstComponents = List.of(
+            ComponentContext componentContext = new ComponentContext();
+
+            deployWatchesFut = startComponentsAsync(componentContext, List.of(
                     threadPoolsManager,
                     vaultManager,
                     nodeCfgMgr,
@@ -1504,53 +1505,43 @@ public class ItRebalanceDistributedTest extends 
BaseIgniteAbstractTest {
                     msLogStorageFactory,
                     raftManager,
                     cmgManager
-            );
-
-            ComponentContext componentContext = new ComponentContext();
-            List<CompletableFuture<?>> componentFuts =
-                    firstComponents.stream()
-                            .map(component -> 
component.startAsync(componentContext))
-                            .collect(Collectors.toList());
-
-            nodeComponents.addAll(firstComponents);
-
-            deployWatchesFut = CompletableFuture.supplyAsync(() -> {
-                List<IgniteComponent> secondComponents = List.of(
-                        lowWatermark,
-                        metaStorageManager,
-                        clusterCfgMgr,
-                        clockWaiter,
-                        catalogManager,
-                        indexMetaStorage,
-                        distributionZoneManager,
-                        replicaManager,
-                        txManager,
-                        dataStorageMgr,
-                        schemaManager,
-                        tableManager,
-                        indexManager
-                );
-
-                componentFuts.addAll(secondComponents.stream()
-                        .map(component -> 
component.startAsync(componentContext))
-                        .collect(Collectors.toList()));
-
-                nodeComponents.addAll(secondComponents);
-
-                var configurationNotificationFut = 
metaStorageManager.recoveryFinishedFuture().thenCompose(rev -> {
-                    return allOf(
-                            
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
-                            
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
-                            ((MetaStorageManagerImpl) 
metaStorageManager).notifyRevisionUpdateListenerOnStart()
-                    );
-                });
+            )).thenApplyAsync(v -> startComponentsAsync(componentContext, 
List.of(
+                    lowWatermark,
+                    metaStorageManager,
+                    clusterCfgMgr,
+                    clockWaiter,
+                    catalogManager,
+                    indexMetaStorage,
+                    distributionZoneManager,
+                    replicaManager,
+                    txManager,
+                    dataStorageMgr,
+                    schemaManager,
+                    tableManager,
+                    indexManager
+            ))).thenComposeAsync(componentFuts -> {
+                CompletableFuture<Void> configurationNotificationFut = 
metaStorageManager.recoveryFinishedFuture()
+                        .thenCompose(rev -> allOf(
+                                
nodeCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+                                
clusterCfgMgr.configurationRegistry().notifyCurrentConfigurationListeners(),
+                                ((MetaStorageManagerImpl) 
metaStorageManager).notifyRevisionUpdateListenerOnStart(),
+                                componentFuts
+                        ));
 
                 assertThat(configurationNotificationFut, willSucceedIn(1, 
TimeUnit.MINUTES));
 
                 lowWatermark.scheduleUpdates();
 
                 return metaStorageManager.deployWatches();
-            
}).thenCombine(allOf(componentFuts.toArray(CompletableFuture[]::new)), 
(deployWatchesFut, unused) -> null);
+            });
+        }
+
+        private CompletableFuture<Void> startComponentsAsync(ComponentContext 
componentContext, List<IgniteComponent> components) {
+            nodeComponents.addAll(components);
+
+            return allOf(components.stream()
+                    .map(component -> component.startAsync(componentContext))
+                    .toArray(CompletableFuture[]::new));
         }
 
         /**

Reply via email to