This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7b95f32d01730bcc75ded42e41d3668a1802a69b Author: Chesnay Schepler <[email protected]> AuthorDate: Thu Aug 1 14:27:28 2019 +0200 [FLINK-13371][coordination] Prevent leaks of blocking partitions --- .../flink/runtime/executiongraph/Execution.java | 71 +++-- .../runtime/executiongraph/ExecutionGraph.java | 2 +- .../runtime/executiongraph/ExecutionVertex.java | 4 +- .../io/network/partition/PartitionTracker.java | 5 + .../io/network/partition/PartitionTrackerImpl.java | 7 + .../flink/runtime/shuffle/ProducerDescriptor.java | 4 +- .../ExecutionGraphDeploymentTest.java | 2 +- .../ExecutionPartitionLifecycleTest.java | 324 +++++++++++++++++++++ .../runtime/executiongraph/ExecutionTest.java | 127 -------- .../io/network/partition/NoOpPartitionTracker.java | 4 + .../io/network/partition/PartitionTestUtils.java | 2 +- .../partition/PartitionTrackerImplTest.java | 7 +- .../network/partition/TestingPartitionTracker.java | 10 + .../TaskExecutorPartitionLifecycleTest.java | 51 +++- 14 files changed, 456 insertions(+), 164 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index af94188..d8a1b6f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -56,6 +56,7 @@ import org.apache.flink.runtime.messages.StackTraceSampleResponse; import org.apache.flink.runtime.shuffle.PartitionDescriptor; import org.apache.flink.runtime.shuffle.ProducerDescriptor; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; import org.apache.flink.runtime.state.KeyGroupRangeAssignment; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.util.ExceptionUtils; @@ -1013,7 +1014,9 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } void markFailed(Throwable t, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) { - processFail(t, true, userAccumulators, metrics); + // skip release of partitions since this is only called if the TM actually sent the FAILED state update + // in this case all partitions have already been cleaned up + processFail(t, true, userAccumulators, metrics, false); } @VisibleForTesting @@ -1059,7 +1062,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution else if (current == CANCELING) { // we sent a cancel call, and the task manager finished before it arrived. We // will never get a CANCELED call back from the job manager - completeCancelling(userAccumulators, metrics); + completeCancelling(userAccumulators, metrics, true); return; } else if (current == CANCELED || current == FAILED) { @@ -1096,10 +1099,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } void completeCancelling() { - completeCancelling(null, null); + completeCancelling(null, null, true); } - void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) { + void completeCancelling(Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) { // the taskmanagers can themselves cancel tasks without an external trigger, if they find that the // network stack is canceled (for example by a failing / canceling receiver or sender @@ -1117,7 +1120,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution updateAccumulatorsAndMetrics(userAccumulators, metrics); if (transitionState(current, CANCELED)) { - finishCancellation(); + finishCancellation(releasePartitions); return; } @@ -1136,11 +1139,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } } - private void finishCancellation() { + private void finishCancellation(boolean releasePartitions) { releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled.")); vertex.getExecutionGraph().deregisterExecution(this); - // release partitions on TM in case the Task finished while we where already CANCELING - stopTrackingAndReleasePartitions(); + handlePartitionCleanup(releasePartitions, releasePartitions); } void cachePartitionInfo(PartitionInfo partitionInfo) { @@ -1160,10 +1162,10 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution // -------------------------------------------------------------------------------------------- private boolean processFail(Throwable t, boolean isCallback) { - return processFail(t, isCallback, null, null); + return processFail(t, isCallback, null, null, true); } - private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics) { + private boolean processFail(Throwable t, boolean isCallback, Map<String, Accumulator<?, ?>> userAccumulators, IOMetrics metrics, boolean releasePartitions) { // damn, we failed. This means only that we keep our books and notify our parent JobExecutionVertex // the actual computation on the task manager is cleaned up by the TaskManager that noticed the failure @@ -1187,7 +1189,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } if (current == CANCELING) { - completeCancelling(userAccumulators, metrics); + completeCancelling(userAccumulators, metrics, true); return false; } @@ -1199,7 +1201,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution releaseAssignedResource(t); vertex.getExecutionGraph().deregisterExecution(this); - stopTrackingAndReleasePartitions(); + handlePartitionCleanup(releasePartitions, releasePartitions); if (!isCallback && (current == RUNNING || current == DEPLOYING)) { if (LOG.isDebugEnabled()) { @@ -1304,16 +1306,51 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } } - void stopTrackingAndReleasePartitions() { + void handlePartitionCleanup(boolean releasePipelinedPartitions, boolean releaseBlockingPartitions) { + if (releasePipelinedPartitions) { + sendReleaseIntermediateResultPartitionsRpcCall(); + } + + final Collection<ResultPartitionID> partitionIds = getPartitionIds(); + final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker(); + + if (!partitionIds.isEmpty()) { + if (releaseBlockingPartitions) { + LOG.info("Discarding the results produced by task execution {}.", attemptId); + partitionTracker.stopTrackingAndReleasePartitions(partitionIds); + } else { + partitionTracker.stopTrackingPartitions(partitionIds); + } + } + } + + private Collection<ResultPartitionID> getPartitionIds() { + return producedPartitions.values().stream() + .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor) + .map(ShuffleDescriptor::getResultPartitionID) + .collect(Collectors.toList()); + } + + private void sendReleaseIntermediateResultPartitionsRpcCall() { LOG.info("Discarding the results produced by task execution {}.", attemptId); - if (producedPartitions != null && producedPartitions.size() > 0) { - final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker(); - final List<ResultPartitionID> producedPartitionIds = producedPartitions.values().stream() + final LogicalSlot slot = assignedResource; + + if (slot != null) { + final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); + + final ShuffleMaster<?> shuffleMaster = getVertex().getExecutionGraph().getShuffleMaster(); + + Collection<ResultPartitionID> partitionIds = producedPartitions.values().stream() + .filter(resultPartitionDeploymentDescriptor -> resultPartitionDeploymentDescriptor.getPartitionType().isPipelined()) .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor) + .peek(shuffleMaster::releasePartitionExternally) .map(ShuffleDescriptor::getResultPartitionID) .collect(Collectors.toList()); - partitionTracker.stopTrackingAndReleasePartitions(producedPartitionIds); + if (!partitionIds.isEmpty()) { + // TODO For some tests this could be a problem when querying too early if all resources were released + taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds); + } } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index f779fd2..0984274 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -1525,7 +1525,7 @@ public class ExecutionGraph implements AccessExecutionGraph { case CANCELED: // this deserialization is exception-free accumulators = deserializeAccumulators(state); - attempt.completeCancelling(accumulators, state.getIOMetrics()); + attempt.completeCancelling(accumulators, state.getIOMetrics(), false); return true; case FAILED: diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index 03c68f8..6d262ee 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -604,7 +604,9 @@ public class ExecutionVertex implements AccessExecutionVertex, Archiveable<Archi if (oldState.isTerminal()) { if (oldState == FINISHED) { - oldExecution.stopTrackingAndReleasePartitions(); + // pipelined partitions are released in Execution#cancel(), covering both job failures and vertex resets + // do not release pipelined partitions here to save RPC calls + oldExecution.handlePartitionCleanup(false, true); getExecutionGraph().getPartitionReleaseStrategy().vertexUnfinished(executionVertexId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java index 3697fb8..268f4f9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java @@ -46,6 +46,11 @@ public interface PartitionTracker { void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds); /** + * Stops the tracking of the given partitions. + */ + void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds); + + /** * Releases all partitions for the given task executor ID, and stop the tracking of partitions that were released. */ void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java index f772b37..2e7f421 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java @@ -106,6 +106,13 @@ public class PartitionTrackerImpl implements PartitionTracker { } @Override + public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) { + Preconditions.checkNotNull(resultPartitionIds); + + resultPartitionIds.forEach(this::internalStopTrackingPartition); + } + + @Override public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) { Preconditions.checkNotNull(producingTaskExecutorId); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java index ad3fc48..ec32178 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/shuffle/ProducerDescriptor.java @@ -63,11 +63,11 @@ public class ProducerDescriptor { this.dataPort = dataPort; } - ResourceID getProducerLocation() { + public ResourceID getProducerLocation() { return producerLocation; } - ExecutionAttemptID getProducerExecutionId() { + public ExecutionAttemptID getProducerExecutionId() { return producerExecutionId; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index d0cfa00..d10fcbd 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -382,7 +382,7 @@ public class ExecutionGraphDeploymentTest extends TestLogger { Execution execution1 = executions.values().iterator().next(); execution1.cancel(); - execution1.completeCancelling(accumulators, ioMetrics); + execution1.completeCancelling(accumulators, ioMetrics, false); assertEquals(ioMetrics, execution1.getIOMetrics()); assertEquals(accumulators, execution1.getUserAccumulators()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java new file mode 100644 index 0000000..ab4048d --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionPartitionLifecycleTest.java @@ -0,0 +1,324 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.executiongraph; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; +import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.JobException; +import org.apache.flink.runtime.blob.VoidBlobWriter; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; +import org.apache.flink.runtime.client.JobExecutionException; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; +import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; +import org.apache.flink.runtime.jobgraph.JobVertex; +import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.LogicalSlot; +import org.apache.flink.runtime.jobmaster.SlotOwner; +import org.apache.flink.runtime.jobmaster.SlotRequestId; +import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskManagerLocation; +import org.apache.flink.runtime.testingUtils.TestingUtils; +import org.apache.flink.runtime.testtasks.NoOpInvokable; +import org.apache.flink.util.TestLogger; + +import org.junit.ClassRule; +import org.junit.Test; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; + +import java.util.Collection; +import java.util.Collections; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.function.Consumer; + +import static org.hamcrest.Matchers.equalTo; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + +/** + * Tests for the {@link Execution}. + */ +public class ExecutionPartitionLifecycleTest extends TestLogger { + + @ClassRule + public static final TestingComponentMainThreadExecutor.Resource EXECUTOR_RESOURCE = + new TestingComponentMainThreadExecutor.Resource(); + + private Execution execution; + private ResultPartitionDeploymentDescriptor descriptor; + private ResourceID taskExecutorResourceId; + private JobID jobId; + + @Test + public void testPartitionReleaseOnFinishWhileCanceling() throws Exception { + final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); + final CompletableFuture<Tuple2<JobID, Collection<ResultPartitionID>>> releasePartitionsCallFuture = new CompletableFuture<>(); + taskManagerGateway.setReleasePartitionsConsumer(((jobID, partitionIds) -> releasePartitionsCallFuture.complete(Tuple2.of(jobID, partitionIds)))); + + final TestingShuffleMaster testingShuffleMaster = new TestingShuffleMaster(); + + setupExecutionGraphAndStartRunningJob(ResultPartitionType.PIPELINED, NoOpPartitionTracker.INSTANCE, taskManagerGateway, testingShuffleMaster); + + execution.cancel(); + assertFalse(releasePartitionsCallFuture.isDone()); + + execution.markFinished(); + assertTrue(releasePartitionsCallFuture.isDone()); + + final Tuple2<JobID, Collection<ResultPartitionID>> releasePartitionsCall = releasePartitionsCallFuture.get(); + assertEquals(jobId, releasePartitionsCall.f0); + assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), releasePartitionsCall.f1); + + assertEquals(1, testingShuffleMaster.externallyReleasedPartitions.size()); + assertEquals(descriptor.getShuffleDescriptor(), testingShuffleMaster.externallyReleasedPartitions.poll()); + } + + private enum PartitionReleaseResult { + NONE, + STOP_TRACKING, + STOP_TRACKING_AND_RELEASE + } + + @Test + public void testPartitionTrackedAndNotReleasedWhenFinished() throws Exception { + testPartitionTrackingForStateTransition(Execution::markFinished, PartitionReleaseResult.NONE); + } + + @Test + public void testPartitionNotTrackedAndNotReleasedWhenCanceledByTM() throws Exception { + testPartitionTrackingForStateTransition( + execution -> { + execution.cancel(); + execution.completeCancelling(Collections.emptyMap(), new IOMetrics(0, 0, 0, 0), false); + }, + PartitionReleaseResult.STOP_TRACKING); + } + + @Test + public void testPartitionNotTrackedAndReleasedWhenCanceledByJM() throws Exception { + testPartitionTrackingForStateTransition( + execution -> { + execution.cancel(); + execution.completeCancelling(); + }, + PartitionReleaseResult.STOP_TRACKING_AND_RELEASE); + } + + @Test + public void testPartitionNotTrackedAndNotReleasedWhenFailedByTM() throws Exception { + testPartitionTrackingForStateTransition( + execution -> execution.markFailed( + new Exception("Test exception"), + Collections.emptyMap(), + new IOMetrics(0, 0, 0, 0)), + PartitionReleaseResult.STOP_TRACKING); + } + + @Test + public void testPartitionNotTrackedAndReleasedWhenFailedByJM() throws Exception { + testPartitionTrackingForStateTransition( + execution -> execution.markFailed(new Exception("Test exception")), + PartitionReleaseResult.STOP_TRACKING_AND_RELEASE); + } + + private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final PartitionReleaseResult partitionReleaseResult) throws Exception { + CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>(); + CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingFuture = new CompletableFuture<>(); + CompletableFuture<Collection<ResultPartitionID>> partitionStopTrackingAndReleaseFuture = new CompletableFuture<>(); + final TestingPartitionTracker partitionTracker = new TestingPartitionTracker(); + partitionTracker.setStartTrackingPartitionsConsumer( + (resourceID, resultPartitionDeploymentDescriptor) -> + partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor)) + ); + partitionTracker.setStopTrackingPartitionsConsumer(partitionStopTrackingFuture::complete); + partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionStopTrackingAndReleaseFuture::complete); + + setupExecutionGraphAndStartRunningJob(ResultPartitionType.BLOCKING, partitionTracker, new SimpleAckingTaskManagerGateway(), NettyShuffleMaster.INSTANCE); + + Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get(); + assertThat(startTrackingCall.f0, equalTo(taskExecutorResourceId)); + assertThat(startTrackingCall.f1, equalTo(descriptor)); + + stateTransition.accept(execution); + + switch (partitionReleaseResult) { + case NONE: + assertFalse(partitionStopTrackingFuture.isDone()); + assertFalse(partitionStopTrackingAndReleaseFuture.isDone()); + break; + case STOP_TRACKING: + assertTrue(partitionStopTrackingFuture.isDone()); + assertFalse(partitionStopTrackingAndReleaseFuture.isDone()); + final Collection<ResultPartitionID> stopTrackingCall = partitionStopTrackingFuture.get(); + assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingCall); + break; + case STOP_TRACKING_AND_RELEASE: + assertFalse(partitionStopTrackingFuture.isDone()); + assertTrue(partitionStopTrackingAndReleaseFuture.isDone()); + final Collection<ResultPartitionID> stopTrackingAndReleaseCall = partitionStopTrackingAndReleaseFuture.get(); + assertEquals(Collections.singletonList(descriptor.getShuffleDescriptor().getResultPartitionID()), stopTrackingAndReleaseCall); + break; + } + } + + private void setupExecutionGraphAndStartRunningJob(ResultPartitionType resultPartitionType, PartitionTracker partitionTracker, TaskManagerGateway taskManagerGateway, ShuffleMaster<?> shuffleMaster) throws JobException, JobExecutionException { + final JobVertex producerVertex = createNoOpJobVertex(); + final JobVertex consumerVertex = createNoOpJobVertex(); + consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, resultPartitionType); + + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + + final SlotProvider slotProvider = new SlotProvider() { + @Override + public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { + return CompletableFuture.completedFuture(new SimpleSlot( + new SingleSlotTestingSlotOwner(), + taskManagerLocation, + 0, + taskManagerGateway)); + } + + @Override + public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { + } + }; + + final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph( + null, + new JobGraph(new JobID(), "test job", producerVertex, consumerVertex), + new Configuration(), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + slotProvider, + ExecutionPartitionLifecycleTest.class.getClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + Time.seconds(10), + new NoRestartStrategy(), + new UnregisteredMetricsGroup(), + VoidBlobWriter.getInstance(), + Time.seconds(10), + log, + shuffleMaster, + partitionTracker); + + executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID()); + final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + execution = executionVertex.getCurrentExecutionAttempt(); + + execution.allocateResourcesForExecution( + executionGraph.getSlotProviderStrategy(), + LocationPreferenceConstraint.ALL, + Collections.emptySet()); + + execution.deploy(); + execution.switchToRunning(); + + final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex + .getProducedDataSets()[0] + .getPartitions()[0] + .getPartitionId(); + + descriptor = execution + .getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get(); + taskExecutorResourceId = taskManagerLocation.getResourceID(); + jobId = executionGraph.getJobID(); + } + + @Nonnull + private JobVertex createNoOpJobVertex() { + final JobVertex jobVertex = new JobVertex("Test vertex", new JobVertexID()); + jobVertex.setInvokableClass(NoOpInvokable.class); + + return jobVertex; + } + + /** + * Slot owner which records the first returned slot. + */ + private static final class SingleSlotTestingSlotOwner implements SlotOwner { + + final CompletableFuture<LogicalSlot> returnedSlot = new CompletableFuture<>(); + + @Override + public void returnLogicalSlot(LogicalSlot logicalSlot) { + returnedSlot.complete(logicalSlot); + } + } + + private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> { + + final Queue<ShuffleDescriptor> externallyReleasedPartitions = new ArrayBlockingQueue<>(4); + + @Override + public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { + return CompletableFuture.completedFuture(new ShuffleDescriptor() { + @Override + public ResultPartitionID getResultPartitionID() { + return new ResultPartitionID( + partitionDescriptor.getPartitionId(), + producerDescriptor.getProducerExecutionId()); + } + + @Override + public Optional<ResourceID> storesLocalResourcesOn() { + return Optional.of(producerDescriptor.getProducerLocation()); + } + }); + } + + @Override + public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) { + externallyReleasedPartitions.add(shuffleDescriptor); + } + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index b8c83fe..433e3f6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -19,33 +19,16 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; -import org.apache.flink.api.common.time.Time; -import org.apache.flink.api.java.tuple.Tuple2; -import org.apache.flink.configuration.Configuration; -import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; -import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; -import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; -import org.apache.flink.runtime.clusterframework.types.ResourceID; -import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; -import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SimpleSlot; -import org.apache.flink.runtime.instance.SlotSharingGroupId; -import org.apache.flink.runtime.io.network.partition.ResultPartitionID; -import org.apache.flink.runtime.io.network.partition.ResultPartitionType; -import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker; -import org.apache.flink.runtime.jobgraph.DistributionPattern; -import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; -import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; -import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotOwner; @@ -53,10 +36,8 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; -import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; -import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; @@ -65,7 +46,6 @@ import org.junit.ClassRule; import org.junit.Test; import javax.annotation.Nonnull; -import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; @@ -74,9 +54,7 @@ import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; -import java.util.function.Consumer; -import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -526,111 +504,6 @@ public class ExecutionTest extends TestLogger { assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get()))); } - @Test - public void testPartitionRetainedWhenFinished() throws Exception { - testPartitionTrackingForStateTransition(Execution::markFinished, false); - } - - @Test - public void testPartitionReleasedWhenCanceled() throws Exception { - testPartitionTrackingForStateTransition( - execution -> { - execution.cancel(); - execution.completeCancelling(); - }, - true); - } - - @Test - public void testPartitionReleasedWhenFailed() throws Exception { - testPartitionTrackingForStateTransition(execution -> execution.fail(new Exception("Test exception")), true); - } - - private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final boolean shouldPartitionBeReleased) throws Exception { - final JobVertex producerVertex = createNoOpJobVertex(); - final JobVertex consumerVertex = createNoOpJobVertex(); - consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - - final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); - - final SlotProvider slotProvider = new SlotProvider() { - @Override - public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { - return CompletableFuture.completedFuture(new SimpleSlot( - new SingleSlotTestingSlotOwner(), - taskManagerLocation, - 0, - new SimpleAckingTaskManagerGateway())); - } - - @Override - public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { - } - }; - - CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>(); - CompletableFuture<Collection<ResultPartitionID>> partitionReleaseFuture = new CompletableFuture<>(); - final TestingPartitionTracker partitionTracker = new TestingPartitionTracker(); - partitionTracker.setStartTrackingPartitionsConsumer( - (resourceID, resultPartitionDeploymentDescriptor) -> - partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor)) - ); - partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionReleaseFuture::complete); - - final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph( - null, - new JobGraph(new JobID(), "test job", producerVertex, consumerVertex), - new Configuration(), - TestingUtils.defaultExecutor(), - TestingUtils.defaultExecutor(), - slotProvider, - ExecutionTest.class.getClassLoader(), - new StandaloneCheckpointRecoveryFactory(), - Time.seconds(10), - new NoRestartStrategy(), - new UnregisteredMetricsGroup(), - VoidBlobWriter.getInstance(), - Time.seconds(10), - log, - NettyShuffleMaster.INSTANCE, - partitionTracker); - - executionGraph.start(ComponentMainThreadExecutorServiceAdapter.forMainThread()); - - final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID()); - final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; - - final Execution execution = executionVertex.getCurrentExecutionAttempt(); - - execution.allocateResourcesForExecution( - executionGraph.getSlotProviderStrategy(), - LocationPreferenceConstraint.ALL, - Collections.emptySet()); - - assertThat(partitionStartTrackingFuture.isDone(), is(true)); - final Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get(); - - final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex - .getProducedDataSets()[0] - .getPartitions()[0] - .getPartitionId(); - final ResultPartitionDeploymentDescriptor descriptor = execution - .getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get(); - assertThat(startTrackingCall.f0, equalTo(taskManagerLocation.getResourceID())); - assertThat(startTrackingCall.f1, equalTo(descriptor)); - - execution.deploy(); - execution.switchToRunning(); - - stateTransition.accept(execution); - - assertThat(partitionReleaseFuture.isDone(), is(shouldPartitionBeReleased)); - if (shouldPartitionBeReleased) { - final Collection<ResultPartitionID> partitionReleaseCall = partitionReleaseFuture.get(); - assertThat(partitionReleaseCall, contains(descriptor.getShuffleDescriptor().getResultPartitionID())); - } - } - /** * Tests that a slot release will atomically release the assigned {@link Execution}. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java index 0d1a160..2888b31 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java @@ -43,6 +43,10 @@ public enum NoOpPartitionTracker implements PartitionTracker { } @Override + public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) { + } + + @Override public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) { } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java index 72892d6..b1a58a0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java @@ -96,7 +96,7 @@ public enum PartitionTestUtils { } } - static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor( + public static ResultPartitionDeploymentDescriptor createPartitionDeploymentDescriptor( ResultPartitionType partitionType) { ShuffleDescriptor shuffleDescriptor = NettyShuffleDescriptorBuilder.newBuilder().buildLocal(); PartitionDescriptor partitionDescriptor = new PartitionDescriptor( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java index 5ca7156..63e4f44 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java @@ -50,12 +50,12 @@ import static org.junit.Assert.assertEquals; public class PartitionTrackerImplTest extends TestLogger { @Test - public void testReleasedOnConsumptionPartitionIsNotTracked() { + public void testPipelinedPartitionIsNotTracked() { testReleaseOnConsumptionHandling(ResultPartitionType.PIPELINED); } @Test - public void testRetainedOnConsumptionPartitionIsTracked() { + public void testBlockingPartitionIsTracked() { testReleaseOnConsumptionHandling(ResultPartitionType.BLOCKING); } @@ -75,8 +75,7 @@ public class PartitionTrackerImplTest extends TestLogger { resultPartitionType, false)); - final boolean isTrackingExpected = resultPartitionType == ResultPartitionType.BLOCKING; - assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(isTrackingExpected)); + assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(resultPartitionType.isBlocking())); } @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java index 2d85ff6..6ba333d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java @@ -36,6 +36,7 @@ public class TestingPartitionTracker implements PartitionTracker { private Consumer<ResourceID> stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {}; private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {}; private Consumer<Collection<ResultPartitionID>> stopTrackingAndReleasePartitionsConsumer = ignored -> {}; + private Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer = ignored -> {}; public void setStartTrackingPartitionsConsumer(BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer) { this.startTrackingPartitionsConsumer = startTrackingPartitionsConsumer; @@ -61,6 +62,10 @@ public class TestingPartitionTracker implements PartitionTracker { this.stopTrackingAndReleasePartitionsConsumer = stopTrackingAndReleasePartitionsConsumer; } + public void setStopTrackingPartitionsConsumer(Consumer<Collection<ResultPartitionID>> stopTrackingPartitionsConsumer) { + this.stopTrackingPartitionsConsumer = stopTrackingPartitionsConsumer; + } + @Override public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) { this.startTrackingPartitionsConsumer.accept(producingTaskExecutorId, resultPartitionDeploymentDescriptor); @@ -77,6 +82,11 @@ public class TestingPartitionTracker implements PartitionTracker { } @Override + public void stopTrackingPartitions(Collection<ResultPartitionID> resultPartitionIds) { + stopTrackingPartitionsConsumer.accept(resultPartitionIds); + } + + @Override public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) { stopTrackingAndReleaseAllPartitionsConsumer.accept(producingTaskExecutorId); } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index d87cf84..6b6b9e4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -45,6 +45,7 @@ import org.apache.flink.runtime.io.network.partition.PartitionProducerStateProvi import org.apache.flink.runtime.io.network.partition.PartitionTestUtils; import org.apache.flink.runtime.io.network.partition.ResultPartition; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.io.network.partition.consumer.SingleInputGate; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; @@ -90,6 +91,7 @@ import java.util.stream.StreamSupport; import static org.hamcrest.MatcherAssert.assertThat; import static org.hamcrest.Matchers.contains; +import static org.hamcrest.Matchers.is; import static org.junit.Assert.assertTrue; /** @@ -190,33 +192,62 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { } @Test - public void testPartitionReleaseAfterDisconnect() throws Exception { + public void testBlockingPartitionReleaseAfterDisconnect() throws Exception { testPartitionRelease( (jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")), - true); + true, + ResultPartitionType.BLOCKING); } @Test - public void testPartitionReleaseAfterReleaseCall() throws Exception { + public void testPipelinedPartitionNotReleasedAfterDisconnect() throws Exception { + testPartitionRelease( + (jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")), + false, + ResultPartitionType.PIPELINED); + } + + @Test + public void testBlockingPartitionReleaseAfterReleaseCall() throws Exception { testPartitionRelease( (jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)), - true); + true, + ResultPartitionType.BLOCKING); } @Test - public void testPartitionReleaseAfterShutdown() throws Exception { + public void testPipelinedPartitionReleaseAfterReleaseCall() throws Exception { + testPartitionRelease( + (jobId, partitionId, taskExecutorGateway) -> taskExecutorGateway.releasePartitions(jobId, Collections.singletonList(partitionId)), + true, + ResultPartitionType.PIPELINED); + } + + @Test + public void testBlockingPartitionReleaseAfterShutdown() throws Exception { // don't do any explicit release action, so that the partition must be cleaned up on shutdown testPartitionRelease( (jobId, partitionId, taskExecutorGateway) -> { }, - false); + false, + ResultPartitionType.BLOCKING); + } + + @Test + public void testPipelinedPartitionReleaseAfterShutdown() throws Exception { + // don't do any explicit release action, so that the partition must be cleaned up on shutdown + testPartitionRelease( + (jobId, partitionId, taskExecutorGateway) -> { }, + false, + ResultPartitionType.PIPELINED); } private void testPartitionRelease( TriConsumer<JobID, ResultPartitionID, TaskExecutorGateway> releaseAction, - boolean waitForRelease) throws Exception { + boolean waitForRelease, + ResultPartitionType resultPartitionType) throws Exception { final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor = - PartitionTestUtils.createPartitionDeploymentDescriptor(); + PartitionTestUtils.createPartitionDeploymentDescriptor(resultPartitionType); final ExecutionAttemptID eid1 = taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID().getProducerId(); final TaskDeploymentDescriptor taskDeploymentDescriptor = @@ -343,7 +374,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { // the task is still running => the partition is in in-progress runInTaskExecutorThreadAndWait( taskExecutor, - () -> assertTrue(partitionTable.hasTrackedPartitions(jobId))); + () -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking()))); TestingInvokable.sync.releaseBlocker(); taskFinishedFuture.get(timeout.getSize(), timeout.getUnit()); @@ -351,7 +382,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { // the task is finished => the partition should be finished now runInTaskExecutorThreadAndWait( taskExecutor, - () -> assertTrue(partitionTable.hasTrackedPartitions(jobId))); + () -> assertThat(partitionTable.hasTrackedPartitions(jobId), is(resultPartitionType.isBlocking()))); final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>(); runInTaskExecutorThreadAndWait(
