This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 90a7dab894b12b85f21d4cfc637d0d770841cfe5
Author: Till Rohrmann <[email protected]>
AuthorDate: Thu Jun 11 16:17:51 2020 +0200

    [FLINK-18137] Handle discarding of triggering checkpoint correctly
    
    Before discarding a triggering checkpoint could cause a NPE which would 
stop the
    processing of subsequent checkpoint requests. This commit changes this 
behaviour
    by checking this condition and instantiating a proper exception in case 
that a
    triggering checkpoint is being discarded.
    
    This closes #12611.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  | 52 ++++++++++++--------
 .../CheckpointCoordinatorTriggeringTest.java       | 57 ++++++++++++++++++++++
 2 files changed, 89 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index da518ee..6c033d9 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -544,27 +544,39 @@ public class CheckpointCoordinator {
                                                final PendingCheckpoint 
checkpoint =
                                                        
FutureUtils.getWithoutException(pendingCheckpointCompletableFuture);
 
-                                               if (throwable == null && 
checkpoint != null && !checkpoint.isDiscarded()) {
-                                                       // no exception, no 
discarding, everything is OK
-                                                       final long checkpointId 
= checkpoint.getCheckpointId();
-                                                       snapshotTaskState(
-                                                               timestamp,
-                                                               checkpointId,
-                                                               
checkpoint.getCheckpointStorageLocation(),
-                                                               request.props,
-                                                               executions,
-                                                               
request.advanceToEndOfTime);
-
-                                                       
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
-
-                                                       onTriggerSuccess();
+                                               Preconditions.checkState(
+                                                       checkpoint != null || 
throwable != null,
+                                                       "Either the pending 
checkpoint needs to be created or an error must have been occurred.");
+
+                                               if (throwable != null) {
+                                                       // the initialization 
might not be finished yet
+                                                       if (checkpoint == null) 
{
+                                                               
onTriggerFailure(request, throwable);
+                                                       } else {
+                                                               
onTriggerFailure(checkpoint, throwable);
+                                                       }
                                                } else {
-                                                               // the 
initialization might not be finished yet
-                                                               if (checkpoint 
== null) {
-                                                                       
onTriggerFailure(request, throwable);
-                                                               } else {
-                                                                       
onTriggerFailure(checkpoint, throwable);
-                                                               }
+                                                       if 
(checkpoint.isDiscarded()) {
+                                                               
onTriggerFailure(
+                                                                       
checkpoint,
+                                                                       new 
CheckpointException(
+                                                                               
CheckpointFailureReason.TRIGGER_CHECKPOINT_FAILURE,
+                                                                               
checkpoint.getFailureCause()));
+                                                       } else {
+                                                               // no 
exception, no discarding, everything is OK
+                                                               final long 
checkpointId = checkpoint.getCheckpointId();
+                                                               
snapshotTaskState(
+                                                                       
timestamp,
+                                                                       
checkpointId,
+                                                                       
checkpoint.getCheckpointStorageLocation(),
+                                                                       
request.props,
+                                                                       
executions,
+                                                                       
request.advanceToEndOfTime);
+
+                                                               
coordinatorsToCheckpoint.forEach((ctx) -> 
ctx.afterSourceBarrierInjection(checkpointId));
+
+                                                               
onTriggerSuccess();
+                                                       }
                                                }
                                        },
                                        timer);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
index 140441d..3dca350 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinatorTriggeringTest.java
@@ -21,8 +21,10 @@ package org.apache.flink.runtime.checkpoint;
 import org.apache.flink.api.common.JobID;
 import org.apache.flink.api.common.JobStatus;
 import org.apache.flink.core.io.SimpleVersionedSerializer;
+import org.apache.flink.core.testutils.OneShotLatch;
 import 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder;
 import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutor;
+import org.apache.flink.runtime.concurrent.ScheduledExecutorServiceAdapter;
 import org.apache.flink.runtime.executiongraph.Execution;
 import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.executiongraph.ExecutionVertex;
@@ -31,6 +33,7 @@ import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguratio
 import 
org.apache.flink.runtime.jobgraph.tasks.CheckpointCoordinatorConfiguration.CheckpointCoordinatorConfigurationBuilder;
 import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint;
 import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.ExecutorUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Before;
@@ -45,14 +48,19 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.function.Predicate;
 
 import static 
org.apache.flink.runtime.checkpoint.CheckpointCoordinatorTestingUtils.mockExecutionVertex;
 import static org.apache.flink.util.Preconditions.checkNotNull;
+import static org.hamcrest.CoreMatchers.instanceOf;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
@@ -529,6 +537,48 @@ checkpointCoordinator.startCheckpointScheduler();
                assertEquals(0, 
checkpointCoordinator.getTriggerRequestQueue().size());
        }
 
