rkhachatryan commented on code in PR #27634:
URL: https://github.com/apache/flink/pull/27634#discussion_r2823356173
##########
flink-tests/src/test/java/org/apache/flink/test/streaming/runtime/SideOutputITCase.java:
##########
@@ -1162,9 +1163,22 @@ public InputStatus pollNext(ReaderOutput<Integer> out) {
out.collect(3, 3);
out.collect(4, 4);
emitted = true;
- return InputStatus.END_OF_INPUT;
}
- return InputStatus.END_OF_INPUT;
+
+ // We're using SingleSplitEnumerator below which DOES
send operator events
+ // to the source tasks. Therefore, if we finish this
task prematurely (by
+ // returning END_OF_INPUT) such event delivery might
fail, causing job and
+ // test failure. Usually, this doesn't happen because
usually the task
+ // finishes before the RPC starts.
+ // To avoid flakiness, we intentinoally wait for
NO_MORE_SPLITS event on TM.
+ return noMoreSplits
+ ? InputStatus.END_OF_INPUT
+ : InputStatus.NOTHING_AVAILABLE;
+ }
+
+ @Override
+ public void notifyNoMoreSplits() {
+ this.noMoreSplits = true;
Review Comment:
Good question. Yes, it should go via mailbox:
```
mainMailboxExecutor.execute(
() -> operatorChain.dispatchOperatorEvent(operator, event),
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]