Repository: flink
Updated Branches:
  refs/heads/master fd324ea72 -> cf3ae88b7


[FLINK-3369] [runtime] Make RemoteTransportException instance of 
CancelTaskException

Problem: RemoteTransportException (RTE) is thrown on data transfer failures
when the remote data producer fails. Because RTE is an instance of IOException,
it can happen that the RTE is reported as the root job failure cause.

Solution: Make RTE instance of CancelTaskException, leading to cancellation of
the task and not failure.

Squashes the following commit:

[pr-comments] Add remote address to RemoteTransportException

This closes #1621.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/cf3ae88b
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/cf3ae88b
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/cf3ae88b

Branch: refs/heads/master
Commit: cf3ae88b73e30a2d69ac1cc6009a8304ea3f53cc
Parents: fd324ea
Author: Ufuk Celebi <[email protected]>
Authored: Wed Feb 10 19:51:20 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Thu Feb 11 14:39:40 2016 +0100

----------------------------------------------------------------------
 .../runtime/execution/CancelTaskException.java  |  4 ++
 .../network/netty/PartitionRequestClient.java   |  6 +--
 .../netty/PartitionRequestClientFactory.java    |  4 +-
 .../netty/PartitionRequestClientHandler.java    | 17 +++-----
 .../exception/LocalTransportException.java      | 22 +++++++---
 .../exception/RemoteTransportException.java     | 36 +++++++++++++---
 .../netty/exception/TransportException.java     | 43 --------------------
 7 files changed, 62 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
index 3bcbe2e..ebf58ec 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/execution/CancelTaskException.java
@@ -34,6 +34,10 @@ public class CancelTaskException extends RuntimeException {
                super(msg);
        }
 
+       public CancelTaskException(String msg, Throwable cause) {
+               super(msg, cause);
+       }
+
        public CancelTaskException() {
                super();
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
index f6120d4..fb24a8e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClient.java
@@ -114,7 +114,7 @@ public class PartitionRequestClient {
                                        inputChannel.onError(
                                                        new 
LocalTransportException(
                                                                        
"Sending the partition request failed.",
-                                                                       
future.channel().localAddress(), future.cause()
+                                                                       
future.cause()
                                                        ));
                                }
                        }
@@ -158,7 +158,7 @@ public class PartitionRequestClient {
                                                                if 
(!future.isSuccess()) {
                                                                        
inputChannel.onError(new LocalTransportException(
                                                                                
        "Sending the task event failed.",
-                                                                               
        future.channel().localAddress(), future.cause()
+                                                                               
        future.cause()
                                                                        ));
                                                                }
                                                        }
@@ -185,7 +185,7 @@ public class PartitionRequestClient {
 
        private void checkNotClosed() throws IOException {
                if (closeReferenceCounter.isDisposed()) {
-                       throw new LocalTransportException("Channel closed.", 
tcpChannel.localAddress());
+                       throw new LocalTransportException("Channel " + 
tcpChannel.localAddress() + "closed.");
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
index 040a8ef..8eae035 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientFactory.java
@@ -216,12 +216,12 @@ class PartitionRequestClientFactory {
                                                "Connecting to remote task 
manager + '" + connectionId.getAddress() +
                                                                "' has failed. 
This might indicate that the remote task " +
                                                                "manager has 
been lost.",
-                                               connectionId.getAddress(), 
future.cause()));
+                                               future.cause(), 
connectionId.getAddress()));
                        }
                        else {
                                notifyOfError(new LocalTransportException(
                                                "Connecting to remote task 
manager + '" + connectionId.getAddress() +
-                                                               "' has been 
cancelled.", null));
+                                                               "' has been 
cancelled."));
                        }
                }
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
index ee015c2..afcd881 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionRequestClientHandler.java
@@ -29,7 +29,6 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import 
org.apache.flink.runtime.io.network.netty.exception.LocalTransportException;
 import 
org.apache.flink.runtime.io.network.netty.exception.RemoteTransportException;
-import org.apache.flink.runtime.io.network.netty.exception.TransportException;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannelID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
@@ -133,27 +132,23 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
        @Override
        public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) 
