This is an automated email from the ASF dual-hosted git repository.
zhuzh pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new caa03223697 [FLINK-36335][runtime] Improving Method Reusability in
StreamGraphGenerator with JobVertexBuildContext
caa03223697 is described below
commit caa032236979bdd53cdbf78ff05cceb6ec2f4d93
Author: noorall <[email protected]>
AuthorDate: Thu Sep 5 17:02:22 2024 +0800
[FLINK-36335][runtime] Improving Method Reusability in StreamGraphGenerator
with JobVertexBuildContext
---
.../api/graph/StreamingJobGraphGenerator.java | 813 ++++++++++-----------
.../api/graph/util/ChainedOperatorHashInfo.java | 56 ++
.../api/graph/util/ChainedSourceInfo.java | 43 ++
.../api/graph/util/JobVertexBuildContext.java | 192 +++++
.../api/graph/util/OperatorChainInfo.java | 183 +++++
.../streaming/api/graph/util/OperatorInfo.java | 70 ++
6 files changed, 932 insertions(+), 425 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
index 6e45fbe505e..3908820db92 100644
---
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java
@@ -41,7 +41,6 @@ import
org.apache.flink.runtime.checkpoint.CheckpointRetentionPolicy;
import org.apache.flink.runtime.checkpoint.MasterTriggerRestoreHook;
import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
import org.apache.flink.runtime.jobgraph.DistributionPattern;
-import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
import org.apache.flink.runtime.jobgraph.InputOutputFormatVertex;
import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
import org.apache.flink.runtime.jobgraph.JobEdge;
@@ -69,6 +68,11 @@ import org.apache.flink.runtime.util.Hardware;
import org.apache.flink.runtime.util.config.memory.ManagedMemoryUtils;
import org.apache.flink.streaming.api.checkpoint.WithMasterCheckpointHook;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
+import org.apache.flink.streaming.api.graph.util.ChainedOperatorHashInfo;
+import org.apache.flink.streaming.api.graph.util.ChainedSourceInfo;
+import org.apache.flink.streaming.api.graph.util.JobVertexBuildContext;
+import org.apache.flink.streaming.api.graph.util.OperatorChainInfo;
+import org.apache.flink.streaming.api.graph.util.OperatorInfo;
import org.apache.flink.streaming.api.operators.ChainingStrategy;
import org.apache.flink.streaming.api.operators.InputSelectable;
import org.apache.flink.streaming.api.operators.SourceOperatorFactory;
@@ -107,7 +111,6 @@ import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.IdentityHashMap;
-import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
import java.util.LinkedList;
import java.util.List;
@@ -120,6 +123,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -172,43 +176,16 @@ public class StreamingJobGraphGenerator {
private final ClassLoader userClassloader;
private final StreamGraph streamGraph;
- private final Map<Integer, JobVertex> jobVertices;
private final JobGraph jobGraph;
private final Collection<Integer> builtVertices;
- private final List<StreamEdge> physicalEdgesInOrder;
-
- private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
-
- private final Map<Integer, StreamConfig> vertexConfigs;
- private final Map<Integer, String> chainedNames;
-
- private final Map<Integer, ResourceSpec> chainedMinResources;
- private final Map<Integer, ResourceSpec> chainedPreferredResources;
-
- private final Map<Integer, InputOutputFormatContainer>
chainedInputOutputFormats;
-
private final StreamGraphHasher defaultStreamGraphHasher;
private final List<StreamGraphHasher> legacyStreamGraphHashers;
- private boolean hasHybridResultPartition = false;
-
private final Executor serializationExecutor;
- // Futures for the serialization of operator coordinators
- private final Map<
- JobVertexID,
-
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>>
- coordinatorSerializationFuturesPerJobVertex = new HashMap<>();
-
- /** The {@link OperatorChainInfo}s, key is the start node id of the chain.
*/
- private final Map<Integer, OperatorChainInfo> chainInfos;
-
- /**
- * This is used to cache the non-chainable outputs, to set the
non-chainable outputs config
- * after all job vertices are created.
- */
- private final Map<Integer, List<StreamEdge>> opNonChainableOutputsCache;
+ // We save all the context needed to create the JobVertex in this
structure.
+ private final JobVertexBuildContext jobVertexBuildContext;
private StreamingJobGraphGenerator(
ClassLoader userClassloader,
@@ -220,42 +197,36 @@ public class StreamingJobGraphGenerator {
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Arrays.asList(new
StreamGraphUserHashHasher());
- this.jobVertices = new LinkedHashMap<>();
this.builtVertices = new HashSet<>();
- this.chainedConfigs = new HashMap<>();
- this.vertexConfigs = new HashMap<>();
- this.chainedNames = new HashMap<>();
- this.chainedMinResources = new HashMap<>();
- this.chainedPreferredResources = new HashMap<>();
- this.chainedInputOutputFormats = new HashMap<>();
- this.physicalEdgesInOrder = new ArrayList<>();
this.serializationExecutor =
Preconditions.checkNotNull(serializationExecutor);
- this.chainInfos = new HashMap<>();
- this.opNonChainableOutputsCache = new LinkedHashMap<>();
-
jobGraph = new JobGraph(jobID, streamGraph.getJobName());
- }
-
- private JobGraph createJobGraph() {
- preValidate();
- jobGraph.setJobType(streamGraph.getJobType());
- jobGraph.setDynamic(streamGraph.isDynamic());
-
- jobGraph.enableApproximateLocalRecovery(
-
streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
// Generate deterministic hashes for the nodes in order to identify
them across
// submission iff they didn't change.
- Map<Integer, byte[]> hashes =
+ final Map<Integer, byte[]> hashes =
defaultStreamGraphHasher.traverseStreamGraphAndGenerateHashes(streamGraph);
// Generate legacy version hashes for backwards compatibility
- List<Map<Integer, byte[]>> legacyHashes = new
ArrayList<>(legacyStreamGraphHashers.size());
+ final List<Map<Integer, byte[]>> legacyHashes =
+ new ArrayList<>(legacyStreamGraphHashers.size());
for (StreamGraphHasher hasher : legacyStreamGraphHashers) {
legacyHashes.add(hasher.traverseStreamGraphAndGenerateHashes(streamGraph));
}
- setChaining(hashes, legacyHashes);
+ this.jobVertexBuildContext =
+ new JobVertexBuildContext(
+ streamGraph, new AtomicBoolean(false), hashes,
legacyHashes);
+ }
+
+ private JobGraph createJobGraph() {
+ preValidate(streamGraph, userClassloader);
+ jobGraph.setJobType(streamGraph.getJobType());
+ jobGraph.setDynamic(streamGraph.isDynamic());
+
+ jobGraph.enableApproximateLocalRecovery(
+
streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled());
+
+ setChaining();
if (jobGraph.isDynamic()) {
setVertexParallelismsForDynamicGraphIfNecessary();
@@ -266,25 +237,20 @@ public class StreamingJobGraphGenerator {
// vertices and partition-reuse
final Map<Integer, Map<StreamEdge, NonChainedOutput>>
opIntermediateOutputs =
new HashMap<>();
- setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs);
+ setAllOperatorNonChainedOutputsConfigs(opIntermediateOutputs,
jobVertexBuildContext);
setAllVertexNonChainedOutputsConfigs(opIntermediateOutputs);
- setPhysicalEdges();
+ setPhysicalEdges(jobVertexBuildContext);
- markSupportingConcurrentExecutionAttempts();
+ markSupportingConcurrentExecutionAttempts(jobVertexBuildContext);
- validateHybridShuffleExecuteInBatchMode();
+ validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext);
- setSlotSharingAndCoLocation();
+ setSlotSharingAndCoLocation(jobGraph, jobVertexBuildContext);
- setManagedMemoryFraction(
- Collections.unmodifiableMap(jobVertices),
- Collections.unmodifiableMap(vertexConfigs),
- Collections.unmodifiableMap(chainedConfigs),
- id ->
streamGraph.getStreamNode(id).getManagedMemoryOperatorScopeUseCaseWeights(),
- id ->
streamGraph.getStreamNode(id).getManagedMemorySlotScopeUseCases());
+ setManagedMemoryFraction(jobVertexBuildContext);
- configureCheckpointing();
+ configureCheckpointing(streamGraph, jobGraph);
jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings());
@@ -309,31 +275,41 @@ public class StreamingJobGraphGenerator {
}
jobGraph.setJobConfiguration(streamGraph.getJobConfiguration());
- addVertexIndexPrefixInVertexName();
+ addVertexIndexPrefixInVertexName(jobVertexBuildContext, new
AtomicInteger(0), jobGraph);
- setVertexDescription();
+ setVertexDescription(jobVertexBuildContext);
// Wait for the serialization of operator coordinators and stream
config.
+ serializeOperatorCoordinatorsAndStreamConfig(
+ jobGraph, serializationExecutor, jobVertexBuildContext);
+
+ if (!streamGraph.getJobStatusHooks().isEmpty()) {
+ jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
+ }
+
+ return jobGraph;
+ }
+
+ public static void serializeOperatorCoordinatorsAndStreamConfig(
+ JobGraph jobGraph,
+ Executor serializationExecutor,
+ JobVertexBuildContext jobVertexBuildContext) {
try {
FutureUtils.combineAll(
- vertexConfigs.values().stream()
+
jobVertexBuildContext.getOperatorInfos().values().stream()
.map(
- config ->
-
config.triggerSerializationAndReturnFuture(
-
serializationExecutor))
+ operatorInfo ->
+ operatorInfo
+ .getVertexConfig()
+
.triggerSerializationAndReturnFuture(
+
serializationExecutor))
.collect(Collectors.toList()))
.get();
- waitForSerializationFuturesAndUpdateJobVertices();
+ waitForSerializationFuturesAndUpdateJobVertices(jobGraph,
jobVertexBuildContext);
} catch (Exception e) {
throw new FlinkRuntimeException("Error in serialization.", e);
}
-
- if (!streamGraph.getJobStatusHooks().isEmpty()) {
- jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks());
- }
-
- return jobGraph;
}
/**
@@ -347,12 +323,16 @@ public class StreamingJobGraphGenerator {
return new
OperatorID(StreamGraphHasherV2.generateUserSpecifiedHash(operatorUid));
}
- private void waitForSerializationFuturesAndUpdateJobVertices()
+ private static void waitForSerializationFuturesAndUpdateJobVertices(
+ JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext)
throws ExecutionException, InterruptedException {
for (Map.Entry<
JobVertexID,
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>>
- futuresPerJobVertex :
coordinatorSerializationFuturesPerJobVertex.entrySet()) {
+ futuresPerJobVertex :
+ jobVertexBuildContext
+
.getCoordinatorSerializationFuturesPerJobVertex()
+ .entrySet()) {
final JobVertexID jobVertexId = futuresPerJobVertex.getKey();
final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId);
@@ -366,32 +346,46 @@ public class StreamingJobGraphGenerator {
}
}
- private void addVertexIndexPrefixInVertexName() {
- if (!streamGraph.isVertexNameIncludeIndexPrefix()) {
+ public static void addVertexIndexPrefixInVertexName(
+ JobVertexBuildContext jobVertexBuildContext,
+ AtomicInteger vertexIndexId,
+ JobGraph jobGraph) {
+ if
(!jobVertexBuildContext.getStreamGraph().isVertexNameIncludeIndexPrefix()) {
return;
}
- final AtomicInteger vertexIndexId = new AtomicInteger(0);
+ Set<JobVertexID> jobVertexIds =
+ jobVertexBuildContext.getJobVerticesInOrder().values().stream()
+ .map(JobVertex::getID)
+ .collect(Collectors.toSet());
+ // JobVertexBuildContext only contains incrementally generated
jobVertex instances. The
+ // setName method needs to set the names based on the topological
order of all jobVertices,
+ // so it relies on the job graph to compute a difference set.
jobGraph.getVerticesSortedTopologicallyFromSources()
.forEach(
- vertex ->
+ vertex -> {
+ if (jobVertexIds.contains(vertex.getID())) {
vertex.setName(
String.format(
"[vertex-%d]%s",
-
vertexIndexId.getAndIncrement(),
- vertex.getName())));
+
vertexIndexId.getAndIncrement(), vertex.getName()));
+ }
+ });
}
- private void setVertexDescription() {
+ public static void setVertexDescription(JobVertexBuildContext
jobVertexBuildContext) {
+ final Map<Integer, JobVertex> jobVertices =
jobVertexBuildContext.getJobVerticesInOrder();
+ final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
for (Map.Entry<Integer, JobVertex> headOpAndJobVertex :
jobVertices.entrySet()) {
Integer headOpId = headOpAndJobVertex.getKey();
JobVertex vertex = headOpAndJobVertex.getValue();
StringBuilder builder = new StringBuilder();
switch (streamGraph.getVertexDescriptionMode()) {
case CASCADING:
- buildCascadingDescription(builder, headOpId, headOpId);
+ buildCascadingDescription(builder, headOpId, headOpId,
jobVertexBuildContext);
break;
case TREE:
- buildTreeDescription(builder, headOpId, headOpId, "",
true);
+ buildTreeDescription(
+ builder, headOpId, headOpId, "", true,
jobVertexBuildContext);
break;
default:
throw new IllegalArgumentException(
@@ -403,11 +397,16 @@ public class StreamingJobGraphGenerator {
}
}
- private void buildCascadingDescription(StringBuilder builder, int
headOpId, int currentOpId) {
- StreamNode node = streamGraph.getStreamNode(currentOpId);
- builder.append(getDescriptionWithChainedSourcesInfo(node));
+ private static void buildCascadingDescription(
+ StringBuilder builder,
+ int headOpId,
+ int currentOpId,
+ JobVertexBuildContext jobVertexBuildContext) {
+ StreamNode node =
jobVertexBuildContext.getStreamGraph().getStreamNode(currentOpId);
+ builder.append(getDescriptionWithChainedSourcesInfo(node,
jobVertexBuildContext));
- LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId,
node);
+ LinkedList<Integer> chainedOutput =
+ getChainedOutputNodes(headOpId, node, jobVertexBuildContext);
if (chainedOutput.isEmpty()) {
return;
}
@@ -419,7 +418,7 @@ public class StreamingJobGraphGenerator {
}
while (true) {
Integer outputId = chainedOutput.pollFirst();
- buildCascadingDescription(builder, headOpId, outputId);
+ buildCascadingDescription(builder, headOpId, outputId,
jobVertexBuildContext);
if (chainedOutput.isEmpty()) {
break;
}
@@ -430,8 +429,11 @@ public class StreamingJobGraphGenerator {
}
}
- private LinkedList<Integer> getChainedOutputNodes(int headOpId, StreamNode
node) {
+ private static LinkedList<Integer> getChainedOutputNodes(
+ int headOpId, StreamNode node, JobVertexBuildContext
jobVertexBuildContext) {
LinkedList<Integer> chainedOutput = new LinkedList<>();
+ Map<Integer, Map<Integer, StreamConfig>> chainedConfigs =
+ jobVertexBuildContext.getChainedConfigs();
if (chainedConfigs.containsKey(headOpId)) {
for (StreamEdge edge : node.getOutEdges()) {
int targetId = edge.getTargetId();
@@ -443,8 +445,13 @@ public class StreamingJobGraphGenerator {
return chainedOutput;
}
- private void buildTreeDescription(
- StringBuilder builder, int headOpId, int currentOpId, String
prefix, boolean isLast) {
+ private static void buildTreeDescription(
+ StringBuilder builder,
+ int headOpId,
+ int currentOpId,
+ String prefix,
+ boolean isLast,
+ JobVertexBuildContext jobVertexBuildContext) {
// Replace the '-' in prefix of current node with ' ', keep ':'
// HeadNode
// :- Node1
@@ -465,21 +472,33 @@ public class StreamingJobGraphGenerator {
}
}
- StreamNode node = streamGraph.getStreamNode(currentOpId);
+ StreamNode node =
jobVertexBuildContext.getStreamGraph().getStreamNode(currentOpId);
builder.append(currentNodePrefix);
- builder.append(getDescriptionWithChainedSourcesInfo(node));
+ builder.append(getDescriptionWithChainedSourcesInfo(node,
jobVertexBuildContext));
builder.append("\n");
- LinkedList<Integer> chainedOutput = getChainedOutputNodes(headOpId,
node);
+ LinkedList<Integer> chainedOutput =
+ getChainedOutputNodes(headOpId, node, jobVertexBuildContext);
while (!chainedOutput.isEmpty()) {
Integer outputId = chainedOutput.pollFirst();
- buildTreeDescription(builder, headOpId, outputId, childPrefix,
chainedOutput.isEmpty());
+ buildTreeDescription(
+ builder,
+ headOpId,
+ outputId,
+ childPrefix,
+ chainedOutput.isEmpty(),
+ jobVertexBuildContext);
}
}
- private String getDescriptionWithChainedSourcesInfo(StreamNode node) {
+ private static String getDescriptionWithChainedSourcesInfo(
+ StreamNode node, JobVertexBuildContext jobVertexBuildContext) {
List<StreamNode> chainedSources;
+ final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs =
+ jobVertexBuildContext.getChainedConfigs();
+ final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+
if (!chainedConfigs.containsKey(node.getId())) {
// node is not head operator of a vertex
chainedSources = Collections.emptyList();
@@ -507,7 +526,7 @@ public class StreamingJobGraphGenerator {
}
@SuppressWarnings("deprecation")
- private void preValidate() {
+ public static void preValidate(StreamGraph streamGraph, ClassLoader
userClassloader) {
CheckpointConfig checkpointConfig = streamGraph.getCheckpointConfig();
if (checkpointConfig.isCheckpointingEnabled()) {
@@ -522,7 +541,8 @@ public class StreamingJobGraphGenerator {
}
if (checkpointConfig.isUnalignedCheckpointsEnabled()
&& !checkpointConfig.isForceUnalignedCheckpoints()
- &&
streamGraph.getStreamNodes().stream().anyMatch(this::hasCustomPartitioner)) {
+ && streamGraph.getStreamNodes().stream()
+
.anyMatch(StreamingJobGraphGenerator::hasCustomPartitioner)) {
throw new UnsupportedOperationException(
"Unaligned checkpoints are currently not supported for
custom partitioners, "
+ "as rescaling is not guaranteed to work
correctly."
@@ -551,16 +571,16 @@ public class StreamingJobGraphGenerator {
}
}
- private boolean hasCustomPartitioner(StreamNode node) {
+ private static boolean hasCustomPartitioner(StreamNode node) {
return node.getOutEdges().stream()
.anyMatch(edge -> edge.getPartitioner() instanceof
CustomPartitionerWrapper);
}
- private void setPhysicalEdges() {
+ public static void setPhysicalEdges(JobVertexBuildContext
jobVertexBuildContext) {
Map<Integer, List<StreamEdge>> physicalInEdgesInOrder =
new HashMap<Integer, List<StreamEdge>>();
- for (StreamEdge edge : physicalEdgesInOrder) {
+ for (StreamEdge edge :
jobVertexBuildContext.getPhysicalEdgesInOrder()) {
int target = edge.getTargetId();
List<StreamEdge> inEdges =
@@ -573,12 +593,14 @@ public class StreamingJobGraphGenerator {
int vertex = inEdges.getKey();
List<StreamEdge> edgeList = inEdges.getValue();
- vertexConfigs.get(vertex).setInPhysicalEdges(edgeList);
+ jobVertexBuildContext
+ .getOperatorInfo(vertex)
+ .getVertexConfig()
+ .setInPhysicalEdges(edgeList);
}
}
- private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs(
- final Map<Integer, byte[]> hashes, final List<Map<Integer,
byte[]>> legacyHashes) {
+ private Map<Integer, OperatorChainInfo>
buildChainedInputsAndGetHeadInputs() {
final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>();
final Map<Integer, OperatorChainInfo> chainEntryPoints = new
HashMap<>();
@@ -598,14 +620,22 @@ public class StreamingJobGraphGenerator {
if (targetChainingStrategy ==
ChainingStrategy.HEAD_WITH_SOURCES
&& isChainableInput(sourceOutEdge, streamGraph)) {
- final OperatorID opId = new
OperatorID(hashes.get(sourceNodeId));
+ final OperatorInfo operatorInfo =
+
jobVertexBuildContext.createAndGetOperatorInfo(sourceNodeId);
+ final OperatorID opId =
+ new
OperatorID(jobVertexBuildContext.getHash(sourceNodeId));
final StreamConfig.SourceInputConfig inputConfig =
new StreamConfig.SourceInputConfig(sourceOutEdge);
final StreamConfig operatorConfig = new StreamConfig(new
Configuration());
- setOperatorConfig(sourceNodeId, operatorConfig,
Collections.emptyMap());
- setOperatorChainedOutputsConfig(operatorConfig,
Collections.emptyList());
+ setOperatorConfig(
+ sourceNodeId,
+ operatorConfig,
+ Collections.emptyMap(),
+ jobVertexBuildContext);
+ setOperatorChainedOutputsConfig(
+ operatorConfig, Collections.emptyList(),
jobVertexBuildContext);
// we cache the non-chainable outputs here, and set the
non-chained config later
- opNonChainableOutputsCache.put(sourceNodeId,
Collections.emptyList());
+
operatorInfo.addNonChainableOutputs(Collections.emptyList());
operatorConfig.setChainIndex(0); // sources are always
first
operatorConfig.setOperatorID(opId);
@@ -624,8 +654,6 @@ public class StreamingJobGraphGenerator {
(k) ->
new OperatorChainInfo(
sourceOutEdge.getTargetId(),
- hashes,
- legacyHashes,
chainedSources,
streamGraph));
chainInfo.addCoordinatorProvider(coord);
@@ -635,9 +663,7 @@ public class StreamingJobGraphGenerator {
}
chainEntryPoints.put(
- sourceNodeId,
- new OperatorChainInfo(
- sourceNodeId, hashes, legacyHashes,
chainedSources, streamGraph));
+ sourceNodeId, new OperatorChainInfo(sourceNodeId,
chainedSources, streamGraph));
}
return chainEntryPoints;
@@ -648,11 +674,11 @@ public class StreamingJobGraphGenerator {
*
* <p>This will recursively create all {@link JobVertex} instances.
*/
- private void setChaining(Map<Integer, byte[]> hashes, List<Map<Integer,
byte[]>> legacyHashes) {
+ private void setChaining() {
// we separate out the sources that run as inputs to another operator
(chained inputs)
// from the sources that needs to run as the main (head) operator.
final Map<Integer, OperatorChainInfo> chainEntryPoints =
- buildChainedInputsAndGetHeadInputs(hashes, legacyHashes);
+ buildChainedInputsAndGetHeadInputs();
final Collection<OperatorChainInfo> initialEntryPoints =
chainEntryPoints.entrySet().stream()
.sorted(Comparator.comparing(Map.Entry::getKey))
@@ -732,64 +758,76 @@ public class StreamingJobGraphGenerator {
chainEntryPoints);
}
- chainedNames.put(
+ chainInfo.addChainedName(
currentNodeId,
createChainedName(
currentNodeId,
chainableOutputs,
-
Optional.ofNullable(chainEntryPoints.get(currentNodeId))));
- chainedMinResources.put(
- currentNodeId, createChainedMinResources(currentNodeId,
chainableOutputs));
- chainedPreferredResources.put(
+
Optional.ofNullable(chainEntryPoints.get(currentNodeId)),
+ chainInfo.getChainedNames(),
+ jobVertexBuildContext));
+
+ chainInfo.addChainedMinResources(
currentNodeId,
- createChainedPreferredResources(currentNodeId,
chainableOutputs));
+ createChainedMinResources(
+ currentNodeId, chainableOutputs, chainInfo,
jobVertexBuildContext));
+
+ chainInfo.addChainedPreferredResources(
+ currentNodeId,
+ createChainedPreferredResources(
+ currentNodeId, chainableOutputs, chainInfo,
jobVertexBuildContext));
OperatorID currentOperatorId =
chainInfo.addNodeToChain(
currentNodeId,
-
streamGraph.getStreamNode(currentNodeId).getOperatorName());
+
streamGraph.getStreamNode(currentNodeId).getOperatorName(),
+ jobVertexBuildContext);
if (currentNode.getInputFormat() != null) {
- getOrCreateFormatContainer(startNodeId)
+ chainInfo
+ .getOrCreateFormatContainer()
.addInputFormat(currentOperatorId,
currentNode.getInputFormat());
}
if (currentNode.getOutputFormat() != null) {
- getOrCreateFormatContainer(startNodeId)
+ chainInfo
+ .getOrCreateFormatContainer()
.addOutputFormat(currentOperatorId,
currentNode.getOutputFormat());
}
+ OperatorInfo operatorInfo =
+
jobVertexBuildContext.createAndGetOperatorInfo(currentNodeId);
StreamConfig config =
currentNodeId.equals(startNodeId)
? createJobVertex(startNodeId, chainInfo)
: new StreamConfig(new Configuration());
- tryConvertPartitionerForDynamicGraph(chainableOutputs,
nonChainableOutputs);
+ tryConvertPartitionerForDynamicGraph(
+ chainableOutputs, nonChainableOutputs,
jobVertexBuildContext);
config.setAttribute(currentNodeAttribute);
- setOperatorConfig(currentNodeId, config,
chainInfo.getChainedSources());
-
- setOperatorChainedOutputsConfig(config, chainableOutputs);
+ setOperatorConfig(
+ currentNodeId, config, chainInfo.getChainedSources(),
jobVertexBuildContext);
+ setOperatorChainedOutputsConfig(config, chainableOutputs,
jobVertexBuildContext);
// we cache the non-chainable outputs here, and set the
non-chained config later
- opNonChainableOutputsCache.put(currentNodeId, nonChainableOutputs);
+ operatorInfo.addNonChainableOutputs(nonChainableOutputs);
if (currentNodeId.equals(startNodeId)) {
chainInfo.setTransitiveOutEdges(transitiveOutEdges);
- chainInfos.put(startNodeId, chainInfo);
+ jobVertexBuildContext.addChainInfo(startNodeId, chainInfo);
config.setChainStart();
config.setChainIndex(chainIndex);
config.setOperatorName(streamGraph.getStreamNode(currentNodeId).getOperatorName());
-
config.setTransitiveChainedTaskConfigs(chainedConfigs.get(startNodeId));
-
+ config.setTransitiveChainedTaskConfigs(
+
jobVertexBuildContext.getChainedConfigs().get(startNodeId));
} else {
- chainedConfigs.computeIfAbsent(
- startNodeId, k -> new HashMap<Integer,
StreamConfig>());
-
config.setChainIndex(chainIndex);
StreamNode node = streamGraph.getStreamNode(currentNodeId);
config.setOperatorName(node.getOperatorName());
- chainedConfigs.get(startNodeId).put(currentNodeId, config);
+ jobVertexBuildContext
+ .getOrCreateChainedConfig(startNodeId)
+ .put(currentNodeId, config);
}
config.setOperatorID(currentOperatorId);
@@ -814,6 +852,7 @@ public class StreamingJobGraphGenerator {
* parallelism at early stage if possible, to avoid invalid partition
reuse.
*/
private void setVertexParallelismsForDynamicGraphIfNecessary() {
+ final Map<Integer, JobVertex> jobVertices =
jobVertexBuildContext.getJobVerticesInOrder();
// Note that the jobVertices are reverse topological order
final List<JobVertex> topologicalOrderVertices =
IterableUtils.toStream(jobVertices.values()).collect(Collectors.toList());
@@ -822,7 +861,8 @@ public class StreamingJobGraphGenerator {
// reset parallelism for job vertices whose parallelism is not
configured
jobVertices.forEach(
(startNodeId, jobVertex) -> {
- final OperatorChainInfo chainInfo =
chainInfos.get(startNodeId);
+ final OperatorChainInfo chainInfo =
+ jobVertexBuildContext.getChainInfo(startNodeId);
if (!jobVertex.isParallelismConfigured()
&& streamGraph.isAutoParallelismEnabled()) {
jobVertex.setParallelism(ExecutionConfig.PARALLELISM_DEFAULT);
@@ -840,7 +880,8 @@ public class StreamingJobGraphGenerator {
jobVertices.forEach(
(startNodeId, jobVertex) -> {
Set<JobVertex> forwardConsumers =
-
chainInfos.get(startNodeId).getTransitiveOutEdges().stream()
+
jobVertexBuildContext.getChainInfo(startNodeId).getTransitiveOutEdges()
+ .stream()
.filter(
edge ->
edge.getPartitioner()
@@ -877,8 +918,8 @@ public class StreamingJobGraphGenerator {
if (forwardGroup != null &&
forwardGroup.isParallelismDecided()) {
jobVertex.setParallelism(forwardGroup.getParallelism());
jobVertex.setParallelismConfigured(true);
- chainInfos
- .get(startNodeId)
+ jobVertexBuildContext
+ .getChainInfo(startNodeId)
.getAllChainedNodes()
.forEach(
streamNode ->
@@ -889,8 +930,8 @@ public class StreamingJobGraphGenerator {
// set max parallelism for vertices in forward group
if (forwardGroup != null &&
forwardGroup.isMaxParallelismDecided()) {
jobVertex.setMaxParallelism(forwardGroup.getMaxParallelism());
- chainInfos
- .get(startNodeId)
+ jobVertexBuildContext
+ .getChainInfo(startNodeId)
.getAllChainedNodes()
.forEach(
streamNode ->
@@ -900,7 +941,8 @@ public class StreamingJobGraphGenerator {
});
}
- private void checkAndReplaceReusableHybridPartitionType(NonChainedOutput
reusableOutput) {
+ private static void checkAndReplaceReusableHybridPartitionType(
+ NonChainedOutput reusableOutput) {
if (reusableOutput.getPartitionType() ==
ResultPartitionType.HYBRID_SELECTIVE) {
// for can be reused hybrid output, it can be optimized to always
use full
// spilling strategy to significantly reduce shuffle data writing
cost.
@@ -913,21 +955,19 @@ public class StreamingJobGraphGenerator {
}
}
- private InputOutputFormatContainer getOrCreateFormatContainer(Integer
startNodeId) {
- return chainedInputOutputFormats.computeIfAbsent(
- startNodeId,
- k ->
- new InputOutputFormatContainer(
-
Thread.currentThread().getContextClassLoader()));
- }
-
- private String createChainedName(
+ public static String createChainedName(
Integer vertexID,
List<StreamEdge> chainedOutputs,
- Optional<OperatorChainInfo> operatorChainInfo) {
+ Optional<OperatorChainInfo> operatorChainInfo,
+ Map<Integer, String> chainedNames,
+ JobVertexBuildContext jobVertexBuildContext) {
+ StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
List<ChainedSourceInfo> chainedSourceInfos =
operatorChainInfo
- .map(chainInfo ->
getChainedSourcesByVertexId(vertexID, chainInfo))
+ .map(
+ chainInfo ->
+ getChainedSourcesByVertexId(
+ vertexID, chainInfo,
streamGraph))
.orElse(Collections.emptyList());
final String operatorName =
nameWithChainedSourcesInfo(
@@ -945,31 +985,44 @@ public class StreamingJobGraphGenerator {
}
}
- private List<ChainedSourceInfo> getChainedSourcesByVertexId(
- Integer vertexId, OperatorChainInfo chainInfo) {
+ private static List<ChainedSourceInfo> getChainedSourcesByVertexId(
+ Integer vertexId, OperatorChainInfo chainInfo, StreamGraph
streamGraph) {
return streamGraph.getStreamNode(vertexId).getInEdges().stream()
.map(inEdge ->
chainInfo.getChainedSources().get(inEdge.getSourceId()))
.filter(Objects::nonNull)
.collect(Collectors.toList());
}
- private ResourceSpec createChainedMinResources(
- Integer vertexID, List<StreamEdge> chainedOutputs) {
- ResourceSpec minResources =
streamGraph.getStreamNode(vertexID).getMinResources();
+ public static ResourceSpec createChainedMinResources(
+ Integer vertexID,
+ List<StreamEdge> chainedOutputs,
+ OperatorChainInfo operatorChainInfo,
+ JobVertexBuildContext jobVertexBuildContext) {
+ ResourceSpec minResources =
+
jobVertexBuildContext.getStreamGraph().getStreamNode(vertexID).getMinResources();
for (StreamEdge chainable : chainedOutputs) {
- minResources =
minResources.merge(chainedMinResources.get(chainable.getTargetId()));
+ minResources =
+ minResources.merge(
+
operatorChainInfo.getChainedMinResources(chainable.getTargetId()));
}
return minResources;
}
- private ResourceSpec createChainedPreferredResources(
- Integer vertexID, List<StreamEdge> chainedOutputs) {
+ public static ResourceSpec createChainedPreferredResources(
+ Integer vertexID,
+ List<StreamEdge> chainedOutputs,
+ OperatorChainInfo operatorChainInfo,
+ JobVertexBuildContext jobVertexBuildContext) {
ResourceSpec preferredResources =
- streamGraph.getStreamNode(vertexID).getPreferredResources();
+ jobVertexBuildContext
+ .getStreamGraph()
+ .getStreamNode(vertexID)
+ .getPreferredResources();
for (StreamEdge chainable : chainedOutputs) {
preferredResources =
preferredResources.merge(
-
chainedPreferredResources.get(chainable.getTargetId()));
+ operatorChainInfo.getChainedPreferredResources(
+ chainable.getTargetId()));
}
return preferredResources;
}
@@ -979,7 +1032,7 @@ public class StreamingJobGraphGenerator {
JobVertex jobVertex;
StreamNode streamNode = streamGraph.getStreamNode(streamNodeId);
- byte[] hash = chainInfo.getHash(streamNodeId);
+ byte[] hash = jobVertexBuildContext.getHash(streamNodeId);
if (hash == null) {
throw new IllegalStateException(
@@ -995,28 +1048,29 @@ public class StreamingJobGraphGenerator {
if (chainedOperators != null) {
for (ChainedOperatorHashInfo chainedOperator : chainedOperators) {
OperatorID userDefinedOperatorID =
- chainedOperator.getUserDefinedOperatorID() == null
+ chainedOperator.getUserDefinedOperatorId() == null
? null
- : new
OperatorID(chainedOperator.getUserDefinedOperatorID());
+ : new
OperatorID(chainedOperator.getUserDefinedOperatorId());
operatorIDPairs.add(
OperatorIDPair.of(
- new
OperatorID(chainedOperator.getGeneratedOperatorID()),
+ new
OperatorID(chainedOperator.getGeneratedOperatorId()),
userDefinedOperatorID,
chainedOperator.getStreamNode().getOperatorName(),
chainedOperator.getStreamNode().getTransformationUID()));
}
}
- if (chainedInputOutputFormats.containsKey(streamNodeId)) {
+ if (chainInfo.hasFormatContainer()) {
jobVertex =
new InputOutputFormatVertex(
- chainedNames.get(streamNodeId), jobVertexId,
operatorIDPairs);
-
- chainedInputOutputFormats
- .get(streamNodeId)
+ chainInfo.getChainedName(streamNodeId),
jobVertexId, operatorIDPairs);
+ chainInfo
+ .getOrCreateFormatContainer()
.write(new TaskConfig(jobVertex.getConfiguration()));
} else {
- jobVertex = new JobVertex(chainedNames.get(streamNodeId),
jobVertexId, operatorIDPairs);
+ jobVertex =
+ new JobVertex(
+ chainInfo.getChainedName(streamNodeId),
jobVertexId, operatorIDPairs);
}
if (streamNode.getConsumeClusterDatasetId() != null) {
@@ -1036,18 +1090,20 @@ public class StreamingJobGraphGenerator {
throw new FlinkRuntimeException(
String.format(
"Coordinator Provider for
node %s is not serializable.",
-
chainedNames.get(streamNodeId)),
+
chainInfo.getChainedName(streamNodeId)),
e);
}
},
serializationExecutor));
}
if (!serializationFutures.isEmpty()) {
- coordinatorSerializationFuturesPerJobVertex.put(jobVertexId,
serializationFutures);
+ jobVertexBuildContext.putCoordinatorSerializationFutures(
+ jobVertexId, serializationFutures);
}
jobVertex.setResources(
- chainedMinResources.get(streamNodeId),
chainedPreferredResources.get(streamNodeId));
+ chainInfo.getChainedMinResources(streamNodeId),
+ chainInfo.getChainedPreferredResources(streamNodeId));
jobVertex.setInvokableClass(streamNode.getJobVertexClass());
@@ -1065,7 +1121,7 @@ public class StreamingJobGraphGenerator {
LOG.debug("Parallelism set: {} for {}", parallelism, streamNodeId);
}
- jobVertices.put(streamNodeId, jobVertex);
+ jobVertexBuildContext.addJobVertex(streamNodeId, jobVertex);
builtVertices.add(streamNodeId);
jobGraph.addVertex(jobVertex);
@@ -1076,9 +1132,13 @@ public class StreamingJobGraphGenerator {
return new StreamConfig(jobVertex.getConfiguration());
}
- private void setOperatorConfig(
- Integer vertexId, StreamConfig config, Map<Integer,
ChainedSourceInfo> chainedSources) {
-
+ public static void setOperatorConfig(
+ Integer vertexId,
+ StreamConfig config,
+ Map<Integer, ChainedSourceInfo> chainedSources,
+ JobVertexBuildContext jobVertexBuildContext) {
+ StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+ OperatorInfo operatorInfo =
jobVertexBuildContext.getOperatorInfo(vertexId);
StreamNode vertex = streamGraph.getStreamNode(vertexId);
config.setVertexID(vertexId);
@@ -1106,8 +1166,8 @@ public class StreamingJobGraphGenerator {
"Trying to union a chained source with another
input.");
}
inputConfigs[inputIndex] = chainedSource.getInputConfig();
- chainedConfigs
- .computeIfAbsent(vertexId, (key) -> new HashMap<>())
+ jobVertexBuildContext
+ .getOrCreateChainedConfig(vertexId)
.put(inEdge.getSourceId(),
chainedSource.getOperatorConfig());
} else {
// network input. null if we move to a new input, non-null if
this is a further edge
@@ -1172,11 +1232,13 @@ public class StreamingJobGraphGenerator {
config.setIterationWaitTime(streamGraph.getLoopTimeout(vertexId));
}
- vertexConfigs.put(vertexId, config);
+ operatorInfo.setVertexConfig(config);
}
- private void setOperatorChainedOutputsConfig(
- StreamConfig config, List<StreamEdge> chainableOutputs) {
+ public static void setOperatorChainedOutputsConfig(
+ StreamConfig config,
+ List<StreamEdge> chainableOutputs,
+ JobVertexBuildContext jobVertexBuildContext) {
// iterate edges, find sideOutput edges create and save serializers
for each outputTag type
for (StreamEdge edge : chainableOutputs) {
if (edge.getOutputTag() != null) {
@@ -1185,17 +1247,21 @@ public class StreamingJobGraphGenerator {
edge.getOutputTag()
.getTypeInfo()
.createSerializer(
-
streamGraph.getExecutionConfig().getSerializerConfig()));
+ jobVertexBuildContext
+ .getStreamGraph()
+ .getExecutionConfig()
+ .getSerializerConfig()));
}
}
config.setChainedOutputs(chainableOutputs);
}
- private void setOperatorNonChainedOutputsConfig(
+ private static void setOperatorNonChainedOutputsConfig(
Integer vertexId,
StreamConfig config,
List<StreamEdge> nonChainableOutputs,
- Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge) {
+ Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge,
+ JobVertexBuildContext jobVertexBuildContext) {
// iterate edges, find sideOutput edges create and save serializers
for each outputTag type
for (StreamEdge edge : nonChainableOutputs) {
if (edge.getOutputTag() != null) {
@@ -1204,12 +1270,19 @@ public class StreamingJobGraphGenerator {
edge.getOutputTag()
.getTypeInfo()
.createSerializer(
-
streamGraph.getExecutionConfig().getSerializerConfig()));
+ jobVertexBuildContext
+ .getStreamGraph()
+ .getExecutionConfig()
+ .getSerializerConfig()));
}
}
List<NonChainedOutput> deduplicatedOutputs =
- mayReuseNonChainedOutputs(vertexId, nonChainableOutputs,
outputsConsumedByEdge);
+ mayReuseNonChainedOutputs(
+ vertexId,
+ nonChainableOutputs,
+ outputsConsumedByEdge,
+ jobVertexBuildContext);
config.setNumberOfOutputs(deduplicatedOutputs.size());
config.setOperatorNonChainedOutputs(deduplicatedOutputs);
}
@@ -1224,52 +1297,68 @@ public class StreamingJobGraphGenerator {
for (StreamEdge edge : transitiveOutEdges) {
NonChainedOutput output =
opIntermediateOutputs.get(edge.getSourceId()).get(edge);
transitiveOutputs.add(output);
- connect(startNodeId, edge, output);
+ connect(
+ startNodeId,
+ edge,
+ output,
+ jobVertexBuildContext.getJobVerticesInOrder(),
+ jobVertexBuildContext);
}
config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
}
- private void setAllOperatorNonChainedOutputsConfigs(
- final Map<Integer, Map<StreamEdge, NonChainedOutput>>
opIntermediateOutputs) {
+ public static void setAllOperatorNonChainedOutputsConfigs(
+ final Map<Integer, Map<StreamEdge, NonChainedOutput>>
opIntermediateOutputs,
+ JobVertexBuildContext jobVertexBuildContext) {
// set non chainable output config
- opNonChainableOutputsCache.forEach(
- (vertexId, nonChainableOutputs) -> {
- Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =
- opIntermediateOutputs.computeIfAbsent(
- vertexId, ignored -> new HashMap<>());
- setOperatorNonChainedOutputsConfig(
- vertexId,
- vertexConfigs.get(vertexId),
- nonChainableOutputs,
- outputsConsumedByEdge);
- });
+ jobVertexBuildContext
+ .getOperatorInfos()
+ .forEach(
+ (vertexId, operatorInfo) -> {
+ Map<StreamEdge, NonChainedOutput>
outputsConsumedByEdge =
+ opIntermediateOutputs.computeIfAbsent(
+ vertexId, ignored -> new
HashMap<>());
+ setOperatorNonChainedOutputsConfig(
+ vertexId,
+ operatorInfo.getVertexConfig(),
+ operatorInfo.getNonChainableOutputs(),
+ outputsConsumedByEdge,
+ jobVertexBuildContext);
+ });
}
private void setAllVertexNonChainedOutputsConfigs(
final Map<Integer, Map<StreamEdge, NonChainedOutput>>
opIntermediateOutputs) {
- jobVertices
+ jobVertexBuildContext
+ .getJobVerticesInOrder()
.keySet()
.forEach(
startNodeId ->
setVertexNonChainedOutputsConfig(
startNodeId,
- vertexConfigs.get(startNodeId),
-
chainInfos.get(startNodeId).getTransitiveOutEdges(),
+ jobVertexBuildContext
+ .getOperatorInfo(startNodeId)
+ .getVertexConfig(),
+ jobVertexBuildContext
+ .getChainInfo(startNodeId)
+ .getTransitiveOutEdges(),
opIntermediateOutputs));
}
- private List<NonChainedOutput> mayReuseNonChainedOutputs(
+ private static List<NonChainedOutput> mayReuseNonChainedOutputs(
int vertexId,
List<StreamEdge> consumerEdges,
- Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge) {
+ Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge,
+ JobVertexBuildContext jobVertexBuildContext) {
if (consumerEdges.isEmpty()) {
return new ArrayList<>();
}
List<NonChainedOutput> outputs = new ArrayList<>(consumerEdges.size());
for (StreamEdge consumerEdge : consumerEdges) {
checkState(vertexId == consumerEdge.getSourceId(), "Vertex id must
be the same.");
- ResultPartitionType partitionType =
getResultPartitionType(consumerEdge);
+ ResultPartitionType partitionType =
+ getResultPartitionType(consumerEdge,
jobVertexBuildContext);
IntermediateDataSetID dataSetId = new IntermediateDataSetID();
boolean isPersistentDataSet =
@@ -1280,7 +1369,7 @@ public class StreamingJobGraphGenerator {
}
if (partitionType.isHybridResultPartition()) {
- hasHybridResultPartition = true;
+ jobVertexBuildContext.setHasHybridResultPartition(true);
if (consumerEdge.getPartitioner().isBroadcast()
&& partitionType ==
ResultPartitionType.HYBRID_SELECTIVE) {
// for broadcast result partition, it can be optimized to
always use full
@@ -1300,18 +1389,20 @@ public class StreamingJobGraphGenerator {
consumerEdge,
isPersistentDataSet,
dataSetId,
- partitionType);
+ partitionType,
+ jobVertexBuildContext.getStreamGraph());
}
return outputs;
}
- private void createOrReuseOutput(
+ private static void createOrReuseOutput(
List<NonChainedOutput> outputs,
Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge,
StreamEdge consumerEdge,
boolean isPersistentDataSet,
IntermediateDataSetID dataSetId,
- ResultPartitionType partitionType) {
+ ResultPartitionType partitionType,
+ StreamGraph streamGraph) {
int consumerParallelism =
streamGraph.getStreamNode(consumerEdge.getTargetId()).getParallelism();
int consumerMaxParallelism =
@@ -1360,23 +1451,25 @@ public class StreamingJobGraphGenerator {
}
}
- private boolean isPartitionTypeCanBeReuse(ResultPartitionType
partitionType) {
+ private static boolean isPartitionTypeCanBeReuse(ResultPartitionType
partitionType) {
// for non-hybrid partition, partition reuse only works when its
re-consumable.
// for hybrid selective partition, it still has the opportunity to be
converted to
// hybrid full partition to support partition reuse.
return partitionType.isReconsumable() ||
partitionType.isHybridResultPartition();
}
- private boolean allHybridOrSameReconsumablePartitionType(
+ private static boolean allHybridOrSameReconsumablePartitionType(
ResultPartitionType partitionType1, ResultPartitionType
partitionType2) {
return (partitionType1.isReconsumable() && partitionType1 ==
partitionType2)
|| (partitionType1.isHybridResultPartition()
&& partitionType2.isHybridResultPartition());
}
- private void tryConvertPartitionerForDynamicGraph(
- List<StreamEdge> chainableOutputs, List<StreamEdge>
nonChainableOutputs) {
-
+ public static void tryConvertPartitionerForDynamicGraph(
+ List<StreamEdge> chainableOutputs,
+ List<StreamEdge> nonChainableOutputs,
+ JobVertexBuildContext jobVertexBuildContext) {
+ StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
for (StreamEdge edge : chainableOutputs) {
StreamPartitioner<?> partitioner = edge.getPartitioner();
if (partitioner instanceof ForwardForConsecutiveHashPartitioner
@@ -1407,7 +1500,7 @@ public class StreamingJobGraphGenerator {
}
}
- private CheckpointingMode getCheckpointingMode(CheckpointConfig
checkpointConfig) {
+ private static CheckpointingMode getCheckpointingMode(CheckpointConfig
checkpointConfig) {
CheckpointingMode checkpointingMode =
checkpointConfig.getCheckpointingConsistencyMode();
checkArgument(
@@ -1425,9 +1518,14 @@ public class StreamingJobGraphGenerator {
}
}
- private void connect(Integer headOfChain, StreamEdge edge,
NonChainedOutput output) {
+ public static void connect(
+ Integer headOfChain,
+ StreamEdge edge,
+ NonChainedOutput output,
+ Map<Integer, JobVertex> jobVertices,
+ JobVertexBuildContext jobVertexBuildContext) {
- physicalEdgesInOrder.add(edge);
+ jobVertexBuildContext.addPhysicalEdgesInOrder(edge);
Integer downStreamVertexID = edge.getTargetId();
@@ -1477,13 +1575,13 @@ public class StreamingJobGraphGenerator {
}
}
- private boolean isPersistentIntermediateDataset(
+ private static boolean isPersistentIntermediateDataset(
ResultPartitionType resultPartitionType, StreamEdge edge) {
return
resultPartitionType.isBlockingOrBlockingPersistentResultPartition()
&& edge.getIntermediateDatasetIdToProduce() != null;
}
- private void checkBufferTimeout(ResultPartitionType type, StreamEdge edge)
{
+ private static void checkBufferTimeout(ResultPartitionType type,
StreamEdge edge) {
long bufferTimeout = edge.getBufferTimeout();
if (!type.canBePipelinedConsumed()
&& bufferTimeout !=
ExecutionOptions.DISABLED_NETWORK_BUFFER_TIMEOUT) {
@@ -1496,7 +1594,8 @@ public class StreamingJobGraphGenerator {
}
}
- private ResultPartitionType getResultPartitionType(StreamEdge edge) {
+ private static ResultPartitionType getResultPartitionType(
+ StreamEdge edge, JobVertexBuildContext jobVertexBuildContext) {
switch (edge.getExchangeMode()) {
case PIPELINED:
return ResultPartitionType.PIPELINED_BOUNDED;
@@ -1507,14 +1606,16 @@ public class StreamingJobGraphGenerator {
case HYBRID_SELECTIVE:
return ResultPartitionType.HYBRID_SELECTIVE;
case UNDEFINED:
- return determineUndefinedResultPartitionType(edge);
+ return determineUndefinedResultPartitionType(edge,
jobVertexBuildContext);
default:
throw new UnsupportedOperationException(
"Data exchange mode " + edge.getExchangeMode() + " is
not supported yet.");
}
}
- private ResultPartitionType
determineUndefinedResultPartitionType(StreamEdge edge) {
+ public static ResultPartitionType determineUndefinedResultPartitionType(
+ StreamEdge edge, JobVertexBuildContext jobVertexBuildContext) {
+ StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
Attribute sourceNodeAttribute =
streamGraph.getStreamNode(edge.getSourceId()).getAttribute();
if (sourceNodeAttribute.isNoOutputUntilEndOfInput()) {
@@ -1559,7 +1660,7 @@ public class StreamingJobGraphGenerator {
return downStreamVertex.getInEdges().size() == 1 &&
isChainableInput(edge, streamGraph);
}
- private static boolean isChainableInput(StreamEdge edge, StreamGraph
streamGraph) {
+ public static boolean isChainableInput(StreamEdge edge, StreamGraph
streamGraph) {
StreamNode upStreamVertex = streamGraph.getSourceVertex(edge);
StreamNode downStreamVertex = streamGraph.getTargetVertex(edge);
@@ -1683,13 +1784,17 @@ public class StreamingJobGraphGenerator {
return streamGraph.getHeadOperatorForNodeFromCache(upStreamVertex);
}
- private void markSupportingConcurrentExecutionAttempts() {
+ public static void markSupportingConcurrentExecutionAttempts(
+ JobVertexBuildContext jobVertexBuildContext) {
+ final Map<Integer, JobVertex> jobVertices =
jobVertexBuildContext.getJobVerticesInOrder();
+ StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+
for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
final JobVertex jobVertex = entry.getValue();
final Set<Integer> vertexOperators = new HashSet<>();
vertexOperators.add(entry.getKey());
final Map<Integer, StreamConfig> vertexChainedConfigs =
- chainedConfigs.get(entry.getKey());
+
jobVertexBuildContext.getChainedConfigs().get(entry.getKey());
if (vertexChainedConfigs != null) {
vertexOperators.addAll(vertexChainedConfigs.keySet());
}
@@ -1708,15 +1813,19 @@ public class StreamingJobGraphGenerator {
}
}
- private void setSlotSharingAndCoLocation() {
- setSlotSharing();
- setCoLocation();
+ public static void setSlotSharingAndCoLocation(
+ JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) {
+ setSlotSharing(jobGraph, jobVertexBuildContext);
+ setCoLocation(jobVertexBuildContext);
}
- private void setSlotSharing() {
+ private static void setSlotSharing(
+ JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) {
+ StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
final Map<String, SlotSharingGroup> specifiedSlotSharingGroups = new
HashMap<>();
final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups
=
- buildVertexRegionSlotSharingGroups();
+ buildVertexRegionSlotSharingGroups(jobGraph,
jobVertexBuildContext);
+ final Map<Integer, JobVertex> jobVertices =
jobVertexBuildContext.getJobVerticesInOrder();
for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
@@ -1733,7 +1842,7 @@ public class StreamingJobGraphGenerator {
checkNotNull(vertexRegionSlotSharingGroups.get(vertex.getID()));
} else {
checkState(
- !hasHybridResultPartition,
+ !jobVertexBuildContext.hasHybridResultPartition(),
"hybrid shuffle mode currently does not support
setting non-default slot sharing group.");
effectiveSlotSharingGroup =
@@ -1752,10 +1861,11 @@ public class StreamingJobGraphGenerator {
}
}
- private void validateHybridShuffleExecuteInBatchMode() {
- if (hasHybridResultPartition) {
+ public static void validateHybridShuffleExecuteInBatchMode(
+ JobVertexBuildContext jobVertexBuildContext) {
+ if (jobVertexBuildContext.hasHybridResultPartition()) {
checkState(
- jobGraph.getJobType() == JobType.BATCH,
+ jobVertexBuildContext.getStreamGraph().getJobType() ==
JobType.BATCH,
"hybrid shuffle mode only supports batch job, please set
%s to %s",
ExecutionOptions.RUNTIME_MODE.key(),
RuntimeExecutionMode.BATCH.name());
@@ -1767,7 +1877,9 @@ public class StreamingJobGraphGenerator {
* StreamGraph#isAllVerticesInSameSlotSharingGroupByDefault()} returns
true, all regions will be
* in the same slot sharing group.
*/
- private Map<JobVertexID, SlotSharingGroup>
buildVertexRegionSlotSharingGroups() {
+ private static Map<JobVertexID, SlotSharingGroup>
buildVertexRegionSlotSharingGroups(
+ JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) {
+ StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups
= new HashMap<>();
final SlotSharingGroup defaultSlotSharingGroup = new
SlotSharingGroup();
streamGraph
@@ -1799,9 +1911,11 @@ public class StreamingJobGraphGenerator {
return vertexRegionSlotSharingGroups;
}
- private void setCoLocation() {
+ private static void setCoLocation(JobVertexBuildContext
jobVertexBuildContext) {
final Map<String, Tuple2<SlotSharingGroup, CoLocationGroupImpl>>
coLocationGroups =
new HashMap<>();
+ final Map<Integer, JobVertex> jobVertices =
jobVertexBuildContext.getJobVerticesInOrder();
+ final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) {
@@ -1833,14 +1947,9 @@ public class StreamingJobGraphGenerator {
}
}
- private static void setManagedMemoryFraction(
- final Map<Integer, JobVertex> jobVertices,
- final Map<Integer, StreamConfig> operatorConfigs,
- final Map<Integer, Map<Integer, StreamConfig>>
vertexChainedConfigs,
- final java.util.function.Function<Integer,
Map<ManagedMemoryUseCase, Integer>>
- operatorScopeManagedMemoryUseCaseWeightsRetriever,
- final java.util.function.Function<Integer,
Set<ManagedMemoryUseCase>>
- slotScopeManagedMemoryUseCasesRetriever) {
+ public static void setManagedMemoryFraction(JobVertexBuildContext
jobVertexBuildContext) {
+
+ final Map<Integer, JobVertex> jobVertices =
jobVertexBuildContext.getJobVerticesInOrder();
// all slot sharing groups in this job
final Set<SlotSharingGroup> slotSharingGroups =
@@ -1868,7 +1977,8 @@ public class StreamingJobGraphGenerator {
final Set<Integer> operatorIds = new HashSet<>();
operatorIds.add(headOperatorId);
operatorIds.addAll(
- vertexChainedConfigs
+ jobVertexBuildContext
+ .getChainedConfigs()
.getOrDefault(headOperatorId,
Collections.emptyMap())
.keySet());
vertexOperators.put(jobVertex.getID(), operatorIds);
@@ -1876,13 +1986,7 @@ public class StreamingJobGraphGenerator {
for (SlotSharingGroup slotSharingGroup : slotSharingGroups) {
setManagedMemoryFractionForSlotSharingGroup(
- slotSharingGroup,
- vertexHeadOperators,
- vertexOperators,
- operatorConfigs,
- vertexChainedConfigs,
- operatorScopeManagedMemoryUseCaseWeightsRetriever,
- slotScopeManagedMemoryUseCasesRetriever);
+ slotSharingGroup, vertexHeadOperators, vertexOperators,
jobVertexBuildContext);
}
}
@@ -1890,13 +1994,10 @@ public class StreamingJobGraphGenerator {
final SlotSharingGroup slotSharingGroup,
final Map<JobVertexID, Integer> vertexHeadOperators,
final Map<JobVertexID, Set<Integer>> vertexOperators,
- final Map<Integer, StreamConfig> operatorConfigs,
- final Map<Integer, Map<Integer, StreamConfig>>
vertexChainedConfigs,
- final java.util.function.Function<Integer,
Map<ManagedMemoryUseCase, Integer>>
- operatorScopeManagedMemoryUseCaseWeightsRetriever,
- final java.util.function.Function<Integer,
Set<ManagedMemoryUseCase>>
- slotScopeManagedMemoryUseCasesRetriever) {
-
+ final JobVertexBuildContext jobVertexBuildContext) {
+ final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph();
+ final Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs =
+ jobVertexBuildContext.getChainedConfigs();
final Set<Integer> groupOperatorIds =
slotSharingGroup.getJobVertexIds().stream()
.flatMap((vid) -> vertexOperators.get(vid).stream())
@@ -1906,7 +2007,8 @@ public class StreamingJobGraphGenerator {
groupOperatorIds.stream()
.flatMap(
(oid) ->
-
operatorScopeManagedMemoryUseCaseWeightsRetriever.apply(oid)
+ streamGraph.getStreamNode(oid)
+
.getManagedMemoryOperatorScopeUseCaseWeights()
.entrySet().stream())
.collect(
Collectors.groupingBy(
@@ -1917,16 +2019,22 @@ public class StreamingJobGraphGenerator {
groupOperatorIds.stream()
.flatMap(
(oid) ->
-
slotScopeManagedMemoryUseCasesRetriever.apply(oid).stream())
+ streamGraph.getStreamNode(oid)
+
.getManagedMemorySlotScopeUseCases().stream())
.collect(Collectors.toSet());
for (JobVertexID jobVertexID : slotSharingGroup.getJobVertexIds()) {
for (int operatorNodeId : vertexOperators.get(jobVertexID)) {
- final StreamConfig operatorConfig =
operatorConfigs.get(operatorNodeId);
+ final StreamConfig operatorConfig =
+
jobVertexBuildContext.getOperatorInfo(operatorNodeId).getVertexConfig();
final Map<ManagedMemoryUseCase, Integer>
operatorScopeUseCaseWeights =
-
operatorScopeManagedMemoryUseCaseWeightsRetriever.apply(operatorNodeId);
+ streamGraph
+ .getStreamNode(operatorNodeId)
+ .getManagedMemoryOperatorScopeUseCaseWeights();
final Set<ManagedMemoryUseCase> slotScopeUseCases =
-
slotScopeManagedMemoryUseCasesRetriever.apply(operatorNodeId);
+ streamGraph
+ .getStreamNode(operatorNodeId)
+ .getManagedMemorySlotScopeUseCases();
setManagedMemoryFractionForOperator(
operatorScopeUseCaseWeights,
slotScopeUseCases,
@@ -1937,7 +2045,8 @@ public class StreamingJobGraphGenerator {
// need to refresh the chained task configs because they are
serialized
final int headOperatorNodeId =
vertexHeadOperators.get(jobVertexID);
- final StreamConfig vertexConfig =
operatorConfigs.get(headOperatorNodeId);
+ final StreamConfig vertexConfig =
+
jobVertexBuildContext.getOperatorInfo(headOperatorNodeId).getVertexConfig();
vertexConfig.setTransitiveChainedTaskConfigs(
vertexChainedConfigs.get(headOperatorNodeId));
}
@@ -1970,7 +2079,7 @@ public class StreamingJobGraphGenerator {
}
}
- private void configureCheckpointing() {
+ public static void configureCheckpointing(StreamGraph streamGraph,
JobGraph jobGraph) {
CheckpointConfig cfg = streamGraph.getCheckpointConfig();
long interval = cfg.getCheckpointInterval();
@@ -2108,150 +2217,4 @@ public class StreamingJobGraphGenerator {
.getOperatorName())
.collect(Collectors.joining(", ")));
}
-
- /**
- * A private class to help maintain the information of an operator chain
during the recursive
- * call in {@link #createChain(Integer, int, OperatorChainInfo, Map)}.
- */
- private static class OperatorChainInfo {
- private final Integer startNodeId;
- private final Map<Integer, byte[]> hashes;
- private final List<Map<Integer, byte[]>> legacyHashes;
- private final Map<Integer, List<ChainedOperatorHashInfo>>
chainedOperatorHashes;
- private final Map<Integer, ChainedSourceInfo> chainedSources;
- private final List<OperatorCoordinator.Provider> coordinatorProviders;
- private final StreamGraph streamGraph;
- private final List<StreamNode> chainedNodes;
- private final List<StreamEdge> transitiveOutEdges;
-
- private OperatorChainInfo(
- int startNodeId,
- Map<Integer, byte[]> hashes,
- List<Map<Integer, byte[]>> legacyHashes,
- Map<Integer, ChainedSourceInfo> chainedSources,
- StreamGraph streamGraph) {
- this.startNodeId = startNodeId;
- this.hashes = hashes;
- this.legacyHashes = legacyHashes;
- this.chainedOperatorHashes = new HashMap<>();
- this.coordinatorProviders = new ArrayList<>();
- this.chainedSources = chainedSources;
- this.streamGraph = streamGraph;
- this.chainedNodes = new ArrayList<>();
- this.transitiveOutEdges = new ArrayList<>();
- }
-
- byte[] getHash(Integer streamNodeId) {
- return hashes.get(streamNodeId);
- }
-
- private Integer getStartNodeId() {
- return startNodeId;
- }
-
- private List<ChainedOperatorHashInfo> getChainedOperatorHashes(int
startNodeId) {
- return chainedOperatorHashes.get(startNodeId);
- }
-
- void addCoordinatorProvider(OperatorCoordinator.Provider coordinator) {
- coordinatorProviders.add(coordinator);
- }
-
- private List<OperatorCoordinator.Provider> getCoordinatorProviders() {
- return coordinatorProviders;
- }
-
- Map<Integer, ChainedSourceInfo> getChainedSources() {
- return chainedSources;
- }
-
- private OperatorID addNodeToChain(int currentNodeId, String
operatorName) {
- recordChainedNode(currentNodeId);
- StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
-
- List<ChainedOperatorHashInfo> operatorHashes =
- chainedOperatorHashes.computeIfAbsent(startNodeId, k ->
new ArrayList<>());
-
- byte[] primaryHashBytes = hashes.get(currentNodeId);
-
- for (Map<Integer, byte[]> legacyHash : legacyHashes) {
- operatorHashes.add(
- new ChainedOperatorHashInfo(
- primaryHashBytes,
legacyHash.get(currentNodeId), streamNode));
- }
-
- streamNode
- .getCoordinatorProvider(operatorName, new
OperatorID(getHash(currentNodeId)))
- .map(coordinatorProviders::add);
-
- return new OperatorID(primaryHashBytes);
- }
-
- private void setTransitiveOutEdges(final List<StreamEdge>
transitiveOutEdges) {
- this.transitiveOutEdges.addAll(transitiveOutEdges);
- }
-
- private List<StreamEdge> getTransitiveOutEdges() {
- return transitiveOutEdges;
- }
-
- private void recordChainedNode(int currentNodeId) {
- StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
- chainedNodes.add(streamNode);
- }
-
- private OperatorChainInfo newChain(Integer startNodeId) {
- return new OperatorChainInfo(
- startNodeId, hashes, legacyHashes, chainedSources,
streamGraph);
- }
-
- private List<StreamNode> getAllChainedNodes() {
- return chainedNodes;
- }
- }
-
- private static final class ChainedOperatorHashInfo {
- private final byte[] generatedOperatorID;
- private final byte[] userDefinedOperatorID;
- private final StreamNode streamNode;
-
- ChainedOperatorHashInfo(
- final byte[] generatedOperatorID,
- final byte[] userDefinedOperatorID,
- final StreamNode streamNode) {
- this.generatedOperatorID = generatedOperatorID;
- this.userDefinedOperatorID = userDefinedOperatorID;
- this.streamNode = streamNode;
- }
-
- public byte[] getGeneratedOperatorID() {
- return generatedOperatorID;
- }
-
- public byte[] getUserDefinedOperatorID() {
- return userDefinedOperatorID;
- }
-
- public StreamNode getStreamNode() {
- return streamNode;
- }
- }
-
- private static final class ChainedSourceInfo {
- private final StreamConfig operatorConfig;
- private final StreamConfig.SourceInputConfig inputConfig;
-
- ChainedSourceInfo(StreamConfig operatorConfig,
StreamConfig.SourceInputConfig inputConfig) {
- this.operatorConfig = operatorConfig;
- this.inputConfig = inputConfig;
- }
-
- public StreamConfig getOperatorConfig() {
- return operatorConfig;
- }
-
- public StreamConfig.SourceInputConfig getInputConfig() {
- return inputConfig;
- }
- }
}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedOperatorHashInfo.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedOperatorHashInfo.java
new file mode 100644
index 00000000000..57848680f6a
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedOperatorHashInfo.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import javax.annotation.Nullable;
+
+import static org.apache.flink.util.Preconditions.checkNotNull;
+
+/** Helper class to help maintain the hash info of an operator chain. */
+@Internal
+public final class ChainedOperatorHashInfo {
+ private final byte[] generatedOperatorId;
+ private final byte[] userDefinedOperatorId;
+ private final StreamNode streamNode;
+
+ ChainedOperatorHashInfo(
+ final byte[] generatedOperatorId,
+ @Nullable final byte[] userDefinedOperatorId,
+ final StreamNode streamNode) {
+ this.generatedOperatorId = checkNotNull(generatedOperatorId);
+ this.userDefinedOperatorId = userDefinedOperatorId;
+ this.streamNode = checkNotNull(streamNode);
+ }
+
+ public byte[] getGeneratedOperatorId() {
+ return generatedOperatorId;
+ }
+
+ @Nullable
+ public byte[] getUserDefinedOperatorId() {
+ return userDefinedOperatorId;
+ }
+
+ public StreamNode getStreamNode() {
+ return streamNode;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedSourceInfo.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedSourceInfo.java
new file mode 100644
index 00000000000..af3afb22fc0
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/ChainedSourceInfo.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+
+/** Helper class to help maintain the chained source info of an operator
chain. */
+@Internal
+public final class ChainedSourceInfo {
+ private final StreamConfig operatorConfig;
+ private final StreamConfig.SourceInputConfig inputConfig;
+
+ public ChainedSourceInfo(
+ StreamConfig operatorConfig, StreamConfig.SourceInputConfig
inputConfig) {
+ this.operatorConfig = operatorConfig;
+ this.inputConfig = inputConfig;
+ }
+
+ public StreamConfig getOperatorConfig() {
+ return operatorConfig;
+ }
+
+ public StreamConfig.SourceInputConfig getInputConfig() {
+ return inputConfig;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
new file mode 100644
index 00000000000..d6ef01424e3
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java
@@ -0,0 +1,192 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.runtime.jobgraph.JobVertex;
+import org.apache.flink.runtime.jobgraph.JobVertexID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.util.SerializedValue;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Helper class encapsulates all necessary information and configurations
required during the
+ * construction of job vertices.
+ */
+@Internal
+public class JobVertexBuildContext {
+
+ private final StreamGraph streamGraph;
+
+ /**
+ * The {@link OperatorChainInfo}s, key is the start node id of the chain.
It should be ordered,
+ * as the implementation of the incremental generator relies on its order
to create the
+ * JobVertex.
+ */
+ private final Map<Integer, OperatorChainInfo> chainInfosInOrder;
+
+ /** The {@link OperatorInfo}s, key is the id of the stream node. */
+ private final Map<Integer, OperatorInfo> operatorInfos;
+
+ // This map's key represents the starting node id of each chain. Note that
this includes not
+ // only the usual head node of the chain but also the ids of chain sources
which are used by
+ // multi-input.
+ private final Map<Integer, Map<Integer, StreamConfig>> chainedConfigs;
+
+ // The created JobVertex, the key is start node id. It records the order
in which the JobVertex
+ // is created, and some functions depend on it.
+ private final Map<Integer, JobVertex> jobVerticesInOrder;
+
+ // Futures for the serialization of operator coordinators.
+ private final Map<
+ JobVertexID,
+
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>>
+ coordinatorSerializationFuturesPerJobVertex;
+
+ // The order of StreamEdge connected to other vertices should be
consistent with the order in
+ // which JobEdge was created.
+ private final List<StreamEdge> physicalEdgesInOrder;
+
+ // We use AtomicBoolean to track the existence of HybridResultPartition
during the incremental
+ // JobGraph generation process introduced by AdaptiveGraphManager. It is
essential to globally
+ // monitor changes to this variable, thus necessitating the use of a
Boolean object instead of a
+ // primitive boolean.
+ private final AtomicBoolean hasHybridResultPartition;
+
+ private final Map<Integer, byte[]> hashes;
+
+ private final List<Map<Integer, byte[]>> legacyHashes;
+
+ public JobVertexBuildContext(
+ StreamGraph streamGraph,
+ AtomicBoolean hasHybridResultPartition,
+ Map<Integer, byte[]> hashes,
+ List<Map<Integer, byte[]>> legacyHashes) {
+ this.streamGraph = streamGraph;
+ this.hashes = hashes;
+ this.legacyHashes = legacyHashes;
+ this.chainInfosInOrder = new LinkedHashMap<>();
+ this.jobVerticesInOrder = new LinkedHashMap<>();
+ this.physicalEdgesInOrder = new ArrayList<>();
+ this.hasHybridResultPartition = hasHybridResultPartition;
+ this.coordinatorSerializationFuturesPerJobVertex = new HashMap<>();
+ this.chainedConfigs = new HashMap<>();
+ this.operatorInfos = new HashMap<>();
+ }
+
+ public void addChainInfo(Integer startNodeId, OperatorChainInfo chainInfo)
{
+ chainInfosInOrder.put(startNodeId, chainInfo);
+ }
+
+ public OperatorChainInfo getChainInfo(Integer startNodeId) {
+ return chainInfosInOrder.get(startNodeId);
+ }
+
+ public Map<Integer, OperatorChainInfo> getChainInfosInOrder() {
+ return chainInfosInOrder;
+ }
+
+ public OperatorInfo getOperatorInfo(Integer nodeId) {
+ return operatorInfos.get(nodeId);
+ }
+
+ public OperatorInfo createAndGetOperatorInfo(Integer nodeId) {
+ OperatorInfo operatorInfo = new OperatorInfo();
+ operatorInfos.put(nodeId, operatorInfo);
+ return operatorInfo;
+ }
+
+ public Map<Integer, OperatorInfo> getOperatorInfos() {
+ return operatorInfos;
+ }
+
+ public StreamGraph getStreamGraph() {
+ return streamGraph;
+ }
+
+ public boolean hasHybridResultPartition() {
+ return hasHybridResultPartition.get();
+ }
+
+ public void setHasHybridResultPartition(boolean hasHybridResultPartition) {
+ this.hasHybridResultPartition.set(hasHybridResultPartition);
+ }
+
+ public void addPhysicalEdgesInOrder(StreamEdge edge) {
+ physicalEdgesInOrder.add(edge);
+ }
+
+ public List<StreamEdge> getPhysicalEdgesInOrder() {
+ return physicalEdgesInOrder;
+ }
+
+ public void addJobVertex(Integer startNodeId, JobVertex jobVertex) {
+ jobVerticesInOrder.put(startNodeId, jobVertex);
+ }
+
+ public Map<Integer, JobVertex> getJobVerticesInOrder() {
+ return jobVerticesInOrder;
+ }
+
+ public JobVertex getJobVertex(Integer startNodeId) {
+ return jobVerticesInOrder.get(startNodeId);
+ }
+
+ public void putCoordinatorSerializationFutures(
+ JobVertexID vertexId,
+
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>
+ serializationFutures) {
+ coordinatorSerializationFuturesPerJobVertex.put(vertexId,
serializationFutures);
+ }
+
+ public Map<JobVertexID,
List<CompletableFuture<SerializedValue<OperatorCoordinator.Provider>>>>
+ getCoordinatorSerializationFuturesPerJobVertex() {
+ return coordinatorSerializationFuturesPerJobVertex;
+ }
+
+ public Map<Integer, Map<Integer, StreamConfig>> getChainedConfigs() {
+ return chainedConfigs;
+ }
+
+ public Map<Integer, StreamConfig> getOrCreateChainedConfig(Integer
streamNodeId) {
+ return chainedConfigs.computeIfAbsent(streamNodeId, key -> new
HashMap<>());
+ }
+
+ public byte[] getHash(Integer streamNodeId) {
+ return hashes.get(streamNodeId);
+ }
+
+ public List<byte[]> getLegacyHashes(Integer streamNodeId) {
+ List<byte[]> hashes = new ArrayList<>();
+ for (Map<Integer, byte[]> legacyHash : legacyHashes) {
+ hashes.add(legacyHash.get(streamNodeId));
+ }
+ return hashes;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
new file mode 100644
index 00000000000..8e776b746a1
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java
@@ -0,0 +1,183 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.operators.ResourceSpec;
+import org.apache.flink.runtime.jobgraph.InputOutputFormatContainer;
+import org.apache.flink.runtime.jobgraph.OperatorID;
+import org.apache.flink.runtime.operators.coordination.OperatorCoordinator;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+import org.apache.flink.streaming.api.graph.StreamGraph;
+import org.apache.flink.streaming.api.graph.StreamNode;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/** Helper class to help maintain the information of an operator chain. */
+@Internal
+public class OperatorChainInfo {
+ private final Integer startNodeId;
+ private final Map<Integer, List<ChainedOperatorHashInfo>>
chainedOperatorHashes;
+ private final Map<Integer, ChainedSourceInfo> chainedSources;
+ private final Map<Integer, ResourceSpec> chainedMinResources;
+ private final Map<Integer, ResourceSpec> chainedPreferredResources;
+ private final Map<Integer, String> chainedNames;
+ private final List<OperatorCoordinator.Provider> coordinatorProviders;
+ private final StreamGraph streamGraph;
+ private final List<StreamNode> chainedNodes;
+ private final List<StreamEdge> transitiveOutEdges;
+ private final List<StreamEdge> transitiveInEdges;
+
+ private InputOutputFormatContainer inputOutputFormatContainer = null;
+
+ public OperatorChainInfo(
+ int startNodeId,
+ Map<Integer, ChainedSourceInfo> chainedSources,
+ StreamGraph streamGraph) {
+ this.startNodeId = startNodeId;
+ this.chainedOperatorHashes = new HashMap<>();
+ this.coordinatorProviders = new ArrayList<>();
+ this.chainedSources = chainedSources;
+ this.chainedMinResources = new HashMap<>();
+ this.chainedPreferredResources = new HashMap<>();
+ this.chainedNames = new HashMap<>();
+ this.streamGraph = streamGraph;
+ this.chainedNodes = new ArrayList<>();
+ this.transitiveOutEdges = new ArrayList<>();
+ this.transitiveInEdges = new ArrayList<>();
+ }
+
+ public Integer getStartNodeId() {
+ return startNodeId;
+ }
+
+ public List<ChainedOperatorHashInfo> getChainedOperatorHashes(int
startNodeId) {
+ return chainedOperatorHashes.get(startNodeId);
+ }
+
+ public void addCoordinatorProvider(OperatorCoordinator.Provider
coordinator) {
+ coordinatorProviders.add(coordinator);
+ }
+
+ public List<OperatorCoordinator.Provider> getCoordinatorProviders() {
+ return coordinatorProviders;
+ }
+
+ public Map<Integer, ChainedSourceInfo> getChainedSources() {
+ return chainedSources;
+ }
+
+ public OperatorID addNodeToChain(
+ int currentNodeId, String operatorName, JobVertexBuildContext
jobVertexBuildContext) {
+ recordChainedNode(currentNodeId);
+ StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
+
+ List<ChainedOperatorHashInfo> operatorHashes =
+ chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new
ArrayList<>());
+
+ byte[] primaryHashBytes = jobVertexBuildContext.getHash(currentNodeId);
+
+ for (byte[] legacyHash :
jobVertexBuildContext.getLegacyHashes(currentNodeId)) {
+ operatorHashes.add(
+ new ChainedOperatorHashInfo(primaryHashBytes, legacyHash,
streamNode));
+ }
+
+ streamNode
+ .getCoordinatorProvider(operatorName, new
OperatorID(primaryHashBytes))
+ .map(coordinatorProviders::add);
+
+ return new OperatorID(primaryHashBytes);
+ }
+
+ public void setTransitiveOutEdges(final List<StreamEdge>
transitiveOutEdges) {
+ this.transitiveOutEdges.addAll(transitiveOutEdges);
+ }
+
+ public List<StreamEdge> getTransitiveOutEdges() {
+ return transitiveOutEdges;
+ }
+
+ public void recordChainedNode(int currentNodeId) {
+ StreamNode streamNode = streamGraph.getStreamNode(currentNodeId);
+ chainedNodes.add(streamNode);
+ }
+
+ public OperatorChainInfo newChain(Integer startNodeId) {
+ return new OperatorChainInfo(startNodeId, chainedSources, streamGraph);
+ }
+
+ public List<StreamNode> getAllChainedNodes() {
+ return chainedNodes;
+ }
+
+ public boolean hasFormatContainer() {
+ return inputOutputFormatContainer != null;
+ }
+
+ public InputOutputFormatContainer getOrCreateFormatContainer() {
+ if (inputOutputFormatContainer == null) {
+ inputOutputFormatContainer =
+ new
InputOutputFormatContainer(Thread.currentThread().getContextClassLoader());
+ }
+ return inputOutputFormatContainer;
+ }
+
+ public void addChainedSource(Integer sourceNodeId, ChainedSourceInfo
chainedSourceInfo) {
+ chainedSources.put(sourceNodeId, chainedSourceInfo);
+ }
+
+ public void addChainedMinResources(Integer sourceNodeId, ResourceSpec
resourceSpec) {
+ chainedMinResources.put(sourceNodeId, resourceSpec);
+ }
+
+ public ResourceSpec getChainedMinResources(Integer sourceNodeId) {
+ return chainedMinResources.get(sourceNodeId);
+ }
+
+ public void addChainedPreferredResources(Integer sourceNodeId,
ResourceSpec resourceSpec) {
+ chainedPreferredResources.put(sourceNodeId, resourceSpec);
+ }
+
+ public ResourceSpec getChainedPreferredResources(Integer sourceNodeId) {
+ return chainedPreferredResources.get(sourceNodeId);
+ }
+
+ public String getChainedName(Integer streamNodeId) {
+ return chainedNames.get(streamNodeId);
+ }
+
+ public Map<Integer, String> getChainedNames() {
+ return chainedNames;
+ }
+
+ public void addChainedName(Integer streamNodeId, String chainedName) {
+ this.chainedNames.put(streamNodeId, chainedName);
+ }
+
+ public void addTransitiveInEdge(StreamEdge streamEdge) {
+ transitiveInEdges.add(streamEdge);
+ }
+
+ public List<StreamEdge> getTransitiveInEdges() {
+ return transitiveInEdges;
+ }
+}
diff --git
a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
new file mode 100644
index 00000000000..f79ca90a3cf
--- /dev/null
+++
b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.api.graph.util;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.graph.StreamConfig;
+import org.apache.flink.streaming.api.graph.StreamEdge;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/** Helper class to help maintain the information of an operator. */
+@Internal
+public class OperatorInfo {
+
+ private StreamConfig vertexConfig;
+
+ // This is used to cache the chainable outputs, to set the chainable
outputs config after
+ // all job vertices are created.
+ private final List<StreamEdge> chainableOutputs;
+
+ // This is used to cache the non-chainable outputs, to set the
non-chainable outputs config
+ // after all job vertices are created.
+ private final List<StreamEdge> nonChainableOutputs;
+
+ public OperatorInfo() {
+ this.chainableOutputs = new ArrayList<>();
+ this.nonChainableOutputs = new ArrayList<>();
+ }
+
+ public List<StreamEdge> getChainableOutputs() {
+ return chainableOutputs;
+ }
+
+ public void addChainableOutputs(List<StreamEdge> chainableOutputs) {
+ this.chainableOutputs.addAll(chainableOutputs);
+ }
+
+ public List<StreamEdge> getNonChainableOutputs() {
+ return nonChainableOutputs;
+ }
+
+ public void addNonChainableOutputs(List<StreamEdge> nonChainableOutEdges) {
+ this.nonChainableOutputs.addAll(nonChainableOutEdges);
+ }
+
+ public StreamConfig getVertexConfig() {
+ return vertexConfig;
+ }
+
+ public void setVertexConfig(StreamConfig vertexConfig) {
+ this.vertexConfig = vertexConfig;
+ }
+}