This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit e73225e4071fb7ffe7c4d5cbfd89983129fe3312 Author: Zhu Zhu <[email protected]> AuthorDate: Thu Jun 23 20:41:01 2022 +0800 [FLINK-28134][runtime] Rework TaskDeploymentDescriptorFactory to accept an execution to deploy This helps to decouple the task deployment from ExecutionVertex#getCurrentExecutionAttempt(). --- .../runtime/deployment/TaskDeploymentDescriptorFactory.java | 10 +++++----- .../org/apache/flink/runtime/executiongraph/Execution.java | 4 ++-- .../runtime/scheduler/adaptivebatch/BlockingResultInfo.java | 3 +-- .../deployment/TaskDeploymentDescriptorFactoryTest.java | 2 +- 4 files changed, 9 insertions(+), 10 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java index 6da8f4fabca..21f5ef6f069 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java @@ -253,10 +253,10 @@ public class TaskDeploymentDescriptorFactory { } } - public static TaskDeploymentDescriptorFactory fromExecutionVertex( - ExecutionVertex executionVertex) + public static TaskDeploymentDescriptorFactory fromExecution(Execution execution) throws IOException, CachedIntermediateDataSetCorruptedException { - InternalExecutionGraphAccessor internalExecutionGraphAccessor = + final ExecutionVertex executionVertex = execution.getVertex(); + final InternalExecutionGraphAccessor internalExecutionGraphAccessor = executionVertex.getExecutionGraphAccessor(); Map<IntermediateDataSetID, ShuffleDescriptor[]> clusterPartitionShuffleDescriptors; try { @@ -272,7 +272,7 @@ public class TaskDeploymentDescriptorFactory { } return new TaskDeploymentDescriptorFactory( - executionVertex.getCurrentExecutionAttempt().getAttemptId(), + execution.getAttemptId(), getSerializedJobInformation(internalExecutionGraphAccessor), getSerializedTaskInformation( executionVertex.getJobVertex().getTaskInformationOrBlobKey()), @@ -338,7 +338,7 @@ public class TaskDeploymentDescriptorFactory { public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor( IntermediateResultPartition consumedPartition, PartitionLocationConstraint partitionDeploymentConstraint) { - Execution producer = consumedPartition.getProducer().getCurrentExecutionAttempt(); + Execution producer = consumedPartition.getProducer().getPartitionProducer(); ExecutionState producerState = producer.getState(); Optional<ResultPartitionDeploymentDescriptor> consumedPartitionDescriptor = diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 459d9861980..96c0d3c119a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -564,13 +564,13 @@ public class Execution "Deploying {} (attempt #{}) with attempt id {} and vertex id {} to {} with allocation id {}", vertex.getTaskNameWithSubtaskIndex(), getAttemptNumber(), - vertex.getCurrentExecutionAttempt().getAttemptId(), + attemptId, vertex.getID(), getAssignedResourceLocation(), slot.getAllocationId()); final TaskDeploymentDescriptor deployment = - TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex) + TaskDeploymentDescriptorFactory.fromExecution(this) .createDeploymentDescriptor( slot.getAllocationId(), taskRestore, diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java index 11a45a6d7a8..302980ab216 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java @@ -68,8 +68,7 @@ public class BlockingResultInfo { for (IntermediateResultPartition partition : intermediateResult.getPartitions()) { checkState(partition.isConsumable()); - IOMetrics ioMetrics = - partition.getProducer().getCurrentExecutionAttempt().getIOMetrics(); + IOMetrics ioMetrics = partition.getProducer().getPartitionProducer().getIOMetrics(); checkNotNull(ioMetrics, "IOMetrics should not be null."); blockingPartitionSizes.add( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java index 7dba9971c56..1074da2d340 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java @@ -164,7 +164,7 @@ public class TaskDeploymentDescriptorFactoryTest extends TestLogger { private static TaskDeploymentDescriptor createTaskDeploymentDescriptor(ExecutionVertex ev) throws IOException, CachedIntermediateDataSetCorruptedException { - return TaskDeploymentDescriptorFactory.fromExecutionVertex(ev) + return TaskDeploymentDescriptorFactory.fromExecution(ev.getCurrentExecutionAttempt()) .createDeploymentDescriptor(new AllocationID(), null, Collections.emptyList()); }
