pnowojski commented on a change in pull request #16589:
URL: https://github.com/apache/flink/pull/16589#discussion_r681421317



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/MultipleInputStreamTask.java
##########
@@ -195,6 +195,22 @@ protected void createInputProcessor(
         // EndOfPartitionEvent, we would not complement barriers for the
         // unfinished network inputs, and the checkpoint would be triggered
         // after received all the EndOfPartitionEvent.
+        if (options.getCheckpointType().shouldDrain()) {
+            CompletableFuture<Void> sourcesStopped =
+                    FutureUtils.waitForAll(
+                            operatorChain.getSourceTaskInputs().stream()
+                                    .map(s -> s.getOperator().stop())
+                                    .collect(Collectors.toList()));
+
+            return sourcesStopped.thenCompose(
+                    ignore -> triggerSourcesCheckpointInMailbox(metadata, 
options));

Review comment:
       Yes. We have learnt it the hard way in other places in the code, that 
all exceptions, even the least likely should be handled and should bubble up. 
Including NPE, type castings or other clearly unexpected errors. Without 
`assertNoException` tests/code can fail in a weird way or deadlock for example, 
obscuring the root problem (some trivial NPE). On top of that submitting mail 
to mailbox can fail as well with `RejectedExecutionException`. 
   
   Speaking of.  With `assertNoException`, `RejectedExecutionException` should 
probably be explicitly ignored here? If task was cancelled in the mean time, 
before we managed to submit mailbox action, I think in the old code `Task` 
would ignore this exception because of `CANCELLING` state. In new one we should 
probably do the same thing.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to