This is an automated email from the ASF dual-hosted git repository.

zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e73225e4071fb7ffe7c4d5cbfd89983129fe3312
Author: Zhu Zhu <[email protected]>
AuthorDate: Thu Jun 23 20:41:01 2022 +0800

    [FLINK-28134][runtime] Rework TaskDeploymentDescriptorFactory to accept an 
execution to deploy
    
    This helps to decouple the task deployment from 
ExecutionVertex#getCurrentExecutionAttempt().
---
 .../runtime/deployment/TaskDeploymentDescriptorFactory.java    | 10 +++++-----
 .../org/apache/flink/runtime/executiongraph/Execution.java     |  4 ++--
 .../runtime/scheduler/adaptivebatch/BlockingResultInfo.java    |  3 +--
 .../deployment/TaskDeploymentDescriptorFactoryTest.java        |  2 +-
 4 files changed, 9 insertions(+), 10 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
index 6da8f4fabca..21f5ef6f069 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactory.java
@@ -253,10 +253,10 @@ public class TaskDeploymentDescriptorFactory {
         }
     }
 
-    public static TaskDeploymentDescriptorFactory fromExecutionVertex(
-            ExecutionVertex executionVertex)
+    public static TaskDeploymentDescriptorFactory fromExecution(Execution 
execution)
             throws IOException, CachedIntermediateDataSetCorruptedException {
-        InternalExecutionGraphAccessor internalExecutionGraphAccessor =
+        final ExecutionVertex executionVertex = execution.getVertex();
+        final InternalExecutionGraphAccessor internalExecutionGraphAccessor =
                 executionVertex.getExecutionGraphAccessor();
         Map<IntermediateDataSetID, ShuffleDescriptor[]> 
clusterPartitionShuffleDescriptors;
         try {
@@ -272,7 +272,7 @@ public class TaskDeploymentDescriptorFactory {
         }
 
         return new TaskDeploymentDescriptorFactory(
-                executionVertex.getCurrentExecutionAttempt().getAttemptId(),
+                execution.getAttemptId(),
                 getSerializedJobInformation(internalExecutionGraphAccessor),
                 getSerializedTaskInformation(
                         
executionVertex.getJobVertex().getTaskInformationOrBlobKey()),
@@ -338,7 +338,7 @@ public class TaskDeploymentDescriptorFactory {
     public static ShuffleDescriptor getConsumedPartitionShuffleDescriptor(
             IntermediateResultPartition consumedPartition,
             PartitionLocationConstraint partitionDeploymentConstraint) {
-        Execution producer = 
consumedPartition.getProducer().getCurrentExecutionAttempt();
+        Execution producer = 
consumedPartition.getProducer().getPartitionProducer();
 
         ExecutionState producerState = producer.getState();
         Optional<ResultPartitionDeploymentDescriptor> 
consumedPartitionDescriptor =
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
index 459d9861980..96c0d3c119a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java
@@ -564,13 +564,13 @@ public class Execution
                     "Deploying {} (attempt #{}) with attempt id {} and vertex 
id {} to {} with allocation id {}",
                     vertex.getTaskNameWithSubtaskIndex(),
                     getAttemptNumber(),
-                    vertex.getCurrentExecutionAttempt().getAttemptId(),
+                    attemptId,
                     vertex.getID(),
                     getAssignedResourceLocation(),
                     slot.getAllocationId());
 
             final TaskDeploymentDescriptor deployment =
-                    TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex)
+                    TaskDeploymentDescriptorFactory.fromExecution(this)
                             .createDeploymentDescriptor(
                                     slot.getAllocationId(),
                                     taskRestore,
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
index 11a45a6d7a8..302980ab216 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/BlockingResultInfo.java
@@ -68,8 +68,7 @@ public class BlockingResultInfo {
         for (IntermediateResultPartition partition : 
intermediateResult.getPartitions()) {
             checkState(partition.isConsumable());
 
-            IOMetrics ioMetrics =
-                    
partition.getProducer().getCurrentExecutionAttempt().getIOMetrics();
+            IOMetrics ioMetrics = 
partition.getProducer().getPartitionProducer().getIOMetrics();
             checkNotNull(ioMetrics, "IOMetrics should not be null.");
 
             blockingPartitionSizes.add(
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
index 7dba9971c56..1074da2d340 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorFactoryTest.java
@@ -164,7 +164,7 @@ public class TaskDeploymentDescriptorFactoryTest extends 
TestLogger {
     private static TaskDeploymentDescriptor 
createTaskDeploymentDescriptor(ExecutionVertex ev)
             throws IOException, CachedIntermediateDataSetCorruptedException {
 
-        return TaskDeploymentDescriptorFactory.fromExecutionVertex(ev)
+        return 
TaskDeploymentDescriptorFactory.fromExecution(ev.getCurrentExecutionAttempt())
                 .createDeploymentDescriptor(new AllocationID(), null, 
Collections.emptyList());
     }
 

Reply via email to