rkhachatryan commented on a change in pull request #12611:
URL: https://github.com/apache/flink/pull/12611#discussion_r438872294
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,36 +538,45 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
- CompletableFuture.allOf(masterStatesComplete,
coordinatorCheckpointsComplete)
- .whenCompleteAsync(
- (ignored, throwable) -> {
- final PendingCheckpoint
checkpoint =
-
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
- if (throwable == null &&
checkpoint != null && !checkpoint.isDiscarded()) {
- // no exception, no
discarding, everything is OK
- final long checkpointId
= checkpoint.getCheckpointId();
- snapshotTaskState(
- timestamp,
- checkpointId,
-
checkpoint.getCheckpointStorageLocation(),
- request.props,
- executions,
-
request.advanceToEndOfTime);
-
-
coordinatorsToCheckpoint.forEach((ctx) ->
ctx.afterSourceBarrierInjection(checkpointId));
-
- onTriggerSuccess();
- } else {
- // the
initialization might not be finished yet
- if (checkpoint
== null) {
-
onTriggerFailure(request, throwable);
+ FutureUtils.assertNoException(
+ CompletableFuture.allOf(masterStatesComplete,
coordinatorCheckpointsComplete)
+ .whenCompleteAsync(
+ (ignored, throwable) -> {
+ final PendingCheckpoint
checkpoint =
+
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+ if (throwable == null
&& checkpoint != null) {
+ if
(checkpoint.isDiscarded()) {
+
onTriggerFailure(
+
checkpoint,
+
new CheckpointException(
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
Review comment:
I think `CHECKPOINT_DECLINED` would be more appropriate here because at
this point checkpoint was already triggered.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
##########
@@ -529,6 +537,48 @@ public void
testTriggerCheckpointSnapshotMasterHookFailed() throws Exception {
assertEquals(0,
checkpointCoordinator.getTriggerRequestQueue().size());
}
+ /**
+ * This test only fails eventually.
+ */
+ @Test
+ public void
discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws
Exception {
+ final ExecutionVertex executionVertex = mockExecutionVertex(new
ExecutionAttemptID());
+
+ final ScheduledExecutorService scheduledExecutorService =
Executors.newSingleThreadScheduledExecutor();
+ final CheckpointCoordinator checkpointCoordinator = new
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+ .setTasks(new ExecutionVertex[]{executionVertex})
+ .setTimer(new
ScheduledExecutorServiceAdapter(scheduledExecutorService))
+
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+ .build())
+ .build();
+
+ final CompletableFuture<String> masterHookCheckpointFuture =
new CompletableFuture<>();
+ final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+ checkpointCoordinator.addMasterHook(new
TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+ try {
+ checkpointCoordinator.triggerCheckpoint(false);
+ final CompletableFuture<CompletedCheckpoint>
secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+ triggerCheckpointLatch.await();
+ masterHookCheckpointFuture.complete("Completed");
+
+ // discard triggering checkpoint
+ checkpointCoordinator.abortPendingCheckpoints(new
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+ try {
+ // verify that the second checkpoint request
will be executed and eventually times out
+ secondCheckpoint.get();
Review comment:
I guess I'm missing some changes because I see only one checkpoint in
this test.
I think there indeed should be a second checkpoint that is started after the
previous one is aborted. We can simulate the original problem without a latch
by completing `TestingMasterHook.checkpointFuture` after aborting the
checkpoint.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,36 +538,45 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
- CompletableFuture.allOf(masterStatesComplete,
coordinatorCheckpointsComplete)
- .whenCompleteAsync(
- (ignored, throwable) -> {
- final PendingCheckpoint
checkpoint =
-
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
- if (throwable == null &&
checkpoint != null && !checkpoint.isDiscarded()) {
- // no exception, no
discarding, everything is OK
- final long checkpointId
= checkpoint.getCheckpointId();
- snapshotTaskState(
- timestamp,
- checkpointId,
-
checkpoint.getCheckpointStorageLocation(),
- request.props,
- executions,
-
request.advanceToEndOfTime);
-
-
coordinatorsToCheckpoint.forEach((ctx) ->
ctx.afterSourceBarrierInjection(checkpointId));
-
- onTriggerSuccess();
- } else {
- // the
initialization might not be finished yet
- if (checkpoint
== null) {
-
onTriggerFailure(request, throwable);
+ FutureUtils.assertNoException(
Review comment:
Won't this kill JVM on _any_ exception regardless of whether it was
handled below or not?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -538,36 +538,45 @@ private void
startTriggeringCheckpoint(CheckpointTriggerRequest request) {
coordinatorsToCheckpoint, pendingCheckpoint, timer),
timer);
- CompletableFuture.allOf(masterStatesComplete,
coordinatorCheckpointsComplete)
- .whenCompleteAsync(
- (ignored, throwable) -> {
- final PendingCheckpoint
checkpoint =
-
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
-
- if (throwable == null &&
checkpoint != null && !checkpoint.isDiscarded()) {
- // no exception, no
discarding, everything is OK
- final long checkpointId
= checkpoint.getCheckpointId();
- snapshotTaskState(
- timestamp,
- checkpointId,
-
checkpoint.getCheckpointStorageLocation(),
- request.props,
- executions,
-
request.advanceToEndOfTime);
-
-
coordinatorsToCheckpoint.forEach((ctx) ->
ctx.afterSourceBarrierInjection(checkpointId));
-
- onTriggerSuccess();
- } else {
- // the
initialization might not be finished yet
- if (checkpoint
== null) {
-
onTriggerFailure(request, throwable);
+ FutureUtils.assertNoException(
+ CompletableFuture.allOf(masterStatesComplete,
coordinatorCheckpointsComplete)
+ .whenCompleteAsync(
+ (ignored, throwable) -> {
+ final PendingCheckpoint
checkpoint =
+
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
+
+ if (throwable == null
&& checkpoint != null) {
+ if
(checkpoint.isDiscarded()) {
+
onTriggerFailure(
+
checkpoint,
+
new CheckpointException(
+
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+
checkpoint.getFailureCause()));
} else {
-
onTriggerFailure(checkpoint, throwable);
+ // no
exception, no discarding, everything is OK
+ final
long checkpointId = checkpoint.getCheckpointId();
+
snapshotTaskState(
+
timestamp,
+
checkpointId,
+
checkpoint.getCheckpointStorageLocation(),
+
request.props,
+
executions,
+
request.advanceToEndOfTime);
+
+
coordinatorsToCheckpoint.forEach((ctx) ->
ctx.afterSourceBarrierInjection(checkpointId));
+
+
onTriggerSuccess();
}
- }
- },
- timer);
+ } else {
+ // the
initialization might not be finished yet
+ if
(checkpoint == null) {
+
onTriggerFailure(request, throwable);
+ } else {
+
onTriggerFailure(checkpoint, throwable);
Review comment:
nit: explicit `else if (throwable == null && checkpoint == null)` branch
would be more clear to me personally. As it is now, it implicitly relies on
error handling above.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]