Repository: flink
Updated Branches:
  refs/heads/release-1.1 75b48edd1 -> 4526005d2


[FLINK-5275] [execgraph] Give more detailed error message if InputChannel 
deployment fails


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4526005d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4526005d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4526005d

Branch: refs/heads/release-1.1
Commit: 4526005d29b697446a6d3a87b22fb0c33912713d
Parents: 1b472d2
Author: Ufuk Celebi <[email protected]>
Authored: Wed Dec 7 13:48:25 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Dec 7 17:13:01 2016 +0100

----------------------------------------------------------------------
 .../InputChannelDeploymentDescriptor.java          | 17 +++++++++++++++--
 1 file changed, 15 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/4526005d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
index 24b95ea..dc97bf3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java
@@ -123,8 +123,21 @@ public class InputChannelDeploymentDescriptor implements 
Serializable {
                        else if (allowLazyDeployment) {
                                // The producing task might not have registered 
the partition yet
                                partitionLocation = 
ResultPartitionLocation.createUnknown();
-                       } else {
-                               throw new IllegalStateException("Trying to 
eagerly schedule a task whose inputs are not ready.");
+                       }
+                       else if (producerState == ExecutionState.CANCELING
+                                               || producerState == 
ExecutionState.CANCELED
+                                               || producerState == 
ExecutionState.FAILED) {
+                               String msg = "Trying to schedule a task whose 
inputs were canceled or failed. " +
+                                       "The producer is in state " + 
producerState + ".";
+                               throw new IllegalStateException(msg);
+                       }
+                       else {
+                               String msg = String.format("Trying to eagerly 
schedule a task whose inputs " +
+                                       "are not ready (partition consumable? 
%s, producer state: %s, producer slot: %s).",
+                                               
consumedPartition.isConsumable(),
+                                               producerState,
+                                               producerSlot);
+                               throw new IllegalStateException(msg);
                        }
 
                        final ResultPartitionID consumedPartitionId = new 
ResultPartitionID(

Reply via email to