This is an automated email from the ASF dual-hosted git repository.

zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new dd3717e  [FLINK-18252][checkpointing] Fix savepoint overtaking output 
data.
dd3717e is described below

commit dd3717e8383c39eb086f21fba79db0381dfac248
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Jun 11 21:18:11 2020 +0200

    [FLINK-18252][checkpointing] Fix savepoint overtaking output data.
    
    Currently, a checkpoint/savepoint barrier is always send as a priority 
events to the output partitions, where it overtakes data. After the fix a 
barrier is only a priority event iff it's unaligned.
    Also CheckpointCoordinator only set unaligned flag if the barrier belongs 
to a checkpoint.
    Ultimately, the unaligned checkpoint config option is not used by 
SubtaskCheckpointCoordinatorImpl except for initializing the channel state 
writer. The source of truth is now the CheckpointOptions.
---
 .../runtime/checkpoint/CheckpointCoordinator.java  |  2 +-
 .../tasks/SubtaskCheckpointCoordinatorImpl.java    | 16 +++------
 .../tasks/SubtaskCheckpointCoordinatorTest.java    | 40 +++++++++++++++++++---
 3 files changed, 41 insertions(+), 17 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index da518ee..735c343 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -732,7 +732,7 @@ public class CheckpointCoordinator {
                        props.getCheckpointType(),
                        checkpointStorageLocation.getLocationReference(),
                        isExactlyOnceMode,
-                       unalignedCheckpointsEnabled);
+                       props.getCheckpointType() == CheckpointType.CHECKPOINT 
&& unalignedCheckpointsEnabled);
 
                // send the messages to the tasks that trigger their checkpoint
                for (Execution execution: executions) {
diff --git 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index d904c87..b2fe147 100644
--- 
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++ 
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -82,7 +82,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
        private final AsyncExceptionHandler asyncExceptionHandler;
        private final ChannelStateWriter channelStateWriter;
        private final StreamTaskActionExecutor actionExecutor;
-       private final boolean unalignedCheckpointEnabled;
        private final BiFunctionWithException<ChannelStateWriter, Long, 
CompletableFuture<Void>, IOException> prepareInputSnapshot;
        /** The IDs of the checkpoint for which we are notified aborted. */
        private final Set<Long> abortedCheckpointIds;
@@ -139,7 +138,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                        executorService,
                        env,
                        asyncExceptionHandler,
-                       unalignedCheckpointEnabled,
                        prepareInputSnapshot,
                        maxRecordAbortedCheckpoints,
                        unalignedCheckpointEnabled ? 
openChannelStateWriter(taskName, checkpointStorage) : ChannelStateWriter.NO_OP);
@@ -154,7 +152,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                        ExecutorService executorService,
                        Environment env,
                        AsyncExceptionHandler asyncExceptionHandler,
-                       boolean unalignedCheckpointEnabled,
                        BiFunctionWithException<ChannelStateWriter, Long, 
CompletableFuture<Void>, IOException> prepareInputSnapshot,
                        int maxRecordAbortedCheckpoints,
                        ChannelStateWriter channelStateWriter) throws 
IOException {
@@ -167,7 +164,6 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                this.asyncExceptionHandler = 
checkNotNull(asyncExceptionHandler);
                this.actionExecutor = checkNotNull(actionExecutor);
                this.channelStateWriter = checkNotNull(channelStateWriter);
-               this.unalignedCheckpointEnabled = unalignedCheckpointEnabled;
                this.prepareInputSnapshot = prepareInputSnapshot;
                this.abortedCheckpointIds = 
createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints);
                this.lastCheckpointId = -1L;
@@ -253,10 +249,10 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                // Step (2): Send the checkpoint barrier downstream
                operatorChain.broadcastEvent(
                        new CheckpointBarrier(metadata.getCheckpointId(), 
metadata.getTimestamp(), options),
-                       unalignedCheckpointEnabled);
+                       options.isUnalignedCheckpoint());
 
                // Step (3): Prepare to spill the in-flight buffers for input 
