zhuzhurk commented on code in PR #25414:
URL: https://github.com/apache/flink/pull/25414#discussion_r1847688952
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/IntermediateDataSet.java:
##########
@@ -43,6 +44,8 @@ public class IntermediateDataSet implements
java.io.Serializable {
// All consumers must have the same partitioner and parallelism
private final List<JobEdge> consumers = new ArrayList<>();
+ private final List<StreamEdge> outputStreamEdges = new ArrayList<>();
Review Comment:
`IntermediateDataSet` is a `JobGraph` level component. I prefer not to bind
it to `StreamEdges`.
How about to maintain the relationship between a `IntermediateDataSet` and
the output `StreamEdges` in `AdaptiveGraphManager`, and refactor the method
`addOutputStreamEdge()` to `configure(distributionPattern, broadcast)`.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamGraphHasherV2.java:
##########
@@ -148,6 +148,18 @@ private static byte[] generateUserSpecifiedHash(String
operatorUid, Hasher hashe
return hasher.hash().asBytes();
}
+ @Override
+ public boolean generateHashesByStreamNodeId(
+ int streamNodeId, StreamGraph streamGraph, Map<Integer, byte[]>
hashes) {
+ StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
+ return generateNodeHash(
+ streamNode,
+ Hashing.murmur3_128(0),
Review Comment:
-> getHashFunction()
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1655,6 +1703,21 @@ && arePartitionerAndExchangeModeChainable(
return true;
}
+ public static boolean isSourceChainable(StreamNode streamNode, StreamGraph
streamGraph) {
+ if (!streamNode.getInEdges().isEmpty()
Review Comment:
Why requiring the inputs to be empty?
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -673,18 +601,32 @@ private void setChaining() {
info.getStartNodeId(),
1, // operators start at position 1 because 0 is for
chained source inputs
info,
- chainEntryPoints);
+ chainEntryPoints,
+ true,
+ serializationExecutor,
+ jobVertexBuildContext,
+ null);
}
}
- private List<StreamEdge> createChain(
+ public static List<StreamEdge> createChain(
final Integer currentNodeId,
final int chainIndex,
final OperatorChainInfo chainInfo,
- final Map<Integer, OperatorChainInfo> chainEntryPoints) {
+ final Map<Integer, OperatorChainInfo> chainEntryPoints,
+ final boolean canCreateNewChain,
+ final Executor serializationExecutor,
+ final JobVertexBuildContext jobVertexBuildContext,
+ final @Nullable Consumer<Integer> hashGenerator) {
Integer startNodeId = chainInfo.getStartNodeId();
- if (!builtVertices.contains(startNodeId)) {
+ if
(!jobVertexBuildContext.getJobVerticesInOrder().containsKey(startNodeId)) {
+ StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+
+ // Progressive hash generation is required in adaptive graph
generator.
+ if (hashGenerator != null) {
+ hashGenerator.accept(currentNodeId);
Review Comment:
From the perspective of this method, it is not a generator because it
generates nothing.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -731,13 +677,20 @@ private List<StreamEdge> createChain(
for (StreamEdge nonChainable : nonChainableOutputs) {
transitiveOutEdges.add(nonChainable);
- createChain(
- nonChainable.getTargetId(),
- 1, // operators start at position 1 because 0 is for
chained source inputs
- chainEntryPoints.computeIfAbsent(
- nonChainable.getTargetId(),
- (k) ->
chainInfo.newChain(nonChainable.getTargetId())),
- chainEntryPoints);
+ if (canCreateNewChain) {
Review Comment:
Comments are needed to explain what's it for.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1969,8 +2032,12 @@ private static void
setManagedMemoryFractionForSlotSharingGroup(
final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
final Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs =
jobVertexBuildContext.getChainedConfigs();
- final Set<Integer> groupOperatorIds =
+ final Set<JobVertexID> jobVertexIds =
slotSharingGroup.getJobVertexIds().stream()
+ .filter(vertexOperators::containsKey)
Review Comment:
In which case the `slotSharingGroup` will contain a job vertex which is not
included in the `vertexOperators`?
Could you add some comments to explain it as it may not be obvious to other
developers.
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1655,6 +1703,21 @@ && arePartitionerAndExchangeModeChainable(
return true;
}
+ public static boolean isSourceChainable(StreamNode streamNode, StreamGraph
streamGraph) {
Review Comment:
-> isChainableSource
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/AdaptiveGraphManager.java:
##########
@@ -0,0 +1,697 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.ExecutionConfig;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSet;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
+import org.apache.flink.runtime.jobgraph.JobGraph;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.jobgraph.forwardgroup.ForwardGroupComputeUtil;
+import org.apache.flink.runtime.jobgraph.forwardgroup.StreamNodeForwardGroup;
+import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup;
+import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
+import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
+import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
+import org.apache.flink.util.Preconditions;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedHashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.Collectors;
+
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.addVertexIndexPrefixInVertexName;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.connect;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createAndInitializeJobGraph;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createChain;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.createSourceChainInfo;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isChainable;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.isSourceChainable;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.markSupportingConcurrentExecutionAttempts;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.preValidate;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.serializeOperatorCoordinatorsAndStreamConfig;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setAllOperatorNonChainedOutputsConfigs;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setManagedMemoryFraction;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setPhysicalEdges;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setSlotSharingAndCoLocation;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.setVertexDescription;
+import static
org.apache.flink.streaming.api.graph.StreamingJobGraphGenerator.validateHybridShuffleExecuteInBatchMode;
+
+/** Default implementation for {@link AdaptiveGraphGenerator}. */
+@Internal
+public class AdaptiveGraphManager implements AdaptiveGraphGenerator {
+
+ private final StreamGraph streamGraph;
+
+ private final JobGraph jobGraph;
+
+ private final StreamGraphHasher defaultStreamGraphHasher;
+
+ private final List<StreamGraphHasher> legacyStreamGraphHasher;
+
+ private final Executor serializationExecutor;
+
+ private final AtomicInteger vertexIndexId;
+
+ private final StreamGraphContext streamGraphContext;
+
+ private final Map<Integer, byte[]> hashes;
+
+ private final List<Map<Integer, byte[]>> legacyHashes;
+
+ // Records the id of stream node which job vertex is created.
Review Comment:
-> Records the ids of stream nodes of which the job vertices are already
created
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -777,18 +730,28 @@ private List<StreamEdge> createChain(
.addOutputFormat(currentOperatorId,
currentNode.getOutputFormat());
}
OperatorInfo operatorInfo =
-
jobVertexBuildContext.createAndGetOperatorInfo(currentNodeId);
+ chainInfo.createAndGetOperatorInfo(currentNodeId,
currentOperatorId);
- StreamConfig config =
- currentNodeId.equals(startNodeId)
- ? createJobVertex(startNodeId, chainInfo)
- : new StreamConfig(new Configuration());
+ StreamConfig config;
+ if (currentNodeId.equals(startNodeId)) {
+ JobVertex jobVertex =
jobVertexBuildContext.getJobVertex(startNodeId);
+ config =
+ jobVertex != null
+ ? new
StreamConfig(jobVertex.getConfiguration())
+ : new StreamConfig(
+ createJobVertex(
+ chainInfo,
+ serializationExecutor,
+ jobVertexBuildContext)
+ .getConfiguration());
Review Comment:
Maybe
```
JobVertex jobVertex =
jobVertexBuildContext.getJobVertex(startNodeId);
if (jobVertex == null) {
jobVertex = createJobVertex( chainInfo,
serializationExecutor, jobVertexBuildContext);
}
config = new StreamConfig(jobVertex.getConfiguration());
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]