This is an automated email from the ASF dual-hosted git repository.
pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 78327de [FLINK-21952] Make all the "Connection reset by peer"
exception wrapped as RemoteTransportException
78327de is described below
commit 78327def57451da7781bed98d434fce1033aa7d1
Author: Yun Gao <[email protected]>
AuthorDate: Fri Jun 18 15:35:24 2021 +0800
[FLINK-21952] Make all the "Connection reset by peer" exception wrapped as
RemoteTransportException
---
.../CreditBasedPartitionRequestClientHandler.java | 4 +-
...editBasedPartitionRequestClientHandlerTest.java | 51 ++++++++++++++++++++++
2 files changed, 53 insertions(+), 2 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index a7ba2cc..3ea0cd9 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -184,8 +184,8 @@ class CreditBasedPartitionRequestClientHandler extends
ChannelInboundHandlerAdap
final TransportException tex;
// Improve on the connection reset by peer error message
- if (cause instanceof IOException
- && cause.getMessage().equals("Connection reset by peer")) {
+ if (cause.getMessage() != null
+ && cause.getMessage().contains("Connection reset by
peer")) {
tex =
new RemoteTransportException(
"Lost connection to task manager '"
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 9c10af5..13b76d5 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -35,6 +35,9 @@ import
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
+import
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import org.apache.flink.runtime.io.network.netty.exception.TransportException;
import
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
import
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
@@ -48,8 +51,12 @@ import
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
import
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
+import org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors;
+import org.junit.Assume;
import org.junit.Test;
import java.io.IOException;
@@ -65,6 +72,7 @@ import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertSame;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -578,6 +586,49 @@ public class CreditBasedPartitionRequestClientHandlerTest {
}
}
+ @Test
+ public void testExceptionWrap() {
+ testExceptionWrap(LocalTransportException.class, new Exception());
+ testExceptionWrap(LocalTransportException.class, new Exception("some
error"));
+ testExceptionWrap(
+ RemoteTransportException.class, new IOException("Connection
reset by peer"));
+
+ // Only when Epoll is available the following exception could be
initiated normally
+ // since it relies on the native strerror method.
+ Assume.assumeTrue(Epoll.isAvailable());
+ testExceptionWrap(
+ RemoteTransportException.class,
+ new Errors.NativeIoException("readAddress",
Errors.ERRNO_ECONNRESET_NEGATIVE));
+ }
+
+ private void testExceptionWrap(
+ Class<? extends TransportException> expectedClass, Exception
cause) {
+ CreditBasedPartitionRequestClientHandler handler =
+ new CreditBasedPartitionRequestClientHandler();
+ EmbeddedChannel embeddedChannel =
+ new EmbeddedChannel(
+ // A test handler to trigger the exception.
+ new ChannelInboundHandlerAdapter() {
+ @Override
+ public void channelRead(ChannelHandlerContext ctx,
Object msg)
+ throws Exception {
+ throw cause;
+ }
+ },
+ handler);
+
+ embeddedChannel.writeInbound(1);
+ try {
+ handler.checkError();
+ fail(
+ String.format(
+ "The handler should wrap the exception %s as %s,
but it does not.",
+ cause, expectedClass));
+ } catch (IOException e) {
+ assertThat(e, instanceOf(expectedClass));
+ }
+ }
+
private void testReadBufferResponseWithReleasingOrRemovingChannel(
boolean isRemoved, boolean readBeforeReleasingOrRemoving) throws
Exception {