pnowojski commented on a change in pull request #13423:
URL: https://github.com/apache/flink/pull/13423#discussion_r493272774



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##########
@@ -33,7 +33,7 @@
  * {@link AbstractStreamOperatorV2}.

Review comment:
       nit: maybe add a motivation behind this change into the commit message? 
Like copy from the jira ticket:
   ```
   As suggested in the FLIP-140 we want to implement the sorting inside 
   of PushingAsyncDataInput.DataOutput. 
   
   To do that PushingAsyncDataInput.DataOutput is notified
   about the end of input. It also moves the propagation to the bounded
   operators of that signal from OperatorChain to the DataOutputs.
   ```
   ?
   
   
   

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/PushingAsyncDataInput.java
##########
@@ -57,5 +57,7 @@
                void emitStreamStatus(StreamStatus streamStatus) throws 
Exception;
 
                void emitLatencyMarker(LatencyMarker latencyMarker) throws 
Exception;
+
+               default void endOutput() throws Exception {}

Review comment:
       Why is this exposed to the things implementing `PushingAsyncDataInput`? 
I guess to make implementation easier?
   
   I mean, there should be no reason for `StreamTaskNetworkInput` or 
`StreamTaskSourceInput` to call `output.endOutput()`, as the runtime can do it 
for them and it can only lead to bugs if the method is called prematurely.
   
   Maybe there is an easy way to hide it from the `emitNext()`? Like splitting 
into two separate interfaces?

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -354,5 +341,12 @@ public void emitStreamStatus(StreamStatus streamStatus) {
                public void emitLatencyMarker(LatencyMarker latencyMarker) 
throws Exception {
                        input.processLatencyMarker(latencyMarker);
                }
+
+               @Override
+               public void endOutput() throws Exception {

Review comment:
       I don't like you have duplicated the `instanceof` checks in many places 
:(

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamMultipleInputProcessor.java
##########
@@ -268,7 +252,12 @@ public InputProcessor(
                }
 
                public InputStatus processInput() throws Exception {
-                       return taskInput.emitNext(dataOutput);
+                       InputStatus status = taskInput.emitNext(dataOutput);
+                       if (status == InputStatus.END_OF_INPUT) {
+                               dataOutput.endOutput();

Review comment:
       It looks strange: you are asking `taskInput` to emit records to 
`dataOutput`. It tells you `END_OF_INPUT` and you are calling `endOutput()` on 
the `dataOutput` that you have just passed to the `taskInput` .

##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/AbstractInput.java
##########
@@ -68,4 +68,13 @@ public void processLatencyMarker(LatencyMarker 
latencyMarker) throws Exception {
        public void setKeyContextElement(StreamRecord record) throws Exception {
                owner.internalSetKeyContextElement(record, stateKeySelector);
        }
+
+       @Override
+       public void endInput() throws Exception {
+               if (owner instanceof BoundedOneInput && inputId == 1) {

Review comment:
       `inputId != 1` and `BoundedOneInput` is definitely a bug, so the code 
should fail. Add a `checkState`/`checkArgument`?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to