zhuzhurk commented on code in PR #25658:
URL: https://github.com/apache/flink/pull/25658#discussion_r1875863192
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java:
##########
@@ -377,6 +377,7 @@ private void setVertexNonChainedOutputsConfig(
distributionPattern,
edge.getPartitioner().isBroadcast(),
edge.getPartitioner().getClass().equals(ForwardPartitioner.class));
+ dataSet.increaseNumJobEdgesToCreate();
Review Comment:
This method is introduced in a later commit.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/NonAdaptiveExecutionHandler.java:
##########
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
+import org.apache.flink.runtime.jobgraph.forwardgroup.JobVertexForwardGroup;
+import org.apache.flink.runtime.jobmaster.event.JobEvent;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/**
+ * A {@code NonAdaptiveExecutionHandler} implements the {@link
AdaptiveExecutionHandler} interface
+ * to provide an immutable static job graph.
+ */
+public class NonAdaptiveExecutionHandler implements AdaptiveExecutionHandler {
+
+ private final Logger log = LoggerFactory.getLogger(getClass());
+ private final JobGraph jobGraph;
+ private final Map<JobVertexID, JobVertexForwardGroup>
forwardGroupsByJobVertexId;
+
+ public NonAdaptiveExecutionHandler(JobGraph jobGraph) {
+ this.jobGraph = checkNotNull(jobGraph);
+ this.forwardGroupsByJobVertexId =
+
ForwardGroupComputeUtil.computeForwardGroupsAndCheckParallelism(
+
getJobGraph().getVerticesSortedTopologicallyFromSources());
+ }
+
+ @Override
+ public JobGraph getJobGraph() {
+ return jobGraph;
+ }
+
+ @Override
+ public void handleJobEvent(JobEvent jobEvent) {
+ // do nothing
+ }
+
+ @Override
+ public void registerJobGraphUpdateListener(JobGraphUpdateListener
listener) {
+ // do nothing
+ }
+
+ @Override
+ public int getInitialParallelism(JobVertexID jobVertexId) {
+ JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
+ int vertexInitialParallelism = jobVertex.getParallelism();
+ JobVertexForwardGroup forwardGroup =
forwardGroupsByJobVertexId.get(jobVertexId);
+
+ if (jobVertex.getParallelism() == ExecutionConfig.PARALLELISM_DEFAULT
+ && forwardGroup != null
+ && forwardGroup.isParallelismDecided()) {
+ vertexInitialParallelism = forwardGroup.getParallelism();
+ log.info(
+ "Parallelism of JobVertex: {} ({}) is decided to be {}
according to forward group's parallelism.",
+ jobVertex.getName(),
+ jobVertex,
+ vertexInitialParallelism);
+ }
+
+ return vertexInitialParallelism;
+ }
+
+ @Override
+ public void notifyJobVertexParallelismDecided(JobVertexID jobVertexId, int
parallelism) {
+ JobVertexForwardGroup forwardGroup =
forwardGroupsByJobVertexId.get(jobVertexId);
+ if (forwardGroup != null && !forwardGroup.isParallelismDecided()) {
Review Comment:
Is it expected that either `!forwardGroup.isParallelismDecided()` or
`forwardGroup.getParallelism() == parallelism `?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adaptivebatch/AdaptiveExecutionHandler.java:
##########
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.scheduler.adaptivebatch;
+
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroup;
+import org.apache.flink.runtime.jobmaster.event.JobEvent;
+
+import java.util.function.BiConsumer;
+
+/**
+ * The {@code AdaptiveExecutionHandler} interface defines the operations for
handling the adaptive
Review Comment:
`@code` -> `@link` To make it easier to find the referenced classes.
This also applies to a few other comments.
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java:
##########
@@ -75,6 +80,10 @@ public List<JobEdge> getConsumers() {
return this.consumers;
}
+ public boolean isAllConsumerVerticesCreated() {
Review Comment:
-> areAllConsumerVerticesCreated
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/adapter/DefaultExecutionTopology.java:
##########
@@ -186,7 +182,14 @@ public EdgeManager getEdgeManager() {
return logicalPipelinedRegionsByJobVertexId;
}
- public void notifyExecutionGraphUpdated(
+ public void notifyExecutionGraphUpdatedWithNewlyJobVertices(
Review Comment:
notifyExecutionGraphUpdatedWithNewlyJobVertices ->
notifyExecutionGraphUpdatedWithNewJobVertices
##########
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/VertexParallelismStore.java:
##########
@@ -33,4 +35,11 @@ public interface VertexParallelismStore {
* @throws IllegalStateException if there is no parallelism information
for the given vertex
*/
VertexParallelismInformation getParallelismInfo(JobVertexID vertexId);
+
+ /**
+ * Gets a map of all vertex parallelism information.
+ *
+ * @return A map containing JobVertexID and corresponding
VertexParallelismInformation.
+ */
+ Map<JobVertexID, VertexParallelismInformation> getAllParallelismInfo();
Review Comment:
Why do we need it to be public?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java:
##########
@@ -127,6 +136,11 @@ public void configure(
}
}
+ public void increaseNumJobEdgesToCreate() {
+ waitingJobEdgesToCreate = true;
Review Comment:
When is this value reset?
##########
flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/IntermediateResultPartition.java:
##########
@@ -146,7 +146,7 @@ public int getNumberOfSubpartitions() {
}
private int computeNumberOfSubpartitionsForDynamicGraph() {
- if (totalResult.isBroadcast()) {
+ if (totalResult.isBroadcast() || totalResult.isForward()) {
// for dynamic graph and broadcast result, we only produced one
subpartition,
// and all the downstream vertices should consume this
subpartition.
Review Comment:
The comment should also be updated.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]