zhuzhurk commented on a change in pull request #18023:
URL: https://github.com/apache/flink/pull/18023#discussion_r775428822



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -210,4 +210,6 @@ void enableCheckpointing(
 
     @Nonnull
     ComponentMainThreadExecutor getJobMasterMainThreadExecutor();
+
+    void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp) 
throws JobException;

Review comment:
       Better to add some documentations for the new method.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfoTest.java
##########
@@ -41,15 +41,15 @@ public void parallelismInvalid() {
         assertThrows(
                 "parallelism is not in valid bounds",
                 IllegalArgumentException.class,
-                () -> new DefaultVertexParallelismInfo(-1, 1, ALWAYS_VALID));
+                () -> new DefaultVertexParallelismInfo(-2, 1, ALWAYS_VALID));
     }
 
     @Test
     public void maxParallelismInvalid() {
         assertThrows(
                 "max parallelism is not in valid bounds",
                 IllegalArgumentException.class,
-                () -> new DefaultVertexParallelismInfo(1, -1, ALWAYS_VALID));
+                () -> new DefaultVertexParallelismInfo(1, -2, ALWAYS_VALID));
     }
 
     @Test

Review comment:
       Can we have some tests for the modification of 
`DefaultVertexParallelismInfo`? e.g. `testSetParallelism`, 
`testCreateWithUnconfiguredParallelism`? 

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
##########
@@ -422,4 +429,25 @@ public void 
testRegisterConsumedPartitionGroupToEdgeManager() throws Exception {
                 partitionIds,
                 containsInAnyOrder(partition1.getPartitionId(), 
partition2.getPartitionId()));
     }
