This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7b95f32d01730bcc75ded42e41d3668a1802a69b
Author: Chesnay Schepler <[email protected]>
AuthorDate: Thu Aug 1 14:27:28 2019 +0200

    [FLINK-13371][coordination] Prevent leaks of blocking partitions
---
 .../flink/runtime/executiongraph/Execution.java    |  71 +++--
 .../runtime/executiongraph/ExecutionGraph.java     |   2 +-
 .../runtime/executiongraph/ExecutionVertex.java    |   4 +-
 .../io/network/partition/PartitionTracker.java     |   5 +
 .../io/network/partition/PartitionTrackerImpl.java |   7 +
 .../flink/runtime/shuffle/ProducerDescriptor.java  |   4 +-
 .../ExecutionGraphDeploymentTest.java              |   2 +-
 .../ExecutionPartitionLifecycleTest.java           | 324 +++++++++++++++++++++
 .../runtime/executiongraph/ExecutionTest.java      | 127 --------
 .../io/network/partition/NoOpPartitionTracker.java |   4 +
 .../io/network/partition/PartitionTestUtils.java   |   2 +-
 .../partition/PartitionTrackerImplTest.java        |   7 +-
 .../network/partition/TestingPartitionTracker.java |  10 +
 .../TaskExecutorPartitionLifecycleTest.java        |  51 +++-
 14 files changed, 456 insertions(+), 164 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index af94188..d8a1b6f 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -56,6 +56,7 @@ import 
org.apache.flink.runtime.messages.StackTraceSampleResponse;
 import org.apache.flink.runtime.shuffle.PartitionDescriptor;
 import org.apache.flink.runtime.shuffle.ProducerDescriptor;
 import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
 import org.apache.flink.runtime.state.KeyGroupRangeAssignment;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.util.ExceptionUtils;
@@ -1013,7 +1014,9 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        }
 
        void markFailed(Throwable t, Map<String, Accumulator<?, ?>> 
userAccumulators, IOMetrics metrics) {
-               processFail(t, true, userAccumulators, metrics);
+               // skip release of partitions since this is only called if the 
TM actually sent the FAILED state update
+               // in this case all partitions have already been cleaned up
+               processFail(t, true, userAccumulators, metrics, false);
        }
 
        @VisibleForTesting
@@ -1059,7 +1062,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        else if (current == CANCELING) {
                                // we sent a cancel call, and the task manager 
finished before it arrived. We
                                // will never get a CANCELED call back from the 
job manager
-                               completeCancelling(userAccumulators, metrics);
+                               completeCancelling(userAccumulators, metrics, 
true);
                                return;
                        }
                        else if (current == CANCELED || current == FAILED) {
@@ -1096,10 +1099,10 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        }
 
        void completeCancelling() {
-               completeCancelling(null, null);
+               completeCancelling(null, null, true);
        }
 
-       void completeCancelling(Map<String, Accumulator<?, ?>> 
userAccumulators, IOMetrics metrics) {
+       void completeCancelling(Map<String, Accumulator<?, ?>> 
userAccumulators, IOMetrics metrics, boolean releasePartitions) {
 
                // the taskmanagers can themselves cancel tasks without an 
external trigger, if they find that the
                // network stack is canceled (for example by a failing / 
canceling receiver or sender
@@ -1117,7 +1120,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                                updateAccumulatorsAndMetrics(userAccumulators, 
metrics);
 
                                if (transitionState(current, CANCELED)) {
-                                       finishCancellation();
+                                       finishCancellation(releasePartitions);
                                        return;
                                }
 
@@ -1136,11 +1139,10 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                }
        }
 
