This is an automated email from the ASF dual-hosted git repository.

dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 758ce721589 [FLINK-29852][Runtime] Fix AdaptiveScheduler add operator 
repeatedly in json plan.
758ce721589 is described below

commit 758ce72158922e00ed7078e00706f8502145e461
Author: Weihua Hu <[email protected]>
AuthorDate: Thu Mar 2 14:29:49 2023 +0800

    [FLINK-29852][Runtime] Fix AdaptiveScheduler add operator repeatedly in 
json plan.
---
 .../runtime/scheduler/adaptive/CreatingExecutionGraph.java | 14 ++++++--------
 .../runtime/scheduler/adaptive/AdaptiveSchedulerTest.java  | 10 +++++++++-
 2 files changed, 15 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index 457b6866a9e..0ec7437bb85 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
 import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
 import org.apache.flink.runtime.jobgraph.JobType;
 import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
@@ -32,6 +33,7 @@ import 
org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
 import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
 import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
 import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.Preconditions;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -44,7 +46,6 @@ import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executor;
 import java.util.concurrent.ScheduledFuture;
-import java.util.stream.StreamSupport;
 
 /**
  * State which waits for the creation of the {@link ExecutionGraph}. If the 
creation fails, then the
@@ -119,19 +120,16 @@ public class CreatingExecutionGraph implements State {
                 operatorCoordinatorHandler.initializeOperatorCoordinators(
                         context.getMainThreadExecutor(), 
context.getMetricGroup());
                 operatorCoordinatorHandler.startAllOperatorCoordinators();
-                String updatedPlan =
+                final String updatedPlan =
                         JsonPlanGenerator.generatePlan(
                                 executionGraph.getJobID(),
                                 executionGraph.getJobName(),
                                 JobType.STREAMING, // Adaptive scheduler works 
only with STREAMING
                                 // jobs
                                 () ->
-                                        StreamSupport.stream(
-                                                        executionGraph
-                                                                
.getAllExecutionVertices()
-                                                                .spliterator(),
-                                                        false)
-                                                .map(v -> 
v.getJobVertex().getJobVertex())
+                                        IterableUtils.toStream(
+                                                        
executionGraph.getVerticesTopologically())
+                                                
.map(ExecutionJobVertex::getJobVertex)
                                                 .iterator(),
                                 
executionGraphWithVertexParallelism.getVertexParallelism());
                 executionGraph.setJsonPlan(updatedPlan);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index bedb12a20b3..e6d457a8ad7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -95,6 +95,7 @@ import 
org.apache.flink.testutils.executor.TestExecutorResource;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.IterableUtils;
 import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
 
 import org.junit.ClassRule;
 import org.junit.Test;
@@ -294,7 +295,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
                         .setJobMasterConfiguration(configuration)
                         .build(EXECUTOR_RESOURCE.getExecutor());
 
-        final int numAvailableSlots = 1;
+        final int numAvailableSlots = 2;
 
         final SubmissionBufferingTaskManagerGateway taskManagerGateway =
                 new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
@@ -321,6 +322,13 @@ public class AdaptiveSchedulerTest extends TestLogger {
 
         
assertThat(executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism())
                 .isEqualTo(numAvailableSlots);
+
+        assertThat(
+                        JacksonMapperFactory.createObjectMapper()
+                                .readTree(executionGraph.getJsonPlan())
+                                .get("nodes")
+                                .size())
+                .isEqualTo(1);
     }
 
     @Test

Reply via email to