This is an automated email from the ASF dual-hosted git repository.
jqin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a9cf18b [FLINK-21133][connector/checkpoint] Fix the
stop-with-savepoint case in FLIP-27 source by stopping the mailbox loop in
SourceOperatorStreamTask#finishTask().
a9cf18b is described below
commit a9cf18b4d25f130e0bd24d51b128bbcf71892b45
Author: Jiangjie (Becket) Qin <[email protected]>
AuthorDate: Mon Mar 29 16:06:31 2021 +0800
[FLINK-21133][connector/checkpoint] Fix the stop-with-savepoint case in
FLIP-27 source by stopping the mailbox loop in
SourceOperatorStreamTask#finishTask().
---
.../runtime/tasks/SourceOperatorStreamTask.java | 5 ++
.../flink/test/checkpointing/SavepointITCase.java | 54 ++++++++++++++++++++++
2 files changed, 59 insertions(+)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
index 1c8589f..7b3b06b 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SourceOperatorStreamTask.java
@@ -108,6 +108,11 @@ public class SourceOperatorStreamTask<T> extends
StreamTask<T, SourceOperator<T,
}
@Override
+ protected void finishTask() throws Exception {
+ mailboxProcessor.allActionsCompleted();
+ }
+
+ @Override
public Future<Boolean> triggerCheckpointAsync(
CheckpointMetaData checkpointMetaData, CheckpointOptions
checkpointOptions) {
if (!isExternallyInducedSource) {
diff --git
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
index 44b206f..fa3709d 100644
---
a/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
+++
b/flink-tests/src/test/java/org/apache/flink/test/checkpointing/SavepointITCase.java
@@ -154,6 +154,60 @@ public class SavepointITCase extends TestLogger {
}
}
+ @Test
+ public void testStopWithSavepointForFlip27SourceWithDrain() throws
Exception {
+ testStopWithSavepointForFlip27Source(true);
+ }
+
+ @Test
+ public void testStopWithSavepointForFlip27SourceWithoutDrain() throws
Exception {
+ testStopWithSavepointForFlip27Source(false);
+ }
+
+ private void testStopWithSavepointForFlip27Source(boolean drain) throws
Exception {
+ final int numTaskManagers = 2;
+ final int numSlotsPerTaskManager = 2;
+
+ final MiniClusterResourceFactory clusterFactory =
+ new MiniClusterResourceFactory(
+ numTaskManagers, numSlotsPerTaskManager,
getFileBasedCheckpointsConfig());
+
+ StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
+ env.setParallelism(1);
+
+ BoundedPassThroughOperator<Long> operator =
+ new BoundedPassThroughOperator<>(ChainingStrategy.ALWAYS);
+ DataStream<Long> stream =
+ env.fromSequence(0, Long.MAX_VALUE)
+ .transform("pass-through",
BasicTypeInfo.LONG_TYPE_INFO, operator);
+ stream.addSink(new DiscardingSink<>());
+
+ final JobGraph jobGraph = env.getStreamGraph().getJobGraph();
+ final JobID jobId = jobGraph.getJobID();
+
+ MiniClusterWithClientResource cluster = clusterFactory.get();
+ cluster.before();
+ ClusterClient<?> client = cluster.getClusterClient();
+
+ try {
+ BoundedPassThroughOperator.resetForTest(1, true);
+
+ client.submitJob(jobGraph).get();
+
+ BoundedPassThroughOperator.getProgressLatch().await();
+
+ client.stopWithSavepoint(jobId, drain, null).get();
+
+ if (drain) {
+ Assert.assertTrue(BoundedPassThroughOperator.inputEnded);
+ } else {
+ Assert.assertFalse(BoundedPassThroughOperator.inputEnded);
+ }
+ } finally {
+ cluster.after();
+ }
+ }
+
/**
* Triggers a savepoint for a job that uses the FsStateBackend. We expect
that all checkpoint
* files are written to a new savepoint directory.