This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new d0d4071  [FLINK-23030][network] 
PartitionRequestClientFactory#createPartitionRequestClient should throw when 
network failure
d0d4071 is described below

commit d0d40718c1eaabc7b1280104218a1197a99896b9
Author: jinxing64 <[email protected]>
AuthorDate: Fri Jun 18 15:34:49 2021 +0800

    [FLINK-23030][network] 
PartitionRequestClientFactory#createPartitionRequestClient should throw when 
network failure
---
 .../netty/PartitionRequestClientFactory.java       |  2 +-
 .../netty/NeverCompletingChannelFuture.java        |  6 +-
 .../netty/PartitionRequestClientFactoryTest.java   | 88 +++++-----------------
 3 files changed, 23 insertions(+), 73 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 591e972..ff4c19c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -131,7 +131,7 @@ class PartitionRequestClientFactory {
     private NettyPartitionRequestClient connect(ConnectionID connectionId)
             throws RemoteTransportException, InterruptedException {
         try {
-            Channel channel = 
nettyClient.connect(connectionId.getAddress()).await().channel();
+            Channel channel = 
nettyClient.connect(connectionId.getAddress()).sync().channel();
             NetworkClientHandler clientHandler = 
channel.pipeline().get(NetworkClientHandler.class);
             return new NettyPartitionRequestClient(channel, clientHandler, 
connectionId, this);
         } catch (InterruptedException e) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java
index 1d7c81e..ad6772c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java
@@ -74,8 +74,10 @@ class NeverCompletingChannelFuture implements ChannelFuture {
     }
 
     @Override
-    public ChannelFuture sync() {
-        throw new UnsupportedOperationException();
+    public ChannelFuture sync() throws InterruptedException {
+        while (true) {
+            Thread.sleep(50);
+        }
     }
 
     @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 595b3e5..5542308 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -18,8 +18,6 @@
 
 package org.apache.flink.runtime.io.network.netty;
 
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.NetworkClientHandler;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -28,21 +26,15 @@ import org.apache.flink.util.NetUtils;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
 
 import org.junit.Test;
 
 import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
 import java.net.InetAddress;
 import java.net.InetSocketAddress;
-import java.net.SocketAddress;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
@@ -217,6 +209,24 @@ public class PartitionRequestClientFactoryTest {
         serverAndClient.server().shutdown();
     }
 
+    @Test(expected = RemoteTransportException.class)
+    public void testThrowsWhenNetworkFailure() throws Exception {
+        NettyTestUtil.NettyServerAndClient nettyServerAndClient = 
createNettyServerAndClient();
+        try {
+            NettyClient client = nettyServerAndClient.client();
+            PartitionRequestClientFactory factory = new 
PartitionRequestClientFactory(client, 0);
+
+            // Connect to a wrong address
+            InetSocketAddress addr =
+                    new InetSocketAddress(InetAddress.getLocalHost(), 
NetUtils.getAvailablePort());
+            ConnectionID connectionID = new ConnectionID(addr, 0);
+            factory.createPartitionRequestClient(connectionID);
+        } finally {
+            nettyServerAndClient.client().shutdown();
+            nettyServerAndClient.server().shutdown();
+        }
+    }
+
     private NettyTestUtil.NettyServerAndClient createNettyServerAndClient() 
throws Exception {
         return NettyTestUtil.initServerAndClient(
                 new NettyProtocol(null, null) {
@@ -289,66 +299,4 @@ public class PartitionRequestClientFactoryTest {
             }
         }
     }
-
-    private static class CountDownLatchOnConnectHandler extends 
ChannelOutboundHandlerAdapter {
-
-        private final CountDownLatch syncOnConnect;
-
-        public CountDownLatchOnConnectHandler(CountDownLatch syncOnConnect) {
-            this.syncOnConnect = syncOnConnect;
-        }
-
-        @Override
-        public void connect(
-                ChannelHandlerContext ctx,
-                SocketAddress remoteAddress,
-                SocketAddress localAddress,
-                ChannelPromise promise) {
-            syncOnConnect.countDown();
-        }
-    }
-
-    private static class UncaughtTestExceptionHandler implements 
UncaughtExceptionHandler {
-
-        private final List<Throwable> errors = new ArrayList<>(1);
-
-        @Override
-        public void uncaughtException(Thread t, Throwable e) {
-            errors.add(e);
-        }
-
-        private List<Throwable> getErrors() {
-            return errors;
-        }
-    }
-
-    // ------------------------------------------------------------------------
-
-    private static Tuple2<NettyServer, NettyClient> createNettyServerAndClient(
-            NettyProtocol protocol) throws IOException {
-        final NettyConfig config =
-                new NettyConfig(
-                        InetAddress.getLocalHost(), SERVER_PORT, 32 * 1024, 1, 
new Configuration());
-
-        final NettyServer server = new NettyServer(config);
-        final NettyClient client = new NettyClient(config);
-
-        boolean success = false;
-
-        try {
-            NettyBufferPool bufferPool = new NettyBufferPool(1);
-
-            server.init(protocol, bufferPool);
-            client.init(protocol, bufferPool);
-
-            success = true;
-        } finally {
-            if (!success) {
-                server.shutdown();
-                client.shutdown();
-            }
-        }
-
-        return new Tuple2<>(server, client);
-    }
 }

Reply via email to