and output
-               if (includeChannelState(options)) {
+               if (options.isUnalignedCheckpoint()) {
                        prepareInflightDataSnapshot(metadata.getCheckpointId());
                }
 
@@ -326,15 +322,11 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
 
        @Override
        public void initCheckpoint(long id, CheckpointOptions 
checkpointOptions) {
-               if (includeChannelState(checkpointOptions)) {
+               if (checkpointOptions.isUnalignedCheckpoint()) {
                        channelStateWriter.start(id, checkpointOptions);
                }
        }
 
-       private boolean includeChannelState(CheckpointOptions 
checkpointOptions) {
-               return unalignedCheckpointEnabled && 
!checkpointOptions.getCheckpointType().isSavepoint();
-       }
-
        @Override
        public void close() throws IOException {
                List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null;
@@ -499,7 +491,7 @@ class SubtaskCheckpointCoordinatorImpl implements 
SubtaskCheckpointCoordinator {
                long checkpointId = checkpointMetaData.getCheckpointId();
                long started = System.nanoTime();
 
-               ChannelStateWriteResult channelStateWriteResult = 
includeChannelState(checkpointOptions) ?
+               ChannelStateWriteResult channelStateWriteResult = 
checkpointOptions.isUnalignedCheckpoint() ?
                                                                
channelStateWriter.getAndRemoveWriteResult(checkpointId) :
                                                                
ChannelStateWriteResult.EMPTY;
 
diff --git 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 580ab54..f7655ea 100644
--- 
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++ 
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
 import org.apache.flink.runtime.checkpoint.CheckpointType;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
 import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
+import org.apache.flink.runtime.event.AbstractEvent;
 import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
 import org.apache.flink.runtime.jobgraph.OperatorID;
 import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -58,6 +59,7 @@ import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Executors;
 import java.util.concurrent.RunnableFuture;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
 
 import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
 import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
@@ -75,7 +77,6 @@ public class SubtaskCheckpointCoordinatorTest {
        @Test
        public void testInitCheckpoint() throws IOException {
                assertTrue(initCheckpoint(true, CHECKPOINT));
-               assertFalse(initCheckpoint(true, SAVEPOINT));
                assertFalse(initCheckpoint(false, CHECKPOINT));
                assertFalse(initCheckpoint(false, SAVEPOINT));
        }
@@ -91,7 +92,9 @@ public class SubtaskCheckpointCoordinatorTest {
 
                MockWriter writer = new MockWriter();
                SubtaskCheckpointCoordinator coordinator = 
coordinator(unalignedCheckpointEnabled, writer);
-               coordinator.initCheckpoint(1L, new 
CheckpointOptions(checkpointType, 
CheckpointStorageLocationReference.getDefault()));
+               CheckpointStorageLocationReference locationReference = 
CheckpointStorageLocationReference.getDefault();
+               CheckpointOptions options = new 
CheckpointOptions(checkpointType, locationReference, true, 
unalignedCheckpointEnabled);
+               coordinator.initCheckpoint(1L, options);
                return writer.started;
        }
 
@@ -120,9 +123,39 @@ public class SubtaskCheckpointCoordinatorTest {
        }
 
        @Test
+       public void testSavepointNotResultingInPriorityEvents() throws 
Exception {
+               MockEnvironment mockEnvironment = 
MockEnvironment.builder().build();
+
+               SubtaskCheckpointCoordinator coordinator = new 
MockSubtaskCheckpointCoordinatorBuilder()
+                               .setUnalignedCheckpointEnabled(true)
+                               .setEnvironment(mockEnvironment)
+                               .build();
+
+               AtomicReference<Boolean> broadcastedPriorityEvent = new 
AtomicReference<>(null);
+               final OperatorChain<?, ?> operatorChain = new OperatorChain(
+                               new 
MockStreamTaskBuilder(mockEnvironment).build(),
+                               new NonRecordWriter<>()) {
+                       @Override
+                       public void broadcastEvent(AbstractEvent event, boolean 
isPriorityEvent) throws IOException {
+                               super.broadcastEvent(event, isPriorityEvent);
+                               broadcastedPriorityEvent.set(isPriorityEvent);
+                       }
+               };
+
+               coordinator.checkpointState(
+                               new CheckpointMetaData(0, 0),
+                               new CheckpointOptions(SAVEPOINT, 
CheckpointStorageLocationReference.getDefault()),
+                               new CheckpointMetrics(),
+                               operatorChain,
+                               () -> false);
+
+               assertEquals(false, broadcastedPriorityEvent.get());
+       }
+
+       @Test
        public void testSkipChannelStateForSavepoints() throws Exception {
                SubtaskCheckpointCoordinator coordinator = new 
MockSubtaskCheckpointCoordinatorBuilder()
-                       .setUnalignedCheckpointEnabled(false)
+                       .setUnalignedCheckpointEnabled(true)
                        .setPrepareInputSnapshot((u1, u2) -> {
                                fail("should not prepare input snapshot for 
savepoint");
                                return null;
@@ -414,7 +447,6 @@ public class SubtaskCheckpointCoordinatorTest {
                        newDirectExecutorService(),
                        new DummyEnvironment(),
                        (message, unused) -> fail(message),
-                       unalignedCheckpointEnabled,
                        (unused1, unused2) -> 
CompletableFuture.completedFuture(null),
                        0,
                        channelStateWriter

Reply via email to