This is an automated email from the ASF dual-hosted git repository. zhuzh pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5ff8d5de62e81cdeecca91f1617f0e05ecc8c6bc Author: noorall <[email protected]> AuthorDate: Wed Nov 6 13:05:57 2024 +0800 [FLINK-36066][runtime] Further Improve Method Reusability in StreamingJobGraphGenerator --- .../apache/flink/runtime/jobgraph/JobGraph.java | 3 +- .../api/graph/StreamingJobGraphGenerator.java | 456 +++++++++++++-------- .../api/graph/util/JobVertexBuildContext.java | 38 +- .../api/graph/util/OperatorChainInfo.java | 43 +- .../streaming/api/graph/util/OperatorInfo.java | 23 +- .../api/graph/StreamingJobGraphGeneratorTest.java | 16 +- 6 files changed, 349 insertions(+), 230 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java index 4d57bcd1d4f..9b862553252 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobgraph/JobGraph.java @@ -310,7 +310,8 @@ public class JobGraph implements ExecutionPlan { setSerializedExecutionConfig(new SerializedValue<>(executionConfig)); } - void setSerializedExecutionConfig(SerializedValue<ExecutionConfig> serializedExecutionConfig) { + public void setSerializedExecutionConfig( + SerializedValue<ExecutionConfig> serializedExecutionConfig) { this.serializedExecutionConfig = checkNotNull( serializedExecutionConfig, diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java index 29b0f1bd219..6476cfdfb52 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java @@ -112,7 +112,9 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; +import java.util.function.Consumer; import java.util.stream.Collectors; +import java.util.stream.Stream; import static org.apache.flink.util.Preconditions.checkNotNull; import static org.apache.flink.util.Preconditions.checkState; @@ -162,7 +164,6 @@ public class StreamingJobGraphGenerator { private final StreamGraph streamGraph; private final JobGraph jobGraph; - private final Collection<Integer> builtVertices; private final StreamGraphHasher defaultStreamGraphHasher; private final List<StreamGraphHasher> legacyStreamGraphHashers; @@ -181,10 +182,8 @@ public class StreamingJobGraphGenerator { this.streamGraph = streamGraph; this.defaultStreamGraphHasher = new StreamGraphHasherV2(); this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphUserHashHasher()); - - this.builtVertices = new HashSet<>(); this.serializationExecutor = Preconditions.checkNotNull(serializationExecutor); - jobGraph = new JobGraph(jobID, streamGraph.getJobName()); + jobGraph = createAndInitializeJobGraph(streamGraph, jobID); // Generate deterministic hashes for the nodes in order to identify them across // submission iff they didn't change. @@ -200,16 +199,16 @@ public class StreamingJobGraphGenerator { this.jobVertexBuildContext = new JobVertexBuildContext( - streamGraph, new AtomicBoolean(false), hashes, legacyHashes); + jobGraph, + streamGraph, + new AtomicBoolean(false), + hashes, + legacyHashes, + new SlotSharingGroup()); } private JobGraph createJobGraph() { preValidate(streamGraph, userClassloader); - jobGraph.setJobType(streamGraph.getJobType()); - jobGraph.setDynamic(streamGraph.isDynamic()); - - jobGraph.enableApproximateLocalRecovery( - streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled()); setChaining(); @@ -231,69 +230,49 @@ public class StreamingJobGraphGenerator { validateHybridShuffleExecuteInBatchMode(jobVertexBuildContext); - setSlotSharingAndCoLocation(jobGraph, jobVertexBuildContext); + setSlotSharingAndCoLocation(jobVertexBuildContext); setManagedMemoryFraction(jobVertexBuildContext); - configureCheckpointing(streamGraph, jobGraph); - - jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings()); - - for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : - streamGraph.getUserArtifacts().entrySet()) { - jobGraph.addUserArtifact(entry.getKey(), entry.getValue()); - } - - streamGraph.getUserJarBlobKeys().forEach(jobGraph::addUserJarBlobKey); - jobGraph.setClasspaths(streamGraph.getClasspath()); - - // set the ExecutionConfig last when it has been finalized - try { - jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); - } catch (IOException e) { - throw new IllegalConfigurationException( - "Could not serialize the ExecutionConfig." - + "This indicates that non-serializable types (like custom serializers) were registered"); - } - jobGraph.setJobConfiguration(streamGraph.getJobConfiguration()); - - addVertexIndexPrefixInVertexName(jobVertexBuildContext, new AtomicInteger(0), jobGraph); + addVertexIndexPrefixInVertexName(jobVertexBuildContext, new AtomicInteger(0)); setVertexDescription(jobVertexBuildContext); // Wait for the serialization of operator coordinators and stream config. - serializeOperatorCoordinatorsAndStreamConfig( - jobGraph, serializationExecutor, jobVertexBuildContext); - - if (!streamGraph.getJobStatusHooks().isEmpty()) { - jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks()); - } + serializeOperatorCoordinatorsAndStreamConfig(serializationExecutor, jobVertexBuildContext); return jobGraph; } public static void serializeOperatorCoordinatorsAndStreamConfig( - JobGraph jobGraph, - Executor serializationExecutor, - JobVertexBuildContext jobVertexBuildContext) { + Executor serializationExecutor, JobVertexBuildContext jobVertexBuildContext) { try { FutureUtils.combineAll( - jobVertexBuildContext.getOperatorInfos().values().stream() - .map( - operatorInfo -> - operatorInfo - .getVertexConfig() - .triggerSerializationAndReturnFuture( - serializationExecutor)) + jobVertexBuildContext.getChainInfosInOrder().values().stream() + .flatMap( + chainInfo -> + serializationOperatorConfigs( + chainInfo, serializationExecutor)) .collect(Collectors.toList())) .get(); - waitForSerializationFuturesAndUpdateJobVertices(jobGraph, jobVertexBuildContext); + waitForSerializationFuturesAndUpdateJobVertices(jobVertexBuildContext); } catch (Exception e) { throw new FlinkRuntimeException("Error in serialization.", e); } } + private static Stream<CompletableFuture<StreamConfig>> serializationOperatorConfigs( + OperatorChainInfo chainInfo, Executor serializationExecutor) { + return chainInfo.getOperatorInfos().values().stream() + .map( + operatorInfo -> + operatorInfo + .getVertexConfig() + .triggerSerializationAndReturnFuture( + serializationExecutor)); + } + /** * Creates an instance of {@link OperatorID} based on the provided operator unique identifier * (UID). @@ -306,7 +285,7 @@ public class StreamingJobGraphGenerator { } private static void waitForSerializationFuturesAndUpdateJobVertices( - JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) + JobVertexBuildContext jobVertexBuildContext) throws ExecutionException, InterruptedException { for (Map.Entry< JobVertexID, @@ -316,7 +295,8 @@ public class StreamingJobGraphGenerator { .getCoordinatorSerializationFuturesPerJobVertex() .entrySet()) { final JobVertexID jobVertexId = futuresPerJobVertex.getKey(); - final JobVertex jobVertex = jobGraph.findVertexByID(jobVertexId); + final JobVertex jobVertex = + jobVertexBuildContext.getJobGraph().findVertexByID(jobVertexId); Preconditions.checkState( jobVertex != null, @@ -329,9 +309,7 @@ public class StreamingJobGraphGenerator { } public static void addVertexIndexPrefixInVertexName( - JobVertexBuildContext jobVertexBuildContext, - AtomicInteger vertexIndexId, - JobGraph jobGraph) { + JobVertexBuildContext jobVertexBuildContext, AtomicInteger vertexIndexId) { if (!jobVertexBuildContext.getStreamGraph().isVertexNameIncludeIndexPrefix()) { return; } @@ -342,7 +320,9 @@ public class StreamingJobGraphGenerator { // JobVertexBuildContext only contains incrementally generated jobVertex instances. The // setName method needs to set the names based on the topological order of all jobVertices, // so it relies on the job graph to compute a difference set. - jobGraph.getVerticesSortedTopologicallyFromSources() + jobVertexBuildContext + .getJobGraph() + .getVerticesSortedTopologicallyFromSources() .forEach( vertex -> { if (jobVertexIds.contains(vertex.getID())) { @@ -576,6 +556,7 @@ public class StreamingJobGraphGenerator { List<StreamEdge> edgeList = inEdges.getValue(); jobVertexBuildContext + .getChainInfo(vertex) .getOperatorInfo(vertex) .getVertexConfig() .setInPhysicalEdges(edgeList); @@ -583,69 +564,15 @@ public class StreamingJobGraphGenerator { } private Map<Integer, OperatorChainInfo> buildChainedInputsAndGetHeadInputs() { - - final Map<Integer, ChainedSourceInfo> chainedSources = new HashMap<>(); final Map<Integer, OperatorChainInfo> chainEntryPoints = new HashMap<>(); for (Integer sourceNodeId : streamGraph.getSourceIDs()) { final StreamNode sourceNode = streamGraph.getStreamNode(sourceNodeId); - - if (sourceNode.getOperatorFactory() != null - && sourceNode.getOperatorFactory() instanceof SourceOperatorFactory - && sourceNode.getOutEdges().size() == 1) { - // as long as only NAry ops support this chaining, we need to skip the other parts - final StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0); - final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId()); - final ChainingStrategy targetChainingStrategy = - Preconditions.checkNotNull(target.getOperatorFactory()) - .getChainingStrategy(); - - if (targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES - && isChainableInput(sourceOutEdge, streamGraph)) { - final OperatorInfo operatorInfo = - jobVertexBuildContext.createAndGetOperatorInfo(sourceNodeId); - final OperatorID opId = - new OperatorID(jobVertexBuildContext.getHash(sourceNodeId)); - final StreamConfig.SourceInputConfig inputConfig = - new StreamConfig.SourceInputConfig(sourceOutEdge); - final StreamConfig operatorConfig = new StreamConfig(new Configuration()); - setOperatorConfig( - sourceNodeId, - operatorConfig, - Collections.emptyMap(), - jobVertexBuildContext); - setOperatorChainedOutputsConfig( - operatorConfig, Collections.emptyList(), jobVertexBuildContext); - // we cache the non-chainable outputs here, and set the non-chained config later - operatorInfo.addNonChainableOutputs(Collections.emptyList()); - - operatorConfig.setChainIndex(0); // sources are always first - operatorConfig.setOperatorID(opId); - operatorConfig.setOperatorName(sourceNode.getOperatorName()); - chainedSources.put( - sourceNodeId, new ChainedSourceInfo(operatorConfig, inputConfig)); - - final SourceOperatorFactory<?> sourceOpFact = - (SourceOperatorFactory<?>) sourceNode.getOperatorFactory(); - final OperatorCoordinator.Provider coord = - sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId); - - final OperatorChainInfo chainInfo = - chainEntryPoints.computeIfAbsent( - sourceOutEdge.getTargetId(), - (k) -> - new OperatorChainInfo( - sourceOutEdge.getTargetId(), - chainedSources, - streamGraph)); - chainInfo.addCoordinatorProvider(coord); - chainInfo.recordChainedNode(sourceNodeId); - continue; - } + if (isChainableSource(sourceNode, streamGraph)) { + createSourceChainInfo(sourceNode, chainEntryPoints, jobVertexBuildContext); + } else { + chainEntryPoints.put(sourceNodeId, new OperatorChainInfo(sourceNodeId)); } - - chainEntryPoints.put( - sourceNodeId, new OperatorChainInfo(sourceNodeId, chainedSources, streamGraph)); } return chainEntryPoints; @@ -673,18 +600,33 @@ public class StreamingJobGraphGenerator { info.getStartNodeId(), 1, // operators start at position 1 because 0 is for chained source inputs info, - chainEntryPoints); + chainEntryPoints, + true, + serializationExecutor, + jobVertexBuildContext, + null); } } - private List<StreamEdge> createChain( + public static List<StreamEdge> createChain( final Integer currentNodeId, final int chainIndex, final OperatorChainInfo chainInfo, - final Map<Integer, OperatorChainInfo> chainEntryPoints) { + final Map<Integer, OperatorChainInfo> chainEntryPoints, + final boolean canCreateNewChain, + final Executor serializationExecutor, + final JobVertexBuildContext jobVertexBuildContext, + final @Nullable Consumer<Integer> visitedStreamNodeConsumer) { Integer startNodeId = chainInfo.getStartNodeId(); - if (!builtVertices.contains(startNodeId)) { + if (!jobVertexBuildContext.getJobVerticesInOrder().containsKey(startNodeId)) { + StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph(); + + // Adaptive graph generator needs to subscribe the visited stream node id to + // generate hashes for it. + if (visitedStreamNodeConsumer != null) { + visitedStreamNodeConsumer.accept(currentNodeId); + } List<StreamEdge> transitiveOutEdges = new ArrayList<StreamEdge>(); @@ -721,7 +663,11 @@ public class StreamingJobGraphGenerator { chainable.getTargetId(), chainIndex + 1, chainInfo, - chainEntryPoints)); + chainEntryPoints, + canCreateNewChain, + serializationExecutor, + jobVertexBuildContext, + visitedStreamNodeConsumer)); // Mark upstream nodes in the same chain as outputBlocking if (targetNodeAttribute != null && targetNodeAttribute.isNoOutputUntilEndOfInput()) { @@ -731,13 +677,24 @@ public class StreamingJobGraphGenerator { for (StreamEdge nonChainable : nonChainableOutputs) { transitiveOutEdges.add(nonChainable); - createChain( - nonChainable.getTargetId(), - 1, // operators start at position 1 because 0 is for chained source inputs - chainEntryPoints.computeIfAbsent( - nonChainable.getTargetId(), - (k) -> chainInfo.newChain(nonChainable.getTargetId())), - chainEntryPoints); + // Used to control whether a new chain can be created, this value is true in the + // full graph generation algorithm and false in the progressive generation + // algorithm. In the future, this variable can be a boolean type function to adapt + // to more adaptive scenarios. + if (canCreateNewChain) { + createChain( + nonChainable.getTargetId(), + 1, // operators start at position 1 because 0 is for chained source + // inputs + chainEntryPoints.computeIfAbsent( + nonChainable.getTargetId(), + (k) -> chainInfo.newChain(nonChainable.getTargetId())), + chainEntryPoints, + canCreateNewChain, + serializationExecutor, + jobVertexBuildContext, + visitedStreamNodeConsumer); + } } chainInfo.addChainedName( @@ -777,18 +734,25 @@ public class StreamingJobGraphGenerator { .addOutputFormat(currentOperatorId, currentNode.getOutputFormat()); } OperatorInfo operatorInfo = - jobVertexBuildContext.createAndGetOperatorInfo(currentNodeId); + chainInfo.createAndGetOperatorInfo(currentNodeId, currentOperatorId); - StreamConfig config = - currentNodeId.equals(startNodeId) - ? createJobVertex(startNodeId, chainInfo) - : new StreamConfig(new Configuration()); + StreamConfig config; + if (currentNodeId.equals(startNodeId)) { + JobVertex jobVertex = jobVertexBuildContext.getJobVertex(startNodeId); + if (jobVertex == null) { + jobVertex = + createJobVertex( + chainInfo, serializationExecutor, jobVertexBuildContext); + } + config = new StreamConfig(jobVertex.getConfiguration()); + } else { + config = new StreamConfig(new Configuration()); + } tryConvertPartitionerForDynamicGraph( chainableOutputs, nonChainableOutputs, jobVertexBuildContext); config.setAttribute(currentNodeAttribute); - setOperatorConfig( - currentNodeId, config, chainInfo.getChainedSources(), jobVertexBuildContext); + setOperatorConfig(currentNodeId, config, chainInfo, jobVertexBuildContext); setOperatorChainedOutputsConfig(config, chainableOutputs, jobVertexBuildContext); // we cache the non-chainable outputs here, and set the non-chained config later @@ -923,6 +887,83 @@ public class StreamingJobGraphGenerator { }); } + public static JobGraph createAndInitializeJobGraph( + StreamGraph streamGraph, @Nullable JobID jobId) { + JobGraph jobGraph = new JobGraph(jobId, streamGraph.getJobName()); + jobGraph.setJobType(streamGraph.getJobType()); + jobGraph.setDynamic(streamGraph.isDynamic()); + + jobGraph.enableApproximateLocalRecovery( + streamGraph.getCheckpointConfig().isApproximateLocalRecoveryEnabled()); + + jobGraph.setSnapshotSettings(streamGraph.getCheckpointingSettings()); + jobGraph.setSavepointRestoreSettings(streamGraph.getSavepointRestoreSettings()); + for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : + streamGraph.getUserArtifacts().entrySet()) { + jobGraph.addUserArtifact(entry.getKey(), entry.getValue()); + } + + streamGraph.getUserJarBlobKeys().forEach(jobGraph::addUserJarBlobKey); + jobGraph.setClasspaths(streamGraph.getClasspath()); + + ExecutionConfig executionConfig = streamGraph.getExecutionConfig(); + if (executionConfig == null) { + jobGraph.setSerializedExecutionConfig(streamGraph.getSerializedExecutionConfig()); + } else { + try { + jobGraph.setExecutionConfig(streamGraph.getExecutionConfig()); + } catch (IOException e) { + throw new IllegalConfigurationException( + "Could not serialize the ExecutionConfig." + + "This indicates that non-serializable types (like custom serializers) were registered"); + } + } + + jobGraph.setJobConfiguration(streamGraph.getJobConfiguration()); + + if (!streamGraph.getJobStatusHooks().isEmpty()) { + jobGraph.setJobStatusHooks(streamGraph.getJobStatusHooks()); + } + + return jobGraph; + } + + public static void createSourceChainInfo( + StreamNode sourceNode, + Map<Integer, OperatorChainInfo> chainEntryPoints, + JobVertexBuildContext jobVertexBuildContext) { + Integer sourceNodeId = sourceNode.getId(); + StreamEdge sourceOutEdge = sourceNode.getOutEdges().get(0); + + final OperatorChainInfo chainInfo = + chainEntryPoints.computeIfAbsent( + sourceOutEdge.getTargetId(), + (k) -> new OperatorChainInfo(sourceOutEdge.getTargetId())); + final OperatorID opId = new OperatorID(jobVertexBuildContext.getHash(sourceNodeId)); + final OperatorInfo operatorInfo = chainInfo.createAndGetOperatorInfo(sourceNodeId, opId); + final StreamConfig.SourceInputConfig inputConfig = + new StreamConfig.SourceInputConfig(sourceOutEdge); + final StreamConfig operatorConfig = new StreamConfig(new Configuration()); + setOperatorConfig(sourceNodeId, operatorConfig, chainInfo, jobVertexBuildContext); + setOperatorChainedOutputsConfig( + operatorConfig, Collections.emptyList(), jobVertexBuildContext); + // we cache the non-chainable outputs here, and set the non-chained config later + operatorInfo.addNonChainableOutputs(Collections.emptyList()); + + operatorConfig.setChainIndex(0); // sources are always first + operatorConfig.setOperatorID(opId); + operatorConfig.setOperatorName(sourceNode.getOperatorName()); + + final SourceOperatorFactory<?> sourceOpFact = + (SourceOperatorFactory<?>) + Preconditions.checkNotNull(sourceNode.getOperatorFactory()); + final OperatorCoordinator.Provider coord = + sourceOpFact.getCoordinatorProvider(sourceNode.getOperatorName(), opId); + + chainInfo.addChainedSource(sourceNode, new ChainedSourceInfo(operatorConfig, inputConfig)); + chainInfo.addCoordinatorProvider(coord); + } + private static void checkAndReplaceReusableHybridPartitionType( NonChainedOutput reusableOutput) { if (reusableOutput.getPartitionType() == ResultPartitionType.HYBRID_SELECTIVE) { @@ -1009,10 +1050,15 @@ public class StreamingJobGraphGenerator { return preferredResources; } - private StreamConfig createJobVertex(Integer streamNodeId, OperatorChainInfo chainInfo) { + public static JobVertex createJobVertex( + OperatorChainInfo chainInfo, + Executor serializationExecutor, + JobVertexBuildContext jobVertexBuildContext) { + + Integer streamNodeId = chainInfo.getStartNodeId(); JobVertex jobVertex; - StreamNode streamNode = streamGraph.getStreamNode(streamNodeId); + StreamNode streamNode = jobVertexBuildContext.getStreamGraph().getStreamNode(streamNodeId); byte[] hash = jobVertexBuildContext.getHash(streamNodeId); @@ -1104,23 +1150,23 @@ public class StreamingJobGraphGenerator { } jobVertexBuildContext.addJobVertex(streamNodeId, jobVertex); - builtVertices.add(streamNodeId); - jobGraph.addVertex(jobVertex); + jobVertexBuildContext.getJobGraph().addVertex(jobVertex); jobVertex.setParallelismConfigured( chainInfo.getAllChainedNodes().stream() .anyMatch(StreamNode::isParallelismConfigured)); - return new StreamConfig(jobVertex.getConfiguration()); + return jobVertex; } public static void setOperatorConfig( Integer vertexId, StreamConfig config, - Map<Integer, ChainedSourceInfo> chainedSources, + OperatorChainInfo chainInfo, JobVertexBuildContext jobVertexBuildContext) { StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph(); - OperatorInfo operatorInfo = jobVertexBuildContext.getOperatorInfo(vertexId); + OperatorInfo operatorInfo = chainInfo.getOperatorInfo(vertexId); + Map<Integer, ChainedSourceInfo> chainedSources = chainInfo.getChainedSources(); StreamNode vertex = streamGraph.getStreamNode(vertexId); config.setVertexID(vertexId); @@ -1301,20 +1347,22 @@ public class StreamingJobGraphGenerator { final Map<Integer, Map<StreamEdge, NonChainedOutput>> opIntermediateOutputs, JobVertexBuildContext jobVertexBuildContext) { // set non chainable output config - jobVertexBuildContext - .getOperatorInfos() - .forEach( - (vertexId, operatorInfo) -> { - Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge = - opIntermediateOutputs.computeIfAbsent( - vertexId, ignored -> new HashMap<>()); - setOperatorNonChainedOutputsConfig( - vertexId, - operatorInfo.getVertexConfig(), - operatorInfo.getNonChainableOutputs(), - outputsConsumedByEdge, - jobVertexBuildContext); - }); + for (OperatorChainInfo chainInfo : jobVertexBuildContext.getChainInfosInOrder().values()) { + chainInfo + .getOperatorInfos() + .forEach( + (vertexId, operatorInfo) -> { + Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge = + opIntermediateOutputs.computeIfAbsent( + vertexId, ignored -> new HashMap<>()); + setOperatorNonChainedOutputsConfig( + vertexId, + operatorInfo.getVertexConfig(), + operatorInfo.getNonChainableOutputs(), + outputsConsumedByEdge, + jobVertexBuildContext); + }); + } } private void setAllVertexNonChainedOutputsConfigs( @@ -1327,6 +1375,7 @@ public class StreamingJobGraphGenerator { setVertexNonChainedOutputsConfig( startNodeId, jobVertexBuildContext + .getChainInfo(startNodeId) .getOperatorInfo(startNodeId) .getVertexConfig(), jobVertexBuildContext @@ -1626,18 +1675,43 @@ public class StreamingJobGraphGenerator { } public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) { + return isChainable(edge, streamGraph, false); + } + + public static boolean isChainable( + StreamEdge edge, StreamGraph streamGraph, boolean allowChainWithDefaultParallelism) { StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); - return downStreamVertex.getInEdges().size() == 1 && isChainableInput(edge, streamGraph); + return downStreamVertex.getInEdges().size() == 1 + && isChainableInput(edge, streamGraph, allowChainWithDefaultParallelism); } - public static boolean isChainableInput(StreamEdge edge, StreamGraph streamGraph) { + public static boolean isChainableSource(StreamNode streamNode, StreamGraph streamGraph) { + if (streamNode.getOperatorFactory() == null + || !(streamNode.getOperatorFactory() instanceof SourceOperatorFactory) + || streamNode.getOutEdges().size() != 1) { + return false; + } + final StreamEdge sourceOutEdge = streamNode.getOutEdges().get(0); + final StreamNode target = streamGraph.getStreamNode(sourceOutEdge.getTargetId()); + final ChainingStrategy targetChainingStrategy = + Preconditions.checkNotNull(target.getOperatorFactory()).getChainingStrategy(); + return targetChainingStrategy == ChainingStrategy.HEAD_WITH_SOURCES + && isChainableInput(sourceOutEdge, streamGraph, false); + } + + private static boolean isChainableInput( + StreamEdge edge, StreamGraph streamGraph, boolean allowChainWithDefaultParallelism) { StreamNode upStreamVertex = streamGraph.getSourceVertex(edge); StreamNode downStreamVertex = streamGraph.getTargetVertex(edge); if (!(streamGraph.isChainingEnabled() && upStreamVertex.isSameSlotSharingGroup(downStreamVertex) - && areOperatorsChainable(upStreamVertex, downStreamVertex, streamGraph) + && areOperatorsChainable( + upStreamVertex, + downStreamVertex, + streamGraph, + allowChainWithDefaultParallelism) && arePartitionerAndExchangeModeChainable( edge.getPartitioner(), edge.getExchangeMode(), streamGraph.isDynamic()))) { @@ -1673,7 +1747,10 @@ public class StreamingJobGraphGenerator { @VisibleForTesting static boolean areOperatorsChainable( - StreamNode upStreamVertex, StreamNode downStreamVertex, StreamGraph streamGraph) { + StreamNode upStreamVertex, + StreamNode downStreamVertex, + StreamGraph streamGraph, + boolean allowChainWithDefaultParallelism) { StreamOperatorFactory<?> upStreamOperator = upStreamVertex.getOperatorFactory(); StreamOperatorFactory<?> downStreamOperator = downStreamVertex.getOperatorFactory(); if (downStreamOperator == null || upStreamOperator == null) { @@ -1723,8 +1800,18 @@ public class StreamingJobGraphGenerator { "Unknown chaining strategy: " + downStreamOperator.getChainingStrategy()); } - // Only vertices with the same parallelism can be chained. - isChainable &= upStreamVertex.getParallelism() == downStreamVertex.getParallelism(); + // When allowChainWithDefaultParallelism is true, any vertex with default parallelism can + // be chained, otherwise only vertices with the same parallelism can be chained. + if (allowChainWithDefaultParallelism) { + isChainable &= + (upStreamVertex.getParallelism() == downStreamVertex.getParallelism() + || upStreamVertex.getParallelism() + == ExecutionConfig.PARALLELISM_DEFAULT + || downStreamVertex.getParallelism() + == ExecutionConfig.PARALLELISM_DEFAULT); + } else { + isChainable &= upStreamVertex.getParallelism() == downStreamVertex.getParallelism(); + } if (!streamGraph.isChainingOfOperatorsWithDifferentMaxParallelismEnabled()) { isChainable &= @@ -1784,18 +1871,16 @@ public class StreamingJobGraphGenerator { } } - public static void setSlotSharingAndCoLocation( - JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) { - setSlotSharing(jobGraph, jobVertexBuildContext); + public static void setSlotSharingAndCoLocation(JobVertexBuildContext jobVertexBuildContext) { + setSlotSharing(jobVertexBuildContext); setCoLocation(jobVertexBuildContext); } - private static void setSlotSharing( - JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) { + private static void setSlotSharing(JobVertexBuildContext jobVertexBuildContext) { StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph(); final Map<String, SlotSharingGroup> specifiedSlotSharingGroups = new HashMap<>(); final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups = - buildVertexRegionSlotSharingGroups(jobGraph, jobVertexBuildContext); + buildVertexRegionSlotSharingGroups(jobVertexBuildContext); final Map<Integer, JobVertex> jobVertices = jobVertexBuildContext.getJobVerticesInOrder(); for (Map.Entry<Integer, JobVertex> entry : jobVertices.entrySet()) { @@ -1849,10 +1934,11 @@ public class StreamingJobGraphGenerator { * in the same slot sharing group. */ private static Map<JobVertexID, SlotSharingGroup> buildVertexRegionSlotSharingGroups( - JobGraph jobGraph, JobVertexBuildContext jobVertexBuildContext) { + JobVertexBuildContext jobVertexBuildContext) { StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph(); final Map<JobVertexID, SlotSharingGroup> vertexRegionSlotSharingGroups = new HashMap<>(); - final SlotSharingGroup defaultSlotSharingGroup = new SlotSharingGroup(); + final SlotSharingGroup defaultSlotSharingGroup = + jobVertexBuildContext.getDefaultSlotSharingGroup(); streamGraph .getSlotSharingGroupResource(StreamGraphGenerator.DEFAULT_SLOT_SHARING_GROUP) .ifPresent(defaultSlotSharingGroup::setResourceProfile); @@ -1861,7 +1947,8 @@ public class StreamingJobGraphGenerator { streamGraph.isAllVerticesInSameSlotSharingGroupByDefault(); final Iterable<DefaultLogicalPipelinedRegion> regions = - DefaultLogicalTopology.fromJobGraph(jobGraph).getAllPipelinedRegions(); + DefaultLogicalTopology.fromJobGraph(jobVertexBuildContext.getJobGraph()) + .getAllPipelinedRegions(); for (DefaultLogicalPipelinedRegion region : regions) { final SlotSharingGroup regionSlotSharingGroup; if (allRegionsInSameSlotSharingGroup) { @@ -1969,8 +2056,17 @@ public class StreamingJobGraphGenerator { final StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph(); final Map<Integer, Map<Integer, StreamConfig>> vertexChainedConfigs = jobVertexBuildContext.getChainedConfigs(); - final Set<Integer> groupOperatorIds = + // In the progressive job graph generation algorithm, if the user specified the + // SlotSharingGroupResource or the AllVerticesInSameSlotSharingGroupByDefault is set to + // true, job vertices generated in different phase may be assigned to the same + // slotSharingGroup. Therefore, we need to filter out the job vertices that belong to the + // current phase. + final Set<JobVertexID> jobVertexIds = slotSharingGroup.getJobVertexIds().stream() + .filter(vertexOperators::containsKey) + .collect(Collectors.toSet()); + final Set<Integer> groupOperatorIds = + jobVertexIds.stream() .flatMap((vid) -> vertexOperators.get(vid).stream()) .collect(Collectors.toSet()); @@ -1994,10 +2090,14 @@ public class StreamingJobGraphGenerator { .getManagedMemorySlotScopeUseCases().stream()) .collect(Collectors.toSet()); - for (JobVertexID jobVertexID : slotSharingGroup.getJobVertexIds()) { + for (JobVertexID jobVertexID : jobVertexIds) { + final int headOperatorNodeId = vertexHeadOperators.get(jobVertexID); for (int operatorNodeId : vertexOperators.get(jobVertexID)) { final StreamConfig operatorConfig = - jobVertexBuildContext.getOperatorInfo(operatorNodeId).getVertexConfig(); + jobVertexBuildContext + .getChainInfo(headOperatorNodeId) + .getOperatorInfo(operatorNodeId) + .getVertexConfig(); final Map<ManagedMemoryUseCase, Integer> operatorScopeUseCaseWeights = streamGraph .getStreamNode(operatorNodeId) @@ -2015,9 +2115,11 @@ public class StreamingJobGraphGenerator { } // need to refresh the chained task configs because they are serialized - final int headOperatorNodeId = vertexHeadOperators.get(jobVertexID); final StreamConfig vertexConfig = - jobVertexBuildContext.getOperatorInfo(headOperatorNodeId).getVertexConfig(); + jobVertexBuildContext + .getChainInfo(headOperatorNodeId) + .getOperatorInfo(headOperatorNodeId) + .getVertexConfig(); vertexConfig.setTransitiveChainedTaskConfigs( vertexChainedConfigs.get(headOperatorNodeId)); } @@ -2050,10 +2152,6 @@ public class StreamingJobGraphGenerator { } } - public static void configureCheckpointing(StreamGraph streamGraph, JobGraph jobGraph) { - jobGraph.setSnapshotSettings(streamGraph.getCheckpointingSettings()); - } - private static String nameWithChainedSourcesInfo( String operatorName, Collection<ChainedSourceInfo> chainedSourceInfos) { return chainedSourceInfos.isEmpty() diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java index d6ef01424e3..a4bd1f507f6 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/JobVertexBuildContext.java @@ -19,8 +19,10 @@ package org.apache.flink.streaming.api.graph.util; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.JobVertexID; +import org.apache.flink.runtime.jobmanager.scheduler.SlotSharingGroup; import org.apache.flink.runtime.operators.coordination.OperatorCoordinator; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; @@ -42,6 +44,8 @@ import java.util.concurrent.atomic.AtomicBoolean; @Internal public class JobVertexBuildContext { + private final JobGraph jobGraph; + private final StreamGraph streamGraph; /** @@ -51,9 +55,6 @@ public class JobVertexBuildContext { */ private final Map<Integer, OperatorChainInfo> chainInfosInOrder; - /** The {@link OperatorInfo}s, key is the id of the stream node. */ - private final Map<Integer, OperatorInfo> operatorInfos; - // This map's key represents the starting node id of each chain. Note that this includes not // only the usual head node of the chain but also the ids of chain sources which are used by // multi-input. @@ -83,11 +84,16 @@ public class JobVertexBuildContext { private final List<Map<Integer, byte[]>> legacyHashes; + private final SlotSharingGroup defaultSlotSharingGroup; + public JobVertexBuildContext( + JobGraph jobGraph, StreamGraph streamGraph, AtomicBoolean hasHybridResultPartition, Map<Integer, byte[]> hashes, - List<Map<Integer, byte[]>> legacyHashes) { + List<Map<Integer, byte[]>> legacyHashes, + SlotSharingGroup defaultSlotSharingGroup) { + this.jobGraph = jobGraph; this.streamGraph = streamGraph; this.hashes = hashes; this.legacyHashes = legacyHashes; @@ -97,7 +103,7 @@ public class JobVertexBuildContext { this.hasHybridResultPartition = hasHybridResultPartition; this.coordinatorSerializationFuturesPerJobVertex = new HashMap<>(); this.chainedConfigs = new HashMap<>(); - this.operatorInfos = new HashMap<>(); + this.defaultSlotSharingGroup = defaultSlotSharingGroup; } public void addChainInfo(Integer startNodeId, OperatorChainInfo chainInfo) { @@ -112,20 +118,6 @@ public class JobVertexBuildContext { return chainInfosInOrder; } - public OperatorInfo getOperatorInfo(Integer nodeId) { - return operatorInfos.get(nodeId); - } - - public OperatorInfo createAndGetOperatorInfo(Integer nodeId) { - OperatorInfo operatorInfo = new OperatorInfo(); - operatorInfos.put(nodeId, operatorInfo); - return operatorInfo; - } - - public Map<Integer, OperatorInfo> getOperatorInfos() { - return operatorInfos; - } - public StreamGraph getStreamGraph() { return streamGraph; } @@ -189,4 +181,12 @@ public class JobVertexBuildContext { } return hashes; } + + public JobGraph getJobGraph() { + return jobGraph; + } + + public SlotSharingGroup getDefaultSlotSharingGroup() { + return defaultSlotSharingGroup; + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java index 8e776b746a1..45c5173c20d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorChainInfo.java @@ -28,6 +28,7 @@ import org.apache.flink.streaming.api.graph.StreamGraph; import org.apache.flink.streaming.api.graph.StreamNode; import java.util.ArrayList; +import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -41,29 +42,29 @@ public class OperatorChainInfo { private final Map<Integer, ResourceSpec> chainedMinResources; private final Map<Integer, ResourceSpec> chainedPreferredResources; private final Map<Integer, String> chainedNames; + + /** The {@link OperatorInfo}s, key is the id of the stream node. */ + private final Map<Integer, OperatorInfo> chainedOperatorInfos; + private final List<OperatorCoordinator.Provider> coordinatorProviders; - private final StreamGraph streamGraph; private final List<StreamNode> chainedNodes; private final List<StreamEdge> transitiveOutEdges; private final List<StreamEdge> transitiveInEdges; private InputOutputFormatContainer inputOutputFormatContainer = null; - public OperatorChainInfo( - int startNodeId, - Map<Integer, ChainedSourceInfo> chainedSources, - StreamGraph streamGraph) { + public OperatorChainInfo(int startNodeId) { this.startNodeId = startNodeId; this.chainedOperatorHashes = new HashMap<>(); this.coordinatorProviders = new ArrayList<>(); - this.chainedSources = chainedSources; + this.chainedSources = new HashMap<>(); this.chainedMinResources = new HashMap<>(); this.chainedPreferredResources = new HashMap<>(); this.chainedNames = new HashMap<>(); - this.streamGraph = streamGraph; this.chainedNodes = new ArrayList<>(); this.transitiveOutEdges = new ArrayList<>(); this.transitiveInEdges = new ArrayList<>(); + this.chainedOperatorInfos = new HashMap<>(); } public Integer getStartNodeId() { @@ -88,9 +89,11 @@ public class OperatorChainInfo { public OperatorID addNodeToChain( int currentNodeId, String operatorName, JobVertexBuildContext jobVertexBuildContext) { - recordChainedNode(currentNodeId); + StreamGraph streamGraph = jobVertexBuildContext.getStreamGraph(); StreamNode streamNode = streamGraph.getStreamNode(currentNodeId); + recordChainedNode(streamNode); + List<ChainedOperatorHashInfo> operatorHashes = chainedOperatorHashes.computeIfAbsent(startNodeId, k -> new ArrayList<>()); @@ -116,13 +119,12 @@ public class OperatorChainInfo { return transitiveOutEdges; } - public void recordChainedNode(int currentNodeId) { - StreamNode streamNode = streamGraph.getStreamNode(currentNodeId); + public void recordChainedNode(StreamNode streamNode) { chainedNodes.add(streamNode); } public OperatorChainInfo newChain(Integer startNodeId) { - return new OperatorChainInfo(startNodeId, chainedSources, streamGraph); + return new OperatorChainInfo(startNodeId); } public List<StreamNode> getAllChainedNodes() { @@ -141,8 +143,9 @@ public class OperatorChainInfo { return inputOutputFormatContainer; } - public void addChainedSource(Integer sourceNodeId, ChainedSourceInfo chainedSourceInfo) { - chainedSources.put(sourceNodeId, chainedSourceInfo); + public void addChainedSource(StreamNode sourceNode, ChainedSourceInfo chainedSourceInfo) { + recordChainedNode(sourceNode); + chainedSources.put(sourceNode.getId(), chainedSourceInfo); } public void addChainedMinResources(Integer sourceNodeId, ResourceSpec resourceSpec) { @@ -180,4 +183,18 @@ public class OperatorChainInfo { public List<StreamEdge> getTransitiveInEdges() { return transitiveInEdges; } + + public OperatorInfo getOperatorInfo(Integer nodeId) { + return chainedOperatorInfos.get(nodeId); + } + + public OperatorInfo createAndGetOperatorInfo(Integer nodeId, OperatorID operatorId) { + OperatorInfo operatorInfo = new OperatorInfo(operatorId); + chainedOperatorInfos.put(nodeId, operatorInfo); + return operatorInfo; + } + + public Map<Integer, OperatorInfo> getOperatorInfos() { + return Collections.unmodifiableMap(chainedOperatorInfos); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java index f79ca90a3cf..0b29a2a9245 100644 --- a/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java +++ b/flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/util/OperatorInfo.java @@ -19,6 +19,7 @@ package org.apache.flink.streaming.api.graph.util; import org.apache.flink.annotation.Internal; +import org.apache.flink.runtime.jobgraph.OperatorID; import org.apache.flink.streaming.api.graph.StreamConfig; import org.apache.flink.streaming.api.graph.StreamEdge; @@ -29,29 +30,19 @@ import java.util.List; @Internal public class OperatorInfo { - private StreamConfig vertexConfig; + private final OperatorID operatorId; - // This is used to cache the chainable outputs, to set the chainable outputs config after - // all job vertices are created. - private final List<StreamEdge> chainableOutputs; + private StreamConfig vertexConfig; // This is used to cache the non-chainable outputs, to set the non-chainable outputs config // after all job vertices are created. private final List<StreamEdge> nonChainableOutputs; - public OperatorInfo() { - this.chainableOutputs = new ArrayList<>(); + public OperatorInfo(OperatorID operatorId) { + this.operatorId = operatorId; this.nonChainableOutputs = new ArrayList<>(); } - public List<StreamEdge> getChainableOutputs() { - return chainableOutputs; - } - - public void addChainableOutputs(List<StreamEdge> chainableOutputs) { - this.chainableOutputs.addAll(chainableOutputs); - } - public List<StreamEdge> getNonChainableOutputs() { return nonChainableOutputs; } @@ -67,4 +58,8 @@ public class OperatorInfo { public void setVertexConfig(StreamConfig vertexConfig) { this.vertexConfig = vertexConfig; } + + public OperatorID getOperatorId() { + return operatorId; + } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java index aaf963a6bd2..ce3de3c5e46 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGeneratorTest.java @@ -1010,9 +1010,13 @@ class StreamingJobGraphGeneratorTest { streamGraph.getStreamNodes().stream() .sorted(Comparator.comparingInt(StreamNode::getId)) .collect(Collectors.toList()); - assertThat(areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph)) + assertThat( + areOperatorsChainable( + streamNodes.get(0), streamNodes.get(1), streamGraph, false)) .isTrue(); - assertThat(areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph)) + assertThat( + areOperatorsChainable( + streamNodes.get(1), streamNodes.get(2), streamGraph, false)) .isFalse(); } @@ -1058,9 +1062,13 @@ class StreamingJobGraphGeneratorTest { streamGraph.getStreamNodes().stream() .sorted(Comparator.comparingInt(StreamNode::getId)) .collect(Collectors.toList()); - assertThat(areOperatorsChainable(streamNodes.get(0), streamNodes.get(1), streamGraph)) + assertThat( + areOperatorsChainable( + streamNodes.get(0), streamNodes.get(1), streamGraph, false)) .isFalse(); - assertThat(areOperatorsChainable(streamNodes.get(1), streamNodes.get(2), streamGraph)) + assertThat( + areOperatorsChainable( + streamNodes.get(1), streamNodes.get(2), streamGraph, false)) .isTrue(); }
