This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f46cfec [FLINK-23030][network]
PartitionRequestClientFactory#createPartitionRequestClient should throw when
network failure
f46cfec is described below
commit f46cfecf027feb025d7fad580fc76eb0b89970ba
Author: jinxing64 <[email protected]>
AuthorDate: Fri Jun 18 15:34:49 2021 +0800
[FLINK-23030][network]
PartitionRequestClientFactory#createPartitionRequestClient should throw when
network failure
---
.../netty/PartitionRequestClientFactory.java | 2 +-
.../netty/NeverCompletingChannelFuture.java | 6 +-
.../netty/PartitionRequestClientFactoryTest.java | 88 +++++-----------------
3 files changed, 23 insertions(+), 73 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 591e972..ff4c19c 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -131,7 +131,7 @@ class PartitionRequestClientFactory {
private NettyPartitionRequestClient connect(ConnectionID connectionId)
throws RemoteTransportException, InterruptedException {
try {
- Channel channel =
nettyClient.connect(connectionId.getAddress()).await().channel();
+ Channel channel =
nettyClient.connect(connectionId.getAddress()).sync().channel();
NetworkClientHandler clientHandler =
channel.pipeline().get(NetworkClientHandler.class);
return new NettyPartitionRequestClient(channel, clientHandler,
connectionId, this);
} catch (InterruptedException e) {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java
index 1d7c81e..ad6772c 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/NeverCompletingChannelFuture.java
@@ -74,8 +74,10 @@ class NeverCompletingChannelFuture implements ChannelFuture {
}
@Override
- public ChannelFuture sync() {
- throw new UnsupportedOperationException();
+ public ChannelFuture sync() throws InterruptedException {
+ while (true) {
+ Thread.sleep(50);
+ }
}
@Override
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 595b3e5..5542308 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -18,8 +18,6 @@
package org.apache.flink.runtime.io.network.netty;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.io.network.ConnectionID;
import org.apache.flink.runtime.io.network.NetworkClientHandler;
import
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
@@ -28,21 +26,15 @@ import org.apache.flink.util.NetUtils;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelException;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelFuture;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandler;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
-import
org.apache.flink.shaded.netty4.io.netty.channel.ChannelOutboundHandlerAdapter;
-import org.apache.flink.shaded.netty4.io.netty.channel.ChannelPromise;
import org.junit.Test;
import java.io.IOException;
-import java.lang.Thread.UncaughtExceptionHandler;
import java.net.InetAddress;
import java.net.InetSocketAddress;
-import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
@@ -217,6 +209,24 @@ public class PartitionRequestClientFactoryTest {
serverAndClient.server().shutdown();
}
+ @Test(expected = RemoteTransportException.class)
+ public void testThrowsWhenNetworkFailure() throws Exception {
+ NettyTestUtil.NettyServerAndClient nettyServerAndClient =
createNettyServerAndClient();
+ try {
+ NettyClient client = nettyServerAndClient.client();
+ PartitionRequestClientFactory factory = new
PartitionRequestClientFactory(client, 0);
+
+ // Connect to a wrong address
+ InetSocketAddress addr =
+ new InetSocketAddress(InetAddress.getLocalHost(),
NetUtils.getAvailablePort());
+ ConnectionID connectionID = new ConnectionID(addr, 0);
+ factory.createPartitionRequestClient(connectionID);
+ } finally {
+ nettyServerAndClient.client().shutdown();
+ nettyServerAndClient.server().shutdown();
+ }
+ }
+
private NettyTestUtil.NettyServerAndClient createNettyServerAndClient()
throws Exception {
return NettyTestUtil.initServerAndClient(
new NettyProtocol(null, null) {
@@ -289,66 +299,4 @@ public class PartitionRequestClientFactoryTest {
}
}
}
-
- private static class CountDownLatchOnConnectHandler extends
ChannelOutboundHandlerAdapter {
-
- private final CountDownLatch syncOnConnect;
-
- public CountDownLatchOnConnectHandler(CountDownLatch syncOnConnect) {
- this.syncOnConnect = syncOnConnect;
- }
-
- @Override
- public void connect(
- ChannelHandlerContext ctx,
- SocketAddress remoteAddress,
- SocketAddress localAddress,
- ChannelPromise promise) {
- syncOnConnect.countDown();
- }
- }
-
- private static class UncaughtTestExceptionHandler implements
UncaughtExceptionHandler {
-
- private final List<Throwable> errors = new ArrayList<>(1);
-
- @Override
- public void uncaughtException(Thread t, Throwable e) {
- errors.add(e);
- }
-
- private List<Throwable> getErrors() {
- return errors;
- }
- }
-
- // ------------------------------------------------------------------------
-
- private static Tuple2<NettyServer, NettyClient> createNettyServerAndClient(
- NettyProtocol protocol) throws IOException {
- final NettyConfig config =
- new NettyConfig(
- InetAddress.getLocalHost(), SERVER_PORT, 32 * 1024, 1,
new Configuration());
-
- final NettyServer server = new NettyServer(config);
- final NettyClient client = new NettyClient(config);
-
- boolean success = false;
-
- try {
- NettyBufferPool bufferPool = new NettyBufferPool(1);
-
- server.init(protocol, bufferPool);
- client.init(protocol, bufferPool);
-
- success = true;
- } finally {
- if (!success) {
- server.shutdown();
- client.shutdown();
- }
- }
-
- return new Tuple2<>(server, client);
- }
}