Repository: flink
Updated Branches:
  refs/heads/release-1.1 3ae6e9e09 -> 2b612f2d8


[FLINK-5114] [network] Handle partition producer state check for unregistered 
executions

This closes #2975.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/2b612f2d
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/2b612f2d
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/2b612f2d

Branch: refs/heads/release-1.1
Commit: 2b612f2d8fa2493fa1d0d586bc0fe10afa9150ca
Parents: 3ae6e9e
Author: Ufuk Celebi <[email protected]>
Authored: Thu Dec 8 23:48:39 2016 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Fri Dec 9 14:20:08 2016 +0100

----------------------------------------------------------------------
 .../executiongraph/IntermediateResult.java      |  33 +++
 .../runtime/io/network/NetworkEnvironment.java  |  37 +--
 .../netty/PartitionProducerStateChecker.java    |  49 ++++
 .../io/network/netty/PartitionStateChecker.java |  34 ---
 .../partition/consumer/SingleInputGate.java     |  10 +-
 .../PartitionProducerDisposedException.java     |  36 +++
 .../flink/runtime/jobmanager/JobManager.scala   |  74 ++++--
 .../runtime/messages/JobManagerMessages.scala   |  29 ++-
 .../runtime/messages/TaskControlMessages.scala  |  25 +-
 .../flink/runtime/taskmanager/TaskManager.scala |  46 +++-
 .../partition/InputGateConcurrentTest.java      |  10 +-
 .../partition/InputGateFairnessTest.java        |  20 +-
 .../consumer/LocalInputChannelTest.java         |   6 +-
 .../partition/consumer/SingleInputGateTest.java |  12 +-
 .../partition/consumer/TestSingleInputGate.java |   4 +-
 .../partition/consumer/UnionInputGateTest.java  |   6 +-
 .../runtime/jobmanager/JobManagerTest.java      | 252 ++++++++++++++++---
 .../runtime/taskmanager/TaskManagerTest.java    |  22 +-
 18 files changed, 520 insertions(+), 185 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
index 67b1fe0..6da2cd3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResult.java
@@ -20,7 +20,9 @@ package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 
+import java.util.HashMap;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.apache.flink.util.Preconditions.checkArgument;
@@ -34,6 +36,14 @@ public class IntermediateResult {
 
        private final IntermediateResultPartition[] partitions;
 
+       /**
+        * Maps intermediate result partition IDs to a partition index. This is
+        * used for ID lookups of intermediate results. I didn't dare to change 
the
+        * partition connect logic in other places that is tightly coupled to 
the
+        * partitions being held as an array.
+        */
+       private final HashMap<IntermediateResultPartitionID, Integer> 
partitionLookupHelper = new HashMap<>();
+
        private final int numParallelProducers;
 
        private final AtomicInteger numberOfRunningProducers;
@@ -80,6 +90,7 @@ public class IntermediateResult {
                }
 
                partitions[partitionNumber] = partition;
+               partitionLookupHelper.put(partition.getPartitionId(), 
partitionNumber);
                partitionsAssigned++;
        }
 
@@ -95,6 +106,28 @@ public class IntermediateResult {
                return partitions;
        }
 
