wanglijie95 commented on code in PR #21963:
URL: https://github.com/apache/flink/pull/21963#discussion_r1116476465


##########
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -1060,14 +1111,97 @@ private void setVertexConfig(
         vertexConfigs.put(vertexID, config);
     }
 
+    private void setChainedOutputsConfig(
+            Integer vertexId, StreamConfig config, List<StreamEdge> 
chainableOutputs) {
+        // iterate edges, find sideOutput edges create and save serializers 
for each outputTag type
+        for (StreamEdge edge : chainableOutputs) {
+            if (edge.getOutputTag() != null) {
+                config.setTypeSerializerSideOut(
+                        edge.getOutputTag(),
+                        edge.getOutputTag()
+                                .getTypeInfo()
+                                
.createSerializer(streamGraph.getExecutionConfig()));
+            }
+        }
+        config.setChainedOutputs(chainableOutputs);
+    }
+
+    private void setOperatorNonChainedOutputsConfig(
+            Integer vertexId,
+            StreamConfig config,
+            List<StreamEdge> nonChainableOutputs,
+            Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge) {
+        // iterate edges, find sideOutput edges create and save serializers 
for each outputTag type
+        for (StreamEdge edge : nonChainableOutputs) {
+            if (edge.getOutputTag() != null) {
+                config.setTypeSerializerSideOut(
+                        edge.getOutputTag(),
+                        edge.getOutputTag()
+                                .getTypeInfo()
+                                
.createSerializer(streamGraph.getExecutionConfig()));
+            }
+        }
+
+        List<NonChainedOutput> deduplicatedOutputs =
+                mayReuseNonChainedOutputs(vertexId, nonChainableOutputs, 
outputsConsumedByEdge);
+        config.setNumberOfOutputs(deduplicatedOutputs.size());
+        config.setOperatorNonChainedOutputs(deduplicatedOutputs);
+    }
+
+    private void setVertexNonChainedOutputsConfig(
+            Integer startNodeId,
+            StreamConfig config,
+            List<StreamEdge> transitiveOutEdges,
+            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs) {
+
+        LinkedHashSet<NonChainedOutput> transitiveOutputs = new 
LinkedHashSet<>();
+        for (StreamEdge edge : transitiveOutEdges) {
+            NonChainedOutput output = 
opIntermediateOutputs.get(edge.getSourceId()).get(edge);
+            transitiveOutputs.add(output);
+            connect(startNodeId, edge, output);
+        }
+
+        config.setVertexNonChainedOutputs(new ArrayList<>(transitiveOutputs));
+    }
+
+    private void setAllOperatorNonChainedOutputsConfigs(
+            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs) {
+        // set non chainable output config
+        opNonChainableOutputsCache.forEach(
+                (vertexId, nonChainableOutputs) -> {
+                    Map<StreamEdge, NonChainedOutput> outputsConsumedByEdge =
+                            opIntermediateOutputs.computeIfAbsent(
+                                    vertexId, ignored -> new HashMap<>());
+                    setOperatorNonChainedOutputsConfig(
+                            vertexId,
+                            vertexConfigs.get(vertexId),
+                            nonChainableOutputs,
+                            outputsConsumedByEdge);
+                });
+    }
+
+    private void setAllVertexNonChainedOutputsConfigs(
+            final Map<Integer, Map<StreamEdge, NonChainedOutput>> 
opIntermediateOutputs) {
+        jobVertices
+                .keySet()
+                .forEach(
+                        startNodeId -> {
+                            setVertexNonChainedOutputsConfig(
+                                    startNodeId,
+                                    vertexConfigs.get(startNodeId),
+                                    
chainInfos.get(startNodeId).getTransitiveOutEdges(),
+                                    opIntermediateOutputs);
+                        });

Review Comment:
   The {} is needed because the `setVertexNonChainedOutputsConfig` has no 
return value.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to