throws Exception {
 
-               if (cause instanceof TransportException) {
+               if (cause instanceof LocalTransportException || cause 
instanceof RemoteTransportException) {
                        notifyAllChannelsOfErrorAndClose(cause);
                }
                else {
                        final SocketAddress remoteAddr = 
ctx.channel().remoteAddress();
 
-                       final TransportException tex;
-
                        // Improve on the connection reset by peer error message
                        if (cause instanceof IOException
                                        && 
cause.getMessage().equals("Connection reset by peer")) {
 
-                               tex = new RemoteTransportException(
+                               notifyAllChannelsOfErrorAndClose(new 
RemoteTransportException(
                                                "Lost connection to task 
manager '" + remoteAddr + "'. This indicates "
-                                                               + "that the 
remote task manager was lost.", remoteAddr, cause);
+                                                               + "that the 
remote task manager was lost.", cause, remoteAddr));
                        }
                        else {
-                               tex = new 
LocalTransportException(cause.getMessage(), ctx.channel().localAddress(), 
cause);
+                               notifyAllChannelsOfErrorAndClose(new 
LocalTransportException(cause.getMessage(), cause));
                        }
-
-                       notifyAllChannelsOfErrorAndClose(tex);
                }
        }
 
@@ -228,7 +223,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
                        if (error.isFatalError()) {
                                notifyAllChannelsOfErrorAndClose(new 
RemoteTransportException(
                                                "Fatal error at remote task 
manager '" + remoteAddr + "'.",
-                                               remoteAddr, error.cause));
+                                               error.cause, remoteAddr));
                        }
                        else {
                                RemoteInputChannel inputChannel = 
inputChannels.get(error.receiverId);
@@ -240,7 +235,7 @@ class PartitionRequestClientHandler extends 
ChannelInboundHandlerAdapter {
                                        else {
                                                inputChannel.onError(new 
RemoteTransportException(
                                                                "Error at 
remote task manager '" + remoteAddr + "'.",
-                                                                               
remoteAddr, error.cause));
+                                                                               
error.cause, remoteAddr));
                                        }
                                }
                        }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
index 37f6e53..7851953 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/LocalTransportException.java
@@ -18,17 +18,27 @@
 
 package org.apache.flink.runtime.io.network.netty.exception;
 
-import java.net.SocketAddress;
+import java.io.IOException;
 
-public class LocalTransportException extends TransportException {
+/**
+ * Exception thrown on local transport failures.
+ *
+ * <p>If you get this type of exception at task manager T, it means that
+ * something went wrong in the local network stack of task manager T.
+ */
+public class LocalTransportException extends IOException {
 
        private static final long serialVersionUID = 2366708881288640674L;
 
-       public LocalTransportException(String message, SocketAddress address) {
-               super(message, address);
+       public LocalTransportException() {
+               super();
+       }
+
+       public LocalTransportException(String message) {
+               super(message);
        }
 
-       public LocalTransportException(String message, SocketAddress address, 
Throwable cause) {
-               super(message, address, cause);
+       public LocalTransportException(String message, Throwable cause) {
+               super(message, cause);
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
index 5f81883..1a83a91 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/RemoteTransportException.java
@@ -18,17 +18,43 @@
 
 package org.apache.flink.runtime.io.network.netty.exception;
 
+import org.apache.flink.runtime.execution.CancelTaskException;
+
 import java.net.SocketAddress;
 
-public class RemoteTransportException extends TransportException {
+/**
+ * Exception thrown on remote transport failures.
+ *
+ * <p>If you get this type of exception at task manager T, it means that
+ * something went wrong at the network stack of another task manager (not T).
+ * It is not an issue at the task, which throws the Exception.
+ */
+public class RemoteTransportException extends CancelTaskException {
 
        private static final long serialVersionUID = 4373615529545893089L;
 
-       public RemoteTransportException(String message, SocketAddress address) {
-               super(message, address);
+       /** Address of the remote task manager that caused this Exception. */
+       private final SocketAddress remoteAddress;
+
+       public RemoteTransportException() {
+               this(null, null, null);
+       }
+
+       public RemoteTransportException(String msg, SocketAddress 
remoteAddress) {
+               this(msg, null, remoteAddress);
+       }
+
+       public RemoteTransportException(String msg, Throwable cause, 
SocketAddress remoteAddress) {
+               super(msg, cause);
+               this.remoteAddress = remoteAddress;
        }
 
-       public RemoteTransportException(String message, SocketAddress address, 
Throwable cause) {
-               super(message, address, cause);
+       /**
+        * Returns the address of the task manager causing this Exception.
+        *
+        * @return Address of the remote task manager causing this Exception
+        */
+       public SocketAddress getRemoteAddress() {
+               return remoteAddress;
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/cf3ae88b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
deleted file mode 100644
index 0438688..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/exception/TransportException.java
+++ /dev/null
@@ -1,43 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty.exception;
-
-import java.io.IOException;
-import java.net.SocketAddress;
-
-public abstract class TransportException extends IOException {
-
-       private static final long serialVersionUID = 3637820720589866570L;
-
-       private final SocketAddress address;
-
-       public TransportException(String message, SocketAddress address) {
-               this(message, address, null);
-       }
-
-       public TransportException(String message, SocketAddress address, 
Throwable cause) {
-               super(message, cause);
-
-               this.address = address;
-       }
-
-       public SocketAddress getAddress() {
-               return address;
-       }
-}

Reply via email to