akalash commented on a change in pull request #15496:
URL: https://github.com/apache/flink/pull/15496#discussion_r607961267
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
##########
@@ -946,41 +952,35 @@ private boolean triggerCheckpoint(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions)
throws Exception {
FlinkSecurityManager.monitorUserSystemExitForCurrentThread();
+ long checkpointId = checkpointMetaData.getCheckpointId();
try {
// No alignment if we inject a checkpoint
CheckpointMetricsBuilder checkpointMetrics =
new CheckpointMetricsBuilder()
.setAlignmentDurationNanos(0L)
.setBytesProcessedDuringAlignment(0L);
- subtaskCheckpointCoordinator.initInputsCheckpoint(
- checkpointMetaData.getCheckpointId(), checkpointOptions);
+ subtaskCheckpointCoordinator.initInputsCheckpoint(checkpointId,
checkpointOptions);
boolean success =
performCheckpoint(checkpointMetaData, checkpointOptions,
checkpointMetrics);
if (!success) {
- declineCheckpoint(checkpointMetaData.getCheckpointId());
+ declineCheckpoint(checkpointId,
CHECKPOINT_DECLINED_TASK_NOT_READY, null);
}
return success;
} catch (Exception e) {
- // propagate exceptions only if the task is still in "running"
state
+ // Decline checkpoint globally only if the task is still in
"running" state
if (isRunning) {
- throw new Exception(
- "Could not perform checkpoint "
- + checkpointMetaData.getCheckpointId()
- + " for operator "
- + getName()
- + '.',
- e);
+ declineCheckpoint(checkpointId, CHECKPOINT_DECLINED, e);
Review comment:
It is not a target of the current task to change propagation of
exception to declineCheckpoint. But this task emphasizes inconsistency between
these two approaches. Why when checkpoint fails asynchronously it leads to
declineCheckpoint but when it fails here( synchronously) it leads to a
different situation(exception propagation) but the same result - the job will
be canceled.
So perhaps, I didn't get anything but let's discuss it here. Should the
declining of checkpoint be the same in all cases? or It should be different for
synchronous and asynchronous cases?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]