Repository: flink Updated Branches: refs/heads/release-1.1 75b48edd1 -> 4526005d2
[FLINK-5275] [execgraph] Give more detailed error message if InputChannel deployment fails Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/4526005d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/4526005d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/4526005d Branch: refs/heads/release-1.1 Commit: 4526005d29b697446a6d3a87b22fb0c33912713d Parents: 1b472d2 Author: Ufuk Celebi <[email protected]> Authored: Wed Dec 7 13:48:25 2016 +0100 Committer: Ufuk Celebi <[email protected]> Committed: Wed Dec 7 17:13:01 2016 +0100 ---------------------------------------------------------------------- .../InputChannelDeploymentDescriptor.java | 17 +++++++++++++++-- 1 file changed, 15 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/4526005d/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java index 24b95ea..dc97bf3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputChannelDeploymentDescriptor.java @@ -123,8 +123,21 @@ public class InputChannelDeploymentDescriptor implements Serializable { else if (allowLazyDeployment) { // The producing task might not have registered the partition yet partitionLocation = ResultPartitionLocation.createUnknown(); - } else { - throw new IllegalStateException("Trying to eagerly schedule a task whose inputs are not ready."); + } + else if (producerState == ExecutionState.CANCELING + || producerState == ExecutionState.CANCELED + || producerState == ExecutionState.FAILED) { + String msg = "Trying to schedule a task whose inputs were canceled or failed. " + + "The producer is in state " + producerState + "."; + throw new IllegalStateException(msg); + } + else { + String msg = String.format("Trying to eagerly schedule a task whose inputs " + + "are not ready (partition consumable? %s, producer state: %s, producer slot: %s).", + consumedPartition.isConsumable(), + producerState, + producerSlot); + throw new IllegalStateException(msg); } final ResultPartitionID consumedPartitionId = new ResultPartitionID(
