yanghua commented on a change in pull request #8322: [FLINK-12364] Introduce a 
CheckpointFailureManager to centralized manage checkpoint failure
URL: https://github.com/apache/flink/pull/8322#discussion_r283764177
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 ##########
 @@ -435,6 +439,12 @@ public boolean triggerCheckpoint(long timestamp, boolean 
isPeriodic) {
                        triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
                        return true;
                } catch (CheckpointException e) {
+                       try {
+                               long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
 
 Review comment:
   > Rest of my comments still holds though, why not remove the 
throwing/catching of an exception that we can also directly report to the 
failure manager? I also think that makes other counts accurate because it 
obtaining the id can then always happen under the lock.
   
   This try/catch block:
   
   ```
   try {
                                long latestGeneratedCheckpointId = 
getCheckpointIdCounter().getAndIncrement();
                                failureManager.handleCheckpointException(e, -1 
* latestGeneratedCheckpointId);
                        } catch (Exception e1) {
                                LOG.warn("Get latest generated checkpoint id 
error : ", e1);
                        }
   ```
   is because the `CheckpointIDCounter#getAndIncrement ` throws a checked 
`Exception`.
   
   I think if it can change to this code snippet then looks better right:
   
   ```
   public boolean triggerCheckpoint(long timestamp, boolean isPeriodic) {
                try {
                        triggerCheckpoint(timestamp, checkpointProperties, 
null, isPeriodic, false);
                        return true;
                } catch (CheckpointException e) {
                        long latestGeneratedCheckpointId = 
getCheckpointIdCounter().get();
                        failureManager.handleCheckpointException(e, -1 * 
latestGeneratedCheckpointId);
                        return false;
                }
        }
   ```
   
   If the reported behavior exists in the real `triggerCheckpoint` method, it 
would cause more code to report exception and generate new checkpoint id. In 
addition, there are two locks exist in the method.
   
   What do you think?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to