This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 4cb7624  [FLINK-24133][core] Network failure test replaced by comment 
in the code due to high expense of stabilizing such low important test
4cb7624 is described below

commit 4cb7624caf8805003a2c59c3ebb9c38200a834ce
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Tue Sep 14 17:33:46 2021 +0200

    [FLINK-24133][core] Network failure test replaced by comment in the code 
due to high expense of stabilizing such low important test
---
 .../network/netty/PartitionRequestClientFactory.java   |  3 +++
 .../netty/PartitionRequestClientFactoryTest.java       | 18 ------------------
 2 files changed, 3 insertions(+), 18 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index ff4c19c..8a91e8a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -131,6 +131,9 @@ class PartitionRequestClientFactory {
     private NettyPartitionRequestClient connect(ConnectionID connectionId)
             throws RemoteTransportException, InterruptedException {
         try {
+            // It's important to use `sync` here because it waits for this 
future until it is
+            // done, and rethrows the cause of the failure if this future 
failed. `await` only
+            // waits for this future to be completed, without throwing the 
error.
             Channel channel = 
nettyClient.connect(connectionId.getAddress()).sync().channel();
             NetworkClientHandler clientHandler = 
channel.pipeline().get(NetworkClientHandler.class);
             return new NettyPartitionRequestClient(channel, clientHandler, 
connectionId, this);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 54def34..884e66b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -210,24 +210,6 @@ public class PartitionRequestClientFactoryTest extends 
TestLogger {
         serverAndClient.server().shutdown();
     }
 
-    @Test(expected = RemoteTransportException.class)
-    public void testThrowsWhenNetworkFailure() throws Exception {
-        NettyTestUtil.NettyServerAndClient nettyServerAndClient = 
createNettyServerAndClient();
-        try {
-            NettyClient client = nettyServerAndClient.client();
-            PartitionRequestClientFactory factory = new 
PartitionRequestClientFactory(client, 0);
-
-            // Connect to a wrong address
-            InetSocketAddress addr =
-                    new InetSocketAddress(InetAddress.getLocalHost(), 
NetUtils.getAvailablePort());
-            ConnectionID connectionID = new ConnectionID(addr, 0);
-            factory.createPartitionRequestClient(connectionID);
-        } finally {
-            nettyServerAndClient.client().shutdown();
-            nettyServerAndClient.server().shutdown();
-        }
-    }
-
     private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() 
throws Exception {
         return NettyTestUtil.initServerAndClient(
                 new NettyProtocol(null, null) {

Reply via email to