This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new e7c634f [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker
while executing checkpoint aborted by coordinator RPC
e7c634f is described below
commit e7c634f223c0a2c180ca5abe14a4661115f0afe6
Author: Yun Tang <[email protected]>
AuthorDate: Wed Jun 17 21:10:02 2020 +0800
[FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker while executing
checkpoint aborted by coordinator RPC
In the case of aborting checkpoint RPC from CheckpointCoordinator, it will
prevent executing the
respective checkpoint which was already triggered before. But we also need
to broadcast the
CancelCheckpointMarker before exiting the execution , otherwise the
downstream side would
probably wait for barrier alignment until deadlock.
This closes #12664.
---
.../tasks/SubtaskCheckpointCoordinatorImpl.java | 4 +-
.../tasks/SubtaskCheckpointCoordinatorTest.java | 75 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 1 deletion(-)
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index b2fe147..0d6d638 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -235,9 +235,11 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
return;
}
- // Step (0): Record the last triggered checkpointId.
+ // Step (0): Record the last triggered checkpointId and abort
the sync phase of checkpoint if necessary.
lastCheckpointId = metadata.getCheckpointId();
if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
+ // broadcast cancel checkpoint marker to avoid
downstream back-pressure due to checkpoint barrier align.
+ operatorChain.broadcastEvent(new
CancelCheckpointMarker(metadata.getCheckpointId()));
LOG.info("Checkpoint {} has been notified as aborted,
would not trigger any checkpoint.", metadata.getCheckpointId());
return;
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index f7655ea..00ef9a1 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -18,6 +18,9 @@
package org.apache.flink.streaming.runtime.tasks;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.metrics.MetricGroup;
@@ -28,7 +31,11 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
+import
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -39,12 +46,15 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
import org.apache.flink.runtime.state.SnapshotResult;
import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView;
import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamMap;
import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
import org.apache.flink.streaming.util.MockStreamTaskBuilder;
@@ -53,6 +63,8 @@ import org.apache.flink.util.ExceptionUtils;
import org.junit.Test;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -219,6 +231,69 @@ public class SubtaskCheckpointCoordinatorTest {
}
@Test
+ public void
testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throws Exception
{
+ OneInputStreamTaskTestHarness<String, String> testHarness =
+ new OneInputStreamTaskTestHarness<>(
+ OneInputStreamTask::new,
+ 1,
+ 1,
+ BasicTypeInfo.STRING_TYPE_INFO,
+ BasicTypeInfo.STRING_TYPE_INFO);
+
+ testHarness.setupOutputForSingletonOperatorChain();
+ StreamConfig streamConfig = testHarness.getStreamConfig();
+ streamConfig.setStreamOperator(new MapOperator());
+
+ testHarness.invoke();
+ testHarness.waitForTaskRunning();
+
+ MockEnvironment mockEnvironment =
MockEnvironment.builder().build();
+ SubtaskCheckpointCoordinator subtaskCheckpointCoordinator = new
MockSubtaskCheckpointCoordinatorBuilder()
+ .setEnvironment(mockEnvironment)
+ .build();
+
+ TestPooledBufferProvider bufferProvider = new
TestPooledBufferProvider(1, 4096);
+ ArrayList<Object> recordOrEvents = new ArrayList<>();
+ StreamElementSerializer<String> stringStreamElementSerializer =
new StreamElementSerializer<>(StringSerializer.INSTANCE);
+ ResultPartitionWriter resultPartitionWriter = new
RecordOrEventCollectingResultPartitionWriter<>(
+ recordOrEvents, bufferProvider,
stringStreamElementSerializer);
+
mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter));
+
+ OneInputStreamTask<String, String> task = testHarness.getTask();
+ OperatorChain<String, OneInputStreamOperator<String, String>>
operatorChain = new OperatorChain<>(
+ task,
StreamTask.createRecordWriterDelegate(streamConfig, mockEnvironment));
+ long checkpointId = 42L;
+ // notify checkpoint aborted before execution.
+
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId,
operatorChain, () -> true);
+ subtaskCheckpointCoordinator.checkpointState(
+ new CheckpointMetaData(checkpointId,
System.currentTimeMillis()),
+ CheckpointOptions.forCheckpointWithDefaultLocation(),
+ new CheckpointMetrics(),
+ operatorChain,
+ () -> true);
+
+ assertEquals(1, recordOrEvents.size());
+ Object recordOrEvent = recordOrEvents.get(0);
+ // ensure CancelCheckpointMarker is broadcast downstream.
+ assertTrue(recordOrEvent instanceof CancelCheckpointMarker);
+ assertEquals(checkpointId, ((CancelCheckpointMarker)
recordOrEvent).getCheckpointId());
+ testHarness.endInput();
+ testHarness.waitForTaskCompletion();
+ }
+
+ private static class MapOperator extends StreamMap<String, String> {
+ private static final long serialVersionUID = 1L;
+
+ public MapOperator() {
+ super((MapFunction<String, String>) value -> value);
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) throws
Exception {
+ }
+ }
+
+ @Test
public void testNotifyCheckpointAbortedDuringAsyncPhase() throws
Exception {
MockEnvironment mockEnvironment =
MockEnvironment.builder().build();
SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator =
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()