This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 90a7dab894b12b85f21d4cfc637d0d770841cfe5 Author: Till Rohrmann <[email protected]> AuthorDate: Thu Jun 11 16:17:51 2020 +0200 [FLINK-18137] Handle discarding of triggering checkpoint correctly Before discarding a triggering checkpoint could cause a NPE which would stop the processing of subsequent checkpoint requests. This commit changes this behaviour by checking this condition and instantiating a proper exception in case that a triggering checkpoint is being discarded. This closes #12611. --- .../runtime/checkpoint/CheckpointCoordinator.java | 52 ++++++++++++-------- .../CheckpointCoordinatorTriggeringTest.java | 57 ++++++++++++++++++++++ 2 files changed, 89 insertions(+), 20 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java index da518ee..6c033d9 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java @@ -544,27 +544,39 @@ public class CheckpointCoordinator { final PendingCheckpoint checkpoint = FutureUtils.getWithoutException(pendingCheckpointCompletableFuture); - if (throwable == null && checkpoint != null && !checkpoint.isDiscarded()) { - // no exception, no discarding, everything is OK - final long checkpointId = checkpoint.getCheckpointId(); - snapshotTaskState( - timestamp, - checkpointId, - checkpoint.getCheckpointStorageLocation(), - request.props, - executions, - request.advanceToEndOfTime); - - coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId)); - - onTriggerSuccess(); + Preconditions.checkState( + checkpoint != null || throwable != null, + "Either the pending checkpoint needs to be created or an error must have been occurred."); + + if (throwable != null) { + // the initialization might not be finished yet + if (checkpoint == null) { + onTriggerFailure(request, throwable); + } else { + onTriggerFailure(checkpoint, throwable); + } } else { - // the initialization might not be finished yet - if (checkpoint == null) { - onTriggerFailure(request, throwable); - } else { - onTriggerFailure(checkpoint, throwable); - } + if (checkpoint.isDiscarded()) { + onTriggerFailure( + checkpoint, + new CheckpointException( + CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE, + checkpoint.getFailureCause())); + } else { + // no exception, no discarding, everything is OK + final long checkpointId = checkpoint.getCheckpointId(); + snapshotTaskState( + timestamp, + checkpointId, + checkpoint.getCheckpointStorageLocation(), + request.props, + executions, + request.advanceToEndOfTime); + + coordinatorsToCheckpoint.forEach((ctx) -> ctx.afterSourceBarrierInjection(checkpointId)); + + onTriggerSuccess(); + } } }, timer); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java index 140441d..3dca350 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java @@ -21,8 +21,10 @@ package org.apache.flink.runtime.checkpoint; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobStatus; import org.apache.flink.core.io.SimpleVersionedSerializer; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor; +import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter; import org.apache.flink.runtime.executiongraph.Execution; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; @@ -31,6 +33,7 @@ import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio import org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.util.ExceptionUtils; +import org.apache.flink.util.ExecutorUtils; import org.apache.flink.util.TestLogger; import org.junit.Before; @@ -45,14 +48,19 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.Executor; +import java.util.concurrent.Executors; import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicInteger; import java.util.function.Predicate; import static org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex; import static org.apache.flink.util.Preconditions.checkNotNull; +import static org.hamcrest.CoreMatchers.instanceOf; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import static org.mockito.ArgumentMatchers.any; @@ -529,6 +537,48 @@ checkpointCoordinator.startCheckpointScheduler(); assertEquals(0, checkpointCoordinator.getTriggerRequestQueue().size()); } + /** + * This test only fails eventually. + */ + @Test + public void discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws Exception { + final ExecutionVertex executionVertex = mockExecutionVertex(new ExecutionAttemptID()); + + final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor(); + final CheckpointCoordinator checkpointCoordinator = new CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder() + .setTasks(new ExecutionVertex[]{executionVertex}) + .setTimer(new ScheduledExecutorServiceAdapter(scheduledExecutorService)) + .setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder() + .build()) + .build(); + + final CompletableFuture<String> masterHookCheckpointFuture = new CompletableFuture<>(); + final OneShotLatch triggerCheckpointLatch = new OneShotLatch(); + checkpointCoordinator.addMasterHook(new TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch)); + + try { + checkpointCoordinator.triggerCheckpoint(false); + final CompletableFuture<CompletedCheckpoint> secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false); + + triggerCheckpointLatch.await(); + masterHookCheckpointFuture.complete("Completed"); + + // discard triggering checkpoint + checkpointCoordinator.abortPendingCheckpoints(new CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED)); + + try { + // verify that the second checkpoint request will be executed and eventually times out + secondCheckpoint.get(); + fail("Expected the second checkpoint to fail."); + } catch (ExecutionException ee) { + assertThat(ExceptionUtils.stripExecutionException(ee), instanceOf(CheckpointException.class)); + } + } finally { + checkpointCoordinator.shutdown(JobStatus.FINISHED); + ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, scheduledExecutorService); + } + } + private CheckpointCoordinator createCheckpointCoordinator() { return new CheckpointCoordinatorBuilder() .setTimer(manuallyTriggeredScheduledExecutor) @@ -568,9 +618,15 @@ checkpointCoordinator.startCheckpointScheduler(); new CheckpointCoordinatorTestingUtils.StringSerializer(); private final CompletableFuture<String> checkpointFuture; + private final OneShotLatch triggerCheckpointLatch; private TestingMasterHook(CompletableFuture<String> checkpointFuture) { + this(checkpointFuture, new OneShotLatch()); + } + + private TestingMasterHook(CompletableFuture<String> checkpointFuture, OneShotLatch triggerCheckpointLatch) { this.checkpointFuture = checkpointFuture; + this.triggerCheckpointLatch = triggerCheckpointLatch; } @Override @@ -582,6 +638,7 @@ checkpointCoordinator.startCheckpointScheduler(); @Override public CompletableFuture<String> triggerCheckpoint( long checkpointId, long timestamp, Executor executor) { + triggerCheckpointLatch.trigger(); return checkpointFuture; }
