rkhachatryan commented on code in PR #27634:
URL: https://github.com/apache/flink/pull/27634#discussion_r2823356173


##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java:
##########
@@ -1162,9 +1163,22 @@ public InputStatus pollNext(ReaderOutput<Integer> out) {
                             out.collect(3, 3);
                             out.collect(4, 4);
                             emitted = true;
-                            return InputStatus.END_OF_INPUT;
                         }
-                        return InputStatus.END_OF_INPUT;
+
+                        // We're using SingleSplitEnumerator below which DOES 
send operator events
+                        // to the source tasks. Therefore, if we finish this 
task prematurely (by
+                        // returning END_OF_INPUT) such event delivery might 
fail, causing job and
+                        // test failure. Usually, this doesn't happen because 
usually the task
+                        // finishes before the RPC starts.
+                        // To avoid flakiness, we intentinoally wait for 
NO_MORE_SPLITS event on TM.
+                        return noMoreSplits
+                                ? InputStatus.END_OF_INPUT
+                                : InputStatus.NOTHING_AVAILABLE;
+                    }
+
+                    @Override
+                    public void notifyNoMoreSplits() {
+                        this.noMoreSplits = true;

Review Comment:
   Good question. Yes, it should go via mailbox:
   ```
   mainMailboxExecutor.execute(
           () -> operatorChain.dispatchOperatorEvent(operator, event),
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to