kl0u commented on a change in pull request #13647:
URL: https://github.com/apache/flink/pull/13647#discussion_r505564569
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/translators/MultiInputTransformationTranslator.java
##########
@@ -51,7 +54,26 @@
protected Collection<Integer> translateForBatchInternal(
final AbstractMultipleInputTransformation<OUT>
transformation,
final Context context) {
- return translateInternal(transformation, context);
+ boolean isKeyed = transformation instanceof
KeyedMultipleInputTransformation;
+ boolean isInputSelectable = isInputSelectable(transformation);
Review comment:
Why not making the following like:
```
Collection<Integer> ids = translateInternal(transformation, context);
if (isKeyed && !isInputSelectable) {
transformation.setChainingStrategy(ChainingStrategy.HEAD);
BatchExecutionUtils.applySortingInputs(transformation.getId(),
context);
}
```
This will make the `if () ...` check being checked once. The same for the
other translators.
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamNode.java
##########
@@ -336,15 +341,30 @@ public void setUserHash(String userHash) {
this.userHash = userHash;
}
- @VisibleForTesting
public void setSortedInputs(boolean sortedInputs) {
this.sortedInputs = sortedInputs;
}
- boolean getSortedInputs() {
+ public boolean getSortedInputs() {
return sortedInputs;
}
+ public void setStateBackend(StateBackend stateBackend) {
Review comment:
From what I understand, the flow is that we set the batch `StateBackend`
and the `timerService` in the translator to the `StreamNode` so that the
`StreamJobGraphGenerator` can pick it up. Why not setting the state backend and
the timer service at the `StreamGraph` level (e.g. in. the
`StreamGraphGenerator.configureStreamGraph()`) from where the
`StreamJobGraphGenerator` can pick it up.
This seems to be able to reduce the changes in the `StreamNode` and the
`StreamJobGraphGenerator`.
WDYT @dawidwys ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]