[FLINK-5114] [network] Handle partition producer state check for unregistered 
executions

This closes #2912.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/a078666d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/a078666d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/a078666d

Branch: refs/heads/master
Commit: a078666d42ab4dae01dedaa50d55343ce141fcb8
Parents: 47db9cb
Author: Ufuk Celebi <[email protected]>
Authored: Tue Nov 22 16:15:04 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Mon Dec 12 17:39:16 2016 +0100

----------------------------------------------------------------------
 .../executiongraph/IntermediateResult.java      |  37 ++-
 .../runtime/io/network/PartitionState.java      |  64 -----
 .../netty/PartitionProducerStateChecker.java    |  52 +++++
 .../io/network/netty/PartitionStateChecker.java |  35 ---
 .../partition/consumer/SingleInputGate.java     |  13 +-
 .../PartitionProducerDisposedException.java     |  36 +++
 ...torGatewayPartitionProducerStateChecker.java |  66 ++++++
 .../ActorGatewayPartitionStateChecker.java      |  67 ------
 .../apache/flink/runtime/taskmanager/Task.java  | 121 ++++++----
 .../flink/runtime/taskmanager/TaskActions.java  |  20 +-
 .../flink/runtime/jobmanager/JobManager.scala   |  70 ++++--
 .../runtime/messages/JobManagerMessages.scala   |  24 +-
 .../flink/runtime/taskmanager/TaskManager.scala |   6 +-
 .../partition/InputGateConcurrentTest.java      |   4 -
 .../partition/InputGateFairnessTest.java        |   8 +-
 .../consumer/LocalInputChannelTest.java         |   3 -
 .../partition/consumer/SingleInputGateTest.java |   6 +-
 .../partition/consumer/TestSingleInputGate.java |   2 -
 .../partition/consumer/UnionInputGateTest.java  |   5 +-
 .../runtime/jobmanager/JobManagerTest.java      | 234 ++++++++++++++++---
 .../runtime/taskmanager/TaskAsyncCallTest.java  |   6 +-
 .../runtime/taskmanager/TaskManagerTest.java    |  14 +-
 .../flink/runtime/taskmanager/TaskStopTest.java |   4 +-
 .../flink/runtime/taskmanager/TaskTest.java     | 148 +++++++++++-
 .../runtime/tasks/BlockingCheckpointsTest.java  |   7 +-
 .../tasks/InterruptSensitiveRestoreTest.java    |   4 +-
 .../streaming/runtime/tasks/StreamTaskTest.java |   6 +-
 27 files changed, 698 insertions(+), 364 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index c2c19d1..313272c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -34,6 +36,14 @@ public class IntermediateResult {
 
        private final IntermediateResultPartition[] partitions;
 
+       /**
+        * Maps intermediate result partition IDs to a partition index. This is
+        * used for ID lookups of intermediate results. I didn't dare to change 
the
+        * partition connect logic in other places that is tightly coupled to 
the
+        * partitions being held as an array.
+        */
+       private final HashMap<IntermediateResultPartitionID, Integer> 
partitionLookupHelper = new HashMap<>();
+
        private final int numParallelProducers;
 
        private final AtomicInteger numberOfRunningProducers;
@@ -54,10 +64,12 @@ public class IntermediateResult {
 
                this.id = checkNotNull(id);
                this.producer = checkNotNull(producer);
-               this.partitions = new 
IntermediateResultPartition[numParallelProducers];
+
                checkArgument(numParallelProducers >= 1);
                this.numParallelProducers = numParallelProducers;
 
+               this.partitions = new 
IntermediateResultPartition[numParallelProducers];
+
                this.numberOfRunningProducers = new 
AtomicInteger(numParallelProducers);
 
                // we do not set the intermediate result partitions here, 
because we let them be initialized by
@@ -80,6 +92,7 @@ public class IntermediateResult {
                }
 
                partitions[partitionNumber] = partition;
+               partitionLookupHelper.put(partition.getPartitionId(), 
partitionNumber);
                partitionsAssigned++;
        }
 
@@ -95,6 +108,28 @@ public class IntermediateResult {
                return partitions;
        }
 
+       /**
+        * Returns the partition with the given ID.
+        *
+        * @param resultPartitionId ID of the partition to look up
+        * @throws NullPointerException If partition ID <code>null</code>
+        * @throws IllegalArgumentException Thrown if unknown partition ID
+        * @return Intermediate result partition with the given ID
+        */
+       public IntermediateResultPartition 
getPartitionById(IntermediateResultPartitionID resultPartitionId) {
+               // Looks ups the partition number via the helper map and 
returns the
+               // partition. Currently, this happens infrequently enough that 
we could
+               // consider removing the map and scanning the partitions on 
every lookup.
+               // The lookup (currently) only happen when the producer of an 
intermediate
+               // result cannot be found via its registered execution.
+               Integer partitionNumber = 
partitionLookupHelper.get(checkNotNull(resultPartitionId, 
"IntermediateResultPartitionID"));
+               if (partitionNumber != null) {
+                       return partitions[partitionNumber];
+               } else {
+                       throw new IllegalArgumentException("Unknown 
intermediate result partition ID " + resultPartitionId);
+               }
+       }
+
        public int getNumberOfAssignedPartitions() {
                return partitionsAssigned;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
deleted file mode 100644
index 59357fc..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/PartitionState.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network;
-
-import org.apache.flink.runtime.execution.ExecutionState;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.util.Preconditions;
-
-import javax.annotation.Nullable;
-import java.io.Serializable;
-
-/**
- * Contains information about the state of a result partition.
- */
-public class PartitionState implements Serializable {
-
-       private static final long serialVersionUID = -4693651272083825031L;
-
-       private final IntermediateDataSetID intermediateDataSetID;
-       private final IntermediateResultPartitionID 
intermediateResultPartitionID;
-       private final ExecutionState executionState;
-
-       public PartitionState(
-               IntermediateDataSetID intermediateDataSetID,
-               IntermediateResultPartitionID intermediateResultPartitionID,
-               @Nullable ExecutionState executionState) {
-
-               this.intermediateDataSetID = 
Preconditions.checkNotNull(intermediateDataSetID);
-               this.intermediateResultPartitionID = 
Preconditions.checkNotNull(intermediateResultPartitionID);
-               this.executionState = executionState;
-       }
-
-       public IntermediateDataSetID getIntermediateDataSetID() {
-               return intermediateDataSetID;
-       }
-
-       public IntermediateResultPartitionID getIntermediateResultPartitionID() 
{
-               return intermediateResultPartitionID;
-       }
-
-       /**
-        * Returns the execution state of the partition producer or 
<code>null</code> if it is not available.
-        */
-       public ExecutionState getExecutionState() {
-               return executionState;
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
new file mode 100644
index 0000000..d0b7e1e
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+/**
+ * Intermediate partition state checker to query the JobManager about the state
+ * of the producer of a result partition.
+ *
+ * <p>These checks are triggered when a partition request is answered with a
+ * PartitionNotFound event. This usually happens when the producer of that
+ * partition has not registered itself with the network stack or terminated.
+ */
+public interface PartitionProducerStateChecker {
+
+       /**
+        * Requests the execution state of the execution producing a result 
partition.
+        *
+        * @param jobId ID of the job the partition belongs to.
+        * @param intermediateDataSetId ID of the parent intermediate data set.
+        * @param resultPartitionId ID of the result partition to check. This
+        * identifies the producing execution and partition.
+        *
+        * @return Future holding the execution state of the producing 
execution.
+        */
+       Future<ExecutionState> requestPartitionProducerState(
+                       JobID jobId,
+                       IntermediateDataSetID intermediateDataSetId,
+                       ResultPartitionID resultPartitionId);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
deleted file mode 100644
index 949f426..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.PartitionState;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
-public interface PartitionStateChecker {
-       Future<PartitionState> requestPartitionState(
-                       JobID jobId,
-                       ExecutionAttemptID executionId,
-                       IntermediateDataSetID resultId,
-                       ResultPartitionID partitionId);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index b4d8d2c..d546559 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -108,9 +108,6 @@ public class SingleInputGate implements InputGate {
        /** The job ID of the owning task. */
        private final JobID jobId;
 
-       /** The execution attempt ID of the owning task. */
-       private final ExecutionAttemptID executionId;
-
        /**
         * The ID of the consumed intermediate result. Each input gate consumes 
partitions of the
         * intermediate result specified by this ID. This ID also identifies 
the input gate at the
@@ -168,7 +165,6 @@ public class SingleInputGate implements InputGate {
        public SingleInputGate(
                String owningTaskName,
                JobID jobId,
-               ExecutionAttemptID executionId,
                IntermediateDataSetID consumedResultId,
                int consumedSubpartitionIndex,
                int numberOfInputChannels,
@@ -177,7 +173,6 @@ public class SingleInputGate implements InputGate {
 
                this.owningTaskName = checkNotNull(owningTaskName);
                this.jobId = checkNotNull(jobId);
-               this.executionId = checkNotNull(executionId);
 
                this.consumedResultId = checkNotNull(consumedResultId);
 
@@ -530,11 +525,7 @@ public class SingleInputGate implements InputGate {
        }
 
        void triggerPartitionStateCheck(ResultPartitionID partitionId) {
-               taskActions.triggerPartitionStateCheck(
-                       jobId,
-                       executionId,
-                       consumedResultId,
-                       partitionId);
+               taskActions.triggerPartitionProducerStateCheck(jobId, 
consumedResultId, partitionId);
        }
 
        private void queueChannel(InputChannel channel) {
@@ -587,7 +578,7 @@ public class SingleInputGate implements InputGate {
                final InputChannelDeploymentDescriptor[] icdd = 
checkNotNull(igdd.getInputChannelDeploymentDescriptors());
 
                final SingleInputGate inputGate = new SingleInputGate(
-                       owningTaskName, jobId, executionId, consumedResultId, 
consumedSubpartitionIndex,
+                       owningTaskName, jobId, consumedResultId, 
consumedSubpartitionIndex,
                        icdd.length, taskActions, metrics);
 
                // Create the input channels. There is one input channel for 
each consumed partition.

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
new file mode 100644
index 0000000..12f2433
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
+
+/**
+ * Exception returned to a TaskManager on {@link RequestPartitionProducerState}
+ * if the producer of a partition has been disposed.
+ */
+public class PartitionProducerDisposedException extends Exception {
+
+       public PartitionProducerDisposedException(ResultPartitionID 
resultPartitionID) {
+               super(String.format("Execution %s producing partition %s has 
already been disposed.",
+                       resultPartitionID.getProducerId(),
+                       resultPartitionID.getPartitionId()));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
new file mode 100644
index 0000000..5c229a9
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionProducerStateChecker.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.taskmanager;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.concurrent.Future;
+import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
+import org.apache.flink.runtime.execution.ExecutionState;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.messages.JobManagerMessages;
+import org.apache.flink.util.Preconditions;
+import scala.concurrent.duration.FiniteDuration;
+import scala.reflect.ClassTag$;
+
+/**
+ * This implementation uses {@link ActorGateway} to trigger the partition 
state check at the job
+ * manager.
+ */
+public class ActorGatewayPartitionProducerStateChecker implements 
PartitionProducerStateChecker {
+
+       private final ActorGateway jobManager;
+       private final FiniteDuration timeout;
+
+       public ActorGatewayPartitionProducerStateChecker(ActorGateway 
jobManager, FiniteDuration timeout) {
+               this.jobManager = Preconditions.checkNotNull(jobManager);
+               this.timeout = Preconditions.checkNotNull(timeout);
+       }
+
+       @Override
+       public Future<ExecutionState> requestPartitionProducerState(
+                       JobID jobId,
+                       IntermediateDataSetID intermediateDataSetId,
+                       ResultPartitionID resultPartitionId) {
+
+               JobManagerMessages.RequestPartitionProducerState msg = new 
JobManagerMessages.RequestPartitionProducerState(
+                       jobId,
+                       intermediateDataSetId, resultPartitionId
+               );
+
+               scala.concurrent.Future<ExecutionState> futureResponse = 
jobManager
+                       .ask(msg, timeout)
+                       
.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
+
+               return new FlinkFuture<>(futureResponse);
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
deleted file mode 100644
index efa6ec3..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/ActorGatewayPartitionStateChecker.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.taskmanager;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.concurrent.Future;
-import org.apache.flink.runtime.concurrent.impl.FlinkFuture;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.instance.ActorGateway;
-import org.apache.flink.runtime.io.network.PartitionState;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.messages.JobManagerMessages;
-import org.apache.flink.util.Preconditions;
-import scala.concurrent.duration.FiniteDuration;
-import scala.reflect.ClassTag$;
-
-/**
- * This implementation uses {@link ActorGateway} to trigger the partition 
state check at the job
- * manager.
- */
-public class ActorGatewayPartitionStateChecker implements 
PartitionStateChecker {
-
-       private final ActorGateway jobManager;
-       private final FiniteDuration timeout;
-
-       public ActorGatewayPartitionStateChecker(ActorGateway jobManager, 
FiniteDuration timeout) {
-               this.jobManager = Preconditions.checkNotNull(jobManager);
-               this.timeout = Preconditions.checkNotNull(timeout);
-       }
-
-       @Override
-       public Future<PartitionState> requestPartitionState(
-                       JobID jobId,
-                       ExecutionAttemptID executionAttemptId,
-                       IntermediateDataSetID resultId,
-                       ResultPartitionID partitionId) {
-               JobManagerMessages.RequestPartitionState msg = new 
JobManagerMessages.RequestPartitionState(
-                       jobId,
-                       partitionId,
-                       executionAttemptId,
-                       resultId);
-
-               scala.concurrent.Future<PartitionState> futureResponse = 
jobManager
-                       .ask(msg, timeout)
-                       
.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
-
-               return new FlinkFuture<>(futureResponse);
-       }
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
index 184c3b1..a1fb35e 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java
@@ -46,20 +46,19 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
 import org.apache.flink.runtime.jobgraph.tasks.StatefulTask;
 import org.apache.flink.runtime.jobgraph.tasks.StoppableTask;
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException;
 import org.apache.flink.runtime.memory.MemoryManager;
 import org.apache.flink.runtime.metrics.groups.TaskMetricGroup;
 import org.apache.flink.runtime.query.TaskKvStateRegistry;
@@ -206,8 +205,8 @@ public class Task implements Runnable, TaskActions {
        /** Parent group for all metrics of this task */
        private final TaskMetricGroup metrics;
 
-       /** Partition state checker to request partition states from */
-       private final PartitionStateChecker partitionStateChecker;
+       /** Partition producer state checker to request partition states from */
+       private final PartitionProducerStateChecker 
partitionProducerStateChecker;
 
        /** Executor to run future callbacks */
        private final Executor executor;
@@ -271,7 +270,7 @@ public class Task implements Runnable, TaskActions {
                TaskManagerRuntimeInfo taskManagerConfig,
                TaskMetricGroup metricGroup,
                ResultPartitionConsumableNotifier 
resultPartitionConsumableNotifier,
-               PartitionStateChecker partitionStateChecker,
+               PartitionProducerStateChecker partitionProducerStateChecker,
                Executor executor) {
 
                Preconditions.checkNotNull(jobInformation);
@@ -321,7 +320,7 @@ public class Task implements Runnable, TaskActions {
                this.taskExecutionStateListeners = new CopyOnWriteArrayList<>();
                this.metrics = metricGroup;
 
-               this.partitionStateChecker = 
Preconditions.checkNotNull(partitionStateChecker);
+               this.partitionProducerStateChecker = 
Preconditions.checkNotNull(partitionProducerStateChecker);
                this.executor = Preconditions.checkNotNull(executor);
 
                // create the reader and writer structures
@@ -1036,32 +1035,37 @@ public class Task implements Runnable, TaskActions {
        // 
------------------------------------------------------------------------
 
        @Override
-       public void triggerPartitionStateCheck(
+       public void triggerPartitionProducerStateCheck(
                JobID jobId,
-               ExecutionAttemptID executionId,
-               final IntermediateDataSetID resultId,
-               final ResultPartitionID partitionId) {
-               org.apache.flink.runtime.concurrent.Future<PartitionState> 
futurePartitionState = partitionStateChecker.requestPartitionState(
-                       jobId,
-                       executionId,
-                       resultId,
-                       partitionId);
-
-               futurePartitionState.handleAsync(new BiFunction<PartitionState, 
Throwable, Void>() {
+               final IntermediateDataSetID intermediateDataSetId,
+               final ResultPartitionID resultPartitionId) {
+
+               org.apache.flink.runtime.concurrent.Future<ExecutionState> 
futurePartitionState =
+                       
partitionProducerStateChecker.requestPartitionProducerState(
+                               jobId,
+                               intermediateDataSetId,
+                               resultPartitionId);
+
+               futurePartitionState.handleAsync(new BiFunction<ExecutionState, 
Throwable, Void>() {
                        @Override
-                       public Void apply(PartitionState partitionState, 
Throwable throwable) {
+                       public Void apply(ExecutionState executionState, 
Throwable throwable) {
                                try {
-                                       if (partitionState != null) {
+                                       if (executionState != null) {
                                                onPartitionStateUpdate(
-                                                       
partitionState.getIntermediateDataSetID(),
-                                                       
partitionState.getIntermediateResultPartitionID(),
-                                                       
partitionState.getExecutionState());
+                                                       intermediateDataSetId,
+                                                       resultPartitionId,
+                                                       executionState);
                                        } else if (throwable instanceof 
TimeoutException) {
                                                // our request timed out, 
assume we're still running and try again
                                                onPartitionStateUpdate(
-                                                       resultId,
-                                                       
partitionId.getPartitionId(),
+                                                       intermediateDataSetId,
+                                                       resultPartitionId,
                                                        ExecutionState.RUNNING);
+                                       } else if (throwable instanceof 
PartitionProducerDisposedException) {
+                                               String msg = 
String.format("Producer {} of partition {} disposed. Cancelling execution.",
+                                                       
resultPartitionId.getProducerId(), resultPartitionId.getPartitionId());
+                                               LOG.info(msg, throwable);
+                                               cancelExecution();
                                        } else {
                                                failExternally(throwable);
                                        }
@@ -1183,41 +1187,58 @@ public class Task implements Runnable, TaskActions {
        /**
         * Answer to a partition state check issued after a failed partition 
request.
         */
-       public void onPartitionStateUpdate(
-                       IntermediateDataSetID resultId,
-                       IntermediateResultPartitionID partitionId,
-                       ExecutionState partitionState) throws IOException, 
InterruptedException {
+       @VisibleForTesting
+       void onPartitionStateUpdate(
+                       IntermediateDataSetID intermediateDataSetId,
+                       ResultPartitionID resultPartitionId,
+                       ExecutionState producerState) throws IOException, 
InterruptedException {
 
                if (executionState == ExecutionState.RUNNING) {
-                       final SingleInputGate inputGate = 
inputGatesById.get(resultId);
+                       final SingleInputGate inputGate = 
inputGatesById.get(intermediateDataSetId);
 
                        if (inputGate != null) {
-                               if (partitionState == ExecutionState.RUNNING ||
-                                       partitionState == 
ExecutionState.FINISHED ||
-                                       partitionState == 
ExecutionState.SCHEDULED ||
-                                       partitionState == 
ExecutionState.DEPLOYING) {
+                               if (producerState == ExecutionState.SCHEDULED
+                                       || producerState == 
ExecutionState.DEPLOYING
+                                       || producerState == 
ExecutionState.RUNNING
+                                       || producerState == 
ExecutionState.FINISHED) {
 
                                        // Retrigger the partition request
-                                       
inputGate.retriggerPartitionRequest(partitionId);
-                               }
-                               else if (partitionState == 
ExecutionState.CANCELED
-                                               || partitionState == 
ExecutionState.CANCELING
-                                               || partitionState == 
ExecutionState.FAILED) {
+                                       
inputGate.retriggerPartitionRequest(resultPartitionId.getPartitionId());
+
+                               } else if (producerState == 
ExecutionState.CANCELING
+                                       || producerState == 
ExecutionState.CANCELED
+                                       || producerState == 
ExecutionState.FAILED) {
+
+                                       // The producing execution has been 
canceled or failed. We
+                                       // don't need to re-trigger the request 
since it cannot
+                                       // succeed.
+                                       if (LOG.isDebugEnabled()) {
+                                               LOG.debug("Cancelling task {} 
after the producer of partition {} with attempt ID {} has entered state {}.",
+                                                       taskNameWithSubtask,
+                                                       
resultPartitionId.getPartitionId(),
+                                                       
resultPartitionId.getProducerId(),
+                                                       producerState);
+                                       }
 
                                        cancelExecution();
+                               } else {
+                                       // Any other execution state is 
unexpected. Currently, only
+                                       // state CREATED is left out of the 
checked states. If we
+                                       // see a producer in this state, 
something went wrong with
+                                       // scheduling in topological order.
+                                       String msg = String.format("Producer 
with attempt ID %s of partition %s in unexpected state %s.",
+                                               
resultPartitionId.getProducerId(),
+                                               
resultPartitionId.getPartitionId(),
+                                               producerState);
+
+                                       failExternally(new 
IllegalStateException(msg));
                                }
-                               else {
-                                       failExternally(new 
IllegalStateException("Received unexpected partition state "
-                                                       + partitionState + " 
for partition request. This is a bug."));
-                               }
-                       }
-                       else {
-                               failExternally(new 
IllegalStateException("Received partition state for " +
-                                               "unknown input gate " + 
resultId + ". This is a bug."));
+                       } else {
+                               failExternally(new 
IllegalStateException("Received partition producer state for " +
+                                               "unknown input gate " + 
intermediateDataSetId + "."));
                        }
-               }
-               else {
-                       LOG.debug("Ignoring partition state notification for 
not running task.");
+               } else {
+                       LOG.debug("Task {} ignored a partition producer state 
notification, because it's not running.", taskNameWithSubtask);
                }
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
index 4f12691..f7650d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/TaskActions.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.taskmanager;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 
@@ -29,21 +28,20 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 public interface TaskActions {
 
        /**
-        * Check the partition state of the given partition.
+        * Check the execution state of the execution producing a result 
partition.
         *
-        * @param jobId of the partition
-        * @param executionId of the partition
-        * @param resultId of the partition
-        * @param partitionId of the partition
+        * @param jobId ID of the job the partition belongs to.
+        * @param intermediateDataSetId ID of the parent intermediate data set.
+        * @param resultPartitionId ID of the result partition to check. This
+        * identifies the producing execution and partition.
         */
-       void triggerPartitionStateCheck(
+       void triggerPartitionProducerStateCheck(
                JobID jobId,
-               ExecutionAttemptID executionId,
-               IntermediateDataSetID resultId,
-               ResultPartitionID partitionId);
+               IntermediateDataSetID intermediateDataSetId,
+               ResultPartitionID resultPartitionId);
 
        /**
-        * Fail the owning task with the given throwawble.
+        * Fail the owning task with the given throwable.
         *
         * @param cause of the failure
         */

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 1dfd3db..8c686cd 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -50,8 +50,7 @@ import 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory
 import org.apache.flink.runtime.executiongraph._
 import org.apache.flink.runtime.instance.{AkkaActorGateway, InstanceID, 
InstanceManager}
-import org.apache.flink.runtime.io.network.PartitionState
-import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus}
+import org.apache.flink.runtime.jobgraph.{JobGraph, JobStatus, JobVertexID}
 import 
org.apache.flink.runtime.jobmanager.SubmittedJobGraphStore.SubmittedJobGraphListener
 import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => 
FlinkScheduler}
 import org.apache.flink.runtime.jobmanager.slots.ActorTaskManagerGateway
@@ -78,7 +77,7 @@ import 
org.apache.flink.runtime.security.SecurityUtils.SecurityConfiguration
 import org.apache.flink.runtime.taskmanager.TaskManager
 import org.apache.flink.runtime.util._
 import org.apache.flink.runtime.webmonitor.{WebMonitor, WebMonitorUtils}
-import org.apache.flink.runtime.{FlinkActor, LeaderSessionMessageFilter, 
LogMessages}
+import org.apache.flink.runtime.{FlinkActor, JobException, 
LeaderSessionMessageFilter, LogMessages}
 import org.apache.flink.util.{ConfigurationUtil, InstantiationUtil, NetUtils}
 import org.jboss.netty.channel.ChannelException
 
@@ -935,27 +934,58 @@ class JobManager(
           )
       }
 
-    case RequestPartitionState(jobId, partitionId, taskExecutionId, 
taskResultId) =>
-      val state = currentJobs.get(jobId) match {
+    case RequestPartitionProducerState(jobId, intermediateDataSetId, 
resultPartitionId) =>
+      currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
-          val execution = 
executionGraph.getRegisteredExecutions.get(partitionId.getProducerId)
+          try {
+            // Find the execution attempt producing the intermediate result 
partition.
+            val execution = executionGraph
+              .getRegisteredExecutions
+              .get(resultPartitionId.getProducerId)
+
+            if (execution != null) {
+              // Common case for pipelined exchanges => producing execution is
+              // still active.
+              sender ! decorateMessage(execution.getState)
+            } else {
+              // The producing execution might have terminated and been
+              // unregistered. We now look for the producing execution via the
+              // intermediate result itself.
+              val intermediateResult = executionGraph
+                .getAllIntermediateResults.get(intermediateDataSetId)
+
+              if (intermediateResult != null) {
+                // Try to find the producing execution
+                val producerExecution = intermediateResult
+                  .getPartitionById(resultPartitionId.getPartitionId)
+                  .getProducer
+                  .getCurrentExecutionAttempt
+
+                if (producerExecution.getAttemptId() == 
resultPartitionId.getProducerId()) {
+                  sender ! decorateMessage(producerExecution.getState)
+                } else {
+                  val cause = new 
PartitionProducerDisposedException(resultPartitionId)
+                  sender ! decorateMessage(Status.Failure(cause))
+                }
+              } else {
+                val cause = new IllegalArgumentException(
+                  s"Intermediate data set with ID $intermediateDataSetId not 
found.")
+                sender ! decorateMessage(Status.Failure(cause))
+              }
+            }
+          } catch {
+            case e: Exception =>
+              sender ! decorateMessage(
+                Status.Failure(new RuntimeException("Failed to look up 
execution state of " +
+                  s"producer with ID ${resultPartitionId.getProducerId}.", e)))
+          }
 
-          if (execution != null) execution.getState else null
         case None =>
-          // Nothing to do. This is not an error, because the request is 
received when a sending
-          // task fails or is not yet available during a remote partition 
request.
-          log.debug(s"Cannot find execution graph for job $jobId.")
+          sender ! decorateMessage(
+            Status.Failure(new IllegalArgumentException(s"Job with ID $jobId 
not found.")))
 
-          null
       }
 
-      sender ! decorateMessage(
-        new PartitionState(
-          taskResultId,
-          partitionId.getPartitionId,
-          state)
-      )
-
     case RequestJobStatus(jobID) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph,_)) =>
@@ -1059,7 +1089,7 @@ class JobManager(
       taskManagerMap.get(taskManagerActorRef) match {
         case Some(instanceId) => 
handleTaskManagerTerminated(taskManagerActorRef, instanceId)
         case None =>  log.debug("Received terminated message for task manager 
" +
-                                  s"${taskManagerActorRef} which is not " +
+                                  s"$taskManagerActorRef which is not " +
                                   "connected to this job manager.")
       }
 
@@ -2092,7 +2122,7 @@ object JobManager {
     def sleepBeforeRetry() : Unit = {
       if (maxSleepBetweenRetries > 0) {
         val sleepTime = (Math.random() * 
maxSleepBetweenRetries).asInstanceOf[Long]
-        LOG.info(s"Retrying after bind exception. Sleeping for ${sleepTime} 
ms.")
+        LOG.info(s"Retrying after bind exception. Sleeping for $sleepTime ms.")
         Thread.sleep(sleepTime)
       }
     }

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 3d72f1a..65819f4 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -157,20 +157,18 @@ object JobManagerMessages {
   case class NextInputSplit(splitData: Array[Byte])
 
   /**
-   * Requests the current state of the partition.
-   *
-   * The state of a partition is currently bound to the state of the producing 
execution.
-   *
-   * @param jobId The job ID of the job, which produces the partition.
-   * @param partitionId The partition ID of the partition to request the state 
of.
-   * @param taskExecutionId The execution attempt ID of the task requesting 
the partition state.
-   * @param taskResultId The input gate ID of the task requesting the 
partition state.
-   */
-  case class RequestPartitionState(
+    * Requests the execution state of the execution producing a result 
partition.
+    *
+    * @param jobId                 ID of the job the partition belongs to.
+    * @param intermediateDataSetId ID of the parent intermediate data set.
+    * @param resultPartitionId     ID of the result partition to check. This
+    *                              identifies the producing execution and
+    *                              partition.
+    */
+  case class RequestPartitionProducerState(
       jobId: JobID,
-      partitionId: ResultPartitionID,
-      taskExecutionId: ExecutionAttemptID,
-      taskResultId: IntermediateDataSetID)
+      intermediateDataSetId: IntermediateDataSetID,
+      resultPartitionId: ResultPartitionID)
     extends RequiresLeaderSessionID
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 271578f..41d3077 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -49,7 +49,7 @@ import 
org.apache.flink.runtime.io.disk.iomanager.IOManager.IOMode
 import org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool
 import org.apache.flink.runtime.io.network.{LocalConnectionManager, 
NetworkEnvironment, TaskEventDispatcher}
-import org.apache.flink.runtime.io.network.netty.{NettyConfig, 
NettyConnectionManager, PartitionStateChecker}
+import org.apache.flink.runtime.io.network.netty.{NettyConfig, 
NettyConnectionManager, PartitionProducerStateChecker}
 import 
org.apache.flink.runtime.io.network.partition.{ResultPartitionConsumableNotifier,
 ResultPartitionManager}
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
@@ -181,7 +181,7 @@ class TaskManager(
 
   private var connectionUtils: Option[(
     CheckpointResponder,
-    PartitionStateChecker,
+    PartitionProducerStateChecker,
     ResultPartitionConsumableNotifier,
     TaskManagerConnection)] = None
 
@@ -916,7 +916,7 @@ class TaskManager(
 
     val taskManagerConnection = new 
ActorGatewayTaskManagerConnection(taskManagerGateway)
 
-    val partitionStateChecker = new ActorGatewayPartitionStateChecker(
+    val partitionStateChecker = new ActorGatewayPartitionProducerStateChecker(
       jobManagerGateway,
       config.timeout)
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index 6570679..8cae04c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -60,7 +59,6 @@ public class InputGateConcurrentTest {
                final SingleInputGate gate = new SingleInputGate(
                                "Test Task Name",
                                new JobID(),
-                               new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
                                mock(TaskActions.class),
@@ -96,7 +94,6 @@ public class InputGateConcurrentTest {
                final SingleInputGate gate = new SingleInputGate(
                                "Test Task Name",
                                new JobID(),
-                               new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0,
                                numChannels,
@@ -146,7 +143,6 @@ public class InputGateConcurrentTest {
                final SingleInputGate gate = new SingleInputGate(
                                "Test Task Name",
                                new JobID(),
-                               new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0,
                                numChannels,

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index b35612a..7e1d792 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -87,7 +86,6 @@ public class InputGateFairnessTest {
                SingleInputGate gate = new FairnessVerifyingInputGate(
                                "Test Task Name",
                                new JobID(),
-                               new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
                                mock(TaskActions.class),
@@ -141,7 +139,6 @@ public class InputGateFairnessTest {
                SingleInputGate gate = new FairnessVerifyingInputGate(
                                "Test Task Name",
                                new JobID(),
-                               new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
                                mock(TaskActions.class),
@@ -192,7 +189,6 @@ public class InputGateFairnessTest {
                SingleInputGate gate = new FairnessVerifyingInputGate(
                                "Test Task Name",
                                new JobID(),
-                               new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
                                mock(TaskActions.class),
@@ -248,7 +244,6 @@ public class InputGateFairnessTest {
                SingleInputGate gate = new FairnessVerifyingInputGate(
                                "Test Task Name",
                                new JobID(),
-                               new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
                                mock(TaskActions.class),
@@ -346,14 +341,13 @@ public class InputGateFairnessTest {
                public FairnessVerifyingInputGate(
                                String owningTaskName,
                                JobID jobId,
-                               ExecutionAttemptID executionId,
                                IntermediateDataSetID consumedResultId,
                                int consumedSubpartitionIndex,
                                int numberOfInputChannels,
                                TaskActions taskActions,
                                TaskIOMetricGroup metrics) {
 
-                       super(owningTaskName, jobId, executionId, 
consumedResultId, consumedSubpartitionIndex,
+                       super(owningTaskName, jobId, consumedResultId, 
consumedSubpartitionIndex,
                                        numberOfInputChannels, taskActions, 
metrics);
 
                        try {

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 35ed4c3..37ec751 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -22,7 +22,6 @@ import com.google.common.collect.Lists;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.core.memory.MemoryType;
 import org.apache.flink.runtime.execution.CancelTaskException;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
@@ -283,7 +282,6 @@ public class LocalInputChannelTest {
                final SingleInputGate gate = new SingleInputGate(
                        "test task name",
                        new JobID(),
-                       new ExecutionAttemptID(),
                        new IntermediateDataSetID(),
                        0,
                        1,
@@ -481,7 +479,6 @@ public class LocalInputChannelTest {
                        this.inputGate = new SingleInputGate(
                                        "Test Name",
                                        new JobID(),
-                                       new ExecutionAttemptID(),
                                        new IntermediateDataSetID(),
                                        subpartitionIndex,
                                        numberOfInputChannels,

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index 7cae362..a25b8d5 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -73,7 +73,7 @@ public class SingleInputGateTest {
        public void testBasicGetNextLogic() throws Exception {
                // Setup
                final SingleInputGate inputGate = new SingleInputGate(
-                       "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, 
mock(TaskActions.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+                       "Test Task Name", new JobID(), new 
IntermediateDataSetID(), 0, 2, mock(TaskActions.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
                final TestInputChannel[] inputChannels = new TestInputChannel[]{
                        new TestInputChannel(inputGate, 0),
@@ -127,7 +127,7 @@ public class SingleInputGateTest {
                // Setup reader with one local and one unknown input channel
                final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 
-               final SingleInputGate inputGate = new SingleInputGate("Test 
Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, 
mock(TaskActions.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+               final SingleInputGate inputGate = new SingleInputGate("Test 
Task Name", new JobID(), resultId, 0, 2, mock(TaskActions.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
                final BufferPool bufferPool = mock(BufferPool.class);
                
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -178,7 +178,6 @@ public class SingleInputGateTest {
                SingleInputGate inputGate = new SingleInputGate(
                        "t1",
                        new JobID(),
-                       new ExecutionAttemptID(),
                        new IntermediateDataSetID(),
                        0,
                        1,
@@ -218,7 +217,6 @@ public class SingleInputGateTest {
                final SingleInputGate inputGate = new SingleInputGate(
                        "InputGate",
                        new JobID(),
-                       new ExecutionAttemptID(),
                        new IntermediateDataSetID(),
                        0,
                        1,

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 126a96e..fe3b087 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -55,7 +54,6 @@ public class TestSingleInputGate {
                SingleInputGate realGate = new SingleInputGate(
                        "Test Task Name",
                        new JobID(),
-                       new ExecutionAttemptID(),
                        new IntermediateDataSetID(),
                        0,
                        numberOfInputChannels,

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index 84ec202..c05df0a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -19,7 +19,6 @@
 package org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.apache.flink.runtime.taskmanager.TaskActions;
@@ -43,8 +42,8 @@ public class UnionInputGateTest {
        public void testBasicGetNextLogic() throws Exception {
                // Setup
                final String testTaskName = "Test Task";
-               final SingleInputGate ig1 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, 
mock(TaskActions.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
-               final SingleInputGate ig2 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, 
mock(TaskActions.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+               final SingleInputGate ig1 = new SingleInputGate(testTaskName, 
new JobID(), new IntermediateDataSetID(), 0, 3, mock(TaskActions.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
+               final SingleInputGate ig2 = new SingleInputGate(testTaskName, 
new JobID(), new IntermediateDataSetID(), 0, 5, mock(TaskActions.class), new 
UnregisteredTaskMetricsGroup.DummyTaskIOMetricGroup());
 
                final UnionInputGate union = new UnionInputGate(new 
SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index f941c24..6d8c70b 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -38,7 +38,6 @@ import 
org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.IntermediateResultPartition;
 import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
-import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -56,7 +55,7 @@ import 
org.apache.flink.runtime.messages.JobManagerMessages.CancelJob;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationFailure;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationResponse;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.CancellationSuccess;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
@@ -116,7 +115,6 @@ import static 
org.apache.flink.runtime.testingUtils.TestingUtils.startTestingClu
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertNotNull;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -221,49 +219,227 @@ public class JobManagerTest {
                                                // - The test 
----------------------------------------------------------------------
 
                                                // 1. All execution states
-                                               RequestPartitionState request = 
new RequestPartitionState(
-                                                       jid, partitionId, 
receiver, rid);
+                                               RequestPartitionProducerState 
request = new RequestPartitionProducerState(
+                                                       jid, rid, partitionId);
 
                                                for (ExecutionState state : 
ExecutionState.values()) {
                                                        
ExecutionGraphTestUtils.setVertexState(vertex, state);
 
-                                                       Future<PartitionState> 
futurePartitionState = jobManagerGateway
+                                                       Future<ExecutionState> 
futurePartitionState = jobManagerGateway
                                                                .ask(request, 
getRemainingTime())
-                                                               
.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+                                                               
.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
 
-                                                       PartitionState resp = 
Await.result(futurePartitionState, getRemainingTime());
-
-                                                       
assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
-                                                       
assertEquals(request.partitionId().getPartitionId(), 
resp.getIntermediateResultPartitionID());
-                                                       assertEquals(state, 
resp.getExecutionState());
+                                                       ExecutionState resp = 
Await.result(futurePartitionState, getRemainingTime());
+                                                       assertEquals(state, 
resp);
                                                }
 
                                                // 2. Non-existing execution
-                                               request = new 
RequestPartitionState(jid, new ResultPartitionID(), receiver, rid);
+                                               request = new 
RequestPartitionProducerState(jid, rid, new ResultPartitionID());
+
+                                               Future<?> futurePartitionState 
= jobManagerGateway.ask(request, getRemainingTime());
+                                               try {
+                                                       
Await.result(futurePartitionState, getRemainingTime());
+                                                       fail("Did not fail with 
expected RuntimeException");
+                                               } catch (RuntimeException e) {
+                                                       
assertEquals(IllegalArgumentException.class, e.getCause().getClass());
+                                               }
 
-                                               Future<PartitionState> 
futurePartitionState = jobManagerGateway
-                                                       .ask(request, 
getRemainingTime())
-                                                       
.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+                                               // 3. Non-existing job
+                                               request = new 
RequestPartitionProducerState(new JobID(), rid, new ResultPartitionID());
+                                               futurePartitionState = 
jobManagerGateway.ask(request, getRemainingTime());
+
+                                               try {
+                                                       
Await.result(futurePartitionState, getRemainingTime());
+                                                       fail("Did not fail with 
expected IllegalArgumentException");
+                                               } catch 
(IllegalArgumentException ignored) {
+                                               }
+                                       } catch (Exception e) {
+                                               e.printStackTrace();
+                                               fail(e.getMessage());
+                                       } finally {
+                                               if (cluster != null) {
+                                                       cluster.shutdown();
+                                               }
+                                       }
+                               }
+                       };
+               }};
+       }
 
-                                               PartitionState resp = 
Await.result(futurePartitionState, getRemainingTime());
+       /**
+        * Tests the JobManager response when the execution is not registered 
with
+        * the ExecutionGraph.
+        */
+       @Test
+       public void testRequestPartitionStateUnregisteredExecution() throws 
Exception {
+               new JavaTestKit(system) {{
+                       new Within(duration("15 seconds")) {
+                               @Override
+                               protected void run() {
+                                       // Setup
+                                       TestingCluster cluster = null;
 
-                                               
assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
-                                               
assertEquals(request.partitionId().getPartitionId(), 
resp.getIntermediateResultPartitionID());
-                                               
assertNull(resp.getExecutionState());
+                                       try {
+                                               cluster = 
startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
 
-                                               // 3. Non-existing job
-                                               request = new 
RequestPartitionState(
-                                                       new JobID(), new 
ResultPartitionID(), receiver, rid);
+                                               final IntermediateDataSetID rid 
= new IntermediateDataSetID();
+
+                                               // Create a task
+                                               final JobVertex sender = new 
JobVertex("Sender");
+                                               sender.setParallelism(1);
+                                               
sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+                                               
sender.createAndAddResultDataSet(rid, PIPELINED);
+
+                                               final JobVertex sender2 = new 
JobVertex("Blocking Sender");
+                                               sender2.setParallelism(1);
+                                               
sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+                                               
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
+
+                                               final JobGraph jobGraph = new 
JobGraph("Fast finishing producer test job", sender, sender2);
+                                               final JobID jid = 
jobGraph.getJobID();
+
+                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(
+                                                       
TestingUtils.TESTING_DURATION());
+
+                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), null);
+
+                                               // Submit the job and wait for 
all vertices to be running
+                                               jobManagerGateway.tell(
+                                                       new SubmitJob(
+                                                               jobGraph,
+                                                               
ListeningBehaviour.EXECUTION_RESULT),
+                                                       testActorGateway);
+                                               
expectMsgClass(JobSubmitSuccess.class);
+
+                                               jobManagerGateway.tell(
+                                                       new 
WaitForAllVerticesToBeRunningOrFinished(jid),
+                                                       testActorGateway);
+
+                                               
expectMsgClass(AllVerticesRunning.class);
+
+                                               Future<Object> egFuture = 
jobManagerGateway.ask(
+                                                       new 
RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+                                               ExecutionGraphFound egFound = 
(ExecutionGraphFound) Await.result(egFuture, remaining());
+                                               ExecutionGraph eg = 
(ExecutionGraph) egFound.executionGraph();
+
+                                               ExecutionVertex vertex = 
eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+                                               while 
(vertex.getExecutionState() != ExecutionState.FINISHED) {
+                                                       Thread.sleep(1);
+                                               }
+
+                                               IntermediateResultPartition 
partition = vertex.getProducedPartitions()
+                                                       
.values().iterator().next();
+
+                                               ResultPartitionID partitionId = 
new ResultPartitionID(
+                                                       
partition.getPartitionId(),
+                                                       
vertex.getCurrentExecutionAttempt().getAttemptId());
 
-                                               futurePartitionState = 
jobManagerGateway
+                                               // Producer finished, request 
state
+                                               Object request = new 
RequestPartitionProducerState(jid, rid, partitionId);
+
+                                               Future<ExecutionState> 
producerStateFuture = jobManagerGateway
                                                        .ask(request, 
getRemainingTime())
-                                                       
.mapTo(ClassTag$.MODULE$.<PartitionState>apply(PartitionState.class));
+                                                       
.mapTo(ClassTag$.MODULE$.<ExecutionState>apply(ExecutionState.class));
+
+                                               
assertEquals(ExecutionState.FINISHED, Await.result(producerStateFuture, 
getRemainingTime()));
+                                       } catch (Exception e) {
+                                               e.printStackTrace();
+                                               fail(e.getMessage());
+                                       } finally {
+                                               if (cluster != null) {
+                                                       cluster.shutdown();
+                                               }
+                                       }
+                               }
+                       };
+               }};
+       }
+
+       /**
+        * Tests the JobManager response when the execution is not registered 
with
+        * the ExecutionGraph anymore and a new execution attempt is available.
+        */
+       @Test
+       public void testRequestPartitionStateMoreRecentExecutionAttempt() 
throws Exception {
+               new JavaTestKit(system) {{
+                       new Within(duration("15 seconds")) {
+                               @Override
+                               protected void run() {
+                                       // Setup
+                                       TestingCluster cluster = null;
+
+                                       try {
+                                               cluster = 
startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+
+                                               final IntermediateDataSetID rid 
= new IntermediateDataSetID();
+
+                                               // Create a task
+                                               final JobVertex sender = new 
JobVertex("Sender");
+                                               sender.setParallelism(1);
+                                               
sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+                                               
sender.createAndAddResultDataSet(rid, PIPELINED);
 
-                                               resp = 
Await.result(futurePartitionState, getRemainingTime());
+                                               final JobVertex sender2 = new 
JobVertex("Blocking Sender");
+                                               sender2.setParallelism(1);
+                                               
sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+                                               
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
 
-                                               
assertEquals(request.taskResultId(), resp.getIntermediateDataSetID());
-                                               
assertEquals(request.partitionId().getPartitionId(), 
resp.getIntermediateResultPartitionID());
-                                               
assertNull(resp.getExecutionState());
+                                               final JobGraph jobGraph = new 
JobGraph("Fast finishing producer test job", sender, sender2);
+                                               final JobID jid = 
jobGraph.getJobID();
+
+                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(
+                                                       
TestingUtils.TESTING_DURATION());
+
+                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), null);
+
+                                               // Submit the job and wait for 
all vertices to be running
+                                               jobManagerGateway.tell(
+                                                       new SubmitJob(
+                                                               jobGraph,
+                                                               
ListeningBehaviour.EXECUTION_RESULT),
+                                                       testActorGateway);
+                                               
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+                                               jobManagerGateway.tell(
+                                                       new 
WaitForAllVerticesToBeRunningOrFinished(jid),
+                                                       testActorGateway);
+
+                                               
expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+                                               Future<Object> egFuture = 
jobManagerGateway.ask(
+                                                       new 
RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+                                               ExecutionGraphFound egFound = 
(ExecutionGraphFound) Await.result(egFuture, remaining());
+                                               ExecutionGraph eg = 
(ExecutionGraph) egFound.executionGraph();
+
+                                               ExecutionVertex vertex = 
eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+                                               while 
(vertex.getExecutionState() != ExecutionState.FINISHED) {
+                                                       Thread.sleep(1);
+                                               }
+
+                                               IntermediateResultPartition 
partition = vertex.getProducedPartitions()
+                                                       
.values().iterator().next();
+
+                                               ResultPartitionID partitionId = 
new ResultPartitionID(
+                                                       
partition.getPartitionId(),
+                                                       
vertex.getCurrentExecutionAttempt().getAttemptId());
+
+                                               // Reset execution => new 
execution attempt
+                                               vertex.resetForNewExecution();
+
+                                               // Producer finished, request 
state
+                                               Object request = new 
JobManagerMessages.RequestPartitionProducerState(jid, rid, partitionId);
+
+                                               Future<?> producerStateFuture = 
jobManagerGateway.ask(request, getRemainingTime());
+
+                                               try {
+                                                       
Await.result(producerStateFuture, getRemainingTime());
+                                                       fail("Did not fail with 
expected Exception");
+                                               } catch 
(PartitionProducerDisposedException ignored) {
+                                               }
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                                fail(e.getMessage());

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
index e37467b..a7ffa1a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskAsyncCallTest.java
@@ -35,7 +35,7 @@ import 
org.apache.flink.runtime.executiongraph.TaskInformation;
 import org.apache.flink.runtime.filecache.FileCache;
 import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import org.apache.flink.runtime.io.network.NetworkEnvironment;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -145,7 +145,7 @@ public class TaskAsyncCallTest {
                
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
                ResultPartitionConsumableNotifier consumableNotifier = 
mock(ResultPartitionConsumableNotifier.class);
-               PartitionStateChecker partitionStateChecker = 
mock(PartitionStateChecker.class);
+               PartitionProducerStateChecker partitionProducerStateChecker = 
mock(PartitionProducerStateChecker.class);
                Executor executor = mock(Executor.class);
                NetworkEnvironment networkEnvironment = 
mock(NetworkEnvironment.class);
                
when(networkEnvironment.getResultPartitionManager()).thenReturn(partitionManager);
@@ -191,7 +191,7 @@ public class TaskAsyncCallTest {
                        new TaskManagerRuntimeInfo("localhost", new 
Configuration(), System.getProperty("java.io.tmpdir")),
                        mock(TaskMetricGroup.class),
                        consumableNotifier,
-                       partitionStateChecker,
+                       partitionProducerStateChecker,
                        executor);
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index fd9ff05..5ccf8a9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -49,7 +49,6 @@ import org.apache.flink.runtime.instance.ActorGateway;
 import org.apache.flink.runtime.instance.AkkaActorGateway;
 import org.apache.flink.runtime.instance.InstanceID;
 import org.apache.flink.runtime.io.network.ConnectionID;
-import org.apache.flink.runtime.io.network.PartitionState;
 import org.apache.flink.runtime.io.network.api.writer.RecordWriter;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
@@ -102,7 +101,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import static 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -1579,15 +1578,8 @@ public class TaskManagerTest extends TestLogger {
 
                @Override
                public void handleMessage(Object message) throws Exception {
-                       if (message instanceof RequestPartitionState) {
-                               final RequestPartitionState msg = 
(RequestPartitionState) message;
-
-                               PartitionState resp = new PartitionState(
-                                               msg.taskResultId(),
-                                               
msg.partitionId().getPartitionId(),
-                                               ExecutionState.RUNNING);
-
-                               getSender().tell(decorateMessage(resp), 
getSelf());
+                       if (message instanceof RequestPartitionProducerState) {
+                               
getSender().tell(decorateMessage(ExecutionState.RUNNING), getSelf());
                        }
                        else if (message instanceof 
TaskMessages.UpdateTaskExecutionState) {
                                final TaskExecutionState msg = 
((TaskMessages.UpdateTaskExecutionState) message)

http://git-wip-us.apache.org/repos/asf/flink/blob/a078666d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
index d80dab3..ae7d0b8 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskStopTest.java
@@ -24,7 +24,7 @@ import 
org.apache.flink.runtime.deployment.InputGateDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.JobInformation;
 import org.apache.flink.runtime.executiongraph.TaskInformation;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider;
@@ -95,7 +95,7 @@ public class TaskStopTest {
                        tmRuntimeInfo,
                        mock(TaskMetricGroup.class),
                        mock(ResultPartitionConsumableNotifier.class),
-                       mock(PartitionStateChecker.class),
+                       mock(PartitionProducerStateChecker.class),
                        mock(Executor.class));
                Field f = task.getClass().getDeclaredField("invokable");
                f.setAccessible(true);

Reply via email to