-       private void finishCancellation() {
+       private void finishCancellation(boolean releasePartitions) {
                releaseAssignedResource(new FlinkException("Execution " + this 
+ " was cancelled."));
                vertex.getExecutionGraph().deregisterExecution(this);
-               // release partitions on TM in case the Task finished while we 
where already CANCELING
-               stopTrackingAndReleasePartitions();
+               handlePartitionCleanup(releasePartitions, releasePartitions);
        }
 
        void cachePartitionInfo(PartitionInfo partitionInfo) {
@@ -1160,10 +1162,10 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
        // 
--------------------------------------------------------------------------------------------
 
        private boolean processFail(Throwable t, boolean isCallback) {
-               return processFail(t, isCallback, null, null);
+               return processFail(t, isCallback, null, null, true);
        }
 
-       private boolean processFail(Throwable t, boolean isCallback, 
Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) {
+       private boolean processFail(Throwable t, boolean isCallback, 
Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean 
releasePartitions) {
                // damn, we failed. This means only that we keep our books and 
notify our parent JobExecutionVertex
                // the actual computation on the task manager is cleaned up by 
the TaskManager that noticed the failure
 
@@ -1187,7 +1189,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                        }
 
                        if (current == CANCELING) {
-                               completeCancelling(userAccumulators, metrics);
+                               completeCancelling(userAccumulators, metrics, 
true);
                                return false;
                        }
 
@@ -1199,7 +1201,7 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
 
                                releaseAssignedResource(t);
                                
vertex.getExecutionGraph().deregisterExecution(this);
-                               stopTrackingAndReleasePartitions();
+                               handlePartitionCleanup(releasePartitions, 
releasePartitions);
 
                                if (!isCallback && (current == RUNNING || 
current == DEPLOYING)) {
                                        if (LOG.isDebugEnabled()) {
@@ -1304,16 +1306,51 @@ public class Execution implements AccessExecution, 
Archiveable<ArchivedExecution
                }
        }
 
-       void stopTrackingAndReleasePartitions() {
+       void handlePartitionCleanup(boolean releasePipelinedPartitions, boolean 
releaseBlockingPartitions) {
+               if (releasePipelinedPartitions) {
+                       sendReleaseIntermediateResultPartitionsRpcCall();
+               }
+
+               final Collection<ResultPartitionID> partitionIds = 
getPartitionIds();
+               final PartitionTracker partitionTracker = 
getVertex().getExecutionGraph().getPartitionTracker();
+
+               if (!partitionIds.isEmpty()) {
+                       if (releaseBlockingPartitions) {
+                               LOG.info("Discarding the results produced by 
task execution {}.", attemptId);
+                               
partitionTracker.stopTrackingAndReleasePartitions(partitionIds);
+                       } else {
+                               
partitionTracker.stopTrackingPartitions(partitionIds);
+                       }
+               }
+       }
+
+       private Collection<ResultPartitionID> getPartitionIds() {
+               return producedPartitions.values().stream()
+                       
.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+                       .map(ShuffleDescriptor::getResultPartitionID)
+                       .collect(Collectors.toList());
+       }
+
+       private void sendReleaseIntermediateResultPartitionsRpcCall() {
                LOG.info("Discarding the results produced by task execution 
{}.", attemptId);
-               if (producedPartitions != null && producedPartitions.size() > 
0) {
-                       final PartitionTracker partitionTracker = 
getVertex().getExecutionGraph().getPartitionTracker();
-                       final List<ResultPartitionID> producedPartitionIds = 
producedPartitions.values().stream()
+               final LogicalSlot slot = assignedResource;
+
+               if (slot != null) {
+                       final TaskManagerGateway taskManagerGateway = 
slot.getTaskManagerGateway();
+
+                       final ShuffleMaster<?> shuffleMaster = 
getVertex().getExecutionGraph().getShuffleMaster();
+
+                       Collection<ResultPartitionID> partitionIds = 
producedPartitions.values().stream()
+                               .filter(resultPartitionDeploymentDescriptor -> 
resultPartitionDeploymentDescriptor.getPartitionType().isPipelined())
                                
.map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor)
+                               .peek(shuffleMaster::releasePartitionExternally)
                                .map(ShuffleDescriptor::getResultPartitionID)
                                .collect(Collectors.toList());
 
-                       
partitionTracker.stopTrackingAndReleasePartitions(producedPartitionIds);
+                       if (!partitionIds.isEmpty()) {
+                               // TODO For some tests this could be a problem 
when querying too early if all resources were released
+                               
taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds);
+                       }
                }
        }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
index f779fd2..0984274 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
@@ -1525,7 +1525,7 @@ public class ExecutionGraph implements 
AccessExecutionGraph {
                        case CANCELED:
                                // this deserialization is exception-free
                                accumulators = deserializeAccumulators(state);
-                               attempt.completeCancelling(accumulators, 
state.getIOMetrics());
+                               attempt.completeCancelling(accumulators, 
state.getIOMetrics(), false);
                                return true;
 
                        case FAILED:
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
index 03c68f8..6d262ee 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java
@@ -604,7 +604,9 @@ public class ExecutionVertex implements 
AccessExecutionVertex, Archiveable<Archi
 
                        if (oldState.isTerminal()) {
                                if (oldState == FINISHED) {
-                                       
oldExecution.stopTrackingAndReleasePartitions();
+                                       // pipelined partitions are released in 
Execution#cancel(), covering both job failures and vertex resets
+                                       // do not release pipelined partitions 
here to save RPC calls
+                                       
oldExecution.handlePartitionCleanup(false, true);
                                        
getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId);
                                }
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
index 3697fb8..268f4f9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java
@@ -46,6 +46,11 @@ public interface PartitionTracker {
        void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> 
resultPartitionIds);
 
        /**
+        * Stops the tracking of the given partitions.
+        */
+       void stopTrackingPartitions(Collection<ResultPartitionID> 
resultPartitionIds);
+
+       /**
         * Releases all partitions for the given task executor ID, and stop the 
tracking of partitions that were released.
         */
        void stopTrackingAndReleasePartitionsFor(ResourceID 
producingTaskExecutorId);
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
index f772b37..2e7f421 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java
@@ -106,6 +106,13 @@ public class PartitionTrackerImpl implements 
PartitionTracker {
        }
 
        @Override
+       public void stopTrackingPartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
+               Preconditions.checkNotNull(resultPartitionIds);
+
+               resultPartitionIds.forEach(this::internalStopTrackingPartition);
+       }
+
+       @Override
        public void stopTrackingAndReleasePartitionsFor(ResourceID 
producingTaskExecutorId) {
                Preconditions.checkNotNull(producingTaskExecutorId);
 
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
index ad3fc48..ec32178 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java
@@ -63,11 +63,11 @@ public class ProducerDescriptor {
                this.dataPort = dataPort;
        }
 
-       ResourceID getProducerLocation() {
+       public ResourceID getProducerLocation() {
                return producerLocation;
        }
 
-       ExecutionAttemptID getProducerExecutionId() {
+       public ExecutionAttemptID getProducerExecutionId() {
                return producerExecutionId;
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
index d0cfa00..d10fcbd 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java
@@ -382,7 +382,7 @@ public class ExecutionGraphDeploymentTest extends 
TestLogger {
 
                Execution execution1 = executions.values().iterator().next();
                execution1.cancel();
-               execution1.completeCancelling(accumulators, ioMetrics);
+               execution1.completeCancelling(accumulators, ioMetrics, false);
 
                assertEquals(ioMetrics, execution1.getIOMetrics());
                assertEquals(accumulators, execution1.getUserAccumulators());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
new file mode 100644
index 0000000..ab4048d
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java
@@ -0,0 +1,324 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.executiongraph;
+
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.api.java.tuple.Tuple2;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
+import org.apache.flink.runtime.JobException;
+import org.apache.flink.runtime.blob.VoidBlobWriter;
+import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
+import org.apache.flink.runtime.client.JobExecutionException;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.SlotProfile;
+import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
+import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
+import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
+import org.apache.flink.runtime.instance.SimpleSlot;
+import org.apache.flink.runtime.instance.SlotSharingGroupId;
+import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker;
+import org.apache.flink.runtime.io.network.partition.PartitionTracker;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
+import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
+import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway;
+import org.apache.flink.runtime.jobmaster.LogicalSlot;
+import org.apache.flink.runtime.jobmaster.SlotOwner;
+import org.apache.flink.runtime.jobmaster.SlotRequestId;
+import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
+import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
+import org.apache.flink.runtime.shuffle.PartitionDescriptor;
+import org.apache.flink.runtime.shuffle.ProducerDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleDescriptor;
+import org.apache.flink.runtime.shuffle.ShuffleMaster;
+import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
+import org.apache.flink.runtime.testtasks.NoOpInvokable;
+import org.apache.flink.util.TestLogger;
+
+import org.junit.ClassRule;
+import org.junit.Test;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.Collections;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+import static org.hamcrest.Matchers.equalTo;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+
+/**
+ * Tests for the {@link Execution}.
+ */
+public class ExecutionPartitionLifecycleTest extends TestLogger {
+
+       @ClassRule
+       public static final TestingComponentMainThreadExecutor.Resource 
EXECUTOR_RESOURCE =
+               new TestingComponentMainThreadExecutor.Resource();
+
+       private Execution execution;
+       private ResultPartitionDeploymentDescriptor descriptor;
+       private ResourceID taskExecutorResourceId;
+       private JobID jobId;
+
+       @Test
+       public void testPartitionReleaseOnFinishWhileCanceling() throws 
Exception {
+               final SimpleAckingTaskManagerGateway taskManagerGateway = new 
SimpleAckingTaskManagerGateway();
+               final CompletableFuture<Tuple2<JobID, 
Collection<ResultPartitionID>>> releasePartitionsCallFuture = new 
CompletableFuture<>();
+               taskManagerGateway.setReleasePartitionsConsumer(((jobID, 
partitionIds) -> releasePartitionsCallFuture.complete(Tuple2.of(jobID, 
partitionIds))));
+
+               final TestingShuffleMaster testingShuffleMaster = new 
TestingShuffleMaster();
+
+               
setupExecutionGraphAndStartRunningJob(ResultPartitionType.PIPELINED, 
NoOpPartitionTracker.INSTANCE, taskManagerGateway, testingShuffleMaster);
+
+               execution.cancel();
+               assertFalse(releasePartitionsCallFuture.isDone());
+
+               execution.markFinished();
+               assertTrue(releasePartitionsCallFuture.isDone());
+
+               final Tuple2<JobID, Collection<ResultPartitionID>> 
releasePartitionsCall = releasePartitionsCallFuture.get();
+               assertEquals(jobId, releasePartitionsCall.f0);
+               
assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()),
 releasePartitionsCall.f1);
+
+               assertEquals(1, 
testingShuffleMaster.externallyReleasedPartitions.size());
+               assertEquals(descriptor.getShuffleDescriptor(), 
testingShuffleMaster.externallyReleasedPartitions.poll());
+       }
+
+       private enum PartitionReleaseResult {
+               NONE,
+               STOP_TRACKING,
+               STOP_TRACKING_AND_RELEASE
+       }
+
+       @Test
+       public void testPartitionTrackedAndNotReleasedWhenFinished() throws 
Exception {
+               
testPartitionTrackingForStateTransition(Execution::markFinished, 
PartitionReleaseResult.NONE);
+       }
+
+       @Test
+       public void testPartitionNotTrackedAndNotReleasedWhenCanceledByTM() 
throws Exception {
+               testPartitionTrackingForStateTransition(
+                       execution -> {
+                               execution.cancel();
+                               
execution.completeCancelling(Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), 
false);
+                       },
+                       PartitionReleaseResult.STOP_TRACKING);
+       }
+
+       @Test
+       public void testPartitionNotTrackedAndReleasedWhenCanceledByJM() throws 
Exception {
+               testPartitionTrackingForStateTransition(
+                       execution -> {
+                               execution.cancel();
+                               execution.completeCancelling();
+                       },
+                       PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+       }
+
+       @Test
+       public void testPartitionNotTrackedAndNotReleasedWhenFailedByTM() 
throws Exception {
+               testPartitionTrackingForStateTransition(
+                       execution -> execution.markFailed(
+                               new Exception("Test exception"),
+                               Collections.emptyMap(),
+                               new IOMetrics(0, 0, 0, 0)),
+                       PartitionReleaseResult.STOP_TRACKING);
+       }
+
+       @Test
+       public void testPartitionNotTrackedAndReleasedWhenFailedByJM() throws 
Exception {
+               testPartitionTrackingForStateTransition(
+                       execution -> execution.markFailed(new Exception("Test 
exception")),
+                       PartitionReleaseResult.STOP_TRACKING_AND_RELEASE);
+       }
+
+       private void testPartitionTrackingForStateTransition(final 
Consumer<Execution> stateTransition, final PartitionReleaseResult 
partitionReleaseResult) throws Exception {
+               CompletableFuture<Tuple2<ResourceID, 
ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new 
CompletableFuture<>();
+               CompletableFuture<Collection<ResultPartitionID>> 
partitionStopTrackingFuture = new CompletableFuture<>();
+               CompletableFuture<Collection<ResultPartitionID>> 
partitionStopTrackingAndReleaseFuture = new CompletableFuture<>();
+               final TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
+               partitionTracker.setStartTrackingPartitionsConsumer(
+                       (resourceID, resultPartitionDeploymentDescriptor) ->
+                               
partitionStartTrackingFuture.complete(Tuple2.of(resourceID, 
resultPartitionDeploymentDescriptor))
+               );
+               
partitionTracker.setStopTrackingPartitionsConsumer(partitionStopTrackingFuture::complete);
+               
partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionStopTrackingAndReleaseFuture::complete);
+
+               
setupExecutionGraphAndStartRunningJob(ResultPartitionType.BLOCKING, 
partitionTracker, new SimpleAckingTaskManagerGateway(), 
NettyShuffleMaster.INSTANCE);
+
+               Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> 
startTrackingCall = partitionStartTrackingFuture.get();
+               assertThat(startTrackingCall.f0, 
equalTo(taskExecutorResourceId));
+               assertThat(startTrackingCall.f1, equalTo(descriptor));
+
+               stateTransition.accept(execution);
+
+               switch (partitionReleaseResult) {
+                       case NONE:
+                               
assertFalse(partitionStopTrackingFuture.isDone());
+                               
assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+                               break;
+                       case STOP_TRACKING:
+                               
assertTrue(partitionStopTrackingFuture.isDone());
+                               
assertFalse(partitionStopTrackingAndReleaseFuture.isDone());
+                               final Collection<ResultPartitionID> 
stopTrackingCall = partitionStopTrackingFuture.get();
+                               
assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()),
 stopTrackingCall);
+                               break;
+                       case STOP_TRACKING_AND_RELEASE:
+                               
assertFalse(partitionStopTrackingFuture.isDone());
+                               
assertTrue(partitionStopTrackingAndReleaseFuture.isDone());
+                               final Collection<ResultPartitionID> 
stopTrackingAndReleaseCall = partitionStopTrackingAndReleaseFuture.get();
+                               
assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()),
 stopTrackingAndReleaseCall);
+                               break;
+               }
+       }
+
+       private void setupExecutionGraphAndStartRunningJob(ResultPartitionType 
resultPartitionType, PartitionTracker partitionTracker, TaskManagerGateway 
taskManagerGateway, ShuffleMaster<?> shuffleMaster) throws JobException, 
JobExecutionException {
+               final JobVertex producerVertex = createNoOpJobVertex();
+               final JobVertex consumerVertex = createNoOpJobVertex();
+               consumerVertex.connectNewDataSetAsInput(producerVertex, 
DistributionPattern.ALL_TO_ALL, resultPartitionType);
+
+               final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
+
+               final SlotProvider slotProvider = new SlotProvider() {
+                       @Override
+                       public CompletableFuture<LogicalSlot> 
allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, 
SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) 
{
+                               return CompletableFuture.completedFuture(new 
SimpleSlot(
+                                       new SingleSlotTestingSlotOwner(),
+                                       taskManagerLocation,
+                                       0,
+                                       taskManagerGateway));
+                       }
+
+                       @Override
+                       public void cancelSlotRequest(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
+                       }
+               };
+
+               final ExecutionGraph executionGraph = 
ExecutionGraphBuilder.buildGraph(
+                       null,
+                       new JobGraph(new JobID(), "test job", producerVertex, 
consumerVertex),
+                       new Configuration(),
+                       TestingUtils.defaultExecutor(),
+                       TestingUtils.defaultExecutor(),
+                       slotProvider,
+                       ExecutionPartitionLifecycleTest.class.getClassLoader(),
+                       new StandaloneCheckpointRecoveryFactory(),
+                       Time.seconds(10),
+                       new NoRestartStrategy(),
+                       new UnregisteredMetricsGroup(),
+                       VoidBlobWriter.getInstance(),
+                       Time.seconds(10),
+                       log,
+                       shuffleMaster,
+                       partitionTracker);
+
+               
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+               final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(producerVertex.getID());
+               final ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
+               execution = executionVertex.getCurrentExecutionAttempt();
+
+               execution.allocateResourcesForExecution(
+                       executionGraph.getSlotProviderStrategy(),
+                       LocationPreferenceConstraint.ALL,
+                       Collections.emptySet());
+
+               execution.deploy();
+               execution.switchToRunning();
+
+               final IntermediateResultPartitionID 
expectedIntermediateResultPartitionId = executionJobVertex
+                       .getProducedDataSets()[0]
+                       .getPartitions()[0]
+                       .getPartitionId();
+
+               descriptor = execution
+                       
.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
+               taskExecutorResourceId = taskManagerLocation.getResourceID();
+               jobId = executionGraph.getJobID();
+       }
+
+       @Nonnull
+       private JobVertex createNoOpJobVertex() {
+               final JobVertex jobVertex = new JobVertex("Test vertex", new 
JobVertexID());
+               jobVertex.setInvokableClass(NoOpInvokable.class);
+
+               return jobVertex;
+       }
+
+       /**
+        * Slot owner which records the first returned slot.
+        */
+       private static final class SingleSlotTestingSlotOwner implements 
SlotOwner {
+
+               final CompletableFuture<LogicalSlot> returnedSlot = new 
CompletableFuture<>();
+
+               @Override
+               public void returnLogicalSlot(LogicalSlot logicalSlot) {
+                       returnedSlot.complete(logicalSlot);
+               }
+       }
+
+       private static class TestingShuffleMaster implements 
ShuffleMaster<ShuffleDescriptor> {
+
+               final Queue<ShuffleDescriptor> externallyReleasedPartitions = 
new ArrayBlockingQueue<>(4);
+
+               @Override
+               public CompletableFuture<ShuffleDescriptor> 
registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, 
ProducerDescriptor producerDescriptor) {
+                       return CompletableFuture.completedFuture(new 
ShuffleDescriptor() {
+                               @Override
+                               public ResultPartitionID getResultPartitionID() 
{
+                                       return new ResultPartitionID(
+                                               
partitionDescriptor.getPartitionId(),
+                                               
producerDescriptor.getProducerExecutionId());
+                               }
+
+                               @Override
+                               public Optional<ResourceID> 
storesLocalResourcesOn() {
+                                       return 
Optional.of(producerDescriptor.getProducerLocation());
+                               }
+                       });
+               }
+
+               @Override
+               public void releasePartitionExternally(ShuffleDescriptor 
shuffleDescriptor) {
+                       externallyReleasedPartitions.add(shuffleDescriptor);
+               }
+       }
+}
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
index b8c83fe..433e3f6 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java
@@ -19,33 +19,16 @@
 package org.apache.flink.runtime.executiongraph;
 
 import org.apache.flink.api.common.JobID;
-import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.tuple.Tuple2;
-import org.apache.flink.configuration.Configuration;
-import org.apache.flink.metrics.groups.UnregisteredMetricsGroup;
-import org.apache.flink.runtime.blob.VoidBlobWriter;
 import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
-import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
-import org.apache.flink.runtime.clusterframework.types.ResourceID;
-import org.apache.flink.runtime.clusterframework.types.SlotProfile;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
-import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy;
 import 
org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway;
 import org.apache.flink.runtime.instance.SimpleSlot;
-import org.apache.flink.runtime.instance.SlotSharingGroupId;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
-import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
-import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker;
-import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID;
-import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.JobVertexID;
 import 
org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint;
-import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit;
 import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils;
 import org.apache.flink.runtime.jobmaster.LogicalSlot;
 import org.apache.flink.runtime.jobmaster.SlotOwner;
@@ -53,10 +36,8 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId;
 import org.apache.flink.runtime.jobmaster.TestingLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot;
 import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider;
-import org.apache.flink.runtime.shuffle.NettyShuffleMaster;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
@@ -65,7 +46,6 @@ import org.junit.ClassRule;
 import org.junit.Test;
 
 import javax.annotation.Nonnull;
-import javax.annotation.Nullable;
 
 import java.util.Arrays;
 import java.util.Collection;
@@ -74,9 +54,7 @@ import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
-import java.util.function.Consumer;
 
-import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.containsInAnyOrder;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
@@ -526,111 +504,6 @@ public class ExecutionTest extends TestLogger {
                assertThat(returnedSlotFuture.get(), 
is(equalTo(slotRequestIdFuture.get())));
        }
 
-       @Test
-       public void testPartitionRetainedWhenFinished() throws Exception {
-               
testPartitionTrackingForStateTransition(Execution::markFinished, false);
-       }
-
-       @Test
-       public void testPartitionReleasedWhenCanceled() throws Exception {
-               testPartitionTrackingForStateTransition(
-                       execution -> {
-                               execution.cancel();
-                               execution.completeCancelling();
-                       },
-                       true);
-       }
-
-       @Test
-       public void testPartitionReleasedWhenFailed() throws Exception {
-               testPartitionTrackingForStateTransition(execution -> 
execution.fail(new Exception("Test exception")), true);
-       }
-
-       private void testPartitionTrackingForStateTransition(final 
Consumer<Execution> stateTransition, final boolean shouldPartitionBeReleased) 
throws Exception {
-               final JobVertex producerVertex = createNoOpJobVertex();
-               final JobVertex consumerVertex = createNoOpJobVertex();
-               consumerVertex.connectNewDataSetAsInput(producerVertex, 
DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING);
-
-               final TaskManagerLocation taskManagerLocation = new 
LocalTaskManagerLocation();
-
-               final SlotProvider slotProvider = new SlotProvider() {
-                       @Override
-                       public CompletableFuture<LogicalSlot> 
allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, 
SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) 
{
-                               return CompletableFuture.completedFuture(new 
SimpleSlot(
-                                       new SingleSlotTestingSlotOwner(),
-                                       taskManagerLocation,
-                                       0,
-                                       new SimpleAckingTaskManagerGateway()));
-                       }
-
-                       @Override
-                       public void cancelSlotRequest(SlotRequestId 
slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable 
cause) {
-                       }
-               };
-
-               CompletableFuture<Tuple2<ResourceID, 
ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new 
CompletableFuture<>();
-               CompletableFuture<Collection<ResultPartitionID>> 
partitionReleaseFuture = new CompletableFuture<>();
-               final TestingPartitionTracker partitionTracker = new 
TestingPartitionTracker();
-               partitionTracker.setStartTrackingPartitionsConsumer(
-                       (resourceID, resultPartitionDeploymentDescriptor) ->
-                               
partitionStartTrackingFuture.complete(Tuple2.of(resourceID, 
resultPartitionDeploymentDescriptor))
-               );
-               
partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionReleaseFuture::complete);
-
-               final ExecutionGraph executionGraph = 
ExecutionGraphBuilder.buildGraph(
-                       null,
-                       new JobGraph(new JobID(), "test job", producerVertex, 
consumerVertex),
-                       new Configuration(),
-                       TestingUtils.defaultExecutor(),
-                       TestingUtils.defaultExecutor(),
-                       slotProvider,
-                       ExecutionTest.class.getClassLoader(),
-                       new StandaloneCheckpointRecoveryFactory(),
-                       Time.seconds(10),
-                       new NoRestartStrategy(),
-                       new UnregisteredMetricsGroup(),
-                       VoidBlobWriter.getInstance(),
-                       Time.seconds(10),
-                       log,
-                       NettyShuffleMaster.INSTANCE,
-                       partitionTracker);
-
-               
executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread());
-
-               final ExecutionJobVertex executionJobVertex = 
executionGraph.getJobVertex(producerVertex.getID());
-               final ExecutionVertex executionVertex = 
executionJobVertex.getTaskVertices()[0];
-
-               final Execution execution = 
executionVertex.getCurrentExecutionAttempt();
-
-               execution.allocateResourcesForExecution(
-                       executionGraph.getSlotProviderStrategy(),
-                       LocationPreferenceConstraint.ALL,
-                       Collections.emptySet());
-
-               assertThat(partitionStartTrackingFuture.isDone(), is(true));
-               final Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> 
startTrackingCall = partitionStartTrackingFuture.get();
-
-               final IntermediateResultPartitionID 
expectedIntermediateResultPartitionId = executionJobVertex
-                       .getProducedDataSets()[0]
-                       .getPartitions()[0]
-                       .getPartitionId();
-               final ResultPartitionDeploymentDescriptor descriptor = execution
-                       
.getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get();
-               assertThat(startTrackingCall.f0, 
equalTo(taskManagerLocation.getResourceID()));
-               assertThat(startTrackingCall.f1, equalTo(descriptor));
-
-               execution.deploy();
-               execution.switchToRunning();
-
-               stateTransition.accept(execution);
-
-               assertThat(partitionReleaseFuture.isDone(), 
is(shouldPartitionBeReleased));
-               if (shouldPartitionBeReleased) {
-                       final Collection<ResultPartitionID> 
partitionReleaseCall = partitionReleaseFuture.get();
-                       assertThat(partitionReleaseCall, 
contains(descriptor.getShuffleDescriptor().getResultPartitionID()));
-               }
-       }
-
        /**
         * Tests that a slot release will atomically release the assigned 
{@link Execution}.
         */
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
index 0d1a160..2888b31 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java
@@ -43,6 +43,10 @@ public enum NoOpPartitionTracker implements PartitionTracker 
{
        }
 
        @Override
