This is an automated email from the ASF dual-hosted git repository.
dmvk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 758ce721589 [FLINK-29852][Runtime] Fix AdaptiveScheduler add operator
repeatedly in json plan.
758ce721589 is described below
commit 758ce72158922e00ed7078e00706f8502145e461
Author: Weihua Hu <[email protected]>
AuthorDate: Thu Mar 2 14:29:49 2023 +0800
[FLINK-29852][Runtime] Fix AdaptiveScheduler add operator repeatedly in
json plan.
---
.../runtime/scheduler/adaptive/CreatingExecutionGraph.java | 14 ++++++--------
.../runtime/scheduler/adaptive/AdaptiveSchedulerTest.java | 10 +++++++++-
2 files changed, 15 insertions(+), 9 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
index 457b6866a9e..0ec7437bb85 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptive/CreatingExecutionGraph.java
@@ -23,6 +23,7 @@ import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import org.apache.flink.runtime.execution.ExecutionState;
import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
import org.apache.flink.runtime.executiongraph.ExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionJobVertex;
import org.apache.flink.runtime.executiongraph.ExecutionVertex;
import org.apache.flink.runtime.jobgraph.JobType;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
@@ -32,6 +33,7 @@ import
org.apache.flink.runtime.scheduler.ExecutionGraphHandler;
import org.apache.flink.runtime.scheduler.GlobalFailureHandler;
import org.apache.flink.runtime.scheduler.OperatorCoordinatorHandler;
import org.apache.flink.runtime.scheduler.adaptive.allocator.VertexParallelism;
+import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.concurrent.FutureUtils;
@@ -44,7 +46,6 @@ import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executor;
import java.util.concurrent.ScheduledFuture;
-import java.util.stream.StreamSupport;
/**
* State which waits for the creation of the {@link ExecutionGraph}. If the
creation fails, then the
@@ -119,19 +120,16 @@ public class CreatingExecutionGraph implements State {
operatorCoordinatorHandler.initializeOperatorCoordinators(
context.getMainThreadExecutor(),
context.getMetricGroup());
operatorCoordinatorHandler.startAllOperatorCoordinators();
- String updatedPlan =
+ final String updatedPlan =
JsonPlanGenerator.generatePlan(
executionGraph.getJobID(),
executionGraph.getJobName(),
JobType.STREAMING, // Adaptive scheduler works
only with STREAMING
// jobs
() ->
- StreamSupport.stream(
- executionGraph
-
.getAllExecutionVertices()
- .spliterator(),
- false)
- .map(v ->
v.getJobVertex().getJobVertex())
+ IterableUtils.toStream(
+
executionGraph.getVerticesTopologically())
+
.map(ExecutionJobVertex::getJobVertex)
.iterator(),
executionGraphWithVertexParallelism.getVertexParallelism());
executionGraph.setJsonPlan(updatedPlan);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
index bedb12a20b3..e6d457a8ad7 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/adaptive/AdaptiveSchedulerTest.java
@@ -95,6 +95,7 @@ import
org.apache.flink.testutils.executor.TestExecutorResource;
import org.apache.flink.util.FlinkException;
import org.apache.flink.util.IterableUtils;
import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.jackson.JacksonMapperFactory;
import org.junit.ClassRule;
import org.junit.Test;
@@ -294,7 +295,7 @@ public class AdaptiveSchedulerTest extends TestLogger {
.setJobMasterConfiguration(configuration)
.build(EXECUTOR_RESOURCE.getExecutor());
- final int numAvailableSlots = 1;
+ final int numAvailableSlots = 2;
final SubmissionBufferingTaskManagerGateway taskManagerGateway =
new SubmissionBufferingTaskManagerGateway(numAvailableSlots);
@@ -321,6 +322,13 @@ public class AdaptiveSchedulerTest extends TestLogger {
assertThat(executionGraph.getJobVertex(JOB_VERTEX.getID()).getParallelism())
.isEqualTo(numAvailableSlots);
+
+ assertThat(
+ JacksonMapperFactory.createObjectMapper()
+ .readTree(executionGraph.getJsonPlan())
+ .get("nodes")
+ .size())
+ .isEqualTo(1);
}
@Test