+       /**
+        * Returns the partition with the given ID.
+        *
+        * @param resultPartitionId ID of the partition to look up
+        * @throws NullPointerException If partition ID <code>null</code>
+        * @throws IllegalArgumentException Thrown if unknown partition ID
+        * @return Intermediate result partition with the given ID
+        */
+       public IntermediateResultPartition 
getPartitionById(IntermediateResultPartitionID resultPartitionId) {
+               // Looks ups the partition number via the helper map and 
returns the
+               // partition. Currently, this happens infrequently enough that 
we could
+               // consider removing the map and scanning the partitions on 
every lookup.
+               // The lookup (currently) only happen when the producer of an 
intermediate
+               // result cannot be found via its registered execution.
+               Integer partitionNumber = 
partitionLookupHelper.get(checkNotNull(resultPartitionId, 
"IntermediateResultPartitionID"));
+               if (partitionNumber != null) {
+                       return partitions[partitionNumber];
+               } else {
+                       throw new IllegalArgumentException("Unknown 
intermediate result partition ID " + resultPartitionId);
+               }
+       }
+
        public int getNumberOfAssignedPartitions() {
                return partitionsAssigned;
        }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
index d3715ed..7bac93b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java
@@ -28,14 +28,14 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
 import org.apache.flink.runtime.io.network.netty.NettyConfig;
 import org.apache.flink.runtime.io.network.netty.NettyConnectionManager;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import 
org.apache.flink.runtime.io.network.partition.ResultPartitionConsumableNotifier;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
 import org.apache.flink.runtime.messages.TaskMessages.FailTask;
 import org.apache.flink.runtime.taskmanager.NetworkEnvironmentConfiguration;
 import org.apache.flink.runtime.taskmanager.Task;
@@ -82,7 +82,7 @@ public class NetworkEnvironment {
 
        private ResultPartitionConsumableNotifier partitionConsumableNotifier;
 
-       private PartitionStateChecker partitionStateChecker;
+       private PartitionProducerStateChecker partitionStateChecker;
 
        private boolean isShutdown;
 
@@ -143,7 +143,7 @@ public class NetworkEnvironment {
                return partitionConsumableNotifier;
        }
 
-       public PartitionStateChecker getPartitionStateChecker() {
+       public PartitionProducerStateChecker getPartitionProducerStateChecker() 
{
                return partitionStateChecker;
        }
 
@@ -196,7 +196,7 @@ public class NetworkEnvironment {
                                        taskManagerGateway,
                                        jobManagerTimeout);
 
-                               this.partitionStateChecker = new 
JobManagerPartitionStateChecker(
+                               this.partitionStateChecker = new 
ActorGatewayPartitionProducerStateChecker(
                                                jobManagerGateway, 
taskManagerGateway);
 
                                // -----  Network connections  -----
@@ -474,28 +474,31 @@ public class NetworkEnvironment {
                }
        }
 
-       private static class JobManagerPartitionStateChecker implements 
PartitionStateChecker {
+       private static class ActorGatewayPartitionProducerStateChecker 
implements PartitionProducerStateChecker {
 
                private final ActorGateway jobManager;
 
                private final ActorGateway taskManager;
 
-               public JobManagerPartitionStateChecker(ActorGateway jobManager, 
ActorGateway taskManager) {
+               ActorGatewayPartitionProducerStateChecker(ActorGateway 
jobManager, ActorGateway taskManager) {
                        this.jobManager = jobManager;
                        this.taskManager = taskManager;
                }
 
                @Override
-               public void triggerPartitionStateCheck(
-                               JobID jobId,
-                               ExecutionAttemptID executionAttemptID,
-                               IntermediateDataSetID resultId,
-                               ResultPartitionID partitionId) {
-
-                       RequestPartitionState msg = new RequestPartitionState(
-                                       jobId, partitionId, executionAttemptID, 
resultId);
-
-                       jobManager.tell(msg, taskManager);
+               public void requestPartitionProducerState(
+                       JobID jobId,
+                       ExecutionAttemptID receiverExecutionId,
+                       IntermediateDataSetID intermediateDataSetId,
+                       ResultPartitionID resultPartitionId) {
+
+                       jobManager.tell(
+                               new RequestPartitionProducerState(
+                                       jobId,
+                                       receiverExecutionId,
+                                       intermediateDataSetId,
+                                       resultPartitionId),
+                               taskManager);
                }
        }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
new file mode 100644
index 0000000..18d92b1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionProducerStateChecker.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.netty;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+
+/**
+ * Intermediate partition state checker to query the JobManager about the state
+ * of the producer of a result partition.
+ *
+ * <p>These checks are triggered when a partition request is answered with a
+ * PartitionNotFound event. This usually happens when the producer of that
+ * partition has not registered itself with the network stack or terminated.
+ */
+public interface PartitionProducerStateChecker {
+
+       /**
+        * Requests the execution state of the execution producing a result 
partition.
+        *
+        * @param jobId ID of the job the partition belongs to.
+        * @param receiverExecutionId The execution attempt ID of the task who 
triggers the request.
+        * @param intermediateDataSetId ID of the parent intermediate data set.
+        * @param resultPartitionId ID of the result partition to check. This
+        */
+       void requestPartitionProducerState(
+               JobID jobId, ExecutionAttemptID receiverExecutionId,
+               IntermediateDataSetID intermediateDataSetId,
+               ResultPartitionID resultPartitionId);
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
deleted file mode 100644
index ecbcdaa..0000000
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/netty/PartitionStateChecker.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.flink.runtime.io.network.netty;
-
-import org.apache.flink.api.common.JobID;
-import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
-
-public interface PartitionStateChecker {
-
-       void triggerPartitionStateCheck(
-                       JobID jobId,
-                       ExecutionAttemptID executionId,
-                       IntermediateDataSetID resultId,
-                       ResultPartitionID partitionId);
-
-}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
index 1550b0d..67bc8d2 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
@@ -33,7 +33,7 @@ import 
org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import 
org.apache.flink.runtime.io.network.partition.consumer.InputChannel.BufferAndAvailability;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -139,7 +139,7 @@ public class SingleInputGate implements InputGate {
        private final BitSet channelsWithEndOfPartitionEvents;
 
        /** The partition state checker to use for failed partition requests. */
-       private final PartitionStateChecker partitionStateChecker;
+       private final PartitionProducerStateChecker partitionStateChecker;
 
        /**
         * Buffer pool for incoming buffers. Incoming data from remote channels 
is copied to buffers
@@ -172,7 +172,7 @@ public class SingleInputGate implements InputGate {
                        IntermediateDataSetID consumedResultId,
                        int consumedSubpartitionIndex,
                        int numberOfInputChannels,
-                       PartitionStateChecker partitionStateChecker,
+                       PartitionProducerStateChecker partitionStateChecker,
                        IOMetricGroup metrics) {
 
                this.owningTaskName = checkNotNull(owningTaskName);
@@ -510,7 +510,7 @@ public class SingleInputGate implements InputGate {
        }
 
        void triggerPartitionStateCheck(ResultPartitionID partitionId) {
-               partitionStateChecker.triggerPartitionStateCheck(
+               partitionStateChecker.requestPartitionProducerState(
                                jobId,
                                executionId,
                                consumedResultId,
@@ -567,7 +567,7 @@ public class SingleInputGate implements InputGate {
 
                final SingleInputGate inputGate = new SingleInputGate(
                                owningTaskName, jobId, executionId, 
consumedResultId, consumedSubpartitionIndex,
-                               icdd.length, 
networkEnvironment.getPartitionStateChecker(), metrics);
+                               icdd.length, 
networkEnvironment.getPartitionProducerStateChecker(), metrics);
 
                // Create the input channels. There is one input channel for 
each consumed partition.
                final InputChannel[] inputChannels = new 
InputChannel[icdd.length];

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
new file mode 100644
index 0000000..12f2433
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/PartitionProducerDisposedException.java
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.jobmanager;
+
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
+
+/**
+ * Exception returned to a TaskManager on {@link RequestPartitionProducerState}
+ * if the producer of a partition has been disposed.
+ */
+public class PartitionProducerDisposedException extends Exception {
+
+       public PartitionProducerDisposedException(ResultPartitionID 
resultPartitionID) {
+               super(String.format("Execution %s producing partition %s has 
already been disposed.",
+                       resultPartitionID.getProducerId(),
+                       resultPartitionID.getPartitionId()));
+       }
+
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 2b455b7..d6d23d9 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -62,7 +62,7 @@ import org.apache.flink.runtime.messages.JobManagerMessages._
 import org.apache.flink.runtime.messages.Messages.{Acknowledge, Disconnect}
 import org.apache.flink.runtime.messages.RegistrationMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages.{Heartbeat, 
SendStackTrace}
-import org.apache.flink.runtime.messages.TaskMessages.{PartitionState, 
UpdateTaskExecutionState}
+import org.apache.flink.runtime.messages.TaskMessages.{PartitionProducerState, 
UpdateTaskExecutionState}
 import org.apache.flink.runtime.messages.accumulators._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
AcknowledgeCheckpoint, DeclineCheckpoint}
 import org.apache.flink.runtime.messages.webmonitor.{InfoMessage, _}
@@ -839,28 +839,68 @@ class JobManager(
           )
       }
 
-    case RequestPartitionState(jobId, partitionId, taskExecutionId, 
taskResultId) =>
-      val state = currentJobs.get(jobId) match {
+    case RequestPartitionProducerState(
+        jobId,
+        receiverId,
+        intermediateDataSetId,
+        resultPartitionId) =>
+
+      currentJobs.get(jobId) match {
         case Some((executionGraph, _)) =>
-          val execution = 
executionGraph.getRegisteredExecutions.get(partitionId.getProducerId)
+          try {
+            // Find the execution attempt producing the intermediate result 
partition.
+            val execution = executionGraph
+              .getRegisteredExecutions
+              .get(resultPartitionId.getProducerId)
+
+            if (execution != null) {
+              // Common case for pipelined exchanges => producing execution is
+              // still active.
+              val success = (intermediateDataSetId, resultPartitionId, 
execution.getState)
+              sender ! decorateMessage(PartitionProducerState(receiverId, 
Left(success)))
+            } else {
+              // The producing execution might have terminated and been
+              // unregistered. We now look for the producing execution via the
+              // intermediate result itself.
+              val intermediateResult = executionGraph
+                .getAllIntermediateResults.get(intermediateDataSetId)
+
+              if (intermediateResult != null) {
+                // Try to find the producing execution
+                val producerExecution = intermediateResult
+                  .getPartitionById(resultPartitionId.getPartitionId)
+                  .getProducer
+                  .getCurrentExecutionAttempt
+
+                if (producerExecution.getAttemptId() == 
resultPartitionId.getProducerId()) {
+                  val success = (
+                    intermediateDataSetId,
+                    resultPartitionId,
+                    producerExecution.getState)
+                  sender ! decorateMessage(PartitionProducerState(receiverId, 
Left(success)))
+                } else {
+                  val failure = new 
PartitionProducerDisposedException(resultPartitionId)
+                  sender ! decorateMessage(PartitionProducerState(receiverId, 
Right(failure)))
+                }
+              } else {
+                val failure = new IllegalArgumentException("Intermediate data 
set with ID" +
+                  s"$intermediateDataSetId not found.")
+                sender ! decorateMessage(PartitionProducerState(receiverId, 
Right(failure)))
+              }
+            }
+          } catch {
+            case e: Exception =>
+              val failure = new RuntimeException("Failed to look up execution 
state of " +
+                s"producer with ID ${resultPartitionId.getProducerId}.", e)
+              sender ! decorateMessage(PartitionProducerState(receiverId, 
Right(failure)))
+          }
 
-          if (execution != null) execution.getState else null
         case None =>
-          // Nothing to do. This is not an error, because the request is 
received when a sending
-          // task fails or is not yet available during a remote partition 
request.
-          log.debug(s"Cannot find execution graph for job $jobId.")
+          val failure = new IllegalArgumentException(s"Job with ID $jobId not 
found.")
+          sender ! decorateMessage(PartitionProducerState(receiverId, 
Right(failure)))
 
-          null
       }
 
-      sender ! decorateMessage(
-        PartitionState(
-          taskExecutionId,
-          taskResultId,
-          partitionId.getPartitionId,
-          state)
-      )
-
     case RequestJobStatus(jobID) =>
       currentJobs.get(jobID) match {
         case Some((executionGraph,_)) =>

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
index 14f72b0..97006d2 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/JobManagerMessages.scala
@@ -127,21 +127,24 @@ object JobManagerMessages {
    */
   case class NextInputSplit(splitData: Array[Byte])
 
+
   /**
-   * Requests the current state of the partition.
-   *
-   * The state of a partition is currently bound to the state of the producing 
execution.
-   *
-   * @param jobId The job ID of the job, which produces the partition.
-   * @param partitionId The partition ID of the partition to request the state 
of.
-   * @param taskExecutionId The execution attempt ID of the task requesting 
the partition state.
-   * @param taskResultId The input gate ID of the task requesting the 
partition state.
-   */
-  case class RequestPartitionState(
+    * Requests the execution state of the execution producing a result 
partition.
+    *
+    * @param jobId                 ID of the job the partition belongs to.
+    * @param receiverExecutionId   Execution ID of the task who triggers the 
request. Required to
+    *                              return an answer to the TaskManager, which 
needs the ID to route
+    *                              this to the receiver task.
+    * @param intermediateDataSetId ID of the parent intermediate data set.
+    * @param resultPartitionId     ID of the result partition to check. This
+    *                              identifies the producing execution and
+    *                              partition.
+    */
+  case class RequestPartitionProducerState(
       jobId: JobID,
-      partitionId: ResultPartitionID,
-      taskExecutionId: ExecutionAttemptID,
-      taskResultId: IntermediateDataSetID)
+      receiverExecutionId: ExecutionAttemptID,
+      intermediateDataSetId: IntermediateDataSetID,
+      resultPartitionId: ResultPartitionID)
     extends RequiresLeaderSessionID
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
index 94762ee..e73d651 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/TaskControlMessages.scala
@@ -21,8 +21,9 @@ package org.apache.flink.runtime.messages
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, 
TaskDeploymentDescriptor}
 import org.apache.flink.runtime.execution.ExecutionState
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID
-import org.apache.flink.runtime.jobgraph.{IntermediateDataSetID, 
IntermediateResultPartitionID}
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState
 import org.apache.flink.runtime.taskmanager.TaskExecutionState
 
 /**
@@ -92,13 +93,19 @@ object TaskMessages {
   // --------------------------------------------------------------------------
 
   /**
-   * Answer to a [[RequestPartitionState]] with the state of the respective 
partition.
-   */
-  case class PartitionState(
-      taskExecutionId: ExecutionAttemptID,
-      taskResultId: IntermediateDataSetID,
-      partitionId: IntermediateResultPartitionID,
-      state: ExecutionState)
+    * Answer to a [[RequestPartitionProducerState]] with the state of the 
partition producer.
+    *
+    * @param receiverExecutionId The execution attempt ID of the task who 
triggered the
+    *                            original request and should receive this 
update.
+    * @param result              Either a successful or failed partition 
producer state check
+    *                            result. On success, this is a 3-tuple of 
intermediate data set ID
+    *                            (to identify the input gate), the partition 
ID (to identify the
+    *                            channel) and the producer state. On failure, 
this contains the
+    *                            failure cause.
+    */
+  case class PartitionProducerState(
+      receiverExecutionId: ExecutionAttemptID,
+      result: Either[(IntermediateDataSetID, ResultPartitionID, 
ExecutionState), Exception])
     extends TaskMessage with RequiresLeaderSessionID
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
index 40ae234..a751865 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/taskmanager/TaskManager.scala
@@ -24,7 +24,7 @@ import java.lang.reflect.Method
 import java.net.{InetAddress, InetSocketAddress}
 import java.util
 import java.util.UUID
-import java.util.concurrent.TimeUnit
+import java.util.concurrent.{TimeUnit, TimeoutException}
 import javax.management.ObjectName
 
 import _root_.akka.actor._
@@ -42,11 +42,11 @@ import org.apache.flink.core.fs.FileSystem
 import org.apache.flink.core.memory.{HeapMemorySegment, HybridMemorySegment, 
MemorySegmentFactory, MemoryType}
 import org.apache.flink.metrics.{MetricGroup, Gauge => FlinkGauge}
 import org.apache.flink.runtime.accumulators.AccumulatorSnapshot
-import org.apache.flink.runtime.clusterframework.messages.StopCluster
-import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.akka.{AkkaUtils, DefaultQuarantineHandler, 
QuarantineMonitor}
 import org.apache.flink.runtime.blob.{BlobCache, BlobClient, BlobService}
 import org.apache.flink.runtime.broadcast.BroadcastVariableManager
+import org.apache.flink.runtime.clusterframework.messages.StopCluster
+import org.apache.flink.runtime.clusterframework.types.ResourceID
 import org.apache.flink.runtime.deployment.{InputChannelDeploymentDescriptor, 
TaskDeploymentDescriptor}
 import org.apache.flink.runtime.execution.ExecutionState
 import 
org.apache.flink.runtime.execution.librarycache.{BlobLibraryCacheManager, 
FallbackLibraryCacheManager, LibraryCacheManager}
@@ -58,16 +58,17 @@ import 
org.apache.flink.runtime.io.disk.iomanager.{IOManager, IOManagerAsync}
 import org.apache.flink.runtime.io.network.NetworkEnvironment
 import org.apache.flink.runtime.io.network.netty.NettyConfig
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID
+import org.apache.flink.runtime.jobmanager.PartitionProducerDisposedException
 import org.apache.flink.runtime.leaderretrieval.{LeaderRetrievalListener, 
LeaderRetrievalService}
 import org.apache.flink.runtime.memory.MemoryManager
 import org.apache.flink.runtime.messages.Messages._
 import org.apache.flink.runtime.messages.RegistrationMessages._
-import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.{ResponseStackTraceSampleFailure,
 ResponseStackTraceSampleSuccess, SampleTaskStackTrace, 
StackTraceSampleMessages, TriggerStackTraceSample}
+import org.apache.flink.runtime.messages.StackTraceSampleMessages._
 import org.apache.flink.runtime.messages.TaskManagerMessages._
 import org.apache.flink.runtime.messages.TaskMessages._
 import 
org.apache.flink.runtime.messages.checkpoint.{AbstractCheckpointMessage, 
NotifyCheckpointComplete, TriggerCheckpoint}
-import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup
+import org.apache.flink.runtime.metrics.{MetricRegistry => FlinkMetricRegistry}
 import org.apache.flink.runtime.process.ProcessReaper
 import org.apache.flink.runtime.security.SecurityUtils
 import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner
@@ -80,7 +81,7 @@ import scala.concurrent._
 import scala.concurrent.duration._
 import scala.concurrent.forkjoin.ForkJoinPool
 import scala.language.postfixOps
-import scala.util.{Failure, Success, Try}
+import scala.util.{Failure, Success}
 
 /**
  * The TaskManager is responsible for executing the individual tasks of a 
Flink job. It is
@@ -503,15 +504,38 @@ class TaskManager(
             )
           }
 
-        case PartitionState(taskExecutionId, taskResultId, partitionId, state) 
=>
-          Option(runningTasks.get(taskExecutionId)) match {
+        // Updates the partition producer state
+        case PartitionProducerState(receiverExecutionId, result) =>
+          Option(runningTasks.get(receiverExecutionId)) match {
             case Some(task) =>
-              task.onPartitionStateUpdate(taskResultId, partitionId, state)
+              try {
+                result match {
+                  case Left((intermediateDataSetId, resultPartitionId, 
producerState)) =>
+                    // Forward the state update to the task
+                    task.onPartitionStateUpdate(
+                      intermediateDataSetId,
+                      resultPartitionId.getPartitionId,
+                      producerState)
+
+                  case Right(failure) =>
+                    // Cancel or fail the execution
+                    if 
(failure.isInstanceOf[PartitionProducerDisposedException]) {
+                      log.info("Partition producer disposed. Cancelling " +
+                        s"execution $receiverExecutionId.", failure)
+                      task.cancelExecution()
+                    } else {
+                      task.failExternally(failure)
+                    }
+                }
+              } catch {
+                case e: Exception => task.failExternally(e)
+              }
             case None =>
-              log.debug(s"Cannot find task $taskExecutionId to respond with 
partition state.")
+              log.debug(s"Cannot find task with ID $receiverExecutionId to 
forward partition " +
+                "state respond to.")
           }
       }
-      }
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
index a5f4c7d..955f335 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateConcurrentTest.java
@@ -24,7 +24,7 @@ import org.apache.flink.runtime.io.network.ConnectionID;
 import org.apache.flink.runtime.io.network.ConnectionManager;
 import org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.RemoteInputChannel;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
@@ -65,7 +65,7 @@ public class InputGateConcurrentTest {
                                new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
-                               mock(PartitionStateChecker.class),
+                               mock(PartitionProducerStateChecker.class),
                                new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                for (int i = 0; i < numChannels; i++) {
@@ -102,7 +102,7 @@ public class InputGateConcurrentTest {
                                new IntermediateDataSetID(),
                                0,
                                numChannels,
-                               mock(PartitionStateChecker.class),
+                               mock(PartitionProducerStateChecker.class),
                                new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                for (int i = 0; i < numChannels; i++) {
@@ -152,7 +152,7 @@ public class InputGateConcurrentTest {
                                new IntermediateDataSetID(),
                                0,
                                numChannels,
-                               mock(PartitionStateChecker.class),
+                               mock(PartitionProducerStateChecker.class),
                                new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                for (int i = 0, local = 0; i < numChannels; i++) {
@@ -192,7 +192,7 @@ public class InputGateConcurrentTest {
        // 
------------------------------------------------------------------------
 
        private static abstract class Source {
-       
+
                abstract void addBuffer(Buffer buffer) throws Exception;
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
index 192b0eb..c367018 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/InputGateFairnessTest.java
@@ -26,7 +26,7 @@ import 
org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.api.EndOfPartitionEvent;
 import org.apache.flink.runtime.io.network.api.serialization.EventSerializer;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.io.network.partition.consumer.BufferOrEvent;
 import org.apache.flink.runtime.io.network.partition.consumer.InputChannel;
 import 
org.apache.flink.runtime.io.network.partition.consumer.LocalInputChannel;
@@ -92,7 +92,7 @@ public class InputGateFairnessTest {
                                new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
-                               mock(PartitionStateChecker.class),
+                               mock(PartitionProducerStateChecker.class),
                                new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                for (int i = 0; i < numChannels; i++) {
@@ -146,7 +146,7 @@ public class InputGateFairnessTest {
                                new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
-                               mock(PartitionStateChecker.class),
+                               mock(PartitionProducerStateChecker.class),
                                new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                for (int i = 0; i < numChannels; i++) {
@@ -197,7 +197,7 @@ public class InputGateFairnessTest {
                                new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
-                               mock(PartitionStateChecker.class),
+                               mock(PartitionProducerStateChecker.class),
                                new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                final ConnectionManager connManager = 
createDummyConnectionManager();
@@ -206,11 +206,11 @@ public class InputGateFairnessTest {
 
                for (int i = 0; i < numChannels; i++) {
                        RemoteInputChannel channel = new RemoteInputChannel(
-                                       gate, i, new ResultPartitionID(), 
mock(ConnectionID.class), 
+                                       gate, i, new ResultPartitionID(), 
mock(ConnectionID.class),
                                        connManager, new Tuple2<>(0, 0), new 
DummyIOMetricGroup());
 
                        channels[i] = channel;
-                       
+
                        for (int p = 0; p < buffersPerChannel; p++) {
                                channel.onBuffer(mockBuffer, p);
                        }
@@ -253,7 +253,7 @@ public class InputGateFairnessTest {
                                new ExecutionAttemptID(),
                                new IntermediateDataSetID(),
                                0, numChannels,
-                               mock(PartitionStateChecker.class),
+                               mock(PartitionProducerStateChecker.class),
                                new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                final ConnectionManager connManager = 
createDummyConnectionManager();
@@ -335,7 +335,7 @@ public class InputGateFairnessTest {
                        partitions[i].onBuffer(buffer, sequenceNumbers[i]++);
                }
        }
-       
+
        // 
------------------------------------------------------------------------
 
        private static class FairnessVerifyingInputGate extends SingleInputGate 
{
@@ -352,11 +352,11 @@ public class InputGateFairnessTest {
                                IntermediateDataSetID consumedResultId,
                                int consumedSubpartitionIndex,
                                int numberOfInputChannels,
-                               PartitionStateChecker partitionStateChecker,
+                               PartitionProducerStateChecker 
partitionProducerStateChecker,
                                IOMetricGroup metrics) {
 
                        super(owningTaskName, jobId, executionId, 
consumedResultId, consumedSubpartitionIndex,
-                                       numberOfInputChannels, 
partitionStateChecker, metrics);
+                                       numberOfInputChannels, 
partitionProducerStateChecker, metrics);
 
                        try {
                                Field f = 
SingleInputGate.class.getDeclaredField("inputChannelsWithData");

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
index 5d0a106..411f344 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/LocalInputChannelTest.java
@@ -28,7 +28,7 @@ import 
org.apache.flink.runtime.io.network.TaskEventDispatcher;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.NetworkBufferPool;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import 
org.apache.flink.runtime.io.network.partition.PartitionNotFoundException;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
@@ -284,7 +284,7 @@ public class LocalInputChannelTest {
                        new IntermediateDataSetID(),
                        0,
                        1,
-                       mock(PartitionStateChecker.class),
+                       mock(PartitionProducerStateChecker.class),
                        new UnregisteredTaskMetricsGroup.DummyIOMetricGroup()
                );
 
@@ -481,7 +481,7 @@ public class LocalInputChannelTest {
                                        new IntermediateDataSetID(),
                                        subpartitionIndex,
                                        numberOfInputChannels,
-                                       mock(PartitionStateChecker.class),
+                                       
mock(PartitionProducerStateChecker.class),
                                        new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                        // Set buffer pool

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
index ec4b31d..f2fb2d9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGateTest.java
@@ -34,7 +34,7 @@ import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.BufferPool;
 import org.apache.flink.runtime.io.network.buffer.BufferProvider;
 import org.apache.flink.runtime.io.network.buffer.BufferRecycler;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import 
org.apache.flink.runtime.io.network.partition.BufferAvailabilityListener;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionManager;
@@ -74,7 +74,7 @@ public class SingleInputGateTest {
        public void testBasicGetNextLogic() throws Exception {
                // Setup
                final SingleInputGate inputGate = new SingleInputGate(
-                       "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       "Test Task Name", new JobID(), new 
ExecutionAttemptID(), new IntermediateDataSetID(), 0, 2, 
mock(PartitionProducerStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                final TestInputChannel[] inputChannels = new TestInputChannel[]{
                        new TestInputChannel(inputGate, 0),
@@ -128,7 +128,7 @@ public class SingleInputGateTest {
                // Setup reader with one local and one unknown input channel
                final IntermediateDataSetID resultId = new 
IntermediateDataSetID();
 
-               final SingleInputGate inputGate = new SingleInputGate("Test 
Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+               final SingleInputGate inputGate = new SingleInputGate("Test 
Task Name", new JobID(), new ExecutionAttemptID(), resultId, 0, 2, 
mock(PartitionProducerStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
                final BufferPool bufferPool = mock(BufferPool.class);
                
when(bufferPool.getNumberOfRequiredMemorySegments()).thenReturn(2);
 
@@ -183,7 +183,7 @@ public class SingleInputGateTest {
                        new IntermediateDataSetID(),
                        0,
                        1,
-                       mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+                       mock(PartitionProducerStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                ResultPartitionManager partitionManager = 
mock(ResultPartitionManager.class);
 
@@ -223,7 +223,7 @@ public class SingleInputGateTest {
                        new IntermediateDataSetID(),
                        0,
                        1,
-                       mock(PartitionStateChecker.class),
+                       mock(PartitionProducerStateChecker.class),
                        new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                InputChannel unknown = new UnknownInputChannel(
@@ -314,7 +314,7 @@ public class SingleInputGateTest {
                NetworkEnvironment netEnv = mock(NetworkEnvironment.class);
                when(netEnv.getPartitionManager()).thenReturn(new 
ResultPartitionManager());
                when(netEnv.getTaskEventDispatcher()).thenReturn(new 
TaskEventDispatcher());
-               
when(netEnv.getPartitionStateChecker()).thenReturn(mock(PartitionStateChecker.class));
+               
when(netEnv.getPartitionProducerStateChecker()).thenReturn(mock(PartitionProducerStateChecker.class));
                
when(netEnv.getPartitionRequestInitialAndMaxBackoff()).thenReturn(new 
Tuple2<>(initialBackoff, maxBackoff));
                when(netEnv.getConnectionManager()).thenReturn(new 
LocalConnectionManager());
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
index 0749467..b1d9483 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/TestSingleInputGate.java
@@ -20,7 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
@@ -59,7 +59,7 @@ public class TestSingleInputGate {
                        new IntermediateDataSetID(),
                        0,
                        numberOfInputChannels,
-                       mock(PartitionStateChecker.class),
+                       mock(PartitionProducerStateChecker.class),
                        new UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                this.inputGate = spy(realGate);

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
index faec77e..8cfda0f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/consumer/UnionInputGateTest.java
@@ -20,7 +20,7 @@ package 
org.apache.flink.runtime.io.network.partition.consumer;
 
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
-import org.apache.flink.runtime.io.network.netty.PartitionStateChecker;
+import org.apache.flink.runtime.io.network.netty.PartitionProducerStateChecker;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import 
org.apache.flink.runtime.operators.testutils.UnregisteredTaskMetricsGroup;
 import org.junit.Test;
@@ -43,8 +43,8 @@ public class UnionInputGateTest {
        public void testBasicGetNextLogic() throws Exception {
                // Setup
                final String testTaskName = "Test Task";
-               final SingleInputGate ig1 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
-               final SingleInputGate ig2 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, 
mock(PartitionStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+               final SingleInputGate ig1 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 3, 
mock(PartitionProducerStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
+               final SingleInputGate ig2 = new SingleInputGate(testTaskName, 
new JobID(), new ExecutionAttemptID(), new IntermediateDataSetID(), 0, 5, 
mock(PartitionProducerStateChecker.class), new 
UnregisteredTaskMetricsGroup.DummyIOMetricGroup());
 
                final UnionInputGate union = new UnionInputGate(new 
SingleInputGate[]{ig1, ig2});
 

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
index b56bf29..e41982e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/JobManagerTest.java
@@ -48,12 +48,12 @@ import 
org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
 import 
org.apache.flink.runtime.leaderretrieval.StandaloneLeaderRetrievalService;
 import org.apache.flink.runtime.messages.JobManagerMessages;
 import 
org.apache.flink.runtime.messages.JobManagerMessages.LeaderSessionMessage;
-import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
+import 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionProducerState;
 import org.apache.flink.runtime.messages.JobManagerMessages.StopJob;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingFailure;
 import org.apache.flink.runtime.messages.JobManagerMessages.StoppingSuccess;
 import org.apache.flink.runtime.messages.JobManagerMessages.SubmitJob;
-import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
+import org.apache.flink.runtime.messages.TaskMessages.PartitionProducerState;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.testingUtils.TestingCluster;
 import org.apache.flink.runtime.testingUtils.TestingJobManager;
@@ -89,7 +89,6 @@ import static 
org.apache.flink.runtime.io.network.partition.ResultPartitionType.
 import static 
org.apache.flink.runtime.testingUtils.TestingUtils.DEFAULT_AKKA_ASK_TIMEOUT;
 import static 
org.apache.flink.runtime.testingUtils.TestingUtils.startTestingCluster;
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
@@ -178,7 +177,7 @@ public class JobManagerTest extends TestLogger {
                                                // Request the execution graph 
to get the runtime info
                                                jobManagerGateway.tell(new 
RequestExecutionGraph(jid), testActorGateway);
 
-                                               final ExecutionGraph eg = 
expectMsgClass(ExecutionGraphFound.class)
+                                               final ExecutionGraph eg = 
(ExecutionGraph) expectMsgClass(ExecutionGraphFound.class)
                                                        .executionGraph();
 
                                                final ExecutionVertex vertex = 
eg.getJobVertex(sender.getID())
@@ -193,59 +192,236 @@ public class JobManagerTest extends TestLogger {
 
                                                // - The test 
----------------------------------------------------------------------
 
+                                               ExecutionAttemptID receiverId = 
new ExecutionAttemptID();
+
                                                // 1. All execution states
-                                               RequestPartitionState request = 
new RequestPartitionState(
-                                                       jid, partitionId, 
receiver, rid);
+                                               RequestPartitionProducerState 
request = new RequestPartitionProducerState(
+                                                       jid, receiverId, rid, 
partitionId);
 
                                                for (ExecutionState state : 
ExecutionState.values()) {
                                                        
ExecutionGraphTestUtils.setVertexState(vertex, state);
 
-                                                       
jobManagerGateway.tell(request, testActorGateway);
+                                                       Future<?> 
futurePartitionState = jobManagerGateway
+                                                               .ask(request, 
getRemainingTime());
 
-                                                       LeaderSessionMessage 
lsm = expectMsgClass(LeaderSessionMessage.class);
+                                                       LeaderSessionMessage 
wrappedMsg = (LeaderSessionMessage) Await.result(futurePartitionState, 
getRemainingTime());
+                                                       PartitionProducerState 
resp = (PartitionProducerState) (PartitionProducerState) wrappedMsg.message();
+                                                       
assertEquals(receiverId, resp.receiverExecutionId());
+                                                       assertTrue("Responded 
with failure: " + resp, resp.result().isLeft());
+                                                       assertEquals(state, 
resp.result().left().get()._3());
+                                               }
 
-                                                       
assertEquals(PartitionState.class, lsm.message().getClass());
+                                               // 2. Non-existing execution
+                                               request = new 
RequestPartitionProducerState(jid, receiverId, rid, new ResultPartitionID());
 
-                                                       PartitionState resp = 
(PartitionState) lsm.message();
+                                               Future<?> futurePartitionState 
= jobManagerGateway.ask(request, getRemainingTime());
+                                               LeaderSessionMessage wrappedMsg 
= (LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime());
+                                               PartitionProducerState resp = 
(PartitionProducerState) wrappedMsg.message();
+                                               assertEquals(receiverId, 
resp.receiverExecutionId());
+                                               assertTrue("Responded with 
success: " + resp, resp.result().isRight());
+                                               
assertTrue(resp.result().right().get() instanceof RuntimeException);
+                                               
assertTrue(resp.result().right().get().getCause() instanceof 
IllegalArgumentException);
 
-                                                       
assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-                                                       
assertEquals(request.taskResultId(), resp.taskResultId());
-                                                       
assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-                                                       assertEquals(state, 
resp.state());
+                                               // 3. Non-existing job
+                                               request = new 
RequestPartitionProducerState(new JobID(), receiverId, rid, new 
ResultPartitionID());
+                                               futurePartitionState = 
jobManagerGateway.ask(request, getRemainingTime());
+                                               wrappedMsg = 
(LeaderSessionMessage) Await.result(futurePartitionState, getRemainingTime());
+                                               resp = (PartitionProducerState) 
wrappedMsg.message();
+                                               assertEquals(receiverId, 
resp.receiverExecutionId());
+                                               assertTrue("Responded with 
success: " + resp, resp.result().isRight());
+                                               
assertTrue(resp.result().right().get() instanceof IllegalArgumentException);
+                                       } catch (Exception e) {
+                                               e.printStackTrace();
+                                               fail(e.getMessage());
+                                       } finally {
+                                               if (cluster != null) {
+                                                       cluster.shutdown();
                                                }
+                                       }
+                               }
+                       };
+               }};
+       }
 
-                                               // 2. Non-existing execution
-                                               request = new 
RequestPartitionState(jid, new ResultPartitionID(), receiver, rid);
+       /**
+        * Tests the JobManager response when the execution is not registered 
with
+        * the ExecutionGraph.
+        */
+       @Test
+       public void testRequestPartitionStateUnregisteredExecution() throws 
Exception {
+               new JavaTestKit(system) {{
+                       new Within(duration("15 seconds")) {
+                               @Override
+                               protected void run() {
+                                       // Setup
+                                       TestingCluster cluster = null;
+
+                                       try {
+                                               cluster = 
startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+
+                                               final IntermediateDataSetID rid 
= new IntermediateDataSetID();
+
+                                               // Create a task
+                                               final JobVertex sender = new 
JobVertex("Sender");
+                                               sender.setParallelism(1);
+                                               
sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+                                               
sender.createAndAddResultDataSet(rid, PIPELINED);
 
-                                               jobManagerGateway.tell(request, 
testActorGateway);
+                                               final JobVertex sender2 = new 
JobVertex("Blocking Sender");
+                                               sender2.setParallelism(1);
+                                               
sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+                                               
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
 
-                                               LeaderSessionMessage lsm = 
expectMsgClass(LeaderSessionMessage.class);
+                                               final JobGraph jobGraph = new 
JobGraph("Fast finishing producer test job", sender, sender2);
+                                               final JobID jid = 
jobGraph.getJobID();
 
-                                               
assertEquals(PartitionState.class, lsm.message().getClass());
+                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(
+                                                       
TestingUtils.TESTING_DURATION());
 
-                                               PartitionState resp = 
(PartitionState) lsm.message();
+                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), null);
 
-                                               
assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-                                               
assertEquals(request.taskResultId(), resp.taskResultId());
-                                               
assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-                                               assertNull(resp.state());
+                                               // Submit the job and wait for 
all vertices to be running
+                                               jobManagerGateway.tell(
+                                                       new SubmitJob(
+                                                               jobGraph,
+                                                               
ListeningBehaviour.EXECUTION_RESULT),
+                                                       testActorGateway);
+                                               
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
 
-                                               // 3. Non-existing job
-                                               request = new 
RequestPartitionState(
-                                                       new JobID(), new 
ResultPartitionID(), receiver, rid);
+                                               jobManagerGateway.tell(
+                                                       new 
WaitForAllVerticesToBeRunningOrFinished(jid),
+                                                       testActorGateway);
+
+                                               
expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+                                               Future<Object> egFuture = 
jobManagerGateway.ask(
+                                                       new 
RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+                                               ExecutionGraphFound egFound = 
(ExecutionGraphFound) Await.result(egFuture, remaining());
+                                               ExecutionGraph eg = 
egFound.executionGraph();
+
+                                               ExecutionVertex vertex = 
eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+                                               while 
(vertex.getExecutionState() != ExecutionState.FINISHED) {
+                                                       Thread.sleep(1);
+                                               }
+
+                                               IntermediateResultPartition 
partition = vertex.getProducedPartitions()
+                                                       
.values().iterator().next();
+
+                                               ResultPartitionID partitionId = 
new ResultPartitionID(
+                                                       
partition.getPartitionId(),
+                                                       
vertex.getCurrentExecutionAttempt().getAttemptId());
+
+                                               // Producer finished, request 
state
+                                               ExecutionAttemptID receiverId = 
new ExecutionAttemptID();
+
+                                               Future<?> producerStateFuture = 
jobManagerGateway.ask(
+                                                       new 
RequestPartitionProducerState(jid, receiverId, rid, partitionId), 
getRemainingTime());
+
+                                               LeaderSessionMessage wrappedMsg 
= (LeaderSessionMessage) Await.result(producerStateFuture, getRemainingTime());
+                                               PartitionProducerState resp = 
(PartitionProducerState) wrappedMsg.message();
+                                               assertEquals(receiverId, 
resp.receiverExecutionId());
+                                               assertTrue("Responded with 
failure: " + resp, resp.result().isLeft());
+                                               
assertEquals(ExecutionState.FINISHED, resp.result().left().get()._3());
+                                       } catch (Exception e) {
+                                               e.printStackTrace();
+                                               fail(e.getMessage());
+                                       } finally {
+                                               if (cluster != null) {
+                                                       cluster.shutdown();
+                                               }
+                                       }
+                               }
+                       };
+               }};
+       }
+
+       /**
+        * Tests the JobManager response when the execution is not registered 
with
+        * the ExecutionGraph anymore and a new execution attempt is available.
+        */
+       @Test
+       public void testRequestPartitionStateMoreRecentExecutionAttempt() 
throws Exception {
+               new JavaTestKit(system) {{
+                       new Within(duration("15 seconds")) {
+                               @Override
+                               protected void run() {
+                                       // Setup
+                                       TestingCluster cluster = null;
+
+                                       try {
+                                               cluster = 
startTestingCluster(4, 1, DEFAULT_AKKA_ASK_TIMEOUT());
+
+                                               final IntermediateDataSetID rid 
= new IntermediateDataSetID();
+
+                                               // Create a task
+                                               final JobVertex sender = new 
JobVertex("Sender");
+                                               sender.setParallelism(1);
+                                               
sender.setInvokableClass(Tasks.NoOpInvokable.class); // just finish
+                                               
sender.createAndAddResultDataSet(rid, PIPELINED);
+
+                                               final JobVertex sender2 = new 
JobVertex("Blocking Sender");
+                                               sender2.setParallelism(1);
+                                               
sender2.setInvokableClass(Tasks.BlockingNoOpInvokable.class); // just block
+                                               
sender2.createAndAddResultDataSet(new IntermediateDataSetID(), PIPELINED);
 
-                                               jobManagerGateway.tell(request, 
testActorGateway);
+                                               final JobGraph jobGraph = new 
JobGraph("Fast finishing producer test job", sender, sender2);
+                                               final JobID jid = 
jobGraph.getJobID();
+
+                                               final ActorGateway 
jobManagerGateway = cluster.getLeaderGateway(
+                                                       
TestingUtils.TESTING_DURATION());
+
+                                               // we can set the leader 
session ID to None because we don't use this gateway to send messages
+                                               final ActorGateway 
testActorGateway = new AkkaActorGateway(getTestActor(), null);
+
+                                               // Submit the job and wait for 
all vertices to be running
+                                               jobManagerGateway.tell(
+                                                       new SubmitJob(
+                                                               jobGraph,
+                                                               
ListeningBehaviour.EXECUTION_RESULT),
+                                                       testActorGateway);
+                                               
expectMsgClass(JobManagerMessages.JobSubmitSuccess.class);
+
+                                               jobManagerGateway.tell(
+                                                       new 
WaitForAllVerticesToBeRunningOrFinished(jid),
+                                                       testActorGateway);
+
+                                               
expectMsgClass(TestingJobManagerMessages.AllVerticesRunning.class);
+
+                                               Future<Object> egFuture = 
jobManagerGateway.ask(
+                                                       new 
RequestExecutionGraph(jobGraph.getJobID()), remaining());
+
+                                               ExecutionGraphFound egFound = 
(ExecutionGraphFound) Await.result(egFuture, remaining());
+                                               ExecutionGraph eg = 
(ExecutionGraph) egFound.executionGraph();
+
+                                               ExecutionVertex vertex = 
eg.getJobVertex(sender.getID()).getTaskVertices()[0];
+                                               while 
(vertex.getExecutionState() != ExecutionState.FINISHED) {
+                                                       Thread.sleep(1);
+                                               }
+
+                                               IntermediateResultPartition 
partition = vertex.getProducedPartitions()
+                                                       
.values().iterator().next();
+
+                                               ResultPartitionID partitionId = 
new ResultPartitionID(
+                                                       
partition.getPartitionId(),
+                                                       
vertex.getCurrentExecutionAttempt().getAttemptId());
 
-                                               lsm = 
expectMsgClass(LeaderSessionMessage.class);
+                                               // Reset execution => new 
execution attempt
+                                               vertex.resetForNewExecution();
 
-                                               
assertEquals(PartitionState.class, lsm.message().getClass());
+                                               // Producer finished, request 
state
+                                               ExecutionAttemptID receiverId = 
new ExecutionAttemptID();
 
-                                               resp = (PartitionState) 
lsm.message();
+                                               Object request = new 
RequestPartitionProducerState(jid, receiverId, rid, partitionId);
 
-                                               
assertEquals(request.taskExecutionId(), resp.taskExecutionId());
-                                               
assertEquals(request.taskResultId(), resp.taskResultId());
-                                               
assertEquals(request.partitionId().getPartitionId(), resp.partitionId());
-                                               assertNull(resp.state());
+                                               Future<?> producerStateFuture = 
jobManagerGateway.ask(request, getRemainingTime());
+
+                                               LeaderSessionMessage wrappedMsg 
= (LeaderSessionMessage) Await.result(producerStateFuture, getRemainingTime());
+                                               PartitionProducerState resp = 
(PartitionProducerState) wrappedMsg.message();
+                                               assertEquals(receiverId, 
resp.receiverExecutionId());
+                                               assertTrue("Responded with 
success: " + resp, resp.result().isRight());
+                                               
assertTrue(resp.result().right().get() instanceof 
PartitionProducerDisposedException);
                                        } catch (Exception e) {
                                                e.printStackTrace();
                                                fail(e.getMessage());
@@ -366,11 +542,7 @@ public class JobManagerTest extends TestLogger {
        }
 
        /**
-                                       system.dispatcher(),
-                               actorSystem.dispatcher(),
-        * Tests that we can trigger a
-        *
-        * @throws Exception
+        * Tests that we can trigger a savepoint when periodic checkpointing is 
disabled.
         */
        @Test
        public void testSavepointWithDeactivatedPeriodicCheckpointing() throws 
Exception {

http://git-wip-us.apache.org/repos/asf/flink/blob/2b612f2d/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
index 6a696a0..87fb24c 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskmanager/TaskManagerTest.java
@@ -54,6 +54,7 @@ import 
org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmanager.Tasks;
+import org.apache.flink.runtime.messages.JobManagerMessages;
 import org.apache.flink.runtime.messages.Messages;
 import org.apache.flink.runtime.messages.RegistrationMessages;
 import 
org.apache.flink.runtime.messages.StackTraceSampleMessages.ResponseStackTraceSampleFailure;
@@ -63,7 +64,6 @@ import org.apache.flink.runtime.messages.TaskManagerMessages;
 import org.apache.flink.runtime.messages.TaskManagerMessages.FatalError;
 import org.apache.flink.runtime.messages.TaskMessages;
 import org.apache.flink.runtime.messages.TaskMessages.CancelTask;
-import org.apache.flink.runtime.messages.TaskMessages.PartitionState;
 import org.apache.flink.runtime.messages.TaskMessages.StopTask;
 import org.apache.flink.runtime.messages.TaskMessages.SubmitTask;
 import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult;
@@ -81,10 +81,12 @@ import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Option;
+import scala.Tuple3;
 import scala.concurrent.Await;
 import scala.concurrent.Future;
 import scala.concurrent.Promise;
 import scala.concurrent.duration.FiniteDuration;
+import scala.util.Left;
 
 import java.io.IOException;
 import java.net.InetSocketAddress;
@@ -100,7 +102,6 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.TimeUnit;
 
-import static 
org.apache.flink.runtime.messages.JobManagerMessages.RequestPartitionState;
 import static 
org.apache.flink.runtime.messages.JobManagerMessages.ScheduleOrUpdateConsumers;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -1586,14 +1587,15 @@ public class TaskManagerTest extends TestLogger {
 
                @Override
                public void handleMessage(Object message) throws Exception {
-                       if (message instanceof RequestPartitionState) {
-                               final RequestPartitionState msg = 
(RequestPartitionState) message;
-
-                               PartitionState resp = new PartitionState(
-                                               msg.taskExecutionId(),
-                                               msg.taskResultId(),
-                                               
msg.partitionId().getPartitionId(),
-                                               ExecutionState.RUNNING);
+                       if (message instanceof 
JobManagerMessages.RequestPartitionProducerState) {
+                               
JobManagerMessages.RequestPartitionProducerState msg = 
(JobManagerMessages.RequestPartitionProducerState) message;
+
+                               TaskMessages.PartitionProducerState resp = new 
TaskMessages.PartitionProducerState(
+                                       msg.receiverExecutionId(),
+                                       new Left<Tuple3<IntermediateDataSetID, 
ResultPartitionID, ExecutionState>, Exception>(new Tuple3<>(
+                                               msg.intermediateDataSetId(),
+                                               msg.resultPartitionId(),
+                                               ExecutionState.RUNNING)));
 
                                getSender().tell(decorateMessage(resp), 
getSelf());
                        }

Reply via email to