This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 78327de  [FLINK-21952] Make all the "Connection reset by peer" 
exception wrapped as RemoteTransportException
78327de is described below

commit 78327def57451da7781bed98d434fce1033aa7d1
Author: Yun Gao <[email protected]>
AuthorDate: Fri Jun 18 15:35:24 2021 +0800

    [FLINK-21952] Make all the "Connection reset by peer" exception wrapped as 
RemoteTransportException
---
 .../CreditBasedPartitionRequestClientHandler.java  |  4 +-
 ...editBasedPartitionRequestClientHandlerTest.java | 51 ++++++++++++++++++++++
 2 files changed, 53 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
index a7ba2cc..3ea0cd9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandler.java
@@ -184,8 +184,8 @@ class CreditBasedPartitionRequestClientHandler extends 
ChannelInboundHandlerAdap
             final TransportException tex;
 
             // Improve on the connection reset by peer error message
-            if (cause instanceof IOException
-                    && cause.getMessage().equals("Connection reset by peer")) {
+            if (cause.getMessage() != null
+                    && cause.getMessage().contains("Connection reset by 
peer")) {
                 tex =
                         new RemoteTransportException(
                                 "Lost connection to task manager '"
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
index 9c10af5..13b76d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/netty/CreditBasedPartitionRequestClientHandlerTest.java
@@ -35,6 +35,9 @@ import 
org.apache.flink.runtime.io.network.netty.NettyMessage.BufferResponse;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.CloseRequest;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.ErrorResponse;
 import org.apache.flink.runtime.io.network.netty.NettyMessage.PartitionRequest;
+import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
+import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
+import org.apache.flink.runtime.io.network.netty.exception.TransportException;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannelBuilder;
@@ -48,8 +51,12 @@ import 
org.apache.flink.shaded.netty4.io.netty.buffer.ByteBuf;
 import org.apache.flink.shaded.netty4.io.netty.buffer.UnpooledByteBufAllocator;
 import org.apache.flink.shaded.netty4.io.netty.channel.Channel;
 import org.apache.flink.shaded.netty4.io.netty.channel.ChannelHandlerContext;
+import 
org.apache.flink.shaded.netty4.io.netty.channel.ChannelInboundHandlerAdapter;
 import 
org.apache.flink.shaded.netty4.io.netty.channel.embedded.EmbeddedChannel;
+import org.apache.flink.shaded.netty4.io.netty.channel.epoll.Epoll;
+import org.apache.flink.shaded.netty4.io.netty.channel.unix.Errors;
 
+import org.junit.Assume;
 import org.junit.Test;
 
 import java.io.IOException;
@@ -65,6 +72,7 @@ import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertSame;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.Matchers.any;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.never;
@@ -578,6 +586,49 @@ public class CreditBasedPartitionRequestClientHandlerTest {
         }
     }
 
+    @Test
+    public void testExceptionWrap() {
+        testExceptionWrap(LocalTransportException.class, new Exception());
+        testExceptionWrap(LocalTransportException.class, new Exception("some 
error"));
+        testExceptionWrap(
+                RemoteTransportException.class, new IOException("Connection 
reset by peer"));
+
+        // Only when Epoll is available the following exception could be 
initiated normally
+        // since it relies on the native strerror method.
+        Assume.assumeTrue(Epoll.isAvailable());
+        testExceptionWrap(
+                RemoteTransportException.class,
+                new Errors.NativeIoException("readAddress", 
Errors.ERRNO_ECONNRESET_NEGATIVE));
+    }
+
+    private void testExceptionWrap(
+            Class<? extends TransportException> expectedClass, Exception 
cause) {
+        CreditBasedPartitionRequestClientHandler handler =
+                new CreditBasedPartitionRequestClientHandler();
+        EmbeddedChannel embeddedChannel =
+                new EmbeddedChannel(
+                        // A test handler to trigger the exception.
+                        new ChannelInboundHandlerAdapter() {
+                            @Override
+                            public void channelRead(ChannelHandlerContext ctx, 
Object msg)
+                                    throws Exception {
+                                throw cause;
+                            }
+                        },
+                        handler);
+
+        embeddedChannel.writeInbound(1);
+        try {
+            handler.checkError();
+            fail(
+                    String.format(
+                            "The handler should wrap the exception %s as %s, 
but it does not.",
+                            cause, expectedClass));
+        } catch (IOException e) {
+            assertThat(e, instanceOf(expectedClass));
+        }
+    }
+
     private void testReadBufferResponseWithReleasingOrRemovingChannel(
             boolean isRemoved, boolean readBeforeReleasingOrRemoving) throws 
Exception {
 

Reply via email to