1u0 commented on a change in pull request #9772: [FLINK-14199] Only use 
dedicated/named classes for mailbox letters. 
URL: https://github.com/apache/flink/pull/9772#discussion_r333371175
 
 

 ##########
 File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 ##########
 @@ -726,32 +729,39 @@ public StreamStatusMaintainer 
getStreamStatusMaintainer() {
                        CheckpointOptions checkpointOptions,
                        boolean advanceToEndOfEventTime) {
 
-               return mailboxProcessor.getMainMailboxExecutor().submit(() -> {
-                       try {
-                               // No alignment if we inject a checkpoint
-                               CheckpointMetrics checkpointMetrics = new 
CheckpointMetrics()
-                                       .setBytesBufferedInAlignment(0L)
-                                       .setAlignmentDurationNanos(0L);
-
-                               boolean success = 
performCheckpoint(checkpointMetaData, checkpointOptions, checkpointMetrics, 
advanceToEndOfEventTime);
-                               if (!success) {
-                                       
declineCheckpoint(checkpointMetaData.getCheckpointId());
-                               }
-                               return success;
-                       } catch (Exception e) {
-                               // propagate exceptions only if the task is 
still in "running" state
-                               if (isRunning) {
-                                       Exception exception = new 
Exception("Could not perform checkpoint " + 
checkpointMetaData.getCheckpointId() +
-                                               " for operator " + getName() + 
'.', e);
-                                       handleCheckpointException(exception);
-                                       throw exception;
-                               } else {
-                                       LOG.debug("Could not perform checkpoint 
{} for operator {} while the " +
-                                               "invokable was not in state 
running.", checkpointMetaData.getCheckpointId(), getName(), e);
-                                       return false;
-                               }
+               return 
mailboxProcessor.getMainMailboxExecutor().submit(namedCallable(
+                               () -> checkpoint(checkpointMetaData, 
checkpointOptions, advanceToEndOfEventTime),
+                               () -> "checkpoint " + checkpointMetaData + " 
with options " + checkpointOptions));
+       }
+
+       private Boolean checkpoint(
 
 Review comment:
   Additionally, rename the method to be more descriptive (and to be consistent 
with other similar methods)?
   For example as `triggerCheckpoint()`.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to