[FLINK-5275] [execgraph] Give more detailed error message if InputChannel 
deployment fails


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4410c04a
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4410c04a
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4410c04a

Branch: refs/heads/master
Commit: 4410c04a68c7b247bb3d7113e5f40f2a9c2165af
Parents: 555a687
Author: Ufuk Celebi <[email protected]>
Authored: Wed Dec 7 13:48:25 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Dec 7 17:14:00 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java          | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4410c04a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 9b3ce5f..9bf3bd5 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -131,8 +131,21 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
                        else if (allowLazyDeployment) {
                                // The producing task might not have registered 
the partition yet
                                partitionLocation = 
ResultPartitionLocation.createUnknown();
-                       } else {
-                               throw new ExecutionGraphException("Trying to 
eagerly schedule a task whose inputs are not ready.");
+                       }
+                       else if (producerState == ExecutionState.CANCELING
+                                               || producerState == 
ExecutionState.CANCELED
+                                               || producerState == 
ExecutionState.FAILED) {
+                               String msg = "Trying to schedule a task whose 
inputs were canceled or failed. " +
+                                       "The producer is in state " + 
producerState + ".";
+                               throw new ExecutionGraphException(msg);
+                       }
+                       else {
+                               String msg = String.format("Trying to eagerly 
schedule a task whose inputs " +
+                                       "are not ready (partition consumable? 
%s, producer state: %s, producer slot: %s).",
+                                               
consumedPartition.isConsumable(),
+                                               producerState,
+                                               producerSlot);
+                               throw new ExecutionGraphException(msg);
                        }
 
                        final ResultPartitionID consumedPartitionId = new 
ResultPartitionID(

Reply via email to