+
+    @Test
+    public void testAttachDynamicGraph() throws Exception {

Review comment:
       `testAttachDynamicGraph` -> `testAttachToDynamicGraph `

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
##########
@@ -104,6 +104,7 @@ public ExecutionGraph createAndRestoreExecutionGraph(
             long initializationTimestamp,
             VertexAttemptNumberStore vertexAttemptNumberStore,
             VertexParallelismStore vertexParallelismStore,
+            boolean isDynamicGraph,

Review comment:
       Maybe make it a field of `DefaultExecutionGraphFactory` which is set 
when constructed. (And add a new constructor for it and the old ctor always set 
`isDynamic` to false.)
   In this way, we can avoid modifying many usages of it.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
##########
@@ -115,19 +116,15 @@
     private Either<SerializedValue<TaskInformation>, PermanentBlobKey> 
taskInformationOrBlobKey =
             null;
 
-    private final Collection<OperatorCoordinatorHolder> operatorCoordinators;
+    @Nullable private Collection<OperatorCoordinatorHolder> 
operatorCoordinators;
 
-    private InputSplitAssigner splitAssigner;
+    @Nullable private InputSplitAssigner splitAssigner;
 
     @VisibleForTesting
-    public ExecutionJobVertex(
+    protected ExecutionJobVertex(

Review comment:
       can be package private, or maybe leave it as public?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
##########
@@ -167,7 +167,7 @@ public static DefaultExecutionTopology fromExecutionGraph(
 
         IndexedPipelinedRegions indexedPipelinedRegions =
                 computePipelinedRegions(
-                        logicalPipelinedRegions,

Review comment:
       What's this change for? Let's remove it from this PR if it's an 
unrelated change.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -263,22 +265,54 @@ public static int getDefaultMaxParallelism(JobVertex 
vertex) {
                 normalizeParallelism(vertex.getParallelism()));
     }
 
+    // TODO: move to Adaptive Batch Scheduler.
+    /**
+     * Compute the {@link VertexParallelismStore} for all given vertices in a 
dynamic graph, which
+     * will set defaults and ensure that the returned store contains valid 
parallelisms, with the
+     * configured default max parallelism.
+     *
+     * @param vertices the vertices to compute parallelism for
+     * @param defaultMaxParallelism the global default max parallelism
+     * @return the computed parallelism store
+     */
+    @VisibleForTesting
+    public static VertexParallelismStore 
computeVertexParallelismStoreForDynamicGraph(
+            Iterable<JobVertex> vertices, int defaultMaxParallelism) {
+        // for dynamic graph, there is no need to normalize vertex 
parallelism. if the max
+        // parallelism is not configured and the parallelism is a positive 
value, max
+        // parallelism can be computed against the parallelism, otherwise it 
need to use the
+        // global default max parallelism.
+        return computeVertexParallelismStore(
+                vertices,
+                v -> {
+                    if (v.getParallelism() > 0) {
+                        return getDefaultMaxParallelism(v);
+                    } else {
+                        return defaultMaxParallelism;
+                    }
+                },
+                Function.identity());
+    }
+
     /**
      * Compute the {@link VertexParallelismStore} for all given vertices, 
which will set defaults
      * and ensure that the returned store contains valid parallelisms, with a 
custom function for
-     * default max parallelism calculation.
+     * default max parallelism calculation and a custom function for 
normalizing vertex parallelism.
      *
      * @param vertices the vertices to compute parallelism for
      * @param defaultMaxParallelismFunc a function for computing a default max 
parallelism if none
      *     is specified on a given vertex
+     * @param normalizeParallelismFunc a function for normalizing vertex 
parallelism
      * @return the computed parallelism store
      */
     public static VertexParallelismStore computeVertexParallelismStore(

Review comment:
       Maybe add a new method instead change the existing one. So that we do 
not need to modify `AdaptiveScheduler` nor make `normalizeParallelism()` public.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
##########
@@ -156,6 +157,11 @@ public TestingDefaultExecutionGraphBuilder 
setVertexParallelismStore(
         return this;
     }
 
+    public TestingDefaultExecutionGraphBuilder setDynamicGraph(boolean 
dynamicGraph) {

Review comment:
       NIT: Maybe add a new method `buildDynamicGraph()` as a different build 
method?
   I just feel `setDynamicGraph` sounds a bit weird.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -762,18 +762,38 @@ public void setInternalTaskFailuresListener(
 
     @Override
     public void attachJobGraph(List<JobVertex> topologicallySorted) throws 
JobException {
+        if (isDynamic) {

Review comment:
       Maybe move changes of this method to commit "Enable to create a dynamic 
graph" and make that commit the last one.
   In this way, the generally purpose of this PR is to support dynamic graphs. 
I think this better summarizes the overall changes.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -263,22 +265,54 @@ public static int getDefaultMaxParallelism(JobVertex 
vertex) {
                 normalizeParallelism(vertex.getParallelism()));
     }
 
+    // TODO: move to Adaptive Batch Scheduler.
+    /**
+     * Compute the {@link VertexParallelismStore} for all given vertices in a 
dynamic graph, which
+     * will set defaults and ensure that the returned store contains valid 
parallelisms, with the
+     * configured default max parallelism.
+     *
+     * @param vertices the vertices to compute parallelism for
+     * @param defaultMaxParallelism the global default max parallelism
+     * @return the computed parallelism store
+     */
+    @VisibleForTesting
+    public static VertexParallelismStore 
computeVertexParallelismStoreForDynamicGraph(
+            Iterable<JobVertex> vertices, int defaultMaxParallelism) {
+        // for dynamic graph, there is no need to normalize vertex 
parallelism. if the max
+        // parallelism is not configured and the parallelism is a positive 
value, max
+        // parallelism can be computed against the parallelism, otherwise it 
need to use the

Review comment:
       need -> needs

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
##########
@@ -41,4 +50,56 @@ public void testParallelismGreaterThanMaxParallelism() {
                 JobException.class,
                 () -> 
ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex));
     }
+
+    @Test
+    public void testLazyInitialization() throws Exception {
+        final JobVertex jobVertex = new JobVertex("testVertex");
+        jobVertex.setInvokableClass(AbstractInvokable.class);
+        jobVertex.createAndAddResultDataSet(ResultPartitionType.BLOCKING);
+
+        final DefaultExecutionGraph eg = 
TestingDefaultExecutionGraphBuilder.newBuilder().build();
+        final DefaultVertexParallelismInfo vertexParallelismInfo =
+                new DefaultVertexParallelismInfo(3, 8, max -> 
Optional.empty());
+        final ExecutionJobVertex ejv = new ExecutionJobVertex(eg, jobVertex, 
vertexParallelismInfo);
+
+        assertThat(ejv.isInitialized(), is(false));
+
+        assertThat(ejv.getTaskVertices().length, is(0));
+
+        try {
+            ejv.getInputs();
+            Assert.fail("failure is expected");
+        } catch (IllegalStateException e) {
+            // ignore
+        }
+
+        try {
+            ejv.getProducedDataSets();
+            Assert.fail("failure is expected");
+        } catch (IllegalStateException e) {
+            // ignore
+        }
+
+        try {
+            ejv.getOperatorCoordinators();
+            Assert.fail("failure is expected");
+        } catch (IllegalStateException e) {
+            // ignore
+        }
+

Review comment:
       maybe add checks for all the methods that are guarded by 
`check(isInitialized())`?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -263,22 +265,54 @@ public static int getDefaultMaxParallelism(JobVertex 
vertex) {
                 normalizeParallelism(vertex.getParallelism()));
     }
 
+    // TODO: move to Adaptive Batch Scheduler.
+    /**
+     * Compute the {@link VertexParallelismStore} for all given vertices in a 
dynamic graph, which
+     * will set defaults and ensure that the returned store contains valid 
parallelisms, with the
+     * configured default max parallelism.
+     *
+     * @param vertices the vertices to compute parallelism for
+     * @param defaultMaxParallelism the global default max parallelism
+     * @return the computed parallelism store
+     */
+    @VisibleForTesting
+    public static VertexParallelismStore 
computeVertexParallelismStoreForDynamicGraph(

Review comment:
       I would suggest to add this method to `ExecutionJobVertexTest` for now 
and later add it to `AdaptiveBatchScheduler`. 

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
##########
@@ -307,14 +323,24 @@ public JobVertexID getJobVertexId() {
 
     @Override
     public ExecutionVertex[] getTaskVertices() {
+        if (taskVertices == null) {
+            // The REST/web may try to get execution vertices of an 
uninitialized job vertex. Using
+            // DEBUG log level to avoid flooding logs.
+            LOG.debug(
+                    "Trying to get execution vertices of an uninitialized job 
vertex "
+                            + getJobVertexId());
+            return new ExecutionVertex[0];
+        }
         return taskVertices;
     }
 
     public IntermediateResult[] getProducedDataSets() {
+        checkState(isInitialized());

Review comment:
       maybe also add this check to all the methods that we expect only to be 
invoked after the job vertex is initialized? Those I noticed are 
`connectToPredecessors()`, `executionVertexFinished()`, 
`executionVertexUnFinished()`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to