+       public void stopTrackingPartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
+       }
+
+       @Override
        public void stopTrackingAndReleasePartitionsFor(ResourceID 
producingTaskExecutorId) {
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
index 72892d6..b1a58a0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java
@@ -96,7 +96,7 @@ public enum PartitionTestUtils {
                }
        }
 
-       static ResultPartitionDeploymentDescriptor 
createPartitionDeploymentDescriptor(
+       public static ResultPartitionDeploymentDescriptor 
createPartitionDeploymentDescriptor(
                ResultPartitionType partitionType) {
                ShuffleDescriptor shuffleDescriptor = 
NettyShuffleDescriptorBuilder.newBuilder().buildLocal();
                PartitionDescriptor partitionDescriptor = new 
PartitionDescriptor(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
index 5ca7156..63e4f44 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java
@@ -50,12 +50,12 @@ import static org.junit.Assert.assertEquals;
 public class PartitionTrackerImplTest extends TestLogger {
 
        @Test
-       public void testReleasedOnConsumptionPartitionIsNotTracked() {
+       public void testPipelinedPartitionIsNotTracked() {
                testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED);
        }
 
        @Test
-       public void testRetainedOnConsumptionPartitionIsTracked() {
+       public void testBlockingPartitionIsTracked() {
                testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING);
        }
 
@@ -75,8 +75,7 @@ public class PartitionTrackerImplTest extends TestLogger {
                                resultPartitionType,
                                false));
 
-               final boolean isTrackingExpected = resultPartitionType == 
ResultPartitionType.BLOCKING;
-               
assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), 
is(isTrackingExpected));
+               
assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), 
is(resultPartitionType.isBlocking()));
        }
 
        @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
