This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 408f6b67aefaccfc76708b2d4772eb7f0a8fd984
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed Jun 12 14:27:32 2019 +0200

    [FLINK-12615][coordination] Track partitions on JM
---
 .../flink/runtime/executiongraph/Execution.java    |  43 +--
 .../runtime/executiongraph/ExecutionGraph.java     |  20 +-
 .../executiongraph/ExecutionGraphBuilder.java      |   7 +-
 .../io/network/partition/PartitionTracker.java     |  57 ++++
 .../network/partition/PartitionTrackerFactory.java |  53 ++++
 .../io/network/partition/PartitionTrackerImpl.java | 176 ++++++++++++
 .../apache/flink/runtime/jobmaster/JobMaster.java  |  20 +-
 .../factories/DefaultJobMasterServiceFactory.java  |   8 +-
 .../flink/runtime/scheduler/DefaultScheduler.java  |   7 +-
 .../runtime/scheduler/DefaultSchedulerFactory.java |   7 +-
 .../flink/runtime/scheduler/LegacyScheduler.java   |  17 +-
 .../runtime/scheduler/LegacySchedulerFactory.java  |   7 +-
 .../runtime/scheduler/SchedulerNGFactory.java      |   4 +-
 .../CheckpointSettingsSerializableTest.java        |   4 +-
 .../ExecutionGraphDeploymentTest.java              |   4 +-
 .../ExecutionGraphRescalingTest.java               |  13 +-
 .../ExecutionGraphSchedulingTest.java              |   4 +-
 .../executiongraph/ExecutionGraphTestUtils.java    |   4 +-
 .../runtime/executiongraph/ExecutionTest.java      | 200 ++++++++------
 .../ExecutionVertexLocalityTest.java               |   4 +-
 .../PipelinedFailoverRegionBuildingTest.java       |   4 +-
 .../io/network/partition/NoOpPartitionTracker.java |  53 ++++
 .../io/network/partition/PartitionTestUtils.java   |  34 +++
 .../partition/PartitionTrackerImplTest.java        | 294 +++++++++++++++++++++
 .../network/partition/TestingPartitionTracker.java |  83 ++++++
 .../flink/runtime/jobmaster/JobMasterTest.java     |  61 ++++-
 .../taskexecutor/TestingTaskExecutorGateway.java   |  20 +-
 .../TestingTaskExecutorGatewayBuilder.java         |  12 +-
 28 files changed, 1082 insertions(+), 138 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index c81ec18..c033304 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
 import org.apache.flink.runtime.clusterframework.types.AllocationID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
@@ -39,6 +40,7 @@ import 
org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
@@ -626,6 +628,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        
vertex.getExecutionGraph().getJobMasterMainThreadExecutor(),
                        producedPartitionsCache -> {
                                producedPartitions = producedPartitionsCache;
+                               
startTrackingPartitions(location.getResourceID(), 
producedPartitionsCache.values());
                                return this;
                        });
        }
@@ -811,7 +814,6 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        else if (current == FINISHED || current == FAILED) {
                                // nothing to do any more. finished/failed 
before it could be cancelled.
                                // in any case, the task is removed from the 
TaskManager already
-                               
sendReleaseIntermediateResultPartitionsRpcCall();
 
                                return;
                        }
@@ -844,8 +846,6 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                break;
                        case FINISHED:
                        case FAILED:
-                               
sendReleaseIntermediateResultPartitionsRpcCall();
-                               break;
                        case CANCELED:
                                break;
                        default:
@@ -1170,6 +1170,8 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        private void finishCancellation() {
                releaseAssignedResource(new FlinkException("Execution " + this 
+ " was cancelled."));
                vertex.getExecutionGraph().deregisterExecution(this);
+               // release partitions on TM in case the Task finished while we 
where already CANCELING
+               stopTrackingAndReleasePartitions();
        }
 
        void cachePartitionInfo(PartitionInfo partitionInfo) {
@@ -1228,6 +1230,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                                releaseAssignedResource(t);
                                
vertex.getExecutionGraph().deregisterExecution(this);
+                               stopTrackingAndReleasePartitions();
 
                                if (!isCallback && (current == RUNNING || 
current == DEPLOYING)) {
                                        if (LOG.isDebugEnabled()) {
@@ -1323,23 +1326,25 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                }
        }
 
-       private void sendReleaseIntermediateResultPartitionsRpcCall() {
-               LOG.info("Discarding the results produced by task execution 
{}.", attemptId);
-               final LogicalSlot slot = assignedResource;
-
-               if (slot != null) {
-                       final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
-
-                       Collection<IntermediateResultPartition> partitions = 
vertex.getProducedPartitions().values();
-                       Collection<ResultPartitionID> partitionIds = new 
ArrayList<>(partitions.size());
-                       for (IntermediateResultPartition partition : 
partitions) {
-                               partitionIds.add(new 
ResultPartitionID(partition.getPartitionId(), attemptId));
-                       }
+       private void startTrackingPartitions(final ResourceID taskExecutorId, 
final Collection<ResultPartitionDeploymentDescriptor> partitions) {
+               PartitionTracker partitionTracker = 
vertex.getExecutionGraph().getPartitionTracker();
+               for (ResultPartitionDeploymentDescriptor partition : 
partitions) {
+                       partitionTracker.startTrackingPartition(
+                               taskExecutorId,
+                               partition);
+               }
+       }
 
-                       if (!partitionIds.isEmpty()) {
-                               // TODO For some tests this could be a problem 
when querying too early if all resources were released
-                               
taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
-                       }
+       private void stopTrackingAndReleasePartitions() {
+               LOG.info("Discarding the results produced by task execution 
{}.", attemptId);
+               if (producedPartitions != null && producedPartitions.size() > 
0) {
+                       final PartitionTracker partitionTracker = 
getVertex().getExecutionGraph().getPartitionTracker();
+                       final List<ResultPartitionID> producedPartitionIds = 
producedPartitions.values().stream()
+                               
.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+                               .map(ShuffleDescriptor::getResultPartitionID)
+                               .collect(Collectors.toList());
+
+                       
partitionTracker.stopTrackingAndReleasePartitions(producedPartitionIds);
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index 0c20ee5..514121e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -51,6 +51,8 @@ import 
org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import 
org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartCallback;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobStatus;
@@ -96,6 +98,7 @@ import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
+import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
@@ -284,6 +287,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
         * strong reference to any user-defined classes.*/
        private volatile ErrorInfo failureInfo;
 
+       private final PartitionTracker partitionTracker;
+
        /**
         * Future for an ongoing or completed scheduling action.
         */
@@ -409,7 +414,11 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                        blobWriter,
                        allocationTimeout,
                        NettyShuffleMaster.INSTANCE,
-                       true);
+                       true,
+                       new PartitionTrackerImpl(
+                               jobInformation.getJobId(),
+                               NettyShuffleMaster.INSTANCE,
+                               ignored -> Optional.empty()));
        }
 
        public ExecutionGraph(
@@ -425,7 +434,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
                        BlobWriter blobWriter,
                        Time allocationTimeout,
                        ShuffleMaster<?> shuffleMaster,
-                       boolean forcePartitionReleaseOnConsumption) throws 
IOException {
+                       boolean forcePartitionReleaseOnConsumption,
+                       PartitionTracker partitionTracker) throws IOException {
 
                checkNotNull(futureExecutor);
 
@@ -477,6 +487,8 @@ public class ExecutionGraph implements AccessExecutionGraph 
{
 
                this.forcePartitionReleaseOnConsumption = 
forcePartitionReleaseOnConsumption;
 
+               this.partitionTracker = checkNotNull(partitionTracker);
+
                LOG.info("Job recovers via failover strategy: {}", 
failoverStrategy.getStrategyName());
        }
 
@@ -1819,4 +1831,8 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
        ShuffleMaster<?> getShuffleMaster() {
                return shuffleMaster;
        }
+
+       public PartitionTracker getPartitionTracker() {
+               return partitionTracker;
+       }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
index c4eead1..f21d703 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java
@@ -43,6 +43,7 @@ import 
org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge
 import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge;
 import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
@@ -94,7 +95,8 @@ public class ExecutionGraphBuilder {
                        BlobWriter blobWriter,
                        Time allocationTimeout,
                        Logger log,
-                       ShuffleMaster<?> shuffleMaster) throws 
JobExecutionException, JobException {
+                       ShuffleMaster<?> shuffleMaster,
+                       PartitionTracker partitionTracker) throws 
JobExecutionException, JobException {
 
                checkNotNull(jobGraph, "job graph cannot be null");
 
@@ -135,7 +137,8 @@ public class ExecutionGraphBuilder {
                                        blobWriter,
                                        allocationTimeout,
                                        shuffleMaster,
-                                       forcePartitionReleaseOnConsumption);
+                                       forcePartitionReleaseOnConsumption,
+                                       partitionTracker);
                } catch (IOException e) {
                        throw new JobException("Could not create the 
ExecutionGraph.", e);
                }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
new file mode 100644
index 0000000..4c432f1
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+
+import java.util.Collection;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public interface PartitionTracker {
+
+       /**
+        * Starts the tracking of the given partition for the given task 
executor ID.
+        *
+        * @param producingTaskExecutorId ID of task executor on which the 
partition is produced
+        * @param resultPartitionDeploymentDescriptor deployment descriptor of 
the partition
+        */
+       void startTrackingPartition(ResourceID producingTaskExecutorId, 
ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor);
+
+       /**
+        * Stops the tracking of all partitions for the given task executor ID, 
without issuing any release calls.
+        */
+       void stopTrackingPartitionsFor(ResourceID producingTaskExecutorId);
+
+       /**
+        * Releases the given partitions and stop the tracking of partitions 
that were released.
+        */
+       void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> 
resultPartitionIds);
+
+       /**
+        * Releases all partitions for the given task executor ID, and stop the 
tracking of partitions that were released.
+        */
+       void stopTrackingAndReleasePartitionsFor(ResourceID 
producingTaskExecutorId);
+
+       /**
+        * Returns whether any partition is being tracked for the given task 
executor ID.
+        */
+       boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId);
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java
new file mode 100644
index 0000000..a1c3416
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+
+import java.util.Optional;
+
+/**
+ * Factory for {@link PartitionTracker}.
+ */
+@FunctionalInterface
+public interface PartitionTrackerFactory {
+
+       /**
+        * Creates a new PartitionTracker.
+        *
+        * @param taskExecutorGatewayLookup lookup function to access task 
executor gateways
+        * @return created PartitionTracker
+        */
+       PartitionTracker create(TaskExecutorGatewayLookup 
taskExecutorGatewayLookup);
+
+       /**
+        * Lookup function for {@link TaskExecutorGateway}.
+        */
+       @FunctionalInterface
+       interface TaskExecutorGatewayLookup {
+
+               /**
+                * Returns a {@link TaskExecutorGateway} corresponding to the 
given ResourceID.
+                *
+                * @param taskExecutorId id of the task executor to look up.
+                * @return optional task executor gateway
+                */
+               Optional<TaskExecutorGateway> lookup(ResourceID taskExecutorId);
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
new file mode 100644
index 0000000..53e7d3f
--- /dev/null
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -0,0 +1,176 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskexecutor.partition.PartitionTable;
+import org.apache.flink.util.Preconditions;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.stream.Collectors;
+
+import static java.util.stream.Collectors.toList;
+
+/**
+ * Utility for tracking partitions and issuing release calls to task executors 
and shuffle masters.
+ */
+public class PartitionTrackerImpl implements PartitionTracker {
+
+       private final JobID jobId;
+
+       private final PartitionTable<ResourceID> partitionTable = new 
PartitionTable<>();
+       private final Map<ResultPartitionID, PartitionInfo> partitionInfos = 
new HashMap<>();
+
+       private final ShuffleMaster<?> shuffleMaster;
+
+       private final PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup;
+
+       public PartitionTrackerImpl(
+               JobID jobId,
+               ShuffleMaster<?> shuffleMaster,
+               PartitionTrackerFactory.TaskExecutorGatewayLookup 
taskExecutorGatewayLookup) {
+
+               this.jobId = Preconditions.checkNotNull(jobId);
+               this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster);
+               this.taskExecutorGatewayLookup = taskExecutorGatewayLookup;
+       }
+
+       @Override
+       public void startTrackingPartition(ResourceID producingTaskExecutorId, 
ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
+               Preconditions.checkNotNull(producingTaskExecutorId);
+               Preconditions.checkNotNull(resultPartitionDeploymentDescriptor);
+
+               // if it is released on consumption we do not need to issue any 
release calls
+               if 
(resultPartitionDeploymentDescriptor.isReleasedOnConsumption()) {
+                       return;
+               }
+
+               final ResultPartitionID resultPartitionId = 
resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
+
+               partitionInfos.put(resultPartitionId, new 
PartitionInfo(producingTaskExecutorId, resultPartitionDeploymentDescriptor));
+               partitionTable.startTrackingPartitions(producingTaskExecutorId, 
Collections.singletonList(resultPartitionId));
+       }
+
+       @Override
+       public void stopTrackingPartitionsFor(ResourceID 
producingTaskExecutorId) {
+               Preconditions.checkNotNull(producingTaskExecutorId);
+
+               // this is a bit icky since we make 2 calls to 
pT#stopTrackingPartitions
+               final Collection<ResultPartitionID> resultPartitionIds = 
partitionTable.stopTrackingPartitions(producingTaskExecutorId);
+
+               for (ResultPartitionID resultPartitionId : resultPartitionIds) {
+                       internalStopTrackingPartition(resultPartitionId);
+               }
+       }
+
+       @Override
+       public void 
stopTrackingAndReleasePartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
+               Preconditions.checkNotNull(resultPartitionIds);
+
+               // stop tracking partitions to be released and group them by 
task executor ID
+               Map<ResourceID, List<ResultPartitionDeploymentDescriptor>> 
partitionsToReleaseByResourceId = resultPartitionIds.stream()
+                       .map(this::internalStopTrackingPartition)
+                       .filter(Optional::isPresent)
+                       .map(Optional::get)
+                       .collect(Collectors.groupingBy(
+                               partitionMetaData -> 
partitionMetaData.producingTaskExecutorResourceId,
+                               Collectors.mapping(
+                                       partitionMetaData -> 
partitionMetaData.resultPartitionDeploymentDescriptor,
+                                       toList())));
+
+               
partitionsToReleaseByResourceId.forEach(this::internalReleasePartitions);
+       }
+
+       @Override
+       public void stopTrackingAndReleasePartitionsFor(ResourceID 
producingTaskExecutorId) {
+               Preconditions.checkNotNull(producingTaskExecutorId);
+
+               // this is a bit icky since we make 2 calls to 
pT#stopTrackingPartitions
+               Collection<ResultPartitionID> resultPartitionIds = 
partitionTable.stopTrackingPartitions(producingTaskExecutorId);
+
+               stopTrackingAndReleasePartitions(resultPartitionIds);
+       }
+
+       @Override
+       public boolean isTrackingPartitionsFor(ResourceID 
producingTaskExecutorId) {
+               Preconditions.checkNotNull(producingTaskExecutorId);
+
+               return 
partitionTable.hasTrackedPartitions(producingTaskExecutorId);
+       }
+
+       private Optional<PartitionInfo> 
internalStopTrackingPartition(ResultPartitionID resultPartitionId) {
+               final PartitionInfo partitionInfo = 
partitionInfos.remove(resultPartitionId);
+               if (partitionInfo == null) {
+                       return Optional.empty();
+               }
+               
partitionTable.stopTrackingPartitions(partitionInfo.producingTaskExecutorResourceId,
 Collections.singletonList(resultPartitionId));
+
+               return Optional.of(partitionInfo);
+       }
+
+       private void internalReleasePartitions(
+               ResourceID potentialPartitionLocation,
+               Collection<ResultPartitionDeploymentDescriptor> 
partitionDeploymentDescriptors) {
+
+               
internalReleasePartitionsOnTaskExecutor(potentialPartitionLocation, 
partitionDeploymentDescriptors);
+               
internalReleasePartitionsOnShuffleMaster(partitionDeploymentDescriptors);
+       }
+
+       private void internalReleasePartitionsOnTaskExecutor(
+               ResourceID potentialPartitionLocation,
+               Collection<ResultPartitionDeploymentDescriptor> 
partitionDeploymentDescriptors) {
+
+               final List<ResultPartitionID> 
partitionsRequiringRpcReleaseCalls = partitionDeploymentDescriptors.stream()
+                       
.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+                       .filter(descriptor -> 
descriptor.storesLocalResourcesOn().isPresent())
+                       .map(ShuffleDescriptor::getResultPartitionID)
+                       .collect(Collectors.toList());
+
+               if (!partitionsRequiringRpcReleaseCalls.isEmpty()) {
+                       taskExecutorGatewayLookup
+                               .lookup(potentialPartitionLocation)
+                               .ifPresent(taskExecutorGateway ->
+                                       
taskExecutorGateway.releasePartitions(jobId, 
partitionsRequiringRpcReleaseCalls));
+               }
+       }
+
+       private void 
internalReleasePartitionsOnShuffleMaster(Collection<ResultPartitionDeploymentDescriptor>
 partitionDeploymentDescriptors) {
+               partitionDeploymentDescriptors.stream()
+                       
.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+                       .forEach(shuffleMaster::releasePartitionExternally);
+       }
+
+       private static final class PartitionInfo {
+               public final ResourceID producingTaskExecutorResourceId;
+               public final ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor;
+
+               private PartitionInfo(ResourceID 
producingTaskExecutorResourceId, ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor) {
+                       this.producingTaskExecutorResourceId = 
producingTaskExecutorResourceId;
+                       this.resultPartitionDeploymentDescriptor = 
resultPartitionDeploymentDescriptor;
+               }
+       }
+}
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index ae903fe..2732727 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -39,6 +39,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.HeartbeatTarget;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -195,6 +197,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
        private Map<String, Object> accumulators;
 
+       private final PartitionTracker partitionTracker;
+
        // 
------------------------------------------------------------------------
 
        public JobMaster(
@@ -212,7 +216,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        FatalErrorHandler fatalErrorHandler,
                        ClassLoader userCodeLoader,
                        SchedulerNGFactory schedulerNGFactory,
-                       ShuffleMaster<?> shuffleMaster) throws Exception {
+                       ShuffleMaster<?> shuffleMaster,
+                       PartitionTrackerFactory partitionTrackerFactory) throws 
Exception {
 
                super(rpcService, 
AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME));
 
@@ -242,6 +247,15 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                this.scheduler = 
checkNotNull(schedulerFactory).createScheduler(slotPool);
 
                this.registeredTaskManagers = new HashMap<>(4);
+               this.partitionTracker = checkNotNull(partitionTrackerFactory)
+                       .create(resourceID -> {
+                               Tuple2<TaskManagerLocation, 
TaskExecutorGateway> taskManagerInfo = registeredTaskManagers.get(resourceID);
+                               if (taskManagerInfo == null) {
+                                       return Optional.empty();
+                               }
+
+                               return Optional.of(taskManagerInfo.f1);
+                       });
 
                this.backPressureStatsTracker = 
checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker());
 
@@ -272,7 +286,8 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                        blobWriter,
                        jobManagerJobMetricGroup,
                        jobMasterConfiguration.getSlotRequestTimeout(),
-                       shuffleMaster);
+                       shuffleMaster,
+                       partitionTracker);
        }
 
        