+       /**
+        * This test only fails eventually.
+        */
+       @Test
+       public void 
discardingTriggeringCheckpointWillExecuteNextCheckpointRequest() throws 
Exception {
+               final ExecutionVertex executionVertex = mockExecutionVertex(new 
ExecutionAttemptID());
+
+               final ScheduledExecutorService scheduledExecutorService = 
Executors.newSingleThreadScheduledExecutor();
+               final CheckpointCoordinator checkpointCoordinator = new 
CheckpointCoordinatorTestingUtils.CheckpointCoordinatorBuilder()
+                       .setTasks(new ExecutionVertex[]{executionVertex})
+                       .setTimer(new 
ScheduledExecutorServiceAdapter(scheduledExecutorService))
+                       
.setCheckpointCoordinatorConfiguration(CheckpointCoordinatorConfiguration.builder()
+                               .build())
+                       .build();
+
+               final CompletableFuture<String> masterHookCheckpointFuture = 
new CompletableFuture<>();
+               final OneShotLatch triggerCheckpointLatch = new OneShotLatch();
+               checkpointCoordinator.addMasterHook(new 
TestingMasterHook(masterHookCheckpointFuture, triggerCheckpointLatch));
+
+               try {
+                       checkpointCoordinator.triggerCheckpoint(false);
+                       final CompletableFuture<CompletedCheckpoint> 
secondCheckpoint = checkpointCoordinator.triggerCheckpoint(false);
+
+                       triggerCheckpointLatch.await();
+                       masterHookCheckpointFuture.complete("Completed");
+
+                       // discard triggering checkpoint
+                       checkpointCoordinator.abortPendingCheckpoints(new 
CheckpointException(CheckpointFailureReason.CHECKPOINT_DECLINED));
+
+                       try {
+                               // verify that the second checkpoint request 
will be executed and eventually times out
+                               secondCheckpoint.get();
+                               fail("Expected the second checkpoint to fail.");
+                       } catch (ExecutionException ee) {
+                               
assertThat(ExceptionUtils.stripExecutionException(ee), 
instanceOf(CheckpointException.class));
+                       }
+               } finally {
+                       checkpointCoordinator.shutdown(JobStatus.FINISHED);
+                       ExecutorUtils.gracefulShutdown(10L, TimeUnit.SECONDS, 
scheduledExecutorService);
+               }
+       }
+
        private CheckpointCoordinator createCheckpointCoordinator() {
                return new CheckpointCoordinatorBuilder()
                        .setTimer(manuallyTriggeredScheduledExecutor)
@@ -568,9 +618,15 @@ checkpointCoordinator.startCheckpointScheduler();
                        new 
CheckpointCoordinatorTestingUtils.StringSerializer();
 
                private final CompletableFuture<String> checkpointFuture;
+               private final OneShotLatch triggerCheckpointLatch;
 
                private TestingMasterHook(CompletableFuture<String> 
checkpointFuture) {
+                       this(checkpointFuture, new OneShotLatch());
+               }
+
+               private TestingMasterHook(CompletableFuture<String> 
checkpointFuture, OneShotLatch triggerCheckpointLatch) {
                        this.checkpointFuture = checkpointFuture;
+                       this.triggerCheckpointLatch = triggerCheckpointLatch;
                }
 
                @Override
@@ -582,6 +638,7 @@ checkpointCoordinator.startCheckpointScheduler();
                @Override
                public CompletableFuture<String> triggerCheckpoint(
                        long checkpointId, long timestamp, Executor executor) {
+                       triggerCheckpointLatch.trigger();
                        return checkpointFuture;
                }
 

Reply via email to