zhuzhurk commented on a change in pull request #18023:
URL: https://github.com/apache/flink/pull/18023#discussion_r775428822
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java
##########
@@ -210,4 +210,6 @@ void enableCheckpointing(
@Nonnull
ComponentMainThreadExecutor getJobMasterMainThreadExecutor();
+
+ void initializeJobVertex(ExecutionJobVertex ejv, long createTimestamp)
throws JobException;
Review comment:
Better to add some documentations for the new method.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/DefaultVertexParallelismInfoTest.java
##########
@@ -41,15 +41,15 @@ public void parallelismInvalid() {
assertThrows(
"parallelism is not in valid bounds",
IllegalArgumentException.class,
- () -> new DefaultVertexParallelismInfo(-1, 1, ALWAYS_VALID));
+ () -> new DefaultVertexParallelismInfo(-2, 1, ALWAYS_VALID));
}
@Test
public void maxParallelismInvalid() {
assertThrows(
"max parallelism is not in valid bounds",
IllegalArgumentException.class,
- () -> new DefaultVertexParallelismInfo(1, -1, ALWAYS_VALID));
+ () -> new DefaultVertexParallelismInfo(1, -2, ALWAYS_VALID));
}
@Test
Review comment:
Can we have some tests for the modification of
`DefaultVertexParallelismInfo`? e.g. `testSetParallelism`,
`testCreateWithUnconfiguredParallelism`?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphConstructionTest.java
##########
@@ -422,4 +429,25 @@ public void
testRegisterConsumedPartitionGroupToEdgeManager() throws Exception {
partitionIds,
containsInAnyOrder(partition1.getPartitionId(),
partition2.getPartitionId()));
}
+
+ @Test
+ public void testAttachDynamicGraph() throws Exception {
Review comment:
`testAttachDynamicGraph` -> `testAttachToDynamicGraph `
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/DefaultExecutionGraphFactory.java
##########
@@ -104,6 +104,7 @@ public ExecutionGraph createAndRestoreExecutionGraph(
long initializationTimestamp,
VertexAttemptNumberStore vertexAttemptNumberStore,
VertexParallelismStore vertexParallelismStore,
+ boolean isDynamicGraph,
Review comment:
Maybe make it a field of `DefaultExecutionGraphFactory` which is set
when constructed. (And add a new constructor for it and the old ctor always set
`isDynamic` to false.)
In this way, we can avoid modifying many usages of it.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
##########
@@ -115,19 +116,15 @@
private Either<SerializedValue<TaskInformation>, PermanentBlobKey>
taskInformationOrBlobKey =
null;
- private final Collection<OperatorCoordinatorHolder> operatorCoordinators;
+ @Nullable private Collection<OperatorCoordinatorHolder>
operatorCoordinators;
- private InputSplitAssigner splitAssigner;
+ @Nullable private InputSplitAssigner splitAssigner;
@VisibleForTesting
- public ExecutionJobVertex(
+ protected ExecutionJobVertex(
Review comment:
can be package private, or maybe leave it as public?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java
##########
@@ -167,7 +167,7 @@ public static DefaultExecutionTopology fromExecutionGraph(
IndexedPipelinedRegions indexedPipelinedRegions =
computePipelinedRegions(
- logicalPipelinedRegions,
Review comment:
What's this change for? Let's remove it from this PR if it's an
unrelated change.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -263,22 +265,54 @@ public static int getDefaultMaxParallelism(JobVertex
vertex) {
normalizeParallelism(vertex.getParallelism()));
}
+ // TODO: move to Adaptive Batch Scheduler.
+ /**
+ * Compute the {@link VertexParallelismStore} for all given vertices in a
dynamic graph, which
+ * will set defaults and ensure that the returned store contains valid
parallelisms, with the
+ * configured default max parallelism.
+ *
+ * @param vertices the vertices to compute parallelism for
+ * @param defaultMaxParallelism the global default max parallelism
+ * @return the computed parallelism store
+ */
+ @VisibleForTesting
+ public static VertexParallelismStore
computeVertexParallelismStoreForDynamicGraph(
+ Iterable<JobVertex> vertices, int defaultMaxParallelism) {
+ // for dynamic graph, there is no need to normalize vertex
parallelism. if the max
+ // parallelism is not configured and the parallelism is a positive
value, max
+ // parallelism can be computed against the parallelism, otherwise it
need to use the
+ // global default max parallelism.
+ return computeVertexParallelismStore(
+ vertices,
+ v -> {
+ if (v.getParallelism() > 0) {
+ return getDefaultMaxParallelism(v);
+ } else {
+ return defaultMaxParallelism;
+ }
+ },
+ Function.identity());
+ }
+
/**
* Compute the {@link VertexParallelismStore} for all given vertices,
which will set defaults
* and ensure that the returned store contains valid parallelisms, with a
custom function for
- * default max parallelism calculation.
+ * default max parallelism calculation and a custom function for
normalizing vertex parallelism.
*
* @param vertices the vertices to compute parallelism for
* @param defaultMaxParallelismFunc a function for computing a default max
parallelism if none
* is specified on a given vertex
+ * @param normalizeParallelismFunc a function for normalizing vertex
parallelism
* @return the computed parallelism store
*/
public static VertexParallelismStore computeVertexParallelismStore(
Review comment:
Maybe add a new method instead change the existing one. So that we do
not need to modify `AdaptiveScheduler` nor make `normalizeParallelism()` public.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/TestingDefaultExecutionGraphBuilder.java
##########
@@ -156,6 +157,11 @@ public TestingDefaultExecutionGraphBuilder
setVertexParallelismStore(
return this;
}
+ public TestingDefaultExecutionGraphBuilder setDynamicGraph(boolean
dynamicGraph) {
Review comment:
NIT: Maybe add a new method `buildDynamicGraph()` as a different build
method?
I just feel `setDynamicGraph` sounds a bit weird.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraph.java
##########
@@ -762,18 +762,38 @@ public void setInternalTaskFailuresListener(
@Override
public void attachJobGraph(List<JobVertex> topologicallySorted) throws
JobException {
+ if (isDynamic) {
Review comment:
Maybe move changes of this method to commit "Enable to create a dynamic
graph" and make that commit the last one.
In this way, the generally purpose of this PR is to support dynamic graphs.
I think this better summarizes the overall changes.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -263,22 +265,54 @@ public static int getDefaultMaxParallelism(JobVertex
vertex) {
normalizeParallelism(vertex.getParallelism()));
}
+ // TODO: move to Adaptive Batch Scheduler.
+ /**
+ * Compute the {@link VertexParallelismStore} for all given vertices in a
dynamic graph, which
+ * will set defaults and ensure that the returned store contains valid
parallelisms, with the
+ * configured default max parallelism.
+ *
+ * @param vertices the vertices to compute parallelism for
+ * @param defaultMaxParallelism the global default max parallelism
+ * @return the computed parallelism store
+ */
+ @VisibleForTesting
+ public static VertexParallelismStore
computeVertexParallelismStoreForDynamicGraph(
+ Iterable<JobVertex> vertices, int defaultMaxParallelism) {
+ // for dynamic graph, there is no need to normalize vertex
parallelism. if the max
+ // parallelism is not configured and the parallelism is a positive
value, max
+ // parallelism can be computed against the parallelism, otherwise it
need to use the
Review comment:
need -> needs
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertexTest.java
##########
@@ -41,4 +50,56 @@ public void testParallelismGreaterThanMaxParallelism() {
JobException.class,
() ->
ExecutionGraphTestUtils.getExecutionJobVertex(jobVertex));
}
+
+ @Test
+ public void testLazyInitialization() throws Exception {
+ final JobVertex jobVertex = new JobVertex("testVertex");
+ jobVertex.setInvokableClass(AbstractInvokable.class);
+ jobVertex.createAndAddResultDataSet(ResultPartitionType.BLOCKING);
+
+ final DefaultExecutionGraph eg =
TestingDefaultExecutionGraphBuilder.newBuilder().build();
+ final DefaultVertexParallelismInfo vertexParallelismInfo =
+ new DefaultVertexParallelismInfo(3, 8, max ->
Optional.empty());
+ final ExecutionJobVertex ejv = new ExecutionJobVertex(eg, jobVertex,
vertexParallelismInfo);
+
+ assertThat(ejv.isInitialized(), is(false));
+
+ assertThat(ejv.getTaskVertices().length, is(0));
+
+ try {
+ ejv.getInputs();
+ Assert.fail("failure is expected");
+ } catch (IllegalStateException e) {
+ // ignore
+ }
+
+ try {
+ ejv.getProducedDataSets();
+ Assert.fail("failure is expected");
+ } catch (IllegalStateException e) {
+ // ignore
+ }
+
+ try {
+ ejv.getOperatorCoordinators();
+ Assert.fail("failure is expected");
+ } catch (IllegalStateException e) {
+ // ignore
+ }
+
Review comment:
maybe add checks for all the methods that are guarded by
`check(isInitialized())`?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -263,22 +265,54 @@ public static int getDefaultMaxParallelism(JobVertex
vertex) {
normalizeParallelism(vertex.getParallelism()));
}
+ // TODO: move to Adaptive Batch Scheduler.
+ /**
+ * Compute the {@link VertexParallelismStore} for all given vertices in a
dynamic graph, which
+ * will set defaults and ensure that the returned store contains valid
parallelisms, with the
+ * configured default max parallelism.
+ *
+ * @param vertices the vertices to compute parallelism for
+ * @param defaultMaxParallelism the global default max parallelism
+ * @return the computed parallelism store
+ */
+ @VisibleForTesting
+ public static VertexParallelismStore
computeVertexParallelismStoreForDynamicGraph(
Review comment:
I would suggest to add this method to `ExecutionJobVertexTest` for now
and later add it to `AdaptiveBatchScheduler`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionJobVertex.java
##########
@@ -307,14 +323,24 @@ public JobVertexID getJobVertexId() {
@Override
public ExecutionVertex[] getTaskVertices() {
+ if (taskVertices == null) {
+ // The REST/web may try to get execution vertices of an
uninitialized job vertex. Using
+ // DEBUG log level to avoid flooding logs.
+ LOG.debug(
+ "Trying to get execution vertices of an uninitialized job
vertex "
+ + getJobVertexId());
+ return new ExecutionVertex[0];
+ }
return taskVertices;
}
public IntermediateResult[] getProducedDataSets() {
+ checkState(isInitialized());
Review comment:
maybe also add this check to all the methods that we expect only to be
invoked after the job vertex is initialized? Those I noticed are
`connectToPredecessors()`, `executionVertexFinished()`,
`executionVertexUnFinished()`.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]