//----------------------------------------------------------------------------------------------
@@ -409,6 +424,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
 
                taskManagerHeartbeatManager.unmonitorTarget(resourceID);
                slotPool.releaseTaskManager(resourceID, cause);
+               partitionTracker.stopTrackingPartitionsFor(resourceID);
 
                Tuple2<TaskManagerLocation, TaskExecutorGateway> 
taskManagerConnection = registeredTaskManagers.remove(resourceID);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
index 60260b0..eee2d7c 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java
@@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.factories;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmanager.OnCompletionActions;
 import org.apache.flink.runtime.jobmaster.JobManagerSharedServices;
@@ -106,6 +107,11 @@ public class DefaultJobMasterServiceFactory implements 
JobMasterServiceFactory {
                        fatalErrorHandler,
                        userCodeClassloader,
                        schedulerNGFactory,
-                       shuffleMaster);
+                       shuffleMaster,
+                       lookup -> new PartitionTrackerImpl(
+                               jobGraph.getJobID(),
+                               shuffleMaster,
+                               lookup
+                       ));
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
index 301bd11..05429ac 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
@@ -54,7 +55,8 @@ public class DefaultScheduler extends LegacyScheduler {
                        final BlobWriter blobWriter,
                        final JobManagerJobMetricGroup jobManagerJobMetricGroup,
                        final Time slotRequestTimeout,
-                       final ShuffleMaster<?> shuffleMaster) throws Exception {
+                       final ShuffleMaster<?> shuffleMaster,
+                       final PartitionTracker partitionTracker) throws 
Exception {
 
                super(
                        log,
@@ -71,7 +73,8 @@ public class DefaultScheduler extends LegacyScheduler {
                        blobWriter,
                        jobManagerJobMetricGroup,
                        slotRequestTimeout,
-                       shuffleMaster);
+                       shuffleMaster,
+                       partitionTracker);
        }
 
        @Override
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
index ead7b9f..5779a7e 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
@@ -54,7 +55,8 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
                        final BlobWriter blobWriter,
                        final JobManagerJobMetricGroup jobManagerJobMetricGroup,
                        final Time slotRequestTimeout,
-                       final ShuffleMaster<?> shuffleMaster) throws Exception {
+                       final ShuffleMaster<?> shuffleMaster,
+                       final PartitionTracker partitionTracker) throws 
Exception {
 
                return new DefaultScheduler(
                        log,
@@ -70,7 +72,8 @@ public class DefaultSchedulerFactory implements 
SchedulerNGFactory {
                        blobWriter,
                        jobManagerJobMetricGroup,
                        slotRequestTimeout,
-                       shuffleMaster);
+                       shuffleMaster,
+                       partitionTracker);
        }
 
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
index 71021ab..7dec7fc 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java
@@ -50,6 +50,7 @@ import 
org.apache.flink.runtime.executiongraph.JobStatusListener;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
 import 
org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -144,7 +145,8 @@ public class LegacyScheduler implements SchedulerNG {
                        final BlobWriter blobWriter,
                        final JobManagerJobMetricGroup jobManagerJobMetricGroup,
                        final Time slotRequestTimeout,
-                       final ShuffleMaster<?> shuffleMaster) throws Exception {
+                       final ShuffleMaster<?> shuffleMaster,
+                       final PartitionTracker partitionTracker) throws 
Exception {
 
                this.log = checkNotNull(log);
                this.jobGraph = checkNotNull(jobGraph);
@@ -171,14 +173,15 @@ public class LegacyScheduler implements SchedulerNG {
                this.blobWriter = checkNotNull(blobWriter);
                this.slotRequestTimeout = checkNotNull(slotRequestTimeout);
 
-               this.executionGraph = 
createAndRestoreExecutionGraph(jobManagerJobMetricGroup, 
checkNotNull(shuffleMaster));
+               this.executionGraph = 
createAndRestoreExecutionGraph(jobManagerJobMetricGroup, 
checkNotNull(shuffleMaster), checkNotNull(partitionTracker));
        }
 
        private ExecutionGraph createAndRestoreExecutionGraph(
                        JobManagerJobMetricGroup 
currentJobManagerJobMetricGroup,
-                       ShuffleMaster<?> shuffleMaster) throws Exception {
+                       ShuffleMaster<?> shuffleMaster,
+                       PartitionTracker partitionTracker) throws Exception {
 
-               ExecutionGraph newExecutionGraph = 
createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster);
+               ExecutionGraph newExecutionGraph = 
createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, 
partitionTracker);
 
                final CheckpointCoordinator checkpointCoordinator = 
newExecutionGraph.getCheckpointCoordinator();
 
