This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 408f6b67aefaccfc76708b2d4772eb7f0a8fd984 Author: Chesnay Schepler <[email protected]> AuthorDate: Wed Jun 12 14:27:32 2019 +0200 [FLINK-12615][coordination] Track partitions on JM --- .../flink/runtime/executiongraph/Execution.java | 43 +-- .../runtime/executiongraph/ExecutionGraph.java | 20 +- .../executiongraph/ExecutionGraphBuilder.java | 7 +- .../io/network/partition/PartitionTracker.java | 57 ++++ .../network/partition/PartitionTrackerFactory.java | 53 ++++ .../io/network/partition/PartitionTrackerImpl.java | 176 ++++++++++++ .../apache/flink/runtime/jobmaster/JobMaster.java | 20 +- .../factories/DefaultJobMasterServiceFactory.java | 8 +- .../flink/runtime/scheduler/DefaultScheduler.java | 7 +- .../runtime/scheduler/DefaultSchedulerFactory.java | 7 +- .../flink/runtime/scheduler/LegacyScheduler.java | 17 +- .../runtime/scheduler/LegacySchedulerFactory.java | 7 +- .../runtime/scheduler/SchedulerNGFactory.java | 4 +- .../CheckpointSettingsSerializableTest.java | 4 +- .../ExecutionGraphDeploymentTest.java | 4 +- .../ExecutionGraphRescalingTest.java | 13 +- .../ExecutionGraphSchedulingTest.java | 4 +- .../executiongraph/ExecutionGraphTestUtils.java | 4 +- .../runtime/executiongraph/ExecutionTest.java | 200 ++++++++------ .../ExecutionVertexLocalityTest.java | 4 +- .../PipelinedFailoverRegionBuildingTest.java | 4 +- .../io/network/partition/NoOpPartitionTracker.java | 53 ++++ .../io/network/partition/PartitionTestUtils.java | 34 +++ .../partition/PartitionTrackerImplTest.java | 294 +++++++++++++++++++++ .../network/partition/TestingPartitionTracker.java | 83 ++++++ .../flink/runtime/jobmaster/JobMasterTest.java | 61 ++++- .../taskexecutor/TestingTaskExecutorGateway.java | 20 +- .../TestingTaskExecutorGatewayBuilder.java | 12 +- 28 files changed, 1082 insertions(+), 138 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index c81ec18..c033304 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions; import org.apache.flink.runtime.checkpoint.CheckpointType; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotProfile; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; @@ -39,6 +40,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; @@ -626,6 +628,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution vertex.getExecutionGraph().getJobMasterMainThreadExecutor(), producedPartitionsCache -> { producedPartitions = producedPartitionsCache; + startTrackingPartitions(location.getResourceID(), producedPartitionsCache.values()); return this; }); } @@ -811,7 +814,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution else if (current == FINISHED || current == FAILED) { // nothing to do any more. finished/failed before it could be cancelled. // in any case, the task is removed from the TaskManager already - sendReleaseIntermediateResultPartitionsRpcCall(); return; } @@ -844,8 +846,6 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution break; case FINISHED: case FAILED: - sendReleaseIntermediateResultPartitionsRpcCall(); - break; case CANCELED: break; default: @@ -1170,6 +1170,8 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution private void finishCancellation() { releaseAssignedResource(new FlinkException("Execution " + this + " was cancelled.")); vertex.getExecutionGraph().deregisterExecution(this); + // release partitions on TM in case the Task finished while we where already CANCELING + stopTrackingAndReleasePartitions(); } void cachePartitionInfo(PartitionInfo partitionInfo) { @@ -1228,6 +1230,7 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution releaseAssignedResource(t); vertex.getExecutionGraph().deregisterExecution(this); + stopTrackingAndReleasePartitions(); if (!isCallback && (current == RUNNING || current == DEPLOYING)) { if (LOG.isDebugEnabled()) { @@ -1323,23 +1326,25 @@ public class Execution implements AccessExecution, Archiveable<ArchivedExecution } } - private void sendReleaseIntermediateResultPartitionsRpcCall() { - LOG.info("Discarding the results produced by task execution {}.", attemptId); - final LogicalSlot slot = assignedResource; - - if (slot != null) { - final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway(); - - Collection<IntermediateResultPartition> partitions = vertex.getProducedPartitions().values(); - Collection<ResultPartitionID> partitionIds = new ArrayList<>(partitions.size()); - for (IntermediateResultPartition partition : partitions) { - partitionIds.add(new ResultPartitionID(partition.getPartitionId(), attemptId)); - } + private void startTrackingPartitions(final ResourceID taskExecutorId, final Collection<ResultPartitionDeploymentDescriptor> partitions) { + PartitionTracker partitionTracker = vertex.getExecutionGraph().getPartitionTracker(); + for (ResultPartitionDeploymentDescriptor partition : partitions) { + partitionTracker.startTrackingPartition( + taskExecutorId, + partition); + } + } - if (!partitionIds.isEmpty()) { - // TODO For some tests this could be a problem when querying too early if all resources were released - taskManagerGateway.releasePartitions(getVertex().getJobId(), partitionIds); - } + private void stopTrackingAndReleasePartitions() { + LOG.info("Discarding the results produced by task execution {}.", attemptId); + if (producedPartitions != null && producedPartitions.size() > 0) { + final PartitionTracker partitionTracker = getVertex().getExecutionGraph().getPartitionTracker(); + final List<ResultPartitionID> producedPartitionIds = producedPartitions.values().stream() + .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor) + .map(ShuffleDescriptor::getResultPartitionID) + .collect(Collectors.toList()); + + partitionTracker.stopTrackingAndReleasePartitions(producedPartitionIds); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index 0c20ee5..514121e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -51,6 +51,8 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; import org.apache.flink.runtime.executiongraph.restart.ExecutionGraphRestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartCallback; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobStatus; @@ -96,6 +98,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.NoSuchElementException; +import java.util.Optional; import java.util.Set; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; @@ -284,6 +287,8 @@ public class ExecutionGraph implements AccessExecutionGraph { * strong reference to any user-defined classes.*/ private volatile ErrorInfo failureInfo; + private final PartitionTracker partitionTracker; + /** * Future for an ongoing or completed scheduling action. */ @@ -409,7 +414,11 @@ public class ExecutionGraph implements AccessExecutionGraph { blobWriter, allocationTimeout, NettyShuffleMaster.INSTANCE, - true); + true, + new PartitionTrackerImpl( + jobInformation.getJobId(), + NettyShuffleMaster.INSTANCE, + ignored -> Optional.empty())); } public ExecutionGraph( @@ -425,7 +434,8 @@ public class ExecutionGraph implements AccessExecutionGraph { BlobWriter blobWriter, Time allocationTimeout, ShuffleMaster<?> shuffleMaster, - boolean forcePartitionReleaseOnConsumption) throws IOException { + boolean forcePartitionReleaseOnConsumption, + PartitionTracker partitionTracker) throws IOException { checkNotNull(futureExecutor); @@ -477,6 +487,8 @@ public class ExecutionGraph implements AccessExecutionGraph { this.forcePartitionReleaseOnConsumption = forcePartitionReleaseOnConsumption; + this.partitionTracker = checkNotNull(partitionTracker); + LOG.info("Job recovers via failover strategy: {}", failoverStrategy.getStrategyName()); } @@ -1819,4 +1831,8 @@ public class ExecutionGraph implements AccessExecutionGraph { ShuffleMaster<?> getShuffleMaster() { return shuffleMaster; } + + public PartitionTracker getPartitionTracker() { + return partitionTracker; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java index c4eead1..f21d703 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraphBuilder.java @@ -43,6 +43,7 @@ import org.apache.flink.runtime.executiongraph.metrics.NumberOfFullRestartsGauge import org.apache.flink.runtime.executiongraph.metrics.RestartTimeGauge; import org.apache.flink.runtime.executiongraph.metrics.UpTimeGauge; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; @@ -94,7 +95,8 @@ public class ExecutionGraphBuilder { BlobWriter blobWriter, Time allocationTimeout, Logger log, - ShuffleMaster<?> shuffleMaster) throws JobExecutionException, JobException { + ShuffleMaster<?> shuffleMaster, + PartitionTracker partitionTracker) throws JobExecutionException, JobException { checkNotNull(jobGraph, "job graph cannot be null"); @@ -135,7 +137,8 @@ public class ExecutionGraphBuilder { blobWriter, allocationTimeout, shuffleMaster, - forcePartitionReleaseOnConsumption); + forcePartitionReleaseOnConsumption, + partitionTracker); } catch (IOException e) { throw new JobException("Could not create the ExecutionGraph.", e); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java new file mode 100644 index 0000000..4c432f1 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTracker.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; + +import java.util.Collection; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public interface PartitionTracker { + + /** + * Starts the tracking of the given partition for the given task executor ID. + * + * @param producingTaskExecutorId ID of task executor on which the partition is produced + * @param resultPartitionDeploymentDescriptor deployment descriptor of the partition + */ + void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor); + + /** + * Stops the tracking of all partitions for the given task executor ID, without issuing any release calls. + */ + void stopTrackingPartitionsFor(ResourceID producingTaskExecutorId); + + /** + * Releases the given partitions and stop the tracking of partitions that were released. + */ + void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds); + + /** + * Releases all partitions for the given task executor ID, and stop the tracking of partitions that were released. + */ + void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId); + + /** + * Returns whether any partition is being tracked for the given task executor ID. + */ + boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId); +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java new file mode 100644 index 0000000..a1c3416 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerFactory.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; + +import java.util.Optional; + +/** + * Factory for {@link PartitionTracker}. + */ +@FunctionalInterface +public interface PartitionTrackerFactory { + + /** + * Creates a new PartitionTracker. + * + * @param taskExecutorGatewayLookup lookup function to access task executor gateways + * @return created PartitionTracker + */ + PartitionTracker create(TaskExecutorGatewayLookup taskExecutorGatewayLookup); + + /** + * Lookup function for {@link TaskExecutorGateway}. + */ + @FunctionalInterface + interface TaskExecutorGatewayLookup { + + /** + * Returns a {@link TaskExecutorGateway} corresponding to the given ResourceID. + * + * @param taskExecutorId id of the task executor to look up. + * @return optional task executor gateway + */ + Optional<TaskExecutorGateway> lookup(ResourceID taskExecutorId); + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java new file mode 100644 index 0000000..53e7d3f --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImpl.java @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskexecutor.partition.PartitionTable; +import org.apache.flink.util.Preconditions; + +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.stream.Collectors; + +import static java.util.stream.Collectors.toList; + +/** + * Utility for tracking partitions and issuing release calls to task executors and shuffle masters. + */ +public class PartitionTrackerImpl implements PartitionTracker { + + private final JobID jobId; + + private final PartitionTable<ResourceID> partitionTable = new PartitionTable<>(); + private final Map<ResultPartitionID, PartitionInfo> partitionInfos = new HashMap<>(); + + private final ShuffleMaster<?> shuffleMaster; + + private final PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup; + + public PartitionTrackerImpl( + JobID jobId, + ShuffleMaster<?> shuffleMaster, + PartitionTrackerFactory.TaskExecutorGatewayLookup taskExecutorGatewayLookup) { + + this.jobId = Preconditions.checkNotNull(jobId); + this.shuffleMaster = Preconditions.checkNotNull(shuffleMaster); + this.taskExecutorGatewayLookup = taskExecutorGatewayLookup; + } + + @Override + public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) { + Preconditions.checkNotNull(producingTaskExecutorId); + Preconditions.checkNotNull(resultPartitionDeploymentDescriptor); + + // if it is released on consumption we do not need to issue any release calls + if (resultPartitionDeploymentDescriptor.isReleasedOnConsumption()) { + return; + } + + final ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(); + + partitionInfos.put(resultPartitionId, new PartitionInfo(producingTaskExecutorId, resultPartitionDeploymentDescriptor)); + partitionTable.startTrackingPartitions(producingTaskExecutorId, Collections.singletonList(resultPartitionId)); + } + + @Override + public void stopTrackingPartitionsFor(ResourceID producingTaskExecutorId) { + Preconditions.checkNotNull(producingTaskExecutorId); + + // this is a bit icky since we make 2 calls to pT#stopTrackingPartitions + final Collection<ResultPartitionID> resultPartitionIds = partitionTable.stopTrackingPartitions(producingTaskExecutorId); + + for (ResultPartitionID resultPartitionId : resultPartitionIds) { + internalStopTrackingPartition(resultPartitionId); + } + } + + @Override + public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds) { + Preconditions.checkNotNull(resultPartitionIds); + + // stop tracking partitions to be released and group them by task executor ID + Map<ResourceID, List<ResultPartitionDeploymentDescriptor>> partitionsToReleaseByResourceId = resultPartitionIds.stream() + .map(this::internalStopTrackingPartition) + .filter(Optional::isPresent) + .map(Optional::get) + .collect(Collectors.groupingBy( + partitionMetaData -> partitionMetaData.producingTaskExecutorResourceId, + Collectors.mapping( + partitionMetaData -> partitionMetaData.resultPartitionDeploymentDescriptor, + toList()))); + + partitionsToReleaseByResourceId.forEach(this::internalReleasePartitions); + } + + @Override + public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) { + Preconditions.checkNotNull(producingTaskExecutorId); + + // this is a bit icky since we make 2 calls to pT#stopTrackingPartitions + Collection<ResultPartitionID> resultPartitionIds = partitionTable.stopTrackingPartitions(producingTaskExecutorId); + + stopTrackingAndReleasePartitions(resultPartitionIds); + } + + @Override + public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) { + Preconditions.checkNotNull(producingTaskExecutorId); + + return partitionTable.hasTrackedPartitions(producingTaskExecutorId); + } + + private Optional<PartitionInfo> internalStopTrackingPartition(ResultPartitionID resultPartitionId) { + final PartitionInfo partitionInfo = partitionInfos.remove(resultPartitionId); + if (partitionInfo == null) { + return Optional.empty(); + } + partitionTable.stopTrackingPartitions(partitionInfo.producingTaskExecutorResourceId, Collections.singletonList(resultPartitionId)); + + return Optional.of(partitionInfo); + } + + private void internalReleasePartitions( + ResourceID potentialPartitionLocation, + Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) { + + internalReleasePartitionsOnTaskExecutor(potentialPartitionLocation, partitionDeploymentDescriptors); + internalReleasePartitionsOnShuffleMaster(partitionDeploymentDescriptors); + } + + private void internalReleasePartitionsOnTaskExecutor( + ResourceID potentialPartitionLocation, + Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) { + + final List<ResultPartitionID> partitionsRequiringRpcReleaseCalls = partitionDeploymentDescriptors.stream() + .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor) + .filter(descriptor -> descriptor.storesLocalResourcesOn().isPresent()) + .map(ShuffleDescriptor::getResultPartitionID) + .collect(Collectors.toList()); + + if (!partitionsRequiringRpcReleaseCalls.isEmpty()) { + taskExecutorGatewayLookup + .lookup(potentialPartitionLocation) + .ifPresent(taskExecutorGateway -> + taskExecutorGateway.releasePartitions(jobId, partitionsRequiringRpcReleaseCalls)); + } + } + + private void internalReleasePartitionsOnShuffleMaster(Collection<ResultPartitionDeploymentDescriptor> partitionDeploymentDescriptors) { + partitionDeploymentDescriptors.stream() + .map(ResultPartitionDeploymentDescriptor::getShuffleDescriptor) + .forEach(shuffleMaster::releasePartitionExternally); + } + + private static final class PartitionInfo { + public final ResourceID producingTaskExecutorResourceId; + public final ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor; + + private PartitionInfo(ResourceID producingTaskExecutorResourceId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) { + this.producingTaskExecutorResourceId = producingTaskExecutorResourceId; + this.resultPartitionDeploymentDescriptor = resultPartitionDeploymentDescriptor; + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index ae903fe..2732727 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -39,6 +39,8 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; +import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -195,6 +197,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast private Map<String, Object> accumulators; + private final PartitionTracker partitionTracker; + // ------------------------------------------------------------------------ public JobMaster( @@ -212,7 +216,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast FatalErrorHandler fatalErrorHandler, ClassLoader userCodeLoader, SchedulerNGFactory schedulerNGFactory, - ShuffleMaster<?> shuffleMaster) throws Exception { + ShuffleMaster<?> shuffleMaster, + PartitionTrackerFactory partitionTrackerFactory) throws Exception { super(rpcService, AkkaRpcServiceUtils.createRandomName(JOB_MANAGER_NAME)); @@ -242,6 +247,15 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast this.scheduler = checkNotNull(schedulerFactory).createScheduler(slotPool); this.registeredTaskManagers = new HashMap<>(4); + this.partitionTracker = checkNotNull(partitionTrackerFactory) + .create(resourceID -> { + Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerInfo = registeredTaskManagers.get(resourceID); + if (taskManagerInfo == null) { + return Optional.empty(); + } + + return Optional.of(taskManagerInfo.f1); + }); this.backPressureStatsTracker = checkNotNull(jobManagerSharedServices.getBackPressureStatsTracker()); @@ -272,7 +286,8 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast blobWriter, jobManagerJobMetricGroup, jobMasterConfiguration.getSlotRequestTimeout(), - shuffleMaster); + shuffleMaster, + partitionTracker); } //---------------------------------------------------------------------------------------------- @@ -409,6 +424,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast taskManagerHeartbeatManager.unmonitorTarget(resourceID); slotPool.releaseTaskManager(resourceID, cause); + partitionTracker.stopTrackingPartitionsFor(resourceID); Tuple2<TaskManagerLocation, TaskExecutorGateway> taskManagerConnection = registeredTaskManagers.remove(resourceID); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java index 60260b0..eee2d7c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/factories/DefaultJobMasterServiceFactory.java @@ -21,6 +21,7 @@ package org.apache.flink.runtime.jobmaster.factories; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; +import org.apache.flink.runtime.io.network.partition.PartitionTrackerImpl; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.OnCompletionActions; import org.apache.flink.runtime.jobmaster.JobManagerSharedServices; @@ -106,6 +107,11 @@ public class DefaultJobMasterServiceFactory implements JobMasterServiceFactory { fatalErrorHandler, userCodeClassloader, schedulerNGFactory, - shuffleMaster); + shuffleMaster, + lookup -> new PartitionTrackerImpl( + jobGraph.getJobID(), + shuffleMaster, + lookup + )); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java index 301bd11..05429ac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultScheduler.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.executiongraph.restart.ThrowingRestartStrategy; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; @@ -54,7 +55,8 @@ public class DefaultScheduler extends LegacyScheduler { final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, final Time slotRequestTimeout, - final ShuffleMaster<?> shuffleMaster) throws Exception { + final ShuffleMaster<?> shuffleMaster, + final PartitionTracker partitionTracker) throws Exception { super( log, @@ -71,7 +73,8 @@ public class DefaultScheduler extends LegacyScheduler { blobWriter, jobManagerJobMetricGroup, slotRequestTimeout, - shuffleMaster); + shuffleMaster, + partitionTracker); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java index ead7b9f..5779a7e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultSchedulerFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; @@ -54,7 +55,8 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory { final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, final Time slotRequestTimeout, - final ShuffleMaster<?> shuffleMaster) throws Exception { + final ShuffleMaster<?> shuffleMaster, + final PartitionTracker partitionTracker) throws Exception { return new DefaultScheduler( log, @@ -70,7 +72,8 @@ public class DefaultSchedulerFactory implements SchedulerNGFactory { blobWriter, jobManagerJobMetricGroup, slotRequestTimeout, - shuffleMaster); + shuffleMaster, + partitionTracker); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java index 71021ab..7dec7fc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacyScheduler.java @@ -50,6 +50,7 @@ import org.apache.flink.runtime.executiongraph.JobStatusListener; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyResolving; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -144,7 +145,8 @@ public class LegacyScheduler implements SchedulerNG { final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, final Time slotRequestTimeout, - final ShuffleMaster<?> shuffleMaster) throws Exception { + final ShuffleMaster<?> shuffleMaster, + final PartitionTracker partitionTracker) throws Exception { this.log = checkNotNull(log); this.jobGraph = checkNotNull(jobGraph); @@ -171,14 +173,15 @@ public class LegacyScheduler implements SchedulerNG { this.blobWriter = checkNotNull(blobWriter); this.slotRequestTimeout = checkNotNull(slotRequestTimeout); - this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster)); + this.executionGraph = createAndRestoreExecutionGraph(jobManagerJobMetricGroup, checkNotNull(shuffleMaster), checkNotNull(partitionTracker)); } private ExecutionGraph createAndRestoreExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, - ShuffleMaster<?> shuffleMaster) throws Exception { + ShuffleMaster<?> shuffleMaster, + PartitionTracker partitionTracker) throws Exception { - ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster); + ExecutionGraph newExecutionGraph = createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker); final CheckpointCoordinator checkpointCoordinator = newExecutionGraph.getCheckpointCoordinator(); @@ -199,7 +202,8 @@ public class LegacyScheduler implements SchedulerNG { private ExecutionGraph createExecutionGraph( JobManagerJobMetricGroup currentJobManagerJobMetricGroup, - ShuffleMaster<?> shuffleMaster) throws JobExecutionException, JobException { + ShuffleMaster<?> shuffleMaster, + final PartitionTracker partitionTracker) throws JobExecutionException, JobException { return ExecutionGraphBuilder.buildGraph( null, jobGraph, @@ -215,7 +219,8 @@ public class LegacyScheduler implements SchedulerNG { blobWriter, slotRequestTimeout, log, - shuffleMaster); + shuffleMaster, + partitionTracker); } /** diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java index 1f22fa9..2458697 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/LegacySchedulerFactory.java @@ -24,6 +24,7 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; import org.apache.flink.runtime.executiongraph.restart.RestartStrategyFactory; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; @@ -63,7 +64,8 @@ public class LegacySchedulerFactory implements SchedulerNGFactory { final BlobWriter blobWriter, final JobManagerJobMetricGroup jobManagerJobMetricGroup, final Time slotRequestTimeout, - final ShuffleMaster<?> shuffleMaster) throws Exception { + final ShuffleMaster<?> shuffleMaster, + final PartitionTracker partitionTracker) throws Exception { return new LegacyScheduler( log, @@ -80,6 +82,7 @@ public class LegacySchedulerFactory implements SchedulerNGFactory { blobWriter, jobManagerJobMetricGroup, slotRequestTimeout, - shuffleMaster); + shuffleMaster, + partitionTracker); } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java index 56bc044..3d2e096 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerNGFactory.java @@ -23,6 +23,7 @@ import org.apache.flink.api.common.time.Time; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.checkpoint.CheckpointRecoveryFactory; +import org.apache.flink.runtime.io.network.partition.PartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.metrics.groups.JobManagerJobMetricGroup; @@ -53,6 +54,7 @@ public interface SchedulerNGFactory { BlobWriter blobWriter, JobManagerJobMetricGroup jobManagerJobMetricGroup, Time slotRequestTimeout, - ShuffleMaster<?> shuffleMaster) throws Exception; + ShuffleMaster<?> shuffleMaster, + PartitionTracker partitionTracker) throws Exception; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java index 74c72438..e4994b2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointSettingsSerializableTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.Environment; import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration; @@ -120,7 +121,8 @@ public class CheckpointSettingsSerializableTest extends TestLogger { VoidBlobWriter.getInstance(), timeout, log, - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); assertEquals(1, eg.getCheckpointCoordinator().getNumberOfRegisteredMasterHooks()); assertTrue(jobGraph.getCheckpointingSettings().getDefaultStateBackend().deserializeValue(classLoader) instanceof CustomStateBackend); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java index 15643a9..7ad1dbb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphDeploymentTest.java @@ -42,6 +42,7 @@ import org.apache.flink.runtime.executiongraph.failover.RestartAllStrategy; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -824,7 +825,8 @@ public class ExecutionGraphDeploymentTest extends TestLogger { blobWriter, timeout, LoggerFactory.getLogger(getClass()), - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); } private static final class ExecutionStageMatcher extends TypeSafeMatcher<List<ExecutionAttemptID>> { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java index 306a835..8e69df9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphRescalingTest.java @@ -25,6 +25,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -79,7 +80,8 @@ public class ExecutionGraphRescalingTest extends TestLogger { VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout(), TEST_LOGGER, - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); for (JobVertex jv : jobVertices) { assertThat(jv.getParallelism(), is(initialParallelism)); @@ -109,7 +111,8 @@ public class ExecutionGraphRescalingTest extends TestLogger { VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout(), TEST_LOGGER, - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); for (JobVertex jv : jobVertices) { assertThat(jv.getParallelism(), is(1)); @@ -139,7 +142,8 @@ public class ExecutionGraphRescalingTest extends TestLogger { VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout(), TEST_LOGGER, - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); for (JobVertex jv : jobVertices) { assertThat(jv.getParallelism(), is(scaleUpParallelism)); @@ -182,7 +186,8 @@ public class ExecutionGraphRescalingTest extends TestLogger { VoidBlobWriter.getInstance(), AkkaUtils.getDefaultTimeout(), TEST_LOGGER, - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); fail("Building the ExecutionGraph with a parallelism higher than the max parallelism should fail."); } catch (JobException e) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java index 4b5c98f..9ceaf84 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphSchedulingTest.java @@ -32,6 +32,7 @@ import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGate import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlotContext; import org.apache.flink.runtime.instance.SlotSharingGroupId; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -591,7 +592,8 @@ public class ExecutionGraphSchedulingTest extends TestLogger { VoidBlobWriter.getInstance(), timeout, log, - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); } private SimpleSlot createSlot(TaskManagerGateway taskManager, JobID jobId) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java index f0924fe..f614795 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionGraphTestUtils.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.restart.RestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.executiongraph.utils.SimpleSlotProvider; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobStatus; import org.apache.flink.runtime.jobgraph.JobVertex; @@ -411,7 +412,8 @@ public class ExecutionGraphTestUtils { VoidBlobWriter.getInstance(), timeout, TEST_LOGGER, - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); } public static JobVertex createNoOpVertex(int parallelism) { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java index efca2d4..3a0770e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionTest.java @@ -19,19 +19,32 @@ package org.apache.flink.runtime.executiongraph; import org.apache.flink.api.common.JobID; +import org.apache.flink.api.common.time.Time; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.configuration.Configuration; +import org.apache.flink.metrics.groups.UnregisteredMetricsGroup; +import org.apache.flink.runtime.blob.VoidBlobWriter; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; +import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.clusterframework.types.SlotProfile; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; import org.apache.flink.runtime.executiongraph.utils.SimpleAckingTaskManagerGateway; import org.apache.flink.runtime.instance.SimpleSlot; +import org.apache.flink.runtime.instance.SlotSharingGroupId; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker; import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateResultPartitionID; +import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobmanager.scheduler.LocationPreferenceConstraint; +import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.scheduler.SchedulerTestUtils; import org.apache.flink.runtime.jobmaster.LogicalSlot; import org.apache.flink.runtime.jobmaster.SlotOwner; @@ -39,6 +52,7 @@ import org.apache.flink.runtime.jobmaster.SlotRequestId; import org.apache.flink.runtime.jobmaster.TestingLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SingleLogicalSlot; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; +import org.apache.flink.runtime.shuffle.NettyShuffleMaster; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testingUtils.TestingUtils; @@ -50,6 +64,7 @@ import org.junit.ClassRule; import org.junit.Test; import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.util.Arrays; import java.util.Collection; @@ -60,6 +75,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.function.Consumer; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.containsInAnyOrder; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; @@ -69,7 +85,6 @@ import static org.hamcrest.Matchers.nullValue; import static org.hamcrest.Matchers.sameInstance; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; -import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; @@ -300,82 +315,6 @@ public class ExecutionTest extends TestLogger { } /** - * Tests that the partitions are released in case of an execution cancellation after the execution is already finished. - */ - @Test - public void testPartitionReleaseOnCancelingAfterBeingFinished() throws Exception { - testPartitionReleaseAfterFinished(Execution::cancel); - } - - /** - * Tests that the partitions are released in case of an execution suspension after the execution is already finished. - */ - @Test - public void testPartitionReleaseOnSuspendingAfterBeingFinished() throws Exception { - testPartitionReleaseAfterFinished(Execution::suspend); - } - - private void testPartitionReleaseAfterFinished(Consumer<Execution> postFinishedExecutionAction) throws Exception { - final Tuple2<JobID, Collection<ResultPartitionID>> releasedPartitions = Tuple2.of(null, null); - final SimpleAckingTaskManagerGateway taskManagerGateway = new SimpleAckingTaskManagerGateway(); - taskManagerGateway.setReleasePartitionsConsumer(releasedPartitions::setFields); - - final JobVertex producerVertex = createNoOpJobVertex(); - final JobVertex consumerVertex = createNoOpJobVertex(); - consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); - - final SimpleSlot slot = new SimpleSlot( - new SingleSlotTestingSlotOwner(), - new LocalTaskManagerLocation(), - 0, - taskManagerGateway); - - final ProgrammedSlotProvider slotProvider = new ProgrammedSlotProvider(1); - slotProvider.addSlot(producerVertex.getID(), 0, CompletableFuture.completedFuture(slot)); - - ExecutionGraph executionGraph = ExecutionGraphTestUtils.createSimpleTestGraph( - new JobID(), - slotProvider, - new NoRestartStrategy(), - producerVertex, - consumerVertex); - - executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); - - ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID()); - ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; - - final Execution execution = executionVertex.getCurrentExecutionAttempt(); - - execution.allocateResourcesForExecution( - slotProvider, - false, - LocationPreferenceConstraint.ALL, - Collections.emptySet(), - TestingUtils.infiniteTime()); - - execution.deploy(); - execution.switchToRunning(); - - // simulate a case where a cancel/suspend call is too slow and the task is already finished - // in this case we have to explicitly release the finished partition - // if the task were canceled properly the TM would release the partition automatically - execution.markFinished(); - postFinishedExecutionAction.accept(execution); - - assertEquals(executionGraph.getJobID(), releasedPartitions.f0); - assertEquals(executionVertex.getProducedPartitions().size(), releasedPartitions.f1.size()); - for (ResultPartitionID partitionId : releasedPartitions.f1) { - // ensure all IDs of released partitions are actually valid - IntermediateResultPartition intermediateResultPartition = executionVertex - .getProducedPartitions() - .get(partitionId.getPartitionId()); - assertNotNull(intermediateResultPartition); - assertEquals(execution.getAttemptId(), partitionId.getProducerId()); - } - } - - /** * Tests that all preferred locations are calculated. */ @Test @@ -588,6 +527,113 @@ public class ExecutionTest extends TestLogger { assertThat(returnedSlotFuture.get(), is(equalTo(slotRequestIdFuture.get()))); } + @Test + public void testPartitionRetainedWhenFinished() throws Exception { + testPartitionTrackingForStateTransition(Execution::markFinished, false); + } + + @Test + public void testPartitionReleasedWhenCanceled() throws Exception { + testPartitionTrackingForStateTransition( + execution -> { + execution.cancel(); + execution.completeCancelling(); + }, + true); + } + + @Test + public void testPartitionReleasedWhenFailed() throws Exception { + testPartitionTrackingForStateTransition(execution -> execution.fail(new Exception("Test exception")), true); + } + + private void testPartitionTrackingForStateTransition(final Consumer<Execution> stateTransition, final boolean shouldPartitionBeReleased) throws Exception { + final JobVertex producerVertex = createNoOpJobVertex(); + final JobVertex consumerVertex = createNoOpJobVertex(); + consumerVertex.connectNewDataSetAsInput(producerVertex, DistributionPattern.ALL_TO_ALL, ResultPartitionType.BLOCKING); + + final TaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + + final SlotProvider slotProvider = new SlotProvider() { + @Override + public CompletableFuture<LogicalSlot> allocateSlot(SlotRequestId slotRequestId, ScheduledUnit scheduledUnit, SlotProfile slotProfile, boolean allowQueuedScheduling, Time allocationTimeout) { + return CompletableFuture.completedFuture(new SimpleSlot( + new SingleSlotTestingSlotOwner(), + taskManagerLocation, + 0, + new SimpleAckingTaskManagerGateway())); + } + + @Override + public void cancelSlotRequest(SlotRequestId slotRequestId, @Nullable SlotSharingGroupId slotSharingGroupId, Throwable cause) { + } + }; + + CompletableFuture<Tuple2<ResourceID, ResultPartitionDeploymentDescriptor>> partitionStartTrackingFuture = new CompletableFuture<>(); + CompletableFuture<Collection<ResultPartitionID>> partitionReleaseFuture = new CompletableFuture<>(); + final TestingPartitionTracker partitionTracker = new TestingPartitionTracker(); + partitionTracker.setStartTrackingPartitionsConsumer( + (resourceID, resultPartitionDeploymentDescriptor) -> + partitionStartTrackingFuture.complete(Tuple2.of(resourceID, resultPartitionDeploymentDescriptor)) + ); + partitionTracker.setStopTrackingAndReleasePartitionsConsumer(partitionReleaseFuture::complete); + + final ExecutionGraph executionGraph = ExecutionGraphBuilder.buildGraph( + null, + new JobGraph(new JobID(), "test job", producerVertex, consumerVertex), + new Configuration(), + TestingUtils.defaultExecutor(), + TestingUtils.defaultExecutor(), + slotProvider, + ExecutionTest.class.getClassLoader(), + new StandaloneCheckpointRecoveryFactory(), + Time.seconds(10), + new NoRestartStrategy(), + new UnregisteredMetricsGroup(), + VoidBlobWriter.getInstance(), + Time.seconds(10), + log, + NettyShuffleMaster.INSTANCE, + partitionTracker); + + executionGraph.start(TestingComponentMainThreadExecutorServiceAdapter.forMainThread()); + + final ExecutionJobVertex executionJobVertex = executionGraph.getJobVertex(producerVertex.getID()); + final ExecutionVertex executionVertex = executionJobVertex.getTaskVertices()[0]; + + final Execution execution = executionVertex.getCurrentExecutionAttempt(); + + execution.allocateResourcesForExecution( + slotProvider, + false, + LocationPreferenceConstraint.ALL, + Collections.emptySet(), + TestingUtils.infiniteTime()); + + assertThat(partitionStartTrackingFuture.isDone(), is(true)); + final Tuple2<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingCall = partitionStartTrackingFuture.get(); + + final IntermediateResultPartitionID expectedIntermediateResultPartitionId = executionJobVertex + .getProducedDataSets()[0] + .getPartitions()[0] + .getPartitionId(); + final ResultPartitionDeploymentDescriptor descriptor = execution + .getResultPartitionDeploymentDescriptor(expectedIntermediateResultPartitionId).get(); + assertThat(startTrackingCall.f0, equalTo(taskManagerLocation.getResourceID())); + assertThat(startTrackingCall.f1, equalTo(descriptor)); + + execution.deploy(); + execution.switchToRunning(); + + stateTransition.accept(execution); + + assertThat(partitionReleaseFuture.isDone(), is(shouldPartitionBeReleased)); + if (shouldPartitionBeReleased) { + final Collection<ResultPartitionID> partitionReleaseCall = partitionReleaseFuture.get(); + assertThat(partitionReleaseCall, contains(descriptor.getShuffleDescriptor().getResultPartitionID())); + } + } + /** * Tests that a slot release will atomically release the assigned {@link Execution}. */ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java index 5b8ef04..3e95587 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionVertexLocalityTest.java @@ -31,6 +31,7 @@ import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.restart.FixedDelayRestartStrategy; import org.apache.flink.runtime.instance.SimpleSlot; import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -226,7 +227,8 @@ public class ExecutionVertexLocalityTest extends TestLogger { VoidBlobWriter.getInstance(), timeout, log, - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); } private void initializeLocation(ExecutionVertex vertex, TaskManagerLocation location) throws Exception { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java index 92e659c..b9c1322 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/failover/PipelinedFailoverRegionBuildingTest.java @@ -30,6 +30,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionGraph; import org.apache.flink.runtime.executiongraph.ExecutionGraphBuilder; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.executiongraph.restart.NoRestartStrategy; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; import org.apache.flink.runtime.jobmaster.slotpool.SlotProvider; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; import org.apache.flink.runtime.jobgraph.DistributionPattern; @@ -644,6 +645,7 @@ public class PipelinedFailoverRegionBuildingTest extends TestLogger { VoidBlobWriter.getInstance(), timeout, log, - NettyShuffleMaster.INSTANCE); + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.INSTANCE); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java new file mode 100644 index 0000000..30895b2 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/NoOpPartitionTracker.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; + +import java.util.Collection; + +/** + * No-op implementation of {@link PartitionTracker}. + */ +public enum NoOpPartitionTracker implements PartitionTracker { + INSTANCE; + + public static final PartitionTrackerFactory FACTORY = lookup -> INSTANCE; + + @Override + public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) { + } + + @Override + public void stopTrackingPartitionsFor(ResourceID producingTaskExecutorId) { + } + + @Override + public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds) { + } + + @Override + public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) { + } + + @Override + public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) { + return false; + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java index 007fdc3..51eff94 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTestUtils.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.io.network.partition; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.io.network.NettyShuffleEnvironment; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; @@ -28,6 +29,8 @@ import org.apache.flink.runtime.util.NettyShuffleDescriptorBuilder; import org.hamcrest.Matchers; import java.io.IOException; +import java.util.EnumSet; +import java.util.Optional; import static org.junit.Assert.assertThat; import static org.junit.Assert.fail; @@ -101,4 +104,35 @@ public class PartitionTestUtils { true, releaseType); } + + public static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor(ResultPartitionID resultPartitionId, ShuffleDescriptor.ReleaseType releaseType, boolean hasLocalResources) { + return new ResultPartitionDeploymentDescriptor( + new PartitionDescriptor( + new IntermediateDataSetID(), + resultPartitionId.getPartitionId(), + ResultPartitionType.BLOCKING, + 1, + 0), + new ShuffleDescriptor() { + @Override + public ResultPartitionID getResultPartitionID() { + return resultPartitionId; + } + + @Override + public Optional<ResourceID> storesLocalResourcesOn() { + return hasLocalResources + ? Optional.of(ResourceID.generate()) + : Optional.empty(); + } + + @Override + public EnumSet<ReleaseType> getSupportedReleaseTypes() { + return EnumSet.of(releaseType); + } + }, + 1, + true, + releaseType); + } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java new file mode 100644 index 0000000..07aba93 --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/PartitionTrackerImplTest.java @@ -0,0 +1,294 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.api.common.JobID; +import org.apache.flink.api.java.tuple.Tuple3; +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; +import org.apache.flink.runtime.shuffle.PartitionDescriptor; +import org.apache.flink.runtime.shuffle.ProducerDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleDescriptor; +import org.apache.flink.runtime.shuffle.ShuffleMaster; +import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; +import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.util.TestLogger; + +import org.junit.Test; + +import java.util.Collection; +import java.util.Collections; +import java.util.EnumSet; +import java.util.Optional; +import java.util.Queue; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.CompletableFuture; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.CoreMatchers.not; +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; +import static org.junit.Assert.assertEquals; + +/** + * Tests for the {@link PartitionTrackerImpl}. + */ +public class PartitionTrackerImplTest extends TestLogger { + + @Test + public void testReleasedOnConsumptionPartitionIsNotTracked() { + testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.AUTO); + } + + @Test + public void testRetainedOnConsumptionPartitionIsTracked() { + testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType.MANUAL); + } + + private void testReleaseOnConsumptionHandling(ShuffleDescriptor.ReleaseType releaseType) { + final PartitionTracker partitionTracker = new PartitionTrackerImpl( + new JobID(), + new TestingShuffleMaster(), + ignored -> Optional.empty() + ); + + final ResourceID resourceId = ResourceID.generate(); + final ResultPartitionID resultPartitionId = new ResultPartitionID(); + partitionTracker.startTrackingPartition( + resourceId, + createResultPartitionDeploymentDescriptor( + resultPartitionId, + releaseType, + false)); + + assertThat(partitionTracker.isTrackingPartitionsFor(resourceId), is(not(releaseType))); + } + + @Test + public void testStartStopTracking() { + final Queue<Tuple3<ResourceID, JobID, Collection<ResultPartitionID>>> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); + final PartitionTracker partitionTracker = new PartitionTrackerImpl( + new JobID(), + new TestingShuffleMaster(), + resourceId -> Optional.of(createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)) + ); + + final ResourceID executorWithTrackedPartition = new ResourceID("tracked"); + final ResourceID executorWithoutTrackedPartition = new ResourceID("untracked"); + + assertThat(partitionTracker.isTrackingPartitionsFor(executorWithTrackedPartition), is(false)); + assertThat(partitionTracker.isTrackingPartitionsFor(executorWithoutTrackedPartition), is(false)); + + partitionTracker.startTrackingPartition( + executorWithTrackedPartition, + createResultPartitionDeploymentDescriptor(new ResultPartitionID(), ShuffleDescriptor.ReleaseType.MANUAL, true)); + + assertThat(partitionTracker.isTrackingPartitionsFor(executorWithTrackedPartition), is(true)); + assertThat(partitionTracker.isTrackingPartitionsFor(executorWithoutTrackedPartition), is(false)); + + partitionTracker.stopTrackingPartitionsFor(executorWithTrackedPartition); + + assertThat(partitionTracker.isTrackingPartitionsFor(executorWithTrackedPartition), is(false)); + assertThat(partitionTracker.isTrackingPartitionsFor(executorWithoutTrackedPartition), is(false)); + } + + @Test + public void testReleaseCallsWithLocalResources() { + final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); + final JobID jobId = new JobID(); + + final Queue<Tuple3<ResourceID, JobID, Collection<ResultPartitionID>>> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); + final PartitionTracker partitionTracker = new PartitionTrackerImpl( + jobId, + shuffleMaster, + resourceId -> Optional.of(createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)) + ); + + final ResourceID taskExecutorId1 = ResourceID.generate(); + final ResourceID taskExecutorId2 = ResourceID.generate(); + final ResultPartitionID resultPartitionId1 = new ResultPartitionID(); + final ResultPartitionID resultPartitionId2 = new ResultPartitionID(); + + partitionTracker.startTrackingPartition( + taskExecutorId1, + createResultPartitionDeploymentDescriptor(resultPartitionId1, ShuffleDescriptor.ReleaseType.MANUAL, true)); + partitionTracker.startTrackingPartition( + taskExecutorId2, + createResultPartitionDeploymentDescriptor(resultPartitionId2, ShuffleDescriptor.ReleaseType.MANUAL, true)); + + { + partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1); + + assertEquals(1, taskExecutorReleaseCalls.size()); + + Tuple3<ResourceID, JobID, Collection<ResultPartitionID>> taskExecutorReleaseCall = taskExecutorReleaseCalls.remove(); + assertEquals(taskExecutorId1, taskExecutorReleaseCall.f0); + assertEquals(jobId, taskExecutorReleaseCall.f1); + assertThat(taskExecutorReleaseCall.f2, contains(resultPartitionId1)); + + assertEquals(1, shuffleMaster.externallyReleasedPartitions.size()); + assertEquals(resultPartitionId1, shuffleMaster.externallyReleasedPartitions.remove()); + + assertThat(partitionTracker.isTrackingPartitionsFor(taskExecutorId1), is(false)); + } + + { + partitionTracker.stopTrackingAndReleasePartitions(Collections.singletonList(resultPartitionId2)); + + assertEquals(1, taskExecutorReleaseCalls.size()); + + Tuple3<ResourceID, JobID, Collection<ResultPartitionID>> releaseCall = taskExecutorReleaseCalls.remove(); + assertEquals(taskExecutorId2, releaseCall.f0); + assertEquals(jobId, releaseCall.f1); + assertThat(releaseCall.f2, contains(resultPartitionId2)); + + assertEquals(1, shuffleMaster.externallyReleasedPartitions.size()); + assertEquals(resultPartitionId2, shuffleMaster.externallyReleasedPartitions.remove()); + + assertThat(partitionTracker.isTrackingPartitionsFor(taskExecutorId2), is(false)); + } + } + + @Test + public void testReleaseCallsWithoutLocalResources() { + final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); + + final Queue<Tuple3<ResourceID, JobID, Collection<ResultPartitionID>>> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); + final PartitionTracker partitionTracker = new PartitionTrackerImpl( + new JobID(), + shuffleMaster, + resourceId -> Optional.of(createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)) + ); + + final ResourceID taskExecutorId1 = ResourceID.generate(); + final ResourceID taskExecutorId2 = ResourceID.generate(); + final ResultPartitionID resultPartitionId1 = new ResultPartitionID(); + final ResultPartitionID resultPartitionId2 = new ResultPartitionID(); + + partitionTracker.startTrackingPartition( + taskExecutorId1, + createResultPartitionDeploymentDescriptor(resultPartitionId1, ShuffleDescriptor.ReleaseType.MANUAL, false)); + partitionTracker.startTrackingPartition( + taskExecutorId2, + createResultPartitionDeploymentDescriptor(resultPartitionId2, ShuffleDescriptor.ReleaseType.MANUAL, false)); + + { + partitionTracker.stopTrackingAndReleasePartitionsFor(taskExecutorId1); + + assertEquals(0, taskExecutorReleaseCalls.size()); + + assertEquals(1, shuffleMaster.externallyReleasedPartitions.size()); + assertEquals(resultPartitionId1, shuffleMaster.externallyReleasedPartitions.remove()); + + assertThat(partitionTracker.isTrackingPartitionsFor(taskExecutorId1), is(false)); + } + + { + partitionTracker.stopTrackingAndReleasePartitions(Collections.singletonList(resultPartitionId2)); + + assertEquals(0, taskExecutorReleaseCalls.size()); + + assertEquals(1, shuffleMaster.externallyReleasedPartitions.size()); + assertEquals(resultPartitionId2, shuffleMaster.externallyReleasedPartitions.remove()); + + assertThat(partitionTracker.isTrackingPartitionsFor(taskExecutorId2), is(false)); + } + } + + @Test + public void testStopTrackingIssuesNoReleaseCalls() { + final TestingShuffleMaster shuffleMaster = new TestingShuffleMaster(); + + final Queue<Tuple3<ResourceID, JobID, Collection<ResultPartitionID>>> taskExecutorReleaseCalls = new ArrayBlockingQueue<>(4); + final PartitionTracker partitionTracker = new PartitionTrackerImpl( + new JobID(), + new TestingShuffleMaster(), + resourceId -> Optional.of(createTaskExecutorGateway(resourceId, taskExecutorReleaseCalls)) + ); + + final ResourceID taskExecutorId1 = ResourceID.generate(); + final ResultPartitionID resultPartitionId1 = new ResultPartitionID(); + + partitionTracker.startTrackingPartition( + taskExecutorId1, + createResultPartitionDeploymentDescriptor(resultPartitionId1, ShuffleDescriptor.ReleaseType.MANUAL, true)); + + partitionTracker.stopTrackingPartitionsFor(taskExecutorId1); + + assertEquals(0, taskExecutorReleaseCalls.size()); + assertEquals(0, shuffleMaster.externallyReleasedPartitions.size()); + } + + private static ResultPartitionDeploymentDescriptor createResultPartitionDeploymentDescriptor( + ResultPartitionID resultPartitionId, + ShuffleDescriptor.ReleaseType releaseType, + boolean hasLocalResources) { + + return new ResultPartitionDeploymentDescriptor( + new PartitionDescriptor( + new IntermediateDataSetID(), + resultPartitionId.getPartitionId(), + ResultPartitionType.BLOCKING, + 1, + 0), + new ShuffleDescriptor() { + @Override + public ResultPartitionID getResultPartitionID() { + return resultPartitionId; + } + + @Override + public Optional<ResourceID> storesLocalResourcesOn() { + return hasLocalResources + ? Optional.of(ResourceID.generate()) + : Optional.empty(); + } + + @Override + public EnumSet<ReleaseType> getSupportedReleaseTypes() { + return EnumSet.of(releaseType); + } + }, + 1, + true, + releaseType); + } + + private static TaskExecutorGateway createTaskExecutorGateway(ResourceID taskExecutorId, Collection<Tuple3<ResourceID, JobID, Collection<ResultPartitionID>>> releaseCalls) { + return new TestingTaskExecutorGatewayBuilder() + .setReleasePartitionsConsumer((jobId, partitionIds) -> releaseCalls.add(Tuple3.of(taskExecutorId, jobId, partitionIds))) + .createTestingTaskExecutorGateway(); + } + + private static class TestingShuffleMaster implements ShuffleMaster<ShuffleDescriptor> { + + final Queue<ResultPartitionID> externallyReleasedPartitions = new ArrayBlockingQueue<>(4); + + @Override + public CompletableFuture<ShuffleDescriptor> registerPartitionWithProducer(PartitionDescriptor partitionDescriptor, ProducerDescriptor producerDescriptor) { + return null; + } + + @Override + public void releasePartitionExternally(ShuffleDescriptor shuffleDescriptor) { + externallyReleasedPartitions.add(shuffleDescriptor.getResultPartitionID()); + } + } + +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java new file mode 100644 index 0000000..2bcda7e --- /dev/null +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/TestingPartitionTracker.java @@ -0,0 +1,83 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.io.network.partition; + +import org.apache.flink.runtime.clusterframework.types.ResourceID; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; + +import java.util.Collection; +import java.util.function.BiConsumer; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Test {@link PartitionTracker} implementation. + */ +public class TestingPartitionTracker implements PartitionTracker { + + private Function<ResourceID, Boolean> isTrackingPartitionsForFunction = ignored -> false; + private Consumer<ResourceID> stopTrackingAllPartitionsConsumer = ignored -> {}; + private Consumer<ResourceID> stopTrackingAndReleaseAllPartitionsConsumer = ignored -> {}; + private BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer = (ignoredA, ignoredB) -> {}; + private Consumer<Collection<ResultPartitionID>> stopTrackingAndReleasePartitionsConsumer = ignored -> {}; + + public void setStartTrackingPartitionsConsumer(BiConsumer<ResourceID, ResultPartitionDeploymentDescriptor> startTrackingPartitionsConsumer) { + this.startTrackingPartitionsConsumer = startTrackingPartitionsConsumer; + } + + public void setIsTrackingPartitionsForFunction(Function<ResourceID, Boolean> isTrackingPartitionsForFunction) { + this.isTrackingPartitionsForFunction = isTrackingPartitionsForFunction; + } + + public void setStopTrackingAllPartitionsConsumer(Consumer<ResourceID> stopTrackingAllPartitionsConsumer) { + this.stopTrackingAllPartitionsConsumer = stopTrackingAllPartitionsConsumer; + } + + public void setStopTrackingAndReleaseAllPartitionsConsumer(Consumer<ResourceID> stopTrackingAndReleaseAllPartitionsConsumer) { + this.stopTrackingAndReleaseAllPartitionsConsumer = stopTrackingAndReleaseAllPartitionsConsumer; + } + + public void setStopTrackingAndReleasePartitionsConsumer(Consumer<Collection<ResultPartitionID>> stopTrackingAndReleasePartitionsConsumer) { + this.stopTrackingAndReleasePartitionsConsumer = stopTrackingAndReleasePartitionsConsumer; + } + + @Override + public void startTrackingPartition(ResourceID producingTaskExecutorId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor) { + this.startTrackingPartitionsConsumer.accept(producingTaskExecutorId, resultPartitionDeploymentDescriptor); + } + + @Override + public void stopTrackingPartitionsFor(ResourceID producingTaskExecutorId) { + stopTrackingAllPartitionsConsumer.accept(producingTaskExecutorId); + } + + @Override + public void stopTrackingAndReleasePartitions(Collection<ResultPartitionID> resultPartitionIds) { + stopTrackingAndReleasePartitionsConsumer.accept(resultPartitionIds); + } + + @Override + public void stopTrackingAndReleasePartitionsFor(ResourceID producingTaskExecutorId) { + stopTrackingAndReleaseAllPartitionsConsumer.accept(producingTaskExecutorId); + } + + @Override + public boolean isTrackingPartitionsFor(ResourceID producingTaskExecutorId) { + return isTrackingPartitionsForFunction.apply(producingTaskExecutorId); + } +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 6cc1a77..1291d99 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -64,8 +64,11 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.SimpleSlotContext; +import org.apache.flink.runtime.io.network.partition.NoOpPartitionTracker; +import org.apache.flink.runtime.io.network.partition.PartitionTrackerFactory; import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.io.network.partition.TestingPartitionTracker; import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; @@ -307,7 +310,8 @@ public class JobMasterTest extends TestLogger { testingFatalErrorHandler, JobMasterTest.class.getClassLoader(), schedulerNGFactory, - NettyShuffleMaster.INSTANCE) { + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.FACTORY) { @Override public void declineCheckpoint(DeclineCheckpoint declineCheckpoint) { declineCheckpointMessageFuture.complete(declineCheckpoint.getReason()); @@ -1623,7 +1627,8 @@ public class JobMasterTest extends TestLogger { testingFatalErrorHandler, JobMasterTest.class.getClassLoader(), schedulerNGFactory, - NettyShuffleMaster.INSTANCE) { + NettyShuffleMaster.INSTANCE, + NoOpPartitionTracker.FACTORY) { @Override public CompletableFuture<String> triggerSavepoint( @@ -1788,6 +1793,48 @@ public class JobMasterTest extends TestLogger { return testingResourceManagerGateway; } + @Test + public void testPartitionTableCleanupOnDisconnect() throws Exception { + final JobManagerSharedServices jobManagerSharedServices = new TestingJobManagerSharedServicesBuilder().build(); + final JobGraph jobGraph = createSingleVertexJobGraph(); + + final CompletableFuture<ResourceID> partitionCleanupTaskExecutorId = new CompletableFuture<>(); + final TestingPartitionTracker partitionTracker = new TestingPartitionTracker(); + partitionTracker.setStopTrackingAllPartitionsConsumer(partitionCleanupTaskExecutorId::complete); + + final JobMaster jobMaster = new JobMasterBuilder() + .withConfiguration(configuration) + .withJobGraph(jobGraph) + .withHighAvailabilityServices(haServices) + .withJobManagerSharedServices(jobManagerSharedServices) + .withHeartbeatServices(heartbeatServices) + .withOnCompletionActions(new TestingOnCompletionActions()) + .withPartitionTrackerFactory(ignored -> partitionTracker) + .createJobMaster(); + + final CompletableFuture<JobID> disconnectTaskExecutorFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setDisconnectJobManagerConsumer((jobID, throwable) -> disconnectTaskExecutorFuture.complete(jobID)) + .createTestingTaskExecutorGateway(); + + try { + jobMaster.start(jobMasterId).get(); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + // register a slot to establish a connection + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + registerSlotsAtJobMaster(1, jobMasterGateway, testingTaskExecutorGateway, taskManagerLocation); + + jobMasterGateway.disconnectTaskManager(taskManagerLocation.getResourceID(), new Exception("test")); + disconnectTaskExecutorFuture.get(); + + assertThat(partitionCleanupTaskExecutorId.get(), equalTo(taskManagerLocation.getResourceID())); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + /** * Tests that the job execution is failed if the TaskExecutor disconnects from the * JobMaster. @@ -2092,6 +2139,8 @@ public class JobMasterTest extends TestLogger { private ShuffleMaster<?> shuffleMaster = NettyShuffleMaster.INSTANCE; + private PartitionTrackerFactory partitionTrackerFactory = NoOpPartitionTracker.FACTORY; + private JobMasterBuilder withConfiguration(Configuration configuration) { this.configuration = configuration; return this; @@ -2132,6 +2181,11 @@ public class JobMasterTest extends TestLogger { return this; } + private JobMasterBuilder withPartitionTrackerFactory(PartitionTrackerFactory partitionTrackerFactory) { + this.partitionTrackerFactory = partitionTrackerFactory; + return this; + } + private JobMaster createJobMaster() throws Exception { final JobMasterConfiguration jobMasterConfiguration = JobMasterConfiguration.fromConfiguration(configuration); final SchedulerNGFactory schedulerNGFactory = new LegacySchedulerFactory( @@ -2152,7 +2206,8 @@ public class JobMasterTest extends TestLogger { testingFatalErrorHandler, JobMasterTest.class.getClassLoader(), schedulerNGFactory, - shuffleMaster); + shuffleMaster, + partitionTrackerFactory); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java index 844ef0d..8c20e49 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGateway.java @@ -74,6 +74,8 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { private final Supplier<Boolean> canBeReleasedSupplier; + private final BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer; + TestingTaskExecutorGateway( String address, String hostname, @@ -85,7 +87,8 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { Consumer<ResourceID> heartbeatResourceManagerConsumer, Consumer<Exception> disconnectResourceManagerConsumer, Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction, - Supplier<Boolean> canBeReleasedSupplier) { + Supplier<Boolean> canBeReleasedSupplier, + BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer) { this.address = Preconditions.checkNotNull(address); this.hostname = Preconditions.checkNotNull(hostname); this.heartbeatJobManagerConsumer = Preconditions.checkNotNull(heartbeatJobManagerConsumer); @@ -97,6 +100,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { this.disconnectResourceManagerConsumer = disconnectResourceManagerConsumer; this.cancelTaskFunction = cancelTaskFunction; this.canBeReleasedSupplier = canBeReleasedSupplier; + this.releasePartitionsConsumer = releasePartitionsConsumer; } @Override @@ -106,12 +110,12 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { @Override public CompletableFuture<StackTraceSampleResponse> requestStackTraceSample( - final ExecutionAttemptID executionAttemptId, - final int sampleId, - final int numSamples, - final Time delayBetweenSamples, - final int maxStackTraceDepth, - final Time timeout) { + final ExecutionAttemptID executionAttemptId, + final int sampleId, + final int numSamples, + final Time delayBetweenSamples, + final int maxStackTraceDepth, + final Time timeout) { throw new UnsupportedOperationException(); } @@ -127,7 +131,7 @@ public class TestingTaskExecutorGateway implements TaskExecutorGateway { @Override public void releasePartitions(JobID jobId, Collection<ResultPartitionID> partitionIds) { - // noop + releasePartitionsConsumer.accept(jobId, partitionIds); } @Override diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java index 483f285..770d28e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TestingTaskExecutorGatewayBuilder.java @@ -25,11 +25,13 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; import org.apache.flink.runtime.jobmaster.AllocatedSlotReport; import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; +import java.util.Collection; import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.BiFunction; @@ -50,6 +52,7 @@ public class TestingTaskExecutorGatewayBuilder { private static final Consumer<ResourceID> NOOP_HEARTBEAT_RESOURCE_MANAGER_CONSUMER = ignored -> {}; private static final Consumer<Exception> NOOP_DISCONNECT_RESOURCE_MANAGER_CONSUMER = ignored -> {}; private static final Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> NOOP_CANCEL_TASK_FUNCTION = ignored -> CompletableFuture.completedFuture(Acknowledge.get()); + private static final BiConsumer<JobID, Collection<ResultPartitionID>> NOOP_RELEASE_PARTITIONS_CONSUMER = (ignoredA, ignoredB) -> {}; private String address = "foobar:1234"; private String hostname = "foobar"; @@ -62,6 +65,7 @@ public class TestingTaskExecutorGatewayBuilder { private Consumer<Exception> disconnectResourceManagerConsumer = NOOP_DISCONNECT_RESOURCE_MANAGER_CONSUMER; private Function<ExecutionAttemptID, CompletableFuture<Acknowledge>> cancelTaskFunction = NOOP_CANCEL_TASK_FUNCTION; private Supplier<Boolean> canBeReleasedSupplier = () -> true; + private BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer = NOOP_RELEASE_PARTITIONS_CONSUMER; public TestingTaskExecutorGatewayBuilder setAddress(String address) { this.address = address; @@ -118,6 +122,11 @@ public class TestingTaskExecutorGatewayBuilder { return this; } + public TestingTaskExecutorGatewayBuilder setReleasePartitionsConsumer(BiConsumer<JobID, Collection<ResultPartitionID>> releasePartitionsConsumer) { + this.releasePartitionsConsumer = releasePartitionsConsumer; + return this; + } + public TestingTaskExecutorGateway createTestingTaskExecutorGateway() { return new TestingTaskExecutorGateway( address, @@ -130,6 +139,7 @@ public class TestingTaskExecutorGatewayBuilder { heartbeatResourceManagerConsumer, disconnectResourceManagerConsumer, cancelTaskFunction, - canBeReleasedSupplier); + canBeReleasedSupplier, + releasePartitionsConsumer); } }
