This is an automated email from the ASF dual-hosted git repository.

jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a9cf18b  [FLINK-21133][connector/checkpoint] Fix the 
stop-with-savepoint case in FLIP-27 source by stopping the mailbox loop in 
SourceOperatorStreamTask#finishTask().
a9cf18b is described below

commit a9cf18b4d25f130e0bd24d51b128bbcf71892b45
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Mon Mar 29 16:06:31 2021 +0800

    [FLINK-21133][connector/checkpoint] Fix the stop-with-savepoint case in 
FLIP-27 source by stopping the mailbox loop in 
SourceOperatorStreamTask#finishTask().
---
 .../runtime/tasks/SourceOperatorStreamTask.java    |  5 ++
 .../flink/test/checkpointing/SavepointITCase.java  | 54 ++++++++++++++++++++++
 2 files changed, 59 insertions(+)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 1c8589f..7b3b06b 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -108,6 +108,11 @@ public class SourceOperatorStreamTask<T> extends 
StreamTask<T, SourceOperator<T,
     }
 
     @Override
+    protected void finishTask() throws Exception {
+        mailboxProcessor.allActionsCompleted();
+    }
+
+    @Override
     public Future<Boolean> triggerCheckpointAsync(
             CheckpointMetaData checkpointMetaData, CheckpointOptions 
checkpointOptions) {
         if (!isExternallyInducedSource) {
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 44b206f..fa3709d 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -154,6 +154,60 @@ public class SavepointITCase extends TestLogger {
         }
     }
 
+    @Test
+    public void testStopWithSavepointForFlip27SourceWithDrain() throws 
Exception {
+        testStopWithSavepointForFlip27Source(true);
+    }
+
+    @Test
+    public void testStopWithSavepointForFlip27SourceWithoutDrain() throws 
Exception {
+        testStopWithSavepointForFlip27Source(false);
+    }
+
+    private void testStopWithSavepointForFlip27Source(boolean drain) throws 
Exception {
+        final int numTaskManagers = 2;
+        final int numSlotsPerTaskManager = 2;
+
+        final MiniClusterResourceFactory clusterFactory =
+                new MiniClusterResourceFactory(
+                        numTaskManagers, numSlotsPerTaskManager, 
getFileBasedCheckpointsConfig());
+
+        StreamExecutionEnvironment env = 
StreamExecutionEnvironment.getExecutionEnvironment();
+        env.setParallelism(1);
+
+        BoundedPassThroughOperator<Long> operator =
+                new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+        DataStream<Long> stream =
+                env.fromSequence(0, Long.MAX_VALUE)
+                        .transform("pass-through", 
BasicTypeInfo.LONG_TYPE_INFO, operator);
+        stream.addSink(new DiscardingSink<>());
+
+        final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+        final JobID jobId = jobGraph.getJobID();
+
+        MiniClusterWithClientResource cluster = clusterFactory.get();
+        cluster.before();
+        ClusterClient<?> client = cluster.getClusterClient();
+
+        try {
+            BoundedPassThroughOperator.resetForTest(1, true);
+
+            client.submitJob(jobGraph).get();
+
+            BoundedPassThroughOperator.getProgressLatch().await();
+
+            client.stopWithSavepoint(jobId, drain, null).get();
+
+            if (drain) {
+                Assert.assertTrue(BoundedPassThroughOperator.inputEnded);
+            } else {
+                Assert.assertFalse(BoundedPassThroughOperator.inputEnded);
+            }
+        } finally {
+            cluster.after();
+        }
+    }
+
     /**
      * Triggers a savepoint for a job that uses the FsStateBackend. We expect 
that all checkpoint
      * files are written to a new savepoint directory.

Reply via email to