kl0u commented on a change in pull request #13423:
URL: https://github.com/apache/flink/pull/13423#discussion_r492682632



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##########
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
        public void setKeyContextElement(StreamRecord record) throws Exception {
                owner.internalSetKeyContextElement(record, stateKeySelector);
        }
+
+       @Override
+       public void endInput() throws Exception {
+               if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
       The reason I am asking is because this check did not exist in any of the 
methods removed by this PR which were handling the input closing logic. 
   
   In my opinion, although from the code this check may be safe, to be on the 
safe side it may make sense to remove it so that the logic is the same. But 
feel free to disagree and leave it as is.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to