This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.11 by this push:
     new e7c634f   [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker 
while executing checkpoint aborted by coordinator RPC
e7c634f is described below

commit e7c634f223c0a2c180ca5abe14a4661115f0afe6
Author: Yun Tang <[email protected]>
AuthorDate: Wed Jun 17 21:10:02 2020 +0800

     [FLINK-18238][checkpoint] Broadcast CancelCheckpointMarker while executing 
checkpoint aborted by coordinator RPC
    
    In the case of aborting checkpoint RPC from CheckpointCoordinator, it will 
prevent executing the
    respective checkpoint which was already triggered before. But we also need 
to broadcast the
    CancelCheckpointMarker before exiting the execution , otherwise the 
downstream side would
    probably wait for barrier alignment until deadlock.
    
    This closes #12664.
---
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    |  4 +-
 .../tasks/SubtaskCheckpointCoordinatorTest.java    | 75 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 1 deletion(-)

diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index b2fe147..0d6d638 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -235,9 +235,11 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                        return;
                }
 
-               // Step (0): Record the last triggered checkpointId.
+               // Step (0): Record the last triggered checkpointId and abort 
the sync phase of checkpoint if necessary.
                lastCheckpointId = metadata.getCheckpointId();
                if (checkAndClearAbortedStatus(metadata.getCheckpointId())) {
+                       // broadcast cancel checkpoint marker to avoid 
downstream back-pressure due to checkpoint barrier align.
+                       operatorChain.broadcastEvent(new 
CancelCheckpointMarker(metadata.getCheckpointId()));
                        LOG.info("Checkpoint {} has been notified as aborted, 
would not trigger any checkpoint.", metadata.getCheckpointId());
                        return;
                }
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index f7655ea..00ef9a1 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -18,6 +18,9 @@
 
 package org.apache.flink.streaming.runtime.tasks;
 
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
+import org.apache.flink.api.common.typeutils.base.StringSerializer;
 import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.metrics.MetricGroup;
@@ -28,7 +31,11 @@ import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
 import org.apache.flink.runtime.event.AbstractEvent;
+import org.apache.flink.runtime.io.network.api.CancelCheckpointMarker;
 import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
+import 
org.apache.flink.runtime.io.network.api.writer.RecordOrEventCollectingResultPartitionWriter;
+import org.apache.flink.runtime.io.network.api.writer.ResultPartitionWriter;
+import org.apache.flink.runtime.io.network.util.TestPooledBufferProvider;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
 import org.apache.flink.runtime.operators.testutils.MockEnvironment;
@@ -39,12 +46,15 @@ import org.apache.flink.runtime.state.KeyedStateHandle;
 import org.apache.flink.runtime.state.SnapshotResult;
 import org.apache.flink.runtime.state.TestCheckpointStorageWorkerView;
 import org.apache.flink.runtime.state.TestTaskStateManager;
+import org.apache.flink.streaming.api.graph.StreamConfig;
 import org.apache.flink.streaming.api.operators.AbstractStreamOperator;
 import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
 import org.apache.flink.streaming.api.operators.OperatorSnapshotFutures;
+import org.apache.flink.streaming.api.operators.StreamMap;
 import org.apache.flink.streaming.api.operators.StreamTaskStateInitializer;
 import org.apache.flink.streaming.api.watermark.Watermark;
 import org.apache.flink.streaming.runtime.streamrecord.LatencyMarker;
+import org.apache.flink.streaming.runtime.streamrecord.StreamElementSerializer;
 import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
 import org.apache.flink.streaming.runtime.tasks.StreamTaskTest.NoOpStreamTask;
 import org.apache.flink.streaming.util.MockStreamTaskBuilder;
@@ -53,6 +63,8 @@ import org.apache.flink.util.ExceptionUtils;
 import org.junit.Test;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.ExecutionException;
@@ -219,6 +231,69 @@ public class SubtaskCheckpointCoordinatorTest {
        }
 
        @Test
+       public void 
testBroadcastCancelCheckpointMarkerOnAbortingFromCoordinator() throws Exception 
{
+               OneInputStreamTaskTestHarness<String, String> testHarness =
+                       new OneInputStreamTaskTestHarness<>(
+                               OneInputStreamTask::new,
+                               1,
+                               1,
+                               BasicTypeInfo.STRING_TYPE_INFO,
+                               BasicTypeInfo.STRING_TYPE_INFO);
+
+               testHarness.setupOutputForSingletonOperatorChain();
+               StreamConfig streamConfig = testHarness.getStreamConfig();
+               streamConfig.setStreamOperator(new MapOperator());
+
+               testHarness.invoke();
+               testHarness.waitForTaskRunning();
+
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+               SubtaskCheckpointCoordinator subtaskCheckpointCoordinator = new 
MockSubtaskCheckpointCoordinatorBuilder()
+                       .setEnvironment(mockEnvironment)
+                       .build();
+
+               TestPooledBufferProvider bufferProvider = new 
TestPooledBufferProvider(1, 4096);
+               ArrayList<Object> recordOrEvents = new ArrayList<>();
+               StreamElementSerializer<String> stringStreamElementSerializer = 
new StreamElementSerializer<>(StringSerializer.INSTANCE);
+               ResultPartitionWriter resultPartitionWriter = new 
RecordOrEventCollectingResultPartitionWriter<>(
+                       recordOrEvents, bufferProvider, 
stringStreamElementSerializer);
+               
mockEnvironment.addOutputs(Collections.singletonList(resultPartitionWriter));
+
+               OneInputStreamTask<String, String> task = testHarness.getTask();
+               OperatorChain<String, OneInputStreamOperator<String, String>> 
operatorChain = new OperatorChain<>(
+                       task, 
StreamTask.createRecordWriterDelegate(streamConfig, mockEnvironment));
+               long checkpointId = 42L;
+               // notify checkpoint aborted before execution.
+               
subtaskCheckpointCoordinator.notifyCheckpointAborted(checkpointId, 
operatorChain, () -> true);
+               subtaskCheckpointCoordinator.checkpointState(
+                       new CheckpointMetaData(checkpointId, 
System.currentTimeMillis()),
+                       CheckpointOptions.forCheckpointWithDefaultLocation(),
+                       new CheckpointMetrics(),
+                       operatorChain,
+                       () -> true);
+
+               assertEquals(1, recordOrEvents.size());
+               Object recordOrEvent = recordOrEvents.get(0);
+               // ensure CancelCheckpointMarker is broadcast downstream.
+               assertTrue(recordOrEvent instanceof CancelCheckpointMarker);
+               assertEquals(checkpointId, ((CancelCheckpointMarker) 
recordOrEvent).getCheckpointId());
+               testHarness.endInput();
+               testHarness.waitForTaskCompletion();
+       }
+
+       private static class MapOperator extends StreamMap<String, String> {
+               private static final long serialVersionUID = 1L;
+
+               public MapOperator() {
+                       super((MapFunction<String, String>) value -> value);
+               }
+
+               @Override
+               public void notifyCheckpointAborted(long checkpointId) throws 
Exception {
+               }
+       }
+
+       @Test
        public void testNotifyCheckpointAbortedDuringAsyncPhase() throws 
Exception {
                MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
                SubtaskCheckpointCoordinatorImpl subtaskCheckpointCoordinator = 
(SubtaskCheckpointCoordinatorImpl) new MockSubtaskCheckpointCoordinatorBuilder()

Reply via email to