This is an automated email from the ASF dual-hosted git repository.

dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f5ef5ae  [FLINK-24161] Fix interplay of stop-with-savepoint w/o drain 
with final checkpoints
f5ef5ae is described below

commit f5ef5ae8f120f3e70ff3fe093fb4c45e92beaabd
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Wed Sep 8 16:41:16 2021 +0200

    [FLINK-24161] Fix interplay of stop-with-savepoint w/o drain with final 
checkpoints
    
    This closes #17200
---
 .../flink/streaming/runtime/tasks/StreamTask.java  |  25 +++-
 .../tasks/StreamTaskFinalCheckpointsTest.java      | 149 ++++++++++++++++++++-
 2 files changed, 166 insertions(+), 8 deletions(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 46decbb..0e58538 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -32,6 +32,7 @@ import 
org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
 import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
 import 
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
@@ -529,6 +530,15 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
     }
 
     protected void endData() throws Exception {
+
+        if (syncSavepointWithoutDrain != null && 
areCheckpointsWithFinishedTasksEnabled()) {
+            throw new FlinkRuntimeException(
+                    "We run out of data to process while waiting for a 
synchronous savepoint"
+                            + " to be finished. This can lead to a deadlock 
waiting for a final"
+                            + " checkpoint after a synchronous savepoint, 
which will never be"
+                            + " triggered.");
+        }
+
         advanceToEndOfEventTime();
         // finish all operators in a chain effect way
         operatorChain.finishOperators(actionExecutor);
@@ -1281,19 +1291,28 @@ public abstract class StreamTask<OUT, OP extends 
StreamOperator<OUT>>
             CheckpointMetricsBuilder checkpointMetrics)
             throws Exception {
 
+        final CheckpointType checkpointType = 
checkpointOptions.getCheckpointType();
         LOG.debug(
                 "Starting checkpoint {} {} on task {}",
                 checkpointMetaData.getCheckpointId(),
-                checkpointOptions.getCheckpointType(),
+                checkpointType,
                 getName());
 
+        if (checkpointType.isSynchronous()
+                && !checkpointType.shouldDrain()
+                && endOfDataReceived
+                && areCheckpointsWithFinishedTasksEnabled()) {
+            LOG.debug("Can not trigger a stop-with-savepoint w/o drain if a 
task is finishing.");
+            return false;
+        }
+
         if (isRunning) {
             actionExecutor.runThrowing(
                     () -> {
-                        if 
(checkpointOptions.getCheckpointType().isSynchronous()) {
+                        if (checkpointType.isSynchronous()) {
                             setSynchronousSavepoint(
                                     checkpointMetaData.getCheckpointId(),
-                                    
checkpointOptions.getCheckpointType().shouldDrain());
+                                    checkpointType.shouldDrain());
                         }
 
                         if (areCheckpointsWithFinishedTasksEnabled()
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
index 932ec2c..5a60391 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState;
 import org.apache.flink.api.common.state.ListStateDescriptor;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
 import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
 import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
 import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
@@ -61,6 +62,7 @@ import java.util.concurrent.Future;
 
 import static 
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
 import static 
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+import static org.apache.flink.util.ExceptionUtils.assertThrowable;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.Matchers.contains;
 import static org.junit.Assert.assertArrayEquals;
@@ -367,7 +369,7 @@ public class StreamTaskFinalCheckpointsTest {
 
                 // trigger the synchronous savepoint
                 CompletableFuture<Boolean> savepointFuture =
-                        triggerStopWithSavepoint(testHarness, syncSavepointId);
+                        triggerStopWithSavepointDrain(testHarness, 
syncSavepointId);
 
                 // The checkpoint 6 would be triggered successfully.
                 testHarness.finishProcessing();
@@ -466,7 +468,7 @@ public class StreamTaskFinalCheckpointsTest {
 
             // trigger the synchronous savepoint
             CompletableFuture<Boolean> savepointFuture =
-                    triggerStopWithSavepoint(testHarness, syncSavepointId);
+                    triggerStopWithSavepointDrain(testHarness, 
syncSavepointId);
 
             // The checkpoint 6 would be triggered successfully.
             testHarness.finishProcessing();
@@ -482,6 +484,130 @@ public class StreamTaskFinalCheckpointsTest {
     }
 
     @Test
+    public void 
testTriggerStopWithSavepointNoDrainWhenWaitingForFinalCheckpointOnSourceTask()
+            throws Exception {
+        int finalCheckpointId = 6;
+        int syncSavepointId = 7;
+        CompletingCheckpointResponder checkpointResponder =
+                new CompletingCheckpointResponder() {
+
+                    private CheckpointMetrics metrics;
+                    private TaskStateSnapshot stateSnapshot;
+
+                    @Override
+                    public void acknowledgeCheckpoint(
+                            JobID jobID,
+                            ExecutionAttemptID executionAttemptID,
+                            long checkpointId,
+                            CheckpointMetrics checkpointMetrics,
+                            TaskStateSnapshot subtaskState) {
+                        // do not acknowledge any checkpoints straightaway
+                        if (checkpointId == finalCheckpointId) {
+                            metrics = checkpointMetrics;
+                            stateSnapshot = subtaskState;
+                        }
+                    }
+
+                    @Override
+                    public void declineCheckpoint(
+                            JobID jobID,
+                            ExecutionAttemptID executionAttemptID,
+                            long checkpointId,
+                            CheckpointException checkpointException) {
+                        // acknowledge the last pending checkpoint once the 
synchronous savepoint is
+                        // declined.
+                        if (syncSavepointId == checkpointId) {
+                            super.acknowledgeCheckpoint(
+                                    jobID,
+                                    executionAttemptID,
+                                    finalCheckpointId,
+                                    metrics,
+                                    stateSnapshot);
+                        }
+                    }
+                };
+
+        try (StreamTaskMailboxTestHarness<String> testHarness =
+                new 
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+                        .modifyStreamConfig(
+                                config -> {
+                                    config.setCheckpointingEnabled(true);
+                                    config.getConfiguration()
+                                            .set(
+                                                    
ExecutionCheckpointingOptions
+                                                            
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
+                                                    true);
+                                })
+                        .setCheckpointResponder(checkpointResponder)
+                        .setupOutputForSingletonOperatorChain(
+                                new StreamSource<>(new 
ImmediatelyFinishingSource()))
+                        .build()) {
+            checkpointResponder.setHandlers(
+                    testHarness.streamTask::notifyCheckpointCompleteAsync,
+                    testHarness.streamTask::notifyCheckpointAbortAsync);
+
+            // start task thread
+            testHarness.streamTask.runMailboxLoop();
+
+            // trigger the final checkpoint
+            CompletableFuture<Boolean> checkpointFuture =
+                    triggerCheckpoint(testHarness, finalCheckpointId);
+
+            // trigger the synchronous savepoint w/o drain, which should be 
declined
+            CompletableFuture<Boolean> savepointFuture =
+                    triggerStopWithSavepointNoDrain(testHarness, 
syncSavepointId);
+
+            // The checkpoint 6 would be triggered successfully.
+            testHarness.finishProcessing();
+            assertTrue(checkpointFuture.isDone());
+            assertTrue(savepointFuture.isDone());
+            assertFalse(savepointFuture.get());
+            testHarness.getTaskStateManager().getWaitForReportLatch().await();
+            assertEquals(
+                    finalCheckpointId, 
testHarness.getTaskStateManager().getReportedCheckpointId());
+            assertEquals(
+                    finalCheckpointId,
+                    
testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
+        }
+    }
+
+    @Test
+    public void 
testTriggerSourceFinishesWhileStoppingWithSavepointWithoutDrain() throws 
Exception {
+        try (StreamTaskMailboxTestHarness<String> testHarness =
+                new 
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+                        .modifyStreamConfig(
+                                config -> {
+                                    config.setCheckpointingEnabled(true);
+                                    config.getConfiguration()
+                                            .set(
+                                                    
ExecutionCheckpointingOptions
+                                                            
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
+                                                    true);
+                                })
+                        .setupOutputForSingletonOperatorChain(
+                                new StreamSource<>(new 
ImmediatelyFinishingSource()))
+                        .build()) {
+
+            // trigger the synchronous savepoint w/o drain
+            triggerStopWithSavepointNoDrain(testHarness, 1);
+
+            // start task thread
+            testHarness.streamTask.runMailboxLoop();
+        } catch (Exception ex) {
+            assertThrowable(
+                    ex,
+                    (e ->
+                            e.getMessage()
+                                    .equals(
+                                            "We run out of data to process 
while waiting for a "
+                                                    + "synchronous savepoint 
to be finished. This "
+                                                    + "can lead to a deadlock 
waiting for a final "
+                                                    + "checkpoint after a 
synchronous savepoint, "
+                                                    + "which will never be 
triggered.")));
+        }
+    }
+
+    @Test
     public void testTriggeringAlignedNoTimeoutCheckpointWithFinishedChannels() 
throws Exception {
         testTriggeringCheckpointWithFinishedChannels(
                 CheckpointOptions.alignedNoTimeout(
@@ -680,16 +806,29 @@ public class StreamTaskFinalCheckpointsTest {
                         checkpointOptions);
     }
 
-    static CompletableFuture<Boolean> triggerStopWithSavepoint(
+    static CompletableFuture<Boolean> triggerStopWithSavepointDrain(
+            StreamTaskMailboxTestHarness<String> testHarness, long 
checkpointId) {
+        return triggerStopWithSavepoint(
+                testHarness, checkpointId, CheckpointType.SAVEPOINT_TERMINATE);
+    }
+
+    static CompletableFuture<Boolean> triggerStopWithSavepointNoDrain(
             StreamTaskMailboxTestHarness<String> testHarness, long 
checkpointId) {
+        return triggerStopWithSavepoint(
+                testHarness, checkpointId, CheckpointType.SAVEPOINT_SUSPEND);
+    }
+
+    static CompletableFuture<Boolean> triggerStopWithSavepoint(
+            StreamTaskMailboxTestHarness<String> testHarness,
+            long checkpointId,
+            CheckpointType checkpointType) {
         testHarness.getTaskStateManager().getWaitForReportLatch().reset();
         return testHarness
                 .getStreamTask()
                 .triggerCheckpointAsync(
                         new CheckpointMetaData(checkpointId, checkpointId * 
1000),
                         CheckpointOptions.alignedNoTimeout(
-                                CheckpointType.SAVEPOINT_TERMINATE,
-                                
CheckpointStorageLocationReference.getDefault()));
+                                checkpointType, 
CheckpointStorageLocationReference.getDefault()));
     }
 
     static void processMailTillCheckpointSucceeds(

Reply via email to