dawidwys commented on a change in pull request #13423:
URL: https://github.com/apache/flink/pull/13423#discussion_r494127458



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##########
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
        public void setKeyContextElement(StreamRecord record) throws Exception {
                owner.internalSetKeyContextElement(record, stateKeySelector);
        }
+
+       @Override
+       public void endInput() throws Exception {
+               if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
       If you say so. 
   
   BTW, I think this discussion is somewhat unrelated to the PR, but ...
   I find the contract between `BoundedOneInput` and `BoundMultiInput` 
undefined/ill defined.
   
   Are these combinations valid? In the current implementation
   1. `TwoInputOperator implements BoundedOneInput` -> whenver any input 
finishes, we call BoundedOneInput#endInput, imo this is confusing
   2. `OneInputOperator implements BoundedMultiInput` -> we call 
BoundedMultiInput#endInput(1) <- sort of ok, imo
   3. `MultiInputOperator implements BoundedOneInput` -> see 1.
   4. `MultiInputOperator implements BoundedMultiInput, BoundedOneInput` -> we 
only ever call BoundedOneInput#endInput for all finished inputs, which imo is 
misleading
   
   I know that this PR, does not fixes it. In the PR I do ignore options 1 and 
2 though.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to