@@ -199,7 +202,8 @@ public class LegacyScheduler implements SchedulerNG {
 
        private ExecutionGraph createExecutionGraph(
                        JobManagerJobMetricGroup 
currentJobManagerJobMetricGroup,
-                       ShuffleMaster<?> shuffleMaster) throws 
JobExecutionException, JobException {
+                       ShuffleMaster<?> shuffleMaster,
+                       final PartitionTracker partitionTracker) throws 
JobExecutionException, JobException {
                return ExecutionGraphBuilder.buildGraph(
                        null,
                        jobGraph,
@@ -215,7 +219,8 @@ public class LegacyScheduler implements SchedulerNG {
                        blobWriter,
                        slotRequestTimeout,
                        log,
-                       shuffleMaster);
+                       shuffleMaster,
+                       partitionTracker);
        }
 
        /**
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
index 1f22fa9..2458697 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java
@@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
@@ -63,7 +64,8 @@ public class LegacySchedulerFactory implements 
SchedulerNGFactory {
                        final BlobWriter blobWriter,
                        final JobManagerJobMetricGroup jobManagerJobMetricGroup,
                        final Time slotRequestTimeout,
-                       final ShuffleMaster<?> shuffleMaster) throws Exception {
+                       final ShuffleMaster<?> shuffleMaster,
+                       final PartitionTracker partitionTracker) throws 
Exception {
 
                return new LegacyScheduler(
                        log,
@@ -80,6 +82,7 @@ public class LegacySchedulerFactory implements 
SchedulerNGFactory {
                        blobWriter,
                        jobManagerJobMetricGroup,
                        slotRequestTimeout,
-                       shuffleMaster);
+                       shuffleMaster,
+                       partitionTracker);
        }
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
index 56bc044..3d2e096 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.runtime.blob.BlobWriter;
 import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup;
@@ -53,6 +54,7 @@ public interface SchedulerNGFactory {
                BlobWriter blobWriter,
                JobManagerJobMetricGroup jobManagerJobMetricGroup,
                Time slotRequestTimeout,
-               ShuffleMaster<?> shuffleMaster) throws Exception;
+               ShuffleMaster<?> shuffleMaster,
+               PartitionTracker partitionTracker) throws Exception;
 
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
index 74c72438..e4994b2 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.Environment;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration;
@@ -120,7 +121,8 @@ public class CheckpointSettingsSerializableTest extends 
TestLogger {
                        VoidBlobWriter.getInstance(),
                        timeout,
                        log,
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.INSTANCE);
 
                assertEquals(1, 
eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks());
                
assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader)
 instanceof CustomStateBackend);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index 15643a9..7ad1dbb 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -824,7 +825,8 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
                        blobWriter,
                        timeout,
                        LoggerFactory.getLogger(getClass()),
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.INSTANCE);
        }
 
        private static final class ExecutionStageMatcher extends 
TypeSafeMatcher<List<ExecutionAttemptID>> {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
index 306a835..8e69df9 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java
@@ -25,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -79,7 +80,8 @@ public class ExecutionGraphRescalingTest extends TestLogger {
                        VoidBlobWriter.getInstance(),
                        AkkaUtils.getDefaultTimeout(),
                        TEST_LOGGER,
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.INSTANCE);
 
                for (JobVertex jv : jobVertices) {
                        assertThat(jv.getParallelism(), is(initialParallelism));
@@ -109,7 +111,8 @@ public class ExecutionGraphRescalingTest extends TestLogger 
{
                        VoidBlobWriter.getInstance(),
                        AkkaUtils.getDefaultTimeout(),
                        TEST_LOGGER,
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.INSTANCE);
 
                for (JobVertex jv : jobVertices) {
                        assertThat(jv.getParallelism(), is(1));
@@ -139,7 +142,8 @@ public class ExecutionGraphRescalingTest extends TestLogger 
{
                        VoidBlobWriter.getInstance(),
                        AkkaUtils.getDefaultTimeout(),
                        TEST_LOGGER,
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.INSTANCE);
 
                for (JobVertex jv : jobVertices) {
                        assertThat(jv.getParallelism(), is(scaleUpParallelism));
@@ -182,7 +186,8 @@ public class ExecutionGraphRescalingTest extends TestLogger 
{
                                VoidBlobWriter.getInstance(),
                                AkkaUtils.getDefaultTimeout(),
                                TEST_LOGGER,
-                               NettyShuffleMaster.INSTANCE);
+                               NettyShuffleMaster.INSTANCE,
+                               NoOpPartitionTracker.INSTANCE);
 
                        fail("Building the ExecutionGraph with a parallelism 
higher than the max parallelism should fail.");
                } catch (JobException e) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
index 4b5c98f..9ceaf84 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGate
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
 import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -591,7 +592,8 @@ public class ExecutionGraphSchedulingTest extends 
TestLogger {
                        VoidBlobWriter.getInstance(),
                        timeout,
                        log,
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.INSTANCE);
        }
 
        private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID 
jobId) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
index f0924fe..f614795 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java
@@ -33,6 +33,7 @@ import 
org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import org.apache.flink.runtime.executiongraph.restart.RestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobStatus;
 import org.apache.flink.runtime.jobgraph.JobVertex;
@@ -411,7 +412,8 @@ public class ExecutionGraphTestUtils {
                        VoidBlobWriter.getInstance(),
                        timeout,
                        TEST_LOGGER,
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.INSTANCE);
        }
 
        public static JobVertex createNoOpVertex(int parallelism) {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index efca2d4..3a0770e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,19 +19,32 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
 import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
@@ -39,6 +52,7 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testingUtils.TestingUtils;
@@ -50,6 +64,7 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -60,6 +75,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
 import java.util.function.Consumer;
 
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -69,7 +85,6 @@ import static org.hamcrest.Matchers.nullValue;
 import static org.hamcrest.Matchers.sameInstance;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
-import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 
@@ -300,82 +315,6 @@ public class ExecutionTest extends TestLogger {
        }
 
        /**
-        * Tests that the partitions are released in case of an execution 
cancellation after the execution is already finished.
-        */
-       @Test
-       public void testPartitionReleaseOnCancelingAfterBeingFinished() throws 
Exception {
-               testPartitionReleaseAfterFinished(Execution::cancel);
-       }
-
-       /**
-        * Tests that the partitions are released in case of an execution 
suspension after the execution is already finished.
-        */
-       @Test
-       public void testPartitionReleaseOnSuspendingAfterBeingFinished() throws 
Exception {
-               testPartitionReleaseAfterFinished(Execution::suspend);
-       }
-
-       private void testPartitionReleaseAfterFinished(Consumer<Execution> 
postFinishedExecutionAction) throws Exception {
-               final Tuple2<JobID, Collection<ResultPartitionID>> 
releasedPartitions = Tuple2.of(null, null);
-               final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
-               
taskManagerGateway.setReleasePartitionsConsumer(releasedPartitions::setFields);
-
-               final JobVertex producerVertex = createNoOpJobVertex();
-               final JobVertex consumerVertex = createNoOpJobVertex();
-               consumerVertex.connectNewDataSetAsInput(producerVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
-
-               final SimpleSlot slot = new SimpleSlot(
-                       new SingleSlotTestingSlotOwner(),
-                       new LocalTaskManagerLocation(),
-                       0,
-                       taskManagerGateway);
-
-               final ProgrammedSlotProvider slotProvider = new 
ProgrammedSlotProvider(1);
-               slotProvider.addSlot(producerVertex.getID(), 0, 
CompletableFuture.completedFuture(slot));
-
-               ExecutionGraph executionGraph = 
ExecutionGraphTestUtils.createSimpleTestGraph(
-                       new JobID(),
-                       slotProvider,
-                       new NoRestartStrategy(),
-                       producerVertex,
-                       consumerVertex);
-
-               
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-               ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(producerVertex.getID());
-               ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
-
-               final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
-
-               execution.allocateResourcesForExecution(
-                       slotProvider,
-                       false,
-                       LocationPreferenceConstraint.ALL,
-                       Collections.emptySet(),
-                       TestingUtils.infiniteTime());
-
-               execution.deploy();
-               execution.switchToRunning();
-
-               // simulate a case where a cancel/suspend call is too slow and 
the task is already finished
-               // in this case we have to explicitly release the finished 
partition
-               // if the task were canceled properly the TM would release the 
partition automatically
-               execution.markFinished();
-               postFinishedExecutionAction.accept(execution);
-
-               assertEquals(executionGraph.getJobID(), releasedPartitions.f0);
-               assertEquals(executionVertex.getProducedPartitions().size(), 
releasedPartitions.f1.size());
-               for (ResultPartitionID partitionId : releasedPartitions.f1) {
-                       // ensure all IDs of released partitions are actually 
valid
-                       IntermediateResultPartition intermediateResultPartition 
= executionVertex
-                               .getProducedPartitions()
-                               .get(partitionId.getPartitionId());
-                       assertNotNull(intermediateResultPartition);
-                       assertEquals(execution.getAttemptId(), 
partitionId.getProducerId());
-               }
-       }
-
-       /**
         * Tests that all preferred locations are calculated.
         */
        @Test
@@ -588,6 +527,113 @@ public class ExecutionTest extends TestLogger {
                assertThat(returnedSlotFuture.get(), 
is(equalTo(slotRequestIdFuture.get())));
        }
 
+       @Test
+       public void testPartitionRetainedWhenFinished() throws Exception {
+               
testPartitionTrackingForStateTransition(Execution::markFinished, false);
+       }
+
+       @Test
+       public void testPartitionReleasedWhenCanceled() throws Exception {
+               testPartitionTrackingForStateTransition(
+                       execution -> {
+                               execution.cancel();
+                               execution.completeCancelling();
+                       },
+                       true);
+       }
+
+       @Test
+       public void testPartitionReleasedWhenFailed() throws Exception {
+               testPartitionTrackingForStateTransition(execution -> 
execution.fail(new Exception("Test exception")), true);
+       }
+
+       private void testPartitionTrackingForStateTransition(final 
Consumer<Execution> stateTransition, final boolean shouldPartitionBeReleased) 
throws Exception {
+               final JobVertex producerVertex = createNoOpJobVertex();
+               final JobVertex consumerVertex = createNoOpJobVertex();
+               consumerVertex.connectNewDataSetAsInput(producerVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
+
+               final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+
+               final SlotProvider slotProvider = new SlotProvider() {
+                       @Override
+                       public CompletableFuture<LogicalSlot> 
allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, 
SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) 
{
+                               return CompletableFuture.completedFuture(new 
SimpleSlot(
+                                       new SingleSlotTestingSlotOwner(),
+                                       taskManagerLocation,
+                                       0,
+                                       new SimpleAckingTaskManagerGateway()));
+                       }
+
+                       @Override
+                       public void cancelSlotRequest(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
+                       }
+               };
+
+               CompletableFuture<Tuple2<ResourceID, 
ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new 
CompletableFuture<>();
+               CompletableFuture<Collection<ResultPartitionID>> 
partitionReleaseFuture = new CompletableFuture<>();
+               final TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
+               partitionTracker.setStartTrackingPartitionsConsumer(
+                       (resourceID, resultPartitionDeploymentDescriptor) ->
+                               
partitionStartTrackingFuture.complete(Tuple2.of(resourceID, 
resultPartitionDeploymentDescriptor))
+               );
+               
partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionReleaseFuture::complete);
+
+               final ExecutionGraph executionGraph = 
ExecutionGraphBuilder.buildGraph(
+                       null,
+                       new JobGraph(new JobID(), "test job", producerVertex, 
consumerVertex),
+                       new Configuration(),
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       slotProvider,
+                       ExecutionTest.class.getClassLoader(),
+                       new StandaloneCheckpointRecoveryFactory(),
+                       Time.seconds(10),
+                       new NoRestartStrategy(),
+                       new UnregisteredMetricsGroup(),
+                       VoidBlobWriter.getInstance(),
+                       Time.seconds(10),
+                       log,
+                       NettyShuffleMaster.INSTANCE,
+                       partitionTracker);
+
+               
executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+               final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(producerVertex.getID());
+               final ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+
+               final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
+
+               execution.allocateResourcesForExecution(
+                       slotProvider,
+                       false,
+                       LocationPreferenceConstraint.ALL,
+                       Collections.emptySet(),
+                       TestingUtils.infiniteTime());
+
+               assertThat(partitionStartTrackingFuture.isDone(), is(true));
+               final Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> 
startTrackingCall = partitionStartTrackingFuture.get();
+
+               final IntermediateResultPartitionID 
expectedIntermediateResultPartitionId = executionJobVertex
+                       .getProducedDataSets()[0]
+                       .getPartitions()[0]
+                       .getPartitionId();
+               final ResultPartitionDeploymentDescriptor descriptor = execution
+                       
.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
+               assertThat(startTrackingCall.f0, 
equalTo(taskManagerLocation.getResourceID()));
+               assertThat(startTrackingCall.f1, equalTo(descriptor));
+
+               execution.deploy();
+               execution.switchToRunning();
+
+               stateTransition.accept(execution);
+
+               assertThat(partitionReleaseFuture.isDone(), 
is(shouldPartitionBeReleased));
+               if (shouldPartitionBeReleased) {
+                       final Collection<ResultPartitionID> 
partitionReleaseCall = partitionReleaseFuture.get();
+                       assertThat(partitionReleaseCall, 
contains(descriptor.getShuffleDescriptor().getResultPartitionID()));
+               }
+       }
+
        /**
         * Tests that a slot release will atomically release the assigned 
{@link Execution}.
         */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
index 5b8ef04..3e95587 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java
@@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.ExecutionState;
 import 
org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy;
 import org.apache.flink.runtime.instance.SimpleSlot;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -226,7 +227,8 @@ public class ExecutionVertexLocalityTest extends TestLogger 
{
                        VoidBlobWriter.getInstance(),
                        timeout,
                        log,
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.INSTANCE);
        }
 
        private void initializeLocation(ExecutionVertex vertex, 
TaskManagerLocation location) throws Exception {
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
index 92e659c..b9c1322 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java
@@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
@@ -644,6 +645,7 @@ public class PipelinedFailoverRegionBuildingTest extends 
TestLogger {
                        VoidBlobWriter.getInstance(),
                        timeout,
                        log,
-                       NettyShuffleMaster.INSTANCE);
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.INSTANCE);
        }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
