This is an automated email from the ASF dual-hosted git repository.
dwysakowicz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f5ef5ae [FLINK-24161] Fix interplay of stop-with-savepoint w/o drain
with final checkpoints
f5ef5ae is described below
commit f5ef5ae8f120f3e70ff3fe093fb4c45e92beaabd
Author: Dawid Wysakowicz <[email protected]>
AuthorDate: Wed Sep 8 16:41:16 2021 +0200
[FLINK-24161] Fix interplay of stop-with-savepoint w/o drain with final
checkpoints
This closes #17200
---
.../flink/streaming/runtime/tasks/StreamTask.java | 25 +++-
.../tasks/StreamTaskFinalCheckpointsTest.java | 149 ++++++++++++++++++++-
2 files changed, 166 insertions(+), 8 deletions(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
index 46decbb..0e58538 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/StreamTask.java
@@ -32,6 +32,7 @@ import
org.apache.flink.runtime.checkpoint.CheckpointFailureReason;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
import org.apache.flink.runtime.checkpoint.CheckpointOptions;
+import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.InputChannelInfo;
import
org.apache.flink.runtime.checkpoint.channel.SequentialChannelStateReader;
@@ -529,6 +530,15 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
}
protected void endData() throws Exception {
+
+ if (syncSavepointWithoutDrain != null &&
areCheckpointsWithFinishedTasksEnabled()) {
+ throw new FlinkRuntimeException(
+ "We run out of data to process while waiting for a
synchronous savepoint"
+ + " to be finished. This can lead to a deadlock
waiting for a final"
+ + " checkpoint after a synchronous savepoint,
which will never be"
+ + " triggered.");
+ }
+
advanceToEndOfEventTime();
// finish all operators in a chain effect way
operatorChain.finishOperators(actionExecutor);
@@ -1281,19 +1291,28 @@ public abstract class StreamTask<OUT, OP extends
StreamOperator<OUT>>
CheckpointMetricsBuilder checkpointMetrics)
throws Exception {
+ final CheckpointType checkpointType =
checkpointOptions.getCheckpointType();
LOG.debug(
"Starting checkpoint {} {} on task {}",
checkpointMetaData.getCheckpointId(),
- checkpointOptions.getCheckpointType(),
+ checkpointType,
getName());
+ if (checkpointType.isSynchronous()
+ && !checkpointType.shouldDrain()
+ && endOfDataReceived
+ && areCheckpointsWithFinishedTasksEnabled()) {
+ LOG.debug("Can not trigger a stop-with-savepoint w/o drain if a
task is finishing.");
+ return false;
+ }
+
if (isRunning) {
actionExecutor.runThrowing(
() -> {
- if
(checkpointOptions.getCheckpointType().isSynchronous()) {
+ if (checkpointType.isSynchronous()) {
setSynchronousSavepoint(
checkpointMetaData.getCheckpointId(),
-
checkpointOptions.getCheckpointType().shouldDrain());
+ checkpointType.shouldDrain());
}
if (areCheckpointsWithFinishedTasksEnabled()
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
index 932ec2c..5a60391 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/StreamTaskFinalCheckpointsTest.java
@@ -23,6 +23,7 @@ import org.apache.flink.api.common.state.ListState;
import org.apache.flink.api.common.state.ListStateDescriptor;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeutils.base.StringSerializer;
+import org.apache.flink.runtime.checkpoint.CheckpointException;
import org.apache.flink.runtime.checkpoint.CheckpointMetaData;
import org.apache.flink.runtime.checkpoint.CheckpointMetrics;
import org.apache.flink.runtime.checkpoint.CheckpointMetricsBuilder;
@@ -61,6 +62,7 @@ import java.util.concurrent.Future;
import static
org.apache.flink.api.common.typeinfo.BasicTypeInfo.STRING_TYPE_INFO;
import static
org.apache.flink.runtime.state.CheckpointStorageLocationReference.getDefault;
+import static org.apache.flink.util.ExceptionUtils.assertThrowable;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
import static org.junit.Assert.assertArrayEquals;
@@ -367,7 +369,7 @@ public class StreamTaskFinalCheckpointsTest {
// trigger the synchronous savepoint
CompletableFuture<Boolean> savepointFuture =
- triggerStopWithSavepoint(testHarness, syncSavepointId);
+ triggerStopWithSavepointDrain(testHarness,
syncSavepointId);
// The checkpoint 6 would be triggered successfully.
testHarness.finishProcessing();
@@ -466,7 +468,7 @@ public class StreamTaskFinalCheckpointsTest {
// trigger the synchronous savepoint
CompletableFuture<Boolean> savepointFuture =
- triggerStopWithSavepoint(testHarness, syncSavepointId);
+ triggerStopWithSavepointDrain(testHarness,
syncSavepointId);
// The checkpoint 6 would be triggered successfully.
testHarness.finishProcessing();
@@ -482,6 +484,130 @@ public class StreamTaskFinalCheckpointsTest {
}
@Test
+ public void
testTriggerStopWithSavepointNoDrainWhenWaitingForFinalCheckpointOnSourceTask()
+ throws Exception {
+ int finalCheckpointId = 6;
+ int syncSavepointId = 7;
+ CompletingCheckpointResponder checkpointResponder =
+ new CompletingCheckpointResponder() {
+
+ private CheckpointMetrics metrics;
+ private TaskStateSnapshot stateSnapshot;
+
+ @Override
+ public void acknowledgeCheckpoint(
+ JobID jobID,
+ ExecutionAttemptID executionAttemptID,
+ long checkpointId,
+ CheckpointMetrics checkpointMetrics,
+ TaskStateSnapshot subtaskState) {
+ // do not acknowledge any checkpoints straightaway
+ if (checkpointId == finalCheckpointId) {
+ metrics = checkpointMetrics;
+ stateSnapshot = subtaskState;
+ }
+ }
+
+ @Override
+ public void declineCheckpoint(
+ JobID jobID,
+ ExecutionAttemptID executionAttemptID,
+ long checkpointId,
+ CheckpointException checkpointException) {
+ // acknowledge the last pending checkpoint once the
synchronous savepoint is
+ // declined.
+ if (syncSavepointId == checkpointId) {
+ super.acknowledgeCheckpoint(
+ jobID,
+ executionAttemptID,
+ finalCheckpointId,
+ metrics,
+ stateSnapshot);
+ }
+ }
+ };
+
+ try (StreamTaskMailboxTestHarness<String> testHarness =
+ new
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+ .modifyStreamConfig(
+ config -> {
+ config.setCheckpointingEnabled(true);
+ config.getConfiguration()
+ .set(
+
ExecutionCheckpointingOptions
+
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
+ true);
+ })
+ .setCheckpointResponder(checkpointResponder)
+ .setupOutputForSingletonOperatorChain(
+ new StreamSource<>(new
ImmediatelyFinishingSource()))
+ .build()) {
+ checkpointResponder.setHandlers(
+ testHarness.streamTask::notifyCheckpointCompleteAsync,
+ testHarness.streamTask::notifyCheckpointAbortAsync);
+
+ // start task thread
+ testHarness.streamTask.runMailboxLoop();
+
+ // trigger the final checkpoint
+ CompletableFuture<Boolean> checkpointFuture =
+ triggerCheckpoint(testHarness, finalCheckpointId);
+
+ // trigger the synchronous savepoint w/o drain, which should be
declined
+ CompletableFuture<Boolean> savepointFuture =
+ triggerStopWithSavepointNoDrain(testHarness,
syncSavepointId);
+
+ // The checkpoint 6 would be triggered successfully.
+ testHarness.finishProcessing();
+ assertTrue(checkpointFuture.isDone());
+ assertTrue(savepointFuture.isDone());
+ assertFalse(savepointFuture.get());
+ testHarness.getTaskStateManager().getWaitForReportLatch().await();
+ assertEquals(
+ finalCheckpointId,
testHarness.getTaskStateManager().getReportedCheckpointId());
+ assertEquals(
+ finalCheckpointId,
+
testHarness.getTaskStateManager().getNotifiedCompletedCheckpointId());
+ }
+ }
+
+ @Test
+ public void
testTriggerSourceFinishesWhileStoppingWithSavepointWithoutDrain() throws
Exception {
+ try (StreamTaskMailboxTestHarness<String> testHarness =
+ new
StreamTaskMailboxTestHarnessBuilder<>(SourceStreamTask::new, STRING_TYPE_INFO)
+ .modifyStreamConfig(
+ config -> {
+ config.setCheckpointingEnabled(true);
+ config.getConfiguration()
+ .set(
+
ExecutionCheckpointingOptions
+
.ENABLE_CHECKPOINTS_AFTER_TASKS_FINISH,
+ true);
+ })
+ .setupOutputForSingletonOperatorChain(
+ new StreamSource<>(new
ImmediatelyFinishingSource()))
+ .build()) {
+
+ // trigger the synchronous savepoint w/o drain
+ triggerStopWithSavepointNoDrain(testHarness, 1);
+
+ // start task thread
+ testHarness.streamTask.runMailboxLoop();
+ } catch (Exception ex) {
+ assertThrowable(
+ ex,
+ (e ->
+ e.getMessage()
+ .equals(
+ "We run out of data to process
while waiting for a "
+ + "synchronous savepoint
to be finished. This "
+ + "can lead to a deadlock
waiting for a final "
+ + "checkpoint after a
synchronous savepoint, "
+ + "which will never be
triggered.")));
+ }
+ }
+
+ @Test
public void testTriggeringAlignedNoTimeoutCheckpointWithFinishedChannels()
throws Exception {
testTriggeringCheckpointWithFinishedChannels(
CheckpointOptions.alignedNoTimeout(
@@ -680,16 +806,29 @@ public class StreamTaskFinalCheckpointsTest {
checkpointOptions);
}
- static CompletableFuture<Boolean> triggerStopWithSavepoint(
+ static CompletableFuture<Boolean> triggerStopWithSavepointDrain(
+ StreamTaskMailboxTestHarness<String> testHarness, long
checkpointId) {
+ return triggerStopWithSavepoint(
+ testHarness, checkpointId, CheckpointType.SAVEPOINT_TERMINATE);
+ }
+
+ static CompletableFuture<Boolean> triggerStopWithSavepointNoDrain(
StreamTaskMailboxTestHarness<String> testHarness, long
checkpointId) {
+ return triggerStopWithSavepoint(
+ testHarness, checkpointId, CheckpointType.SAVEPOINT_SUSPEND);
+ }
+
+ static CompletableFuture<Boolean> triggerStopWithSavepoint(
+ StreamTaskMailboxTestHarness<String> testHarness,
+ long checkpointId,
+ CheckpointType checkpointType) {
testHarness.getTaskStateManager().getWaitForReportLatch().reset();
return testHarness
.getStreamTask()
.triggerCheckpointAsync(
new CheckpointMetaData(checkpointId, checkpointId *
1000),
CheckpointOptions.alignedNoTimeout(
- CheckpointType.SAVEPOINT_TERMINATE,
-
CheckpointStorageLocationReference.getDefault()));
+ checkpointType,
CheckpointStorageLocationReference.getDefault()));
}
static void processMailTillCheckpointSucceeds(