This is an automated email from the ASF dual-hosted git repository.
zhijiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new dd3717e [FLINK-18252][checkpointing] Fix savepoint overtaking output
data.
dd3717e is described below
commit dd3717e8383c39eb086f21fba79db0381dfac248
Author: Arvid Heise <[email protected]>
AuthorDate: Thu Jun 11 21:18:11 2020 +0200
[FLINK-18252][checkpointing] Fix savepoint overtaking output data.
Currently, a checkpoint/savepoint barrier is always send as a priority
events to the output partitions, where it overtakes data. After the fix a
barrier is only a priority event iff it's unaligned.
Also CheckpointCoordinator only set unaligned flag if the barrier belongs
to a checkpoint.
Ultimately, the unaligned checkpoint config option is not used by
SubtaskCheckpointCoordinatorImpl except for initializing the channel state
writer. The source of truth is now the CheckpointOptions.
---
.../runtime/checkpoint/CheckpointCoordinator.java | 2 +-
.../tasks/SubtaskCheckpointCoordinatorImpl.java | 16 +++------
.../tasks/SubtaskCheckpointCoordinatorTest.java | 40 +++++++++++++++++++---
3 files changed, 41 insertions(+), 17 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
index da518ee..735c343 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
@@ -732,7 +732,7 @@ public class CheckpointCoordinator {
props.getCheckpointType(),
checkpointStorageLocation.getLocationReference(),
isExactlyOnceMode,
- unalignedCheckpointsEnabled);
+ props.getCheckpointType() == CheckpointType.CHECKPOINT
&& unalignedCheckpointsEnabled);
// send the messages to the tasks that trigger their checkpoint
for (Execution execution: executions) {
diff --git
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
index d904c87..b2fe147 100644
---
a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
+++
b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorImpl.java
@@ -82,7 +82,6 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
private final AsyncExceptionHandler asyncExceptionHandler;
private final ChannelStateWriter channelStateWriter;
private final StreamTaskActionExecutor actionExecutor;
- private final boolean unalignedCheckpointEnabled;
private final BiFunctionWithException<ChannelStateWriter, Long,
CompletableFuture<Void>, IOException> prepareInputSnapshot;
/** The IDs of the checkpoint for which we are notified aborted. */
private final Set<Long> abortedCheckpointIds;
@@ -139,7 +138,6 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
executorService,
env,
asyncExceptionHandler,
- unalignedCheckpointEnabled,
prepareInputSnapshot,
maxRecordAbortedCheckpoints,
unalignedCheckpointEnabled ?
openChannelStateWriter(taskName, checkpointStorage) : ChannelStateWriter.NO_OP);
@@ -154,7 +152,6 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
ExecutorService executorService,
Environment env,
AsyncExceptionHandler asyncExceptionHandler,
- boolean unalignedCheckpointEnabled,
BiFunctionWithException<ChannelStateWriter, Long,
CompletableFuture<Void>, IOException> prepareInputSnapshot,
int maxRecordAbortedCheckpoints,
ChannelStateWriter channelStateWriter) throws
IOException {
@@ -167,7 +164,6 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
this.asyncExceptionHandler =
checkNotNull(asyncExceptionHandler);
this.actionExecutor = checkNotNull(actionExecutor);
this.channelStateWriter = checkNotNull(channelStateWriter);
- this.unalignedCheckpointEnabled = unalignedCheckpointEnabled;
this.prepareInputSnapshot = prepareInputSnapshot;
this.abortedCheckpointIds =
createAbortedCheckpointSetWithLimitSize(maxRecordAbortedCheckpoints);
this.lastCheckpointId = -1L;
@@ -253,10 +249,10 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
// Step (2): Send the checkpoint barrier downstream
operatorChain.broadcastEvent(
new CheckpointBarrier(metadata.getCheckpointId(),
metadata.getTimestamp(), options),
- unalignedCheckpointEnabled);
+ options.isUnalignedCheckpoint());
// Step (3): Prepare to spill the in-flight buffers for input
and output
- if (includeChannelState(options)) {
+ if (options.isUnalignedCheckpoint()) {
prepareInflightDataSnapshot(metadata.getCheckpointId());
}
@@ -326,15 +322,11 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
@Override
public void initCheckpoint(long id, CheckpointOptions
checkpointOptions) {
- if (includeChannelState(checkpointOptions)) {
+ if (checkpointOptions.isUnalignedCheckpoint()) {
channelStateWriter.start(id, checkpointOptions);
}
}
- private boolean includeChannelState(CheckpointOptions
checkpointOptions) {
- return unalignedCheckpointEnabled &&
!checkpointOptions.getCheckpointType().isSavepoint();
- }
-
@Override
public void close() throws IOException {
List<AsyncCheckpointRunnable> asyncCheckpointRunnables = null;
@@ -499,7 +491,7 @@ class SubtaskCheckpointCoordinatorImpl implements
SubtaskCheckpointCoordinator {
long checkpointId = checkpointMetaData.getCheckpointId();
long started = System.nanoTime();
- ChannelStateWriteResult channelStateWriteResult =
includeChannelState(checkpointOptions) ?
+ ChannelStateWriteResult channelStateWriteResult =
checkpointOptions.isUnalignedCheckpoint() ?
channelStateWriter.getAndRemoveWriteResult(checkpointId) :
ChannelStateWriteResult.EMPTY;
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
index 580ab54..f7655ea 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/SubtaskCheckpointCoordinatorTest.java
@@ -27,6 +27,7 @@ import org.apache.flink.runtime.checkpoint.CheckpointOptions;
import org.apache.flink.runtime.checkpoint.CheckpointType;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriter;
import org.apache.flink.runtime.checkpoint.channel.ChannelStateWriterImpl;
+import org.apache.flink.runtime.event.AbstractEvent;
import org.apache.flink.runtime.io.network.api.writer.NonRecordWriter;
import org.apache.flink.runtime.jobgraph.OperatorID;
import org.apache.flink.runtime.operators.testutils.DummyEnvironment;
@@ -58,6 +59,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.Executors;
import java.util.concurrent.RunnableFuture;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicReference;
import static org.apache.flink.runtime.checkpoint.CheckpointType.CHECKPOINT;
import static org.apache.flink.runtime.checkpoint.CheckpointType.SAVEPOINT;
@@ -75,7 +77,6 @@ public class SubtaskCheckpointCoordinatorTest {
@Test
public void testInitCheckpoint() throws IOException {
assertTrue(initCheckpoint(true, CHECKPOINT));
- assertFalse(initCheckpoint(true, SAVEPOINT));
assertFalse(initCheckpoint(false, CHECKPOINT));
assertFalse(initCheckpoint(false, SAVEPOINT));
}
@@ -91,7 +92,9 @@ public class SubtaskCheckpointCoordinatorTest {
MockWriter writer = new MockWriter();
SubtaskCheckpointCoordinator coordinator =
coordinator(unalignedCheckpointEnabled, writer);
- coordinator.initCheckpoint(1L, new
CheckpointOptions(checkpointType,
CheckpointStorageLocationReference.getDefault()));
+ CheckpointStorageLocationReference locationReference =
CheckpointStorageLocationReference.getDefault();
+ CheckpointOptions options = new
CheckpointOptions(checkpointType, locationReference, true,
unalignedCheckpointEnabled);
+ coordinator.initCheckpoint(1L, options);
return writer.started;
}
@@ -120,9 +123,39 @@ public class SubtaskCheckpointCoordinatorTest {
}
@Test
+ public void testSavepointNotResultingInPriorityEvents() throws
Exception {
+ MockEnvironment mockEnvironment =
MockEnvironment.builder().build();
+
+ SubtaskCheckpointCoordinator coordinator = new
MockSubtaskCheckpointCoordinatorBuilder()
+ .setUnalignedCheckpointEnabled(true)
+ .setEnvironment(mockEnvironment)
+ .build();
+
+ AtomicReference<Boolean> broadcastedPriorityEvent = new
AtomicReference<>(null);
+ final OperatorChain<?, ?> operatorChain = new OperatorChain(
+ new
MockStreamTaskBuilder(mockEnvironment).build(),
+ new NonRecordWriter<>()) {
+ @Override
+ public void broadcastEvent(AbstractEvent event, boolean
isPriorityEvent) throws IOException {
+ super.broadcastEvent(event, isPriorityEvent);
+ broadcastedPriorityEvent.set(isPriorityEvent);
+ }
+ };
+
+ coordinator.checkpointState(
+ new CheckpointMetaData(0, 0),
+ new CheckpointOptions(SAVEPOINT,
CheckpointStorageLocationReference.getDefault()),
+ new CheckpointMetrics(),
+ operatorChain,
+ () -> false);
+
+ assertEquals(false, broadcastedPriorityEvent.get());
+ }
+
+ @Test
public void testSkipChannelStateForSavepoints() throws Exception {
SubtaskCheckpointCoordinator coordinator = new
MockSubtaskCheckpointCoordinatorBuilder()
- .setUnalignedCheckpointEnabled(false)
+ .setUnalignedCheckpointEnabled(true)
.setPrepareInputSnapshot((u1, u2) -> {
fail("should not prepare input snapshot for
savepoint");
return null;
@@ -414,7 +447,6 @@ public class SubtaskCheckpointCoordinatorTest {
newDirectExecutorService(),
new DummyEnvironment(),
(message, unused) -> fail(message),
- unalignedCheckpointEnabled,
(unused1, unused2) ->
CompletableFuture.completedFuture(null),
0,
channelStateWriter