new file mode 100644
index 0000000..30895b2
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+
+import java.util.Collection;
+
+/**
+ * No-op implementation of {@link PartitionTracker}.
+ */
+public enum NoOpPartitionTracker implements PartitionTracker {
+       INSTANCE;
+
+       public static final PartitionTrackerFactory FACTORY = lookup -> 
INSTANCE;
+
+       @Override
+       public void startTrackingPartition(ResourceID producingTaskExecutorId, 
ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
+       }
+
+       @Override
+       public void stopTrackingPartitionsFor(ResourceID 
producingTaskExecutorId) {
+       }
+
+       @Override
+       public void 
stopTrackingAndReleasePartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
+       }
+
+       @Override
+       public void stopTrackingAndReleasePartitionsFor(ResourceID 
producingTaskExecutorId) {
+       }
+
+       @Override
+       public boolean isTrackingPartitionsFor(ResourceID 
producingTaskExecutorId) {
+               return false;
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 007fdc3..51eff94 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.io.network.partition;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.io.network.NettyShuffleEnvironment;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
@@ -28,6 +29,8 @@ import 
org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder;
 import org.hamcrest.Matchers;
 
 import java.io.IOException;
+import java.util.EnumSet;
+import java.util.Optional;
 
 import static org.junit.Assert.assertThat;
 import static org.junit.Assert.fail;
@@ -101,4 +104,35 @@ public class PartitionTestUtils {
                        true,
                        releaseType);
        }
+
+       public static ResultPartitionDeploymentDescriptor 
createResultPartitionDeploymentDescriptor(ResultPartitionID resultPartitionId, 
ShuffleDescriptor.ReleaseType releaseType, boolean hasLocalResources) {
+               return new ResultPartitionDeploymentDescriptor(
+                       new PartitionDescriptor(
+                               new IntermediateDataSetID(),
+                               resultPartitionId.getPartitionId(),
+                               ResultPartitionType.BLOCKING,
+                               1,
+                               0),
+                       new ShuffleDescriptor() {
+                               @Override
+                               public ResultPartitionID getResultPartitionID() 
{
+                                       return resultPartitionId;
+                               }
+
+                               @Override
+                               public Optional<ResourceID> 
storesLocalResourcesOn() {
+                                       return hasLocalResources
+                                               ? 
Optional.of(ResourceID.generate())
+                                               : Optional.empty();
+                               }
+
+                               @Override
+                               public EnumSet<ReleaseType> 
getSupportedReleaseTypes() {
+                                       return EnumSet.of(releaseType);
+                               }
+                       },
+                       1,
+                       true,
+                       releaseType);
+       }
 }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
new file mode 100644
index 0000000..07aba93
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
@@ -0,0 +1,294 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.java.tuple.Tuple3;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.Test;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.EnumSet;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
+import static org.junit.Assert.assertEquals;
+
+/**
+ * Tests for the {@link PartitionTrackerImpl}.
+ */
+public class PartitionTrackerImplTest extends TestLogger {
+
+       @Test
+       public void testReleasedOnConsumptionPartitionIsNotTracked() {
+               
testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.AUTO);
+       }
+
+       @Test
+       public void testRetainedOnConsumptionPartitionIsTracked() {
+               
testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.MANUAL);
+       }
+
+       private void 
testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType releaseType) {
+               final PartitionTracker partitionTracker = new 
PartitionTrackerImpl(
+                       new JobID(),
+                       new TestingShuffleMaster(),
+                       ignored -> Optional.empty()
+               );
+
+               final ResourceID resourceId = ResourceID.generate();
+               final ResultPartitionID resultPartitionId = new 
ResultPartitionID();
+               partitionTracker.startTrackingPartition(
+                       resourceId,
+                       createResultPartitionDeploymentDescriptor(
+                               resultPartitionId,
+                               releaseType,
+                               false));
+
+               
assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), 
is(not(releaseType)));
+       }
+
+       @Test
+       public void testStartStopTracking() {
+               final Queue<Tuple3<ResourceID, JobID, 
Collection<ResultPartitionID>>> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
+               final PartitionTracker partitionTracker = new 
PartitionTrackerImpl(
+                       new JobID(),
+                       new TestingShuffleMaster(),
+                       resourceId -> 
Optional.of(createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls))
+               );
+
+               final ResourceID executorWithTrackedPartition = new 
ResourceID("tracked");
+               final ResourceID executorWithoutTrackedPartition = new 
ResourceID("untracked");
+
+               
assertThat(partitionTracker.isTrackingPartitionsFor(executorWithTrackedPartition),
 is(false));
+               
assertThat(partitionTracker.isTrackingPartitionsFor(executorWithoutTrackedPartition),
 is(false));
+
+               partitionTracker.startTrackingPartition(
+                       executorWithTrackedPartition,
+                       createResultPartitionDeploymentDescriptor(new 
ResultPartitionID(), ShuffleDescriptor.ReleaseType.MANUAL, true));
+
+               
assertThat(partitionTracker.isTrackingPartitionsFor(executorWithTrackedPartition),
 is(true));
+               
assertThat(partitionTracker.isTrackingPartitionsFor(executorWithoutTrackedPartition),
 is(false));
+
+               
partitionTracker.stopTrackingPartitionsFor(executorWithTrackedPartition);
+
+               
assertThat(partitionTracker.isTrackingPartitionsFor(executorWithTrackedPartition),
 is(false));
+               
assertThat(partitionTracker.isTrackingPartitionsFor(executorWithoutTrackedPartition),
 is(false));
+       }
+
+       @Test
+       public void testReleaseCallsWithLocalResources() {
+               final TestingShuffleMaster shuffleMaster = new 
TestingShuffleMaster();
+               final JobID jobId = new JobID();
+
+               final Queue<Tuple3<ResourceID, JobID, 
Collection<ResultPartitionID>>> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
+               final PartitionTracker partitionTracker = new 
PartitionTrackerImpl(
+                       jobId,
+                       shuffleMaster,
+                       resourceId -> 
Optional.of(createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls))
+               );
+
+               final ResourceID taskExecutorId1 = ResourceID.generate();
+               final ResourceID taskExecutorId2 = ResourceID.generate();
+               final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+               final ResultPartitionID resultPartitionId2 = new 
ResultPartitionID();
+
+               partitionTracker.startTrackingPartition(
+                       taskExecutorId1,
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId1, 
ShuffleDescriptor.ReleaseType.MANUAL, true));
+               partitionTracker.startTrackingPartition(
+                       taskExecutorId2,
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId2, 
ShuffleDescriptor.ReleaseType.MANUAL, true));
+
+               {
+                       
partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
+
+                       assertEquals(1, taskExecutorReleaseCalls.size());
+
+                       Tuple3<ResourceID, JobID, 
Collection<ResultPartitionID>> taskExecutorReleaseCall = 
taskExecutorReleaseCalls.remove();
+                       assertEquals(taskExecutorId1, 
taskExecutorReleaseCall.f0);
+                       assertEquals(jobId, taskExecutorReleaseCall.f1);
+                       assertThat(taskExecutorReleaseCall.f2, 
contains(resultPartitionId1));
+
+                       assertEquals(1, 
shuffleMaster.externallyReleasedPartitions.size());
+                       assertEquals(resultPartitionId1, 
shuffleMaster.externallyReleasedPartitions.remove());
+
+                       
assertThat(partitionTracker.isTrackingPartitionsFor(taskExecutorId1), 
is(false));
+               }
+
+               {
+                       
partitionTracker.stopTrackingAndReleasePartitions(Collections.singletonList(resultPartitionId2));
+
+                       assertEquals(1, taskExecutorReleaseCalls.size());
+
+                       Tuple3<ResourceID, JobID, 
Collection<ResultPartitionID>> releaseCall = taskExecutorReleaseCalls.remove();
+                       assertEquals(taskExecutorId2, releaseCall.f0);
+                       assertEquals(jobId, releaseCall.f1);
+                       assertThat(releaseCall.f2, 
contains(resultPartitionId2));
+
+                       assertEquals(1, 
shuffleMaster.externallyReleasedPartitions.size());
+                       assertEquals(resultPartitionId2, 
shuffleMaster.externallyReleasedPartitions.remove());
+
+                       
assertThat(partitionTracker.isTrackingPartitionsFor(taskExecutorId2), 
is(false));
+               }
+       }
+
+       @Test
+       public void testReleaseCallsWithoutLocalResources() {
+               final TestingShuffleMaster shuffleMaster = new 
TestingShuffleMaster();
+
+               final Queue<Tuple3<ResourceID, JobID, 
Collection<ResultPartitionID>>> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
+               final PartitionTracker partitionTracker = new 
PartitionTrackerImpl(
+                       new JobID(),
+                       shuffleMaster,
+                       resourceId -> 
Optional.of(createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls))
+               );
+
+               final ResourceID taskExecutorId1 = ResourceID.generate();
+               final ResourceID taskExecutorId2 = ResourceID.generate();
+               final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+               final ResultPartitionID resultPartitionId2 = new 
ResultPartitionID();
+
+               partitionTracker.startTrackingPartition(
+                       taskExecutorId1,
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId1, 
ShuffleDescriptor.ReleaseType.MANUAL, false));
+               partitionTracker.startTrackingPartition(
+                       taskExecutorId2,
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId2, 
ShuffleDescriptor.ReleaseType.MANUAL, false));
+
+               {
+                       
partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1);
+
+                       assertEquals(0, taskExecutorReleaseCalls.size());
+
+                       assertEquals(1, 
shuffleMaster.externallyReleasedPartitions.size());
+                       assertEquals(resultPartitionId1, 
shuffleMaster.externallyReleasedPartitions.remove());
+
+                       
assertThat(partitionTracker.isTrackingPartitionsFor(taskExecutorId1), 
is(false));
+               }
+
+               {
+                       
partitionTracker.stopTrackingAndReleasePartitions(Collections.singletonList(resultPartitionId2));
+
+                       assertEquals(0, taskExecutorReleaseCalls.size());
+
+                       assertEquals(1, 
shuffleMaster.externallyReleasedPartitions.size());
+                       assertEquals(resultPartitionId2, 
shuffleMaster.externallyReleasedPartitions.remove());
+
+                       
assertThat(partitionTracker.isTrackingPartitionsFor(taskExecutorId2), 
is(false));
+               }
+       }
+
+       @Test
+       public void testStopTrackingIssuesNoReleaseCalls() {
+               final TestingShuffleMaster shuffleMaster = new 
TestingShuffleMaster();
+
+               final Queue<Tuple3<ResourceID, JobID, 
Collection<ResultPartitionID>>> taskExecutorReleaseCalls = new 
ArrayBlockingQueue<>(4);
+               final PartitionTracker partitionTracker = new 
PartitionTrackerImpl(
+                       new JobID(),
+                       new TestingShuffleMaster(),
+                       resourceId -> 
Optional.of(createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls))
+               );
+
+               final ResourceID taskExecutorId1 = ResourceID.generate();
+               final ResultPartitionID resultPartitionId1 = new 
ResultPartitionID();
+
+               partitionTracker.startTrackingPartition(
+                       taskExecutorId1,
+                       
createResultPartitionDeploymentDescriptor(resultPartitionId1, 
ShuffleDescriptor.ReleaseType.MANUAL, true));
+
+               partitionTracker.stopTrackingPartitionsFor(taskExecutorId1);
+
+               assertEquals(0, taskExecutorReleaseCalls.size());
+               assertEquals(0, 
shuffleMaster.externallyReleasedPartitions.size());
+       }
+
+       private static ResultPartitionDeploymentDescriptor 
createResultPartitionDeploymentDescriptor(
+               ResultPartitionID resultPartitionId,
+               ShuffleDescriptor.ReleaseType releaseType,
+               boolean hasLocalResources) {
+
+               return new ResultPartitionDeploymentDescriptor(
+                       new PartitionDescriptor(
+                               new IntermediateDataSetID(),
+                               resultPartitionId.getPartitionId(),
+                               ResultPartitionType.BLOCKING,
+                               1,
+                               0),
+                       new ShuffleDescriptor() {
+                               @Override
+                               public ResultPartitionID getResultPartitionID() 
{
+                                       return resultPartitionId;
+                               }
+
+                               @Override
+                               public Optional<ResourceID> 
storesLocalResourcesOn() {
+                                       return hasLocalResources
+                                               ? 
Optional.of(ResourceID.generate())
+                                               : Optional.empty();
+                               }
+
+                               @Override
+                               public EnumSet<ReleaseType> 
getSupportedReleaseTypes() {
+                                       return EnumSet.of(releaseType);
+                               }
+                       },
+                       1,
+                       true,
+                       releaseType);
+       }
+
+       private static TaskExecutorGateway createTaskExecutorGateway(ResourceID 
taskExecutorId, Collection<Tuple3<ResourceID, JobID, 
Collection<ResultPartitionID>>> releaseCalls) {
+               return new TestingTaskExecutorGatewayBuilder()
+                       .setReleasePartitionsConsumer((jobId, partitionIds) -> 
releaseCalls.add(Tuple3.of(taskExecutorId, jobId, partitionIds)))
+                       .createTestingTaskExecutorGateway();
+       }
+
+       private static class TestingShuffleMaster implements 
ShuffleMaster<ShuffleDescriptor> {
+
+               final Queue<ResultPartitionID> externallyReleasedPartitions = 
new ArrayBlockingQueue<>(4);
+
+               @Override
+               public CompletableFuture<ShuffleDescriptor> 
registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, 
ProducerDescriptor producerDescriptor) {
+                       return null;
+               }
+
+               @Override
+               public void releasePartitionExternally(ShuffleDescriptor 
shuffleDescriptor) {
+                       
externallyReleasedPartitions.add(shuffleDescriptor.getResultPartitionID());
+               }
+       }
+
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
new file mode 100644
index 0000000..2bcda7e
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.partition;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+
+import java.util.Collection;
+import java.util.function.BiConsumer;
+import java.util.function.Consumer;
+import java.util.function.Function;
+
+/**
+ * Test {@link PartitionTracker} implementation.
+ */
+public class TestingPartitionTracker implements PartitionTracker {
+
+       private Function<ResourceID, Boolean> isTrackingPartitionsForFunction = 
ignored -> false;
+       private Consumer<ResourceID> stopTrackingAllPartitionsConsumer = 
ignored -> {};
+       private Consumer<ResourceID> 
stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {};
+       private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> 
startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
+       private Consumer<Collection<ResultPartitionID>> 
stopTrackingAndReleasePartitionsConsumer = ignored -> {};
+
+       public void setStartTrackingPartitionsConsumer(BiConsumer<ResourceID, 
ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer) {
+               this.startTrackingPartitionsConsumer = 
startTrackingPartitionsConsumer;
+       }
+
+       public void setIsTrackingPartitionsForFunction(Function<ResourceID, 
Boolean> isTrackingPartitionsForFunction) {
+               this.isTrackingPartitionsForFunction = 
isTrackingPartitionsForFunction;
+       }
+
+       public void setStopTrackingAllPartitionsConsumer(Consumer<ResourceID> 
stopTrackingAllPartitionsConsumer) {
+               this.stopTrackingAllPartitionsConsumer = 
stopTrackingAllPartitionsConsumer;
+       }
+
+       public void 
setStopTrackingAndReleaseAllPartitionsConsumer(Consumer<ResourceID> 
stopTrackingAndReleaseAllPartitionsConsumer) {
+               this.stopTrackingAndReleaseAllPartitionsConsumer = 
stopTrackingAndReleaseAllPartitionsConsumer;
+       }
+
+       public void 
setStopTrackingAndReleasePartitionsConsumer(Consumer<Collection<ResultPartitionID>>
 stopTrackingAndReleasePartitionsConsumer) {
+               this.stopTrackingAndReleasePartitionsConsumer = 
stopTrackingAndReleasePartitionsConsumer;
+       }
+
+       @Override
+       public void startTrackingPartition(ResourceID producingTaskExecutorId, 
ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
+               
this.startTrackingPartitionsConsumer.accept(producingTaskExecutorId, 
resultPartitionDeploymentDescriptor);
+       }
+
+       @Override
+       public void stopTrackingPartitionsFor(ResourceID 
producingTaskExecutorId) {
+               
stopTrackingAllPartitionsConsumer.accept(producingTaskExecutorId);
+       }
+
+       @Override
+       public void 
stopTrackingAndReleasePartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
+               
stopTrackingAndReleasePartitionsConsumer.accept(resultPartitionIds);
+       }
+
+       @Override
+       public void stopTrackingAndReleasePartitionsFor(ResourceID 
producingTaskExecutorId) {
+               
stopTrackingAndReleaseAllPartitionsConsumer.accept(producingTaskExecutorId);
+       }
+
+       @Override
+       public boolean isTrackingPartitionsFor(ResourceID 
producingTaskExecutorId) {
+               return 
isTrackingPartitionsForFunction.apply(producingTaskExecutorId);
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 6cc1a77..1291d99 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -64,8 +64,11 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
 import org.apache.flink.runtime.instance.SimpleSlotContext;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
 import org.apache.flink.runtime.jobgraph.DistributionPattern;
 import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
@@ -307,7 +310,8 @@ public class JobMasterTest extends TestLogger {
                                testingFatalErrorHandler,
                                JobMasterTest.class.getClassLoader(),
                                schedulerNGFactory,
-                               NettyShuffleMaster.INSTANCE) {
+                               NettyShuffleMaster.INSTANCE,
+                               NoOpPartitionTracker.FACTORY) {
                                @Override
                                public void declineCheckpoint(DeclineCheckpoint 
declineCheckpoint) {
                                        
declineCheckpointMessageFuture.complete(declineCheckpoint.getReason());
@@ -1623,7 +1627,8 @@ public class JobMasterTest extends TestLogger {
                        testingFatalErrorHandler,
                        JobMasterTest.class.getClassLoader(),
                        schedulerNGFactory,
-                       NettyShuffleMaster.INSTANCE) {
+                       NettyShuffleMaster.INSTANCE,
+                       NoOpPartitionTracker.FACTORY) {
 
                        @Override
                        public CompletableFuture<String> triggerSavepoint(
@@ -1788,6 +1793,48 @@ public class JobMasterTest extends TestLogger {
                return testingResourceManagerGateway;
        }
 
+       @Test
+       public void testPartitionTableCleanupOnDisconnect() throws Exception {
+               final JobManagerSharedServices jobManagerSharedServices = new 
TestingJobManagerSharedServicesBuilder().build();
+               final JobGraph jobGraph = createSingleVertexJobGraph();
+
+               final CompletableFuture<ResourceID> 
partitionCleanupTaskExecutorId = new CompletableFuture<>();
+               final TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
+               
partitionTracker.setStopTrackingAllPartitionsConsumer(partitionCleanupTaskExecutorId::complete);
+
+               final JobMaster jobMaster = new JobMasterBuilder()
+                       .withConfiguration(configuration)
+                       .withJobGraph(jobGraph)
+                       .withHighAvailabilityServices(haServices)
+                       .withJobManagerSharedServices(jobManagerSharedServices)
+                       .withHeartbeatServices(heartbeatServices)
+                       .withOnCompletionActions(new 
TestingOnCompletionActions())
+                       .withPartitionTrackerFactory(ignored -> 
partitionTracker)
+                       .createJobMaster();
+
+               final CompletableFuture<JobID> disconnectTaskExecutorFuture = 
new CompletableFuture<>();
+               final TestingTaskExecutorGateway testingTaskExecutorGateway = 
new TestingTaskExecutorGatewayBuilder()
+                       .setDisconnectJobManagerConsumer((jobID, throwable) -> 
disconnectTaskExecutorFuture.complete(jobID))
+                       .createTestingTaskExecutorGateway();
+
+               try {
+                       jobMaster.start(jobMasterId).get();
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       // register a slot to establish a connection
+                       final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+                       registerSlotsAtJobMaster(1, jobMasterGateway, 
testingTaskExecutorGateway, taskManagerLocation);
+
+                       
jobMasterGateway.disconnectTaskManager(taskManagerLocation.getResourceID(), new 
Exception("test"));
+                       disconnectTaskExecutorFuture.get();
+
+                       assertThat(partitionCleanupTaskExecutorId.get(), 
equalTo(taskManagerLocation.getResourceID()));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
        /**
         * Tests that the job execution is failed if the TaskExecutor 
disconnects from the
         * JobMaster.
@@ -2092,6 +2139,8 @@ public class JobMasterTest extends TestLogger {
 
                private ShuffleMaster<?> shuffleMaster = 
NettyShuffleMaster.INSTANCE;
 
+               private PartitionTrackerFactory partitionTrackerFactory = 
NoOpPartitionTracker.FACTORY;
+
                private JobMasterBuilder withConfiguration(Configuration 
configuration) {
                        this.configuration = configuration;
                        return this;
@@ -2132,6 +2181,11 @@ public class JobMasterTest extends TestLogger {
                        return this;
                }
 
+               private JobMasterBuilder 
withPartitionTrackerFactory(PartitionTrackerFactory partitionTrackerFactory) {
+                       this.partitionTrackerFactory = partitionTrackerFactory;
+                       return this;
+               }
+
                private JobMaster createJobMaster() throws Exception {
                        final JobMasterConfiguration jobMasterConfiguration = 
JobMasterConfiguration.fromConfiguration(configuration);
                        final SchedulerNGFactory schedulerNGFactory = new 
LegacySchedulerFactory(
@@ -2152,7 +2206,8 @@ public class JobMasterTest extends TestLogger {
                                testingFatalErrorHandler,
                                JobMasterTest.class.getClassLoader(),
                                schedulerNGFactory,
-                               shuffleMaster);
+                               shuffleMaster,
+                               partitionTrackerFactory);
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
index 844ef0d..8c20e49 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java
@@ -74,6 +74,8 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
 
        private final Supplier<Boolean> canBeReleasedSupplier;
 
+       private final BiConsumer<JobID, Collection<ResultPartitionID>> 
releasePartitionsConsumer;
+
        TestingTaskExecutorGateway(
                        String address,
                        String hostname,
@@ -85,7 +87,8 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
                        Consumer<ResourceID> heartbeatResourceManagerConsumer,
                        Consumer<Exception> disconnectResourceManagerConsumer,
                        Function<ExecutionAttemptID, 
CompletableFuture<Acknowledge>> cancelTaskFunction,
-                       Supplier<Boolean> canBeReleasedSupplier) {
+                       Supplier<Boolean> canBeReleasedSupplier,
+                       BiConsumer<JobID, Collection<ResultPartitionID>> 
releasePartitionsConsumer) {
                this.address = Preconditions.checkNotNull(address);
                this.hostname = Preconditions.checkNotNull(hostname);
                this.heartbeatJobManagerConsumer = 
Preconditions.checkNotNull(heartbeatJobManagerConsumer);
@@ -97,6 +100,7 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
                this.disconnectResourceManagerConsumer = 
disconnectResourceManagerConsumer;
                this.cancelTaskFunction = cancelTaskFunction;
                this.canBeReleasedSupplier = canBeReleasedSupplier;
+               this.releasePartitionsConsumer = releasePartitionsConsumer;
        }
 
        @Override
@@ -106,12 +110,12 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
 
        @Override
        public CompletableFuture<StackTraceSampleResponse> 
requestStackTraceSample(
-                       final ExecutionAttemptID executionAttemptId,
-                       final int sampleId,
-                       final int numSamples,
-                       final Time delayBetweenSamples,
-                       final int maxStackTraceDepth,
-                       final Time timeout) {
+               final ExecutionAttemptID executionAttemptId,
+               final int sampleId,
+               final int numSamples,
+               final Time delayBetweenSamples,
+               final int maxStackTraceDepth,
+               final Time timeout) {
                throw new UnsupportedOperationException();
        }
 
@@ -127,7 +131,7 @@ public class TestingTaskExecutorGateway implements 
TaskExecutorGateway {
 
        @Override
        public void releasePartitions(JobID jobId, 
Collection<ResultPartitionID> partitionIds) {
-               // noop
+               releasePartitionsConsumer.accept(jobId, partitionIds);
        }
 
        @Override
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
index 483f285..770d28e 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java
@@ -25,11 +25,13 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
 import org.apache.flink.runtime.jobmaster.AllocatedSlotReport;
 import org.apache.flink.runtime.jobmaster.JobMasterId;
 import org.apache.flink.runtime.messages.Acknowledge;
 import org.apache.flink.runtime.resourcemanager.ResourceManagerId;
 
+import java.util.Collection;
 import java.util.concurrent.CompletableFuture;
 import java.util.function.BiConsumer;
 import java.util.function.BiFunction;
@@ -50,6 +52,7 @@ public class TestingTaskExecutorGatewayBuilder {
        private static final Consumer<ResourceID> 
NOOP_HEARTBEAT_RESOURCE_MANAGER_CONSUMER = ignored -> {};
        private static final Consumer<Exception> 
NOOP_DISCONNECT_RESOURCE_MANAGER_CONSUMER = ignored -> {};
        private static final Function<ExecutionAttemptID, 
CompletableFuture<Acknowledge>> NOOP_CANCEL_TASK_FUNCTION = ignored -> 
CompletableFuture.completedFuture(Acknowledge.get());
+       private static final BiConsumer<JobID, Collection<ResultPartitionID>> 
NOOP_RELEASE_PARTITIONS_CONSUMER = (ignoredA, ignoredB) -> {};
 
        private String address = "foobar:1234";
        private String hostname = "foobar";
@@ -62,6 +65,7 @@ public class TestingTaskExecutorGatewayBuilder {
        private Consumer<Exception> disconnectResourceManagerConsumer = 
NOOP_DISCONNECT_RESOURCE_MANAGER_CONSUMER;
        private Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> 
cancelTaskFunction = NOOP_CANCEL_TASK_FUNCTION;
        private Supplier<Boolean> canBeReleasedSupplier = () -> true;
+       private BiConsumer<JobID, Collection<ResultPartitionID>> 
releasePartitionsConsumer = NOOP_RELEASE_PARTITIONS_CONSUMER;
 
        public TestingTaskExecutorGatewayBuilder setAddress(String address) {
                this.address = address;
@@ -118,6 +122,11 @@ public class TestingTaskExecutorGatewayBuilder {
                return this;
        }
 
+       public TestingTaskExecutorGatewayBuilder 
setReleasePartitionsConsumer(BiConsumer<JobID, Collection<ResultPartitionID>> 
releasePartitionsConsumer) {
+               this.releasePartitionsConsumer = releasePartitionsConsumer;
+               return this;
+       }
+
        public TestingTaskExecutorGateway createTestingTaskExecutorGateway() {
                return new TestingTaskExecutorGateway(
                        address,
@@ -130,6 +139,7 @@ public class TestingTaskExecutorGatewayBuilder {
                        heartbeatResourceManagerConsumer,
                        disconnectResourceManagerConsumer,
                        cancelTaskFunction,
-                       canBeReleasedSupplier);
+                       canBeReleasedSupplier,
+                       releasePartitionsConsumer);
        }
 }

Reply via email to