This is an automated email from the ASF dual-hosted git repository.

ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new c0f92249c2 IGNITE-21470 Wait for sender to appear before applying an 
ack silencer (#3163)
c0f92249c2 is described below

commit c0f92249c2d5ac235710520b13bfbb2412c27928
Author: Roman Puchkovskiy <roman.puchkovs...@gmail.com>
AuthorDate: Tue Feb 6 19:44:29 2024 +0400

    IGNITE-21470 Wait for sender to appear before applying an ack silencer 
(#3163)
---
 .../internal/network/netty/ItConnectionManagerTest.java | 17 +++++++++++++++++
 .../network/netty/OutgoingAcknowledgementSilencer.java  |  5 +++++
 .../ignite/internal/network/netty/NettySender.java      |  8 ++++++++
 3 files changed, 30 insertions(+)

diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
index 65122b0481..a921dadfd0 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/ItConnectionManagerTest.java
@@ -388,6 +388,7 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
                 ConnectionManagerWrapper manager2 = startManager(4001)
         ) {
             NettySender sender = 
manager1.openChannelTo(manager2).toCompletableFuture().get(10, 
TimeUnit.SECONDS);
+            waitTillChannelAppearsInMapOnAcceptor(sender, manager1, manager2);
 
             OutgoingAcknowledgementSilencer ackSilencer = 
dropAcksFrom(manager2);
 
@@ -402,6 +403,22 @@ public class ItConnectionManagerTest extends 
BaseIgniteAbstractTest {
         }
     }
 
+    private static void waitTillChannelAppearsInMapOnAcceptor(
+            NettySender senderFromOpener,
+            ConnectionManagerWrapper opener,
+            ConnectionManagerWrapper acceptor
+    ) throws InterruptedException {
+        assertTrue(
+                waitForCondition(
+                        () -> 
acceptor.channels().values().stream().anyMatch(acceptorSender
+                                -> 
acceptorSender.consistentId().equals(opener.connectionManager.consistentId())
+                                        && acceptorSender.channelId() == 
senderFromOpener.channelId()),
+                        TimeUnit.SECONDS.toMillis(10)
+                ),
+                "Did not observe the sender appearing in the acceptor's sender 
map in time"
+        );
+    }
+
     @Test
     public void sendFuturesCompleteInSendOrder() throws Exception {
         try (
diff --git 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java
 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java
index 160a16263e..e18f8458ee 100644
--- 
a/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java
+++ 
b/modules/network/src/integrationTest/java/org/apache/ignite/internal/network/netty/OutgoingAcknowledgementSilencer.java
@@ -17,6 +17,9 @@
 
 package org.apache.ignite.internal.network.netty;
 
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.empty;
+import static org.hamcrest.Matchers.not;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
 import io.netty.channel.ChannelHandler.Sharable;
@@ -50,6 +53,8 @@ public class OutgoingAcknowledgementSilencer extends 
ChannelOutboundHandlerAdapt
      */
     public static OutgoingAcknowledgementSilencer 
installOn(Collection<NettySender> senders)
             throws InterruptedException {
+        assertThat(senders, not(empty()));
+
         OutgoingAcknowledgementSilencer ackSilencer = new 
OutgoingAcknowledgementSilencer(senders.size());
 
         for (NettySender sender : senders) {
diff --git 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
index 82c570d8d3..7bff673b6e 100644
--- 
a/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
+++ 
b/modules/network/src/main/java/org/apache/ignite/internal/network/netty/NettySender.java
@@ -30,6 +30,8 @@ import 
org.apache.ignite.internal.network.NettyBootstrapFactory;
 import org.apache.ignite.internal.network.OutNetworkObject;
 import org.apache.ignite.internal.network.direct.DirectMessageWriter;
 import org.apache.ignite.internal.network.recovery.RecoveryDescriptor;
+import org.apache.ignite.internal.tostring.IgniteToStringExclude;
+import org.apache.ignite.internal.tostring.S;
 import org.jetbrains.annotations.TestOnly;
 
 /**
@@ -49,6 +51,7 @@ public class NettySender {
 
     private final short channelId;
 
+    @IgniteToStringExclude
     private final RecoveryDescriptor recoveryDescriptor;
 
     /**
@@ -234,4 +237,9 @@ public class NettySender {
     public RecoveryDescriptor recoveryDescriptor() {
         return recoveryDescriptor;
     }
+
+    @Override
+    public String toString() {
+        return S.toString(this);
+    }
 }

Reply via email to