This is an automated email from the ASF dual-hosted git repository.
apolovtsev pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 3a65f242670 IGNITE-27828 Fix flaky
nodeCannotCommunicateAfterLeavingPhysicalTopology (#7582)
3a65f242670 is described below
commit 3a65f242670c71c6c40259fb527f330b593997e5
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Feb 12 15:55:26 2026 +0200
IGNITE-27828 Fix flaky nodeCannotCommunicateAfterLeavingPhysicalTopology
(#7582)
---
.../scalecube/ItScaleCubeNetworkMessagingTest.java | 119 ++++++++-------------
1 file changed, 46 insertions(+), 73 deletions(-)
diff --git
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
index 9446dcb5ddf..d108a7ea914 100644
---
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
+++
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/scalecube/ItScaleCubeNetworkMessagingTest.java
@@ -30,6 +30,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static
org.apache.ignite.internal.util.CompletableFutures.nullCompletedFuture;
import static org.apache.ignite.internal.util.IgniteUtils.startAsync;
import static org.apache.ignite.internal.util.IgniteUtils.stopAsync;
+import static org.awaitility.Awaitility.await;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.either;
import static org.hamcrest.Matchers.equalTo;
@@ -96,6 +97,7 @@ import
org.apache.ignite.internal.network.recovery.RecoveryInitiatorHandshakeMan
import
org.apache.ignite.internal.network.recovery.message.HandshakeFinishMessage;
import org.apache.ignite.internal.network.utils.ClusterServiceTestUtils;
import org.apache.ignite.internal.properties.IgniteProductVersion;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
import org.apache.ignite.internal.testframework.log4j2.LogInspector;
import org.apache.ignite.internal.util.CompletableFutures;
import org.apache.ignite.internal.version.DefaultIgniteProductVersionSource;
@@ -115,7 +117,7 @@ import reactor.core.publisher.Mono;
/**
* Integration tests for messaging based on ScaleCube.
*/
-class ItScaleCubeNetworkMessagingTest {
+class ItScaleCubeNetworkMessagingTest extends BaseIgniteAbstractTest {
/** Message sent to establish a connection. */
private static final String TRAILBLAZER = "trailblazer";
@@ -142,7 +144,7 @@ class ItScaleCubeNetworkMessagingTest {
}
@AfterEach
- void tearDown() throws Exception {
+ void tearDown() {
testCluster.shutdown();
logInspectors.forEach(LogInspector::stop);
logInspectors.clear();
@@ -329,8 +331,7 @@ class ItScaleCubeNetworkMessagingTest {
}
/**
- * Tests that if the network component is stopped while making an "invoke"
call, the corresponding future completes
- * exceptionally.
+ * Tests that if the network component is stopped while making an "invoke"
call, the corresponding future completes exceptionally.
*/
@Test
public void testInvokeDuringStop() throws InterruptedException {
@@ -524,18 +525,17 @@ class ItScaleCubeNetworkMessagingTest {
* @throws Exception in case of errors.
*/
@SuppressWarnings("ConstantConditions")
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void nodeCannotReuseOldId(boolean keepPreExistingConnections)
throws Exception {
+ @Test
+ public void nodeCannotReuseOldId() throws Exception {
testCluster = new Cluster(3, testInfo);
testCluster.startAwait();
- String outcastName =
testCluster.members.get(testCluster.members.size() - 1).nodeName();
+ ClusterService outcast =
testCluster.members.get(testCluster.members.size() - 1);
- knockOutNode(outcastName, !keepPreExistingConnections);
+ knockOutNode(outcast);
- IgniteBiTuple<CountDownLatch, AtomicBoolean> pair =
reanimateNode(outcastName);
+ IgniteBiTuple<CountDownLatch, AtomicBoolean> pair =
reanimateNode(outcast);
CountDownLatch ready = pair.get1();
AtomicBoolean reappeared = pair.get2();
@@ -544,9 +544,8 @@ class ItScaleCubeNetworkMessagingTest {
assertThat(reappeared.get(), is(false));
}
- @ParameterizedTest
- @ValueSource(booleans = {false, true})
- public void nodeCannotCommunicateAfterLeavingPhysicalTopology(boolean
keepPreExistingConnections) throws Exception {
+ @Test
+ public void nodeCannotCommunicateAfterLeavingPhysicalTopology() throws
Exception {
testCluster = new Cluster(3, testInfo);
testCluster.startAwait();
@@ -559,14 +558,12 @@ class ItScaleCubeNetworkMessagingTest {
assertNotNull(outcastNode);
assertNotNull(notOutcastNode);
- if (keepPreExistingConnections) {
- assertThat(notOutcast.messagingService().send(outcastNode,
messageFactory.testMessage().build()), willCompleteSuccessfully());
- assertThat(outcast.messagingService().send(notOutcastNode,
messageFactory.testMessage().build()), willCompleteSuccessfully());
- }
+ assertThat(notOutcast.messagingService().send(outcastNode,
messageFactory.testMessage().build()), willCompleteSuccessfully());
+ assertThat(outcast.messagingService().send(notOutcastNode,
messageFactory.testMessage().build()), willCompleteSuccessfully());
- knockOutNode(outcast.nodeName(), !keepPreExistingConnections);
+ knockOutNode(outcast);
- stopDroppingMessagesTo(outcast.nodeName());
+ stopDroppingMessagesTo(outcast);
CompletableFuture<Void> sendFromOutcast =
outcast.messagingService().send(notOutcastNode,
messageFactory.testMessage().build());
assertThat(sendFromOutcast,
either(willThrow(HandshakeException.class)).or(willThrow(RecipientLeftException.class)));
@@ -600,7 +597,7 @@ class ItScaleCubeNetworkMessagingTest {
establishConnection(sender, receiver);
- closeAllChannels(sender.messagingService());
+ closeAllChannels(sender);
receiver.messagingService().addMessageHandler(
TestMessageTypes.class,
@@ -691,7 +688,7 @@ class ItScaleCubeNetworkMessagingTest {
return nettySenderWarp[0];
}
- private void collectReceivedPayloads(ClusterService sender, ClusterService
receiver, List<String> receivedPayloads) {
+ private static void collectReceivedPayloads(ClusterService sender,
ClusterService receiver, List<String> receivedPayloads) {
receiver.messagingService().addMessageHandler(
TestMessageTypes.class,
(message, senderParam, correlationId) -> {
@@ -952,7 +949,7 @@ class ItScaleCubeNetworkMessagingTest {
assertTrue(blockingStarted.await(10, SECONDS));
- knockOutNode(sender.nodeName(), false);
+ knockOutNode(sender);
canProceed.countDown();
@@ -964,10 +961,8 @@ class ItScaleCubeNetworkMessagingTest {
assertThat(messagesDelivered.get(), is(1));
}
- private static void closeAllChannels(MessagingService messagingService) {
- ConnectionManager connectionManager = ((DefaultMessagingService)
messagingService).connectionManager();
-
- for (NettySender sender : connectionManager.channels().values()) {
+ private static void closeAllChannels(ClusterService clusterService) {
+ for (NettySender sender :
connectionManager(clusterService).channels().values()) {
assertThat(sender.closeAsync(), willCompleteSuccessfully());
}
}
@@ -1031,11 +1026,7 @@ class ItScaleCubeNetworkMessagingTest {
}
private static CompletableFuture<Void> ackFuture(ClusterService
clusterService, NetworkMessage msg) {
- DefaultMessagingService messagingService = (DefaultMessagingService)
clusterService.messagingService();
-
- ConnectionManager connectionManager =
messagingService.connectionManager();
-
- for (NettySender sender : connectionManager.channels().values()) {
+ for (NettySender sender :
connectionManager(clusterService).channels().values()) {
for (OutNetworkObject outMsgWrapper :
sender.recoveryDescriptor().unacknowledgedMessages()) {
if (outMsgWrapper.networkMessage() == msg) {
return outMsgWrapper.acknowledgedFuture();
@@ -1054,9 +1045,8 @@ class ItScaleCubeNetworkMessagingTest {
private static OutgoingAcknowledgementSilencer
dropAcksWhenDefaultChannelOpens(ClusterService clusterService)
throws InterruptedException {
- DefaultMessagingService messagingService = (DefaultMessagingService)
clusterService.messagingService();
+ ConnectionManager connectionManager =
connectionManager(clusterService);
- ConnectionManager connectionManager =
messagingService.connectionManager();
assertTrue(
waitForCondition(
() ->
connectionManager.channels().keySet().stream().anyMatch(key -> key.type() ==
ChannelType.DEFAULT),
@@ -1099,7 +1089,7 @@ class ItScaleCubeNetworkMessagingTest {
CompletableFuture<Void> ackSendFuture = ackFuture(notOutcast, msg);
- knockOutNode(outcast.nodeName(), false);
+ knockOutNode(outcast);
assertThat(ackSendFuture, willThrow(RecipientLeftException.class));
}
@@ -1209,53 +1199,36 @@ class ItScaleCubeNetworkMessagingTest {
);
}
- private void knockOutNode(String outcastName, boolean
closeConnectionsForcibly) throws InterruptedException {
- CountDownLatch disappeared = new
CountDownLatch(testCluster.members.size() - 1);
-
- TopologyEventHandler disappearListener = new TopologyEventHandler() {
- @Override
- public void onDisappeared(InternalClusterNode member) {
- if (Objects.equals(member.name(), outcastName)) {
- disappeared.countDown();
- }
- }
- };
-
+ private void knockOutNode(ClusterService outcast) {
List<ClusterService> notOutcasts = testCluster.members.stream()
- .filter(service -> !outcastName.equals(service.nodeName()))
+ .filter(service -> outcast != service)
.collect(toList());
- notOutcasts.forEach(clusterService -> {
-
clusterService.topologyService().addEventHandler(disappearListener);
- });
-
notOutcasts.forEach(service -> {
DefaultMessagingService messagingService =
(DefaultMessagingService) service.messagingService();
- messagingService.dropMessages((recipientConsistentId, message) ->
outcastName.equals(recipientConsistentId));
+ messagingService.dropMessages((recipientConsistentId, message) ->
outcast.nodeName().equals(recipientConsistentId));
});
- // Wait until all nodes see disappearance of the outcast.
- assertTrue(disappeared.await(10, SECONDS), "Node did not disappear in
time");
+ UUID outcastId = outcast.topologyService().localMember().id();
- if (closeConnectionsForcibly) {
- MessagingService messagingService = testCluster.members.stream()
- .filter(service -> outcastName.equals(service.nodeName()))
- .findFirst().orElseThrow()
- .messagingService();
-
- // Forcefully close channels, so that nodes will create new
channels on reanimation of the outcast.
- closeAllChannels(messagingService);
- }
+ // Wait for the remote nodes to close their channels automatically.
+ await().until(() ->
+ testCluster.members.stream()
+ .flatMap(clusterService ->
connectionManager(clusterService).channels().values().stream())
+ .filter(sender -> sender.launchId().equals(outcastId))
+ .findAny()
+ .isEmpty()
+ );
}
- private IgniteBiTuple<CountDownLatch, AtomicBoolean> reanimateNode(String
outcastName) {
+ private IgniteBiTuple<CountDownLatch, AtomicBoolean>
reanimateNode(ClusterService outcast) {
CountDownLatch ready = new CountDownLatch(1);
AtomicBoolean reappeared = new AtomicBoolean(false);
testCluster.members.get(0).topologyService().addEventHandler(new
TopologyEventHandler() {
@Override
public void onAppeared(InternalClusterNode member) {
- if (Objects.equals(member.name(), outcastName)) {
+ if (Objects.equals(member.name(), outcast.nodeName())) {
reappeared.compareAndSet(false, true);
ready.countDown();
@@ -1277,14 +1250,14 @@ class ItScaleCubeNetworkMessagingTest {
logInspectors.forEach(LogInspector::start);
- stopDroppingMessagesTo(outcastName);
+ stopDroppingMessagesTo(outcast);
return new IgniteBiTuple<>(ready, reappeared);
}
- private void stopDroppingMessagesTo(String outcastName) {
+ private void stopDroppingMessagesTo(ClusterService outcast) {
testCluster.members.stream()
- .filter(service -> !outcastName.equals(service.nodeName()))
+ .filter(service -> outcast != service)
.forEach(service -> {
DefaultMessagingService messagingService =
(DefaultMessagingService) service.messagingService();
messagingService.stopDroppingMessages();
@@ -1372,7 +1345,7 @@ class ItScaleCubeNetworkMessagingTest {
* Creates a test cluster with the given amount of members.
*
* @param numOfNodes Amount of cluster members.
- * @param testInfo Test info.
+ * @param testInfo Test info.
*/
Cluster(int numOfNodes, TestInfo testInfo) {
this(numOfNodes, testInfo, normalClusterIdSupplierFactory);
@@ -1382,7 +1355,7 @@ class ItScaleCubeNetworkMessagingTest {
* Creates a test cluster with the given amount of members.
*
* @param numOfNodes Amount of cluster members.
- * @param testInfo Test info.
+ * @param testInfo Test info.
* @param clusterIdSupplierFactory Allows to obtain a Supplier for
cluster ID by node address.
*/
Cluster(int numOfNodes, TestInfo testInfo, Function<NetworkAddress,
ClusterIdSupplier> clusterIdSupplierFactory) {
@@ -1393,7 +1366,7 @@ class ItScaleCubeNetworkMessagingTest {
* Creates a test cluster with the given amount of members.
*
* @param numOfNodes Amount of cluster members.
- * @param testInfo Test info.
+ * @param testInfo Test info.
* @param clusterIdSupplierFactory Allows to obtain a Supplier for
cluster ID by node address.
*/
Cluster(
@@ -1445,7 +1418,7 @@ class ItScaleCubeNetworkMessagingTest {
* Starts and waits for the cluster to come up.
*
* @throws InterruptedException If failed.
- * @throws AssertionError If the cluster was unable to start in
3 seconds.
+ * @throws AssertionError If the cluster was unable to start in 3
seconds.
*/
void startAwait() throws InterruptedException {
assertThat(startAsync(new ComponentContext(), members),
willCompleteSuccessfully());
@@ -1475,7 +1448,7 @@ class ItScaleCubeNetworkMessagingTest {
}
}
- private enum SendOperation {
+ enum SendOperation {
SEND {
@Override
CompletableFuture<Void> send(MessagingService messagingService,
NetworkMessage message, InternalClusterNode recipient) {