index 2d85ff6..6ba333d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java
@@ -36,6 +36,7 @@ public class TestingPartitionTracker implements 
PartitionTracker {
        private Consumer<ResourceID> 
stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {};
        private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> 
startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {};
        private Consumer<Collection<ResultPartitionID>> 
stopTrackingAndReleasePartitionsConsumer = ignored -> {};
+       private Consumer<Collection<ResultPartitionID>> 
stopTrackingPartitionsConsumer = ignored -> {};
 
        public void setStartTrackingPartitionsConsumer(BiConsumer<ResourceID, 
ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer) {
                this.startTrackingPartitionsConsumer = 
startTrackingPartitionsConsumer;
@@ -61,6 +62,10 @@ public class TestingPartitionTracker implements 
PartitionTracker {
                this.stopTrackingAndReleasePartitionsConsumer = 
stopTrackingAndReleasePartitionsConsumer;
        }
 
+       public void 
setStopTrackingPartitionsConsumer(Consumer<Collection<ResultPartitionID>> 
stopTrackingPartitionsConsumer) {
+               this.stopTrackingPartitionsConsumer = 
stopTrackingPartitionsConsumer;
+       }
+
        @Override
        public void startTrackingPartition(ResourceID producingTaskExecutorId, 
ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) {
                
this.startTrackingPartitionsConsumer.accept(producingTaskExecutorId, 
resultPartitionDeploymentDescriptor);
@@ -77,6 +82,11 @@ public class TestingPartitionTracker implements 
PartitionTracker {
        }
 
        @Override
+       public void stopTrackingPartitions(Collection<ResultPartitionID> 
resultPartitionIds) {
+               stopTrackingPartitionsConsumer.accept(resultPartitionIds);
+       }
+
+       @Override
        public void stopTrackingAndReleasePartitionsFor(ResourceID 
producingTaskExecutorId) {
                
stopTrackingAndReleaseAllPartitionsConsumer.accept(producingTaskExecutorId);
        }
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index d87cf84..6b6b9e4 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -45,6 +45,7 @@ import 
org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvi
 import org.apache.flink.runtime.io.network.partition.PartitionTestUtils;
 import org.apache.flink.runtime.io.network.partition.ResultPartition;
 import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
 import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate;
 import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
 import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess;
@@ -90,6 +91,7 @@ import java.util.stream.StreamSupport;
 
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
 import static org.junit.Assert.assertTrue;
 
 /**
@@ -190,33 +192,62 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
        }
 
        @Test
-       public void testPartitionReleaseAfterDisconnect() throws Exception {
+       public void testBlockingPartitionReleaseAfterDisconnect() throws 
Exception {
                testPartitionRelease(
                        (jobId, partitionId, taskExecutorGateway) -> 
taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
-                       true);
+                       true,
+                       ResultPartitionType.BLOCKING);
        }
 
        @Test
-       public void testPartitionReleaseAfterReleaseCall() throws Exception {
+       public void testPipelinedPartitionNotReleasedAfterDisconnect() throws 
Exception {
+               testPartitionRelease(
+                       (jobId, partitionId, taskExecutorGateway) -> 
taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")),
+                       false,
+                       ResultPartitionType.PIPELINED);
+       }
+
+       @Test
+       public void testBlockingPartitionReleaseAfterReleaseCall() throws 
Exception {
                testPartitionRelease(
                        (jobId, partitionId, taskExecutorGateway) -> 
taskExecutorGateway.releasePartitions(jobId, 
Collections.singletonList(partitionId)),
-                       true);
+                       true,
+                       ResultPartitionType.BLOCKING);
        }
 
        @Test
-       public void testPartitionReleaseAfterShutdown() throws Exception {
+       public void testPipelinedPartitionReleaseAfterReleaseCall() throws 
Exception {
+               testPartitionRelease(
+                       (jobId, partitionId, taskExecutorGateway) -> 
taskExecutorGateway.releasePartitions(jobId, 
Collections.singletonList(partitionId)),
+                       true,
+                       ResultPartitionType.PIPELINED);
+       }
+
+       @Test
+       public void testBlockingPartitionReleaseAfterShutdown() throws 
Exception {
                // don't do any explicit release action, so that the partition 
must be cleaned up on shutdown
                testPartitionRelease(
                        (jobId, partitionId, taskExecutorGateway) -> { },
-                       false);
+                       false,
+                       ResultPartitionType.BLOCKING);
+       }
+
+       @Test
+       public void testPipelinedPartitionReleaseAfterShutdown() throws 
Exception {
+               // don't do any explicit release action, so that the partition 
must be cleaned up on shutdown
+               testPartitionRelease(
+                       (jobId, partitionId, taskExecutorGateway) -> { },
+                       false,
+                       ResultPartitionType.PIPELINED);
        }
 
        private void testPartitionRelease(
                TriConsumer<JobID, ResultPartitionID, TaskExecutorGateway> 
releaseAction,
-               boolean waitForRelease) throws Exception {
+               boolean waitForRelease,
+               ResultPartitionType resultPartitionType) throws Exception {
 
                final ResultPartitionDeploymentDescriptor 
taskResultPartitionDescriptor =
-                       
PartitionTestUtils.createPartitionDeploymentDescriptor();
+                       
PartitionTestUtils.createPartitionDeploymentDescriptor(resultPartitionType);
                final ExecutionAttemptID eid1 = 
taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId();
 
                final TaskDeploymentDescriptor taskDeploymentDescriptor =
@@ -343,7 +374,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        // the task is still running => the partition is in 
in-progress
                        runInTaskExecutorThreadAndWait(
                                taskExecutor,
-                               () -> 
assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+                               () -> 
assertThat(partitionTable.hasTrackedPartitions(jobId), 
is(resultPartitionType.isBlocking())));
 
                        TestingInvokable.sync.releaseBlocker();
                        taskFinishedFuture.get(timeout.getSize(), 
timeout.getUnit());
@@ -351,7 +382,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        // the task is finished => the partition should be 
finished now
                        runInTaskExecutorThreadAndWait(
                                taskExecutor,
-                               () -> 
assertTrue(partitionTable.hasTrackedPartitions(jobId)));
+                               () -> 
assertThat(partitionTable.hasTrackedPartitions(jobId), 
is(resultPartitionType.isBlocking())));
 
                        final CompletableFuture<Collection<ResultPartitionID>> 
releasePartitionsFuture = new CompletableFuture<>();
                        runInTaskExecutorThreadAndWait(

Reply via email to