noorall commented on code in PR #25366:
URL: https://github.com/apache/flink/pull/25366#discussion_r1774908113
##########
flink-runtime/src/main/java/org/apache/flink/streaming/api/graph/StreamingJobGraphGenerator.java:
##########
@@ -593,15 +616,22 @@ private Map<Integer, OperatorChainInfo>
buildChainedInputsAndGetHeadInputs(
if (targetChainingStrategy ==
ChainingStrategy.HEAD_WITH_SOURCES
&& isChainableInput(sourceOutEdge, streamGraph)) {
+ // we cache the non-chainable outputs here, and set the
non-chained config later
+ OperatorInfo operatorInfo = new OperatorInfo();
+ operatorInfo.setChainableOutputs(Collections.emptyList());
Review Comment:
> maybe nonChainable?
>
> ```
> OperatorInfo operatorInfo = new OperatorInfo();
> jobVertexBuildContext.addOperatorInfo(sourceNodeId, operatorInfo);
>
> final OperatorID opId = new OperatorID(hashes.get(sourceNodeId));
> final StreamConfig.SourceInputConfig inputConfig =
> new
StreamConfig.SourceInputConfig(sourceOutEdge);
> final StreamConfig operatorConfig = new StreamConfig(new Configuration());
>
> setOperatorConfig(
> sourceNodeId,
> operatorConfig,
> Collections.emptyMap(),
> jobVertexBuildContext);
> setOperatorChainedOutputsConfig(
> operatorConfig, Collections.emptyList(),
jobVertexBuildContext);
>
> // we cache the non-chainable outputs here, and set the non-chained config
later
> operatorInfo.setNonChainableOutputs(Collections.emptyList());
> ```
>
> And a unit test case is needed
Thank you for the reminder and I have corrected this part. However, there is
no need for an additional unit test because the property in OperatorInfo is an
empty list by default. Therefore, it doesn't matter whether it is set or not.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]