This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 5165205 [FLINK-24133][core] Network failure test replaced by comment
in the code due to high expense of stabilizing such low important test
5165205 is described below
commit 5165205ab6d8917c0c0ee7bf3bb7fb05930376ec
Author: Anton Kalashnikov <[email protected]>
AuthorDate: Tue Sep 14 17:33:46 2021 +0200
[FLINK-24133][core] Network failure test replaced by comment in the code
due to high expense of stabilizing such low important test
---
.../network/netty/PartitionRequestClientFactory.java | 3 +++
.../netty/PartitionRequestClientFactoryTest.java | 18 ------------------
2 files changed, 3 insertions(+), 18 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index ff4c19c..8a91e8a 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -131,6 +131,9 @@ class PartitionRequestClientFactory {
private NettyPartitionRequestClient connect(ConnectionID connectionId)
throws RemoteTransportException, InterruptedException {
try {
+ // It's important to use `sync` here because it waits for this
future until it is
+ // done, and rethrows the cause of the failure if this future
failed. `await` only
+ // waits for this future to be completed, without throwing the
error.
Channel channel =
nettyClient.connect(connectionId.getAddress()).sync().channel();
NetworkClientHandler clientHandler =
channel.pipeline().get(NetworkClientHandler.class);
return new NettyPartitionRequestClient(channel, clientHandler,
connectionId, this);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
index 54def34..884e66b 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactoryTest.java
@@ -210,24 +210,6 @@ public class PartitionRequestClientFactoryTest extends
TestLogger {
serverAndClient.server().shutdown();
}
- @Test(expected = RemoteTransportException.class)
- public void testThrowsWhenNetworkFailure() throws Exception {
- NettyTestUtil.NettyServerAndClient nettyServerAndClient =
createNettyServerAndClient();
- try {
- NettyClient client = nettyServerAndClient.client();
- PartitionRequestClientFactory factory = new
PartitionRequestClientFactory(client, 0);
-
- // Connect to a wrong address
- InetSocketAddress addr =
- new InetSocketAddress(InetAddress.getLocalHost(),
NetUtils.getAvailablePort());
- ConnectionID connectionID = new ConnectionID(addr, 0);
- factory.createPartitionRequestClient(connectionID);
- } finally {
- nettyServerAndClient.client().shutdown();
- nettyServerAndClient.server().shutdown();
- }
- }
-
private NettyTestUtil.NettyServerAndClient createNettyServerAndClient()
throws Exception {
return NettyTestUtil.initServerAndClient(
new NettyProtocol(null, null) {