This is an automated email from the ASF dual-hosted git repository.
lindong pushed a commit to branch release-1.16
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.16 by this push:
new 37bcc5b5c7e [FLINK-28941][Runtime/Checkpointing] Add concurrent
checkpoint support in Operator Coordinator
37bcc5b5c7e is described below
commit 37bcc5b5c7e5bb0b467f14b8effe978928211ef0
Author: yunfengzhou-hub <[email protected]>
AuthorDate: Tue Sep 6 21:36:59 2022 +0800
[FLINK-28941][Runtime/Checkpointing] Add concurrent checkpoint support in
Operator Coordinator
This closes #20754.
---
.../coordination/OperatorCoordinatorHolder.java | 17 +-
.../operators/coordination/SubtaskGatewayImpl.java | 99 +++++----
.../CoordinatorEventsExactlyOnceITCase.java | 4 +-
.../OperatorCoordinatorHolderTest.java | 20 +-
.../coordination/SubtaskGatewayImplTest.java | 4 +-
...ToStreamOperatorRecipientExactlyOnceITCase.java | 236 ++++++++++++++++++---
6 files changed, 286 insertions(+), 94 deletions(-)
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
index 17aa17a8a1e..7587795ba74 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolder.java
@@ -58,12 +58,6 @@ import static org.apache.flink.util.Preconditions.checkState;
*
* <h3>Exactly-one Mechanism</h3>
*
- * <p>This implementation can handle one checkpoint being triggered at a time.
If another checkpoint
- * is triggered while the triggering of the first one was not completed or
aborted, this class will
- * throw an exception. That is in line with the capabilities of the Checkpoint
Coordinator, which
- * can handle multiple concurrent checkpoints on the TaskManagers, but only
one concurrent
- * triggering phase.
- *
* <p>The mechanism for exactly once semantics is as follows:
*
* <ul>
@@ -94,6 +88,11 @@ import static org.apache.flink.util.Preconditions.checkState;
* AcknowledgeCheckpointEvent}, it would be sent out immediately.
* </ul>
*
+ * <p>This implementation can handle concurrent checkpoints. In the behavior
described above, If an
+ * event is generated after the coordinator has completed multiple
checkpoints, and before it
+ * receives {@link AcknowledgeCheckpointEvent} about any of them, the event
would be buffered until
+ * the coordinator has received {@link AcknowledgeCheckpointEvent} about all
of these checkpoints.
+ *
* <p><b>IMPORTANT:</b> A critical assumption is that all events from the
scheduler to the Tasks are
* transported strictly in order. Events being sent from the coordinator after
the checkpoint
* barrier was injected must not overtake the checkpoint barrier. This is
currently guaranteed by
@@ -282,7 +281,7 @@ public class OperatorCoordinatorHolder
mainThreadExecutor.assertRunningInMainThread();
}
-
subtaskGatewayMap.values().forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint);
+
subtaskGatewayMap.values().forEach(SubtaskGatewayImpl::openGatewayAndUnmarkAllCheckpoint);
context.resetFailed();
// when initial savepoints are restored, this call comes before the
mainThreadExecutor
@@ -401,7 +400,9 @@ public class OperatorCoordinatorHolder
() ->
subtaskGatewayMap
.values()
-
.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint));
+ .forEach(
+ SubtaskGatewayImpl
+
::openGatewayAndUnmarkLastCheckpointIfAny));
}
// ------------------------------------------------------------------------
diff --git
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
index 86bf373a3f0..2fb2bffce3f 100644
---
a/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
+++
b/flink-runtime/src/main/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImpl.java
@@ -30,8 +30,10 @@ import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.concurrent.FutureUtils;
import java.io.IOException;
-import java.util.ArrayList;
+import java.util.LinkedList;
import java.util.List;
+import java.util.TreeMap;
+import java.util.TreeSet;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
@@ -39,9 +41,12 @@ import java.util.concurrent.CompletableFuture;
* Implementation of the {@link OperatorCoordinator.SubtaskGateway} interface
that access to
* subtasks for status and event sending via {@link SubtaskAccess}.
*
- * <p>Instances of this class can be temporarily closed, blocking events from
going through,
- * buffering them, and releasing them later. It is used for "alignment" of
operator event streams
- * with checkpoint barrier injection, similar to how the input channels are
aligned during a common
+ * <p>Instances of this class can be closed, blocking events from going
through, buffering them, and
+ * releasing them later. If the instance is closed for a specific checkpoint,
events arrived after
+ * that would be blocked temporarily, and released after the checkpoint
finishes. If an event is
+ * blocked & buffered when there are multiple ongoing checkpoints, the event
would be released after
+ * all these checkpoints finish. It is used for "alignment" of operator event
streams with
+ * checkpoint barrier injection, similar to how the input channels are aligned
during a common
* checkpoint.
*
* <p>The methods on the critical communication path, including
closing/reopening the gateway and
@@ -63,13 +68,13 @@ class SubtaskGatewayImpl implements
OperatorCoordinator.SubtaskGateway {
private final IncompleteFuturesTracker incompleteFuturesTracker;
- private final List<BlockedEvent> blockedEvents;
+ private final TreeMap<Long, List<BlockedEvent>> blockedEventsMap;
- private long currentCheckpointId;
+ /** The ids of the checkpoints that have been marked but not unmarked yet.
*/
+ private final TreeSet<Long> currentMarkedCheckpointIds;
- private long lastCheckpointId;
-
- private boolean isClosed;
+ /** The id of the latest checkpoint that has ever been marked. */
+ private long latestAttemptedCheckpointId;
SubtaskGatewayImpl(
SubtaskAccess subtaskAccess,
@@ -78,10 +83,9 @@ class SubtaskGatewayImpl implements
OperatorCoordinator.SubtaskGateway {
this.subtaskAccess = subtaskAccess;
this.mainThreadExecutor = mainThreadExecutor;
this.incompleteFuturesTracker = incompleteFuturesTracker;
- this.blockedEvents = new ArrayList<>();
- this.currentCheckpointId = NO_CHECKPOINT;
- this.lastCheckpointId = Long.MIN_VALUE;
- this.isClosed = false;
+ this.blockedEventsMap = new TreeMap<>();
+ this.currentMarkedCheckpointIds = new TreeSet<>();
+ this.latestAttemptedCheckpointId = NO_CHECKPOINT;
}
@Override
@@ -134,8 +138,8 @@ class SubtaskGatewayImpl implements
OperatorCoordinator.SubtaskGateway {
CompletableFuture<Acknowledge> result) {
checkRunsInMainThread();
- if (isClosed) {
- blockedEvents.add(new BlockedEvent(sendAction, result));
+ if (!blockedEventsMap.isEmpty()) {
+ blockedEventsMap.lastEntry().getValue().add(new
BlockedEvent(sendAction, result));
} else {
callSendAction(sendAction, result);
}
@@ -180,21 +184,14 @@ class SubtaskGatewayImpl implements
OperatorCoordinator.SubtaskGateway {
void markForCheckpoint(long checkpointId) {
checkRunsInMainThread();
- if (currentCheckpointId != NO_CHECKPOINT && currentCheckpointId !=
checkpointId) {
- throw new IllegalStateException(
- String.format(
- "Cannot mark for checkpoint %d, already marked for
checkpoint %d",
- checkpointId, currentCheckpointId));
- }
-
- if (checkpointId > lastCheckpointId) {
- currentCheckpointId = checkpointId;
- lastCheckpointId = checkpointId;
+ if (checkpointId > latestAttemptedCheckpointId) {
+ currentMarkedCheckpointIds.add(checkpointId);
+ latestAttemptedCheckpointId = checkpointId;
} else {
throw new IllegalStateException(
String.format(
"Regressing checkpoint IDs. Previous checkpointId
= %d, new checkpointId = %d",
- lastCheckpointId, checkpointId));
+ latestAttemptedCheckpointId, checkpointId));
}
}
@@ -207,8 +204,8 @@ class SubtaskGatewayImpl implements
OperatorCoordinator.SubtaskGateway {
boolean tryCloseGateway(long checkpointId) {
checkRunsInMainThread();
- if (checkpointId == currentCheckpointId) {
- isClosed = true;
+ if (currentMarkedCheckpointIds.contains(checkpointId)) {
+ blockedEventsMap.putIfAbsent(checkpointId, new LinkedList<>());
return true;
}
@@ -221,38 +218,56 @@ class SubtaskGatewayImpl implements
OperatorCoordinator.SubtaskGateway {
// Gateways should always be marked and closed for a specific
checkpoint before it can be
// reopened for that checkpoint. If a gateway is to be opened for an
unforeseen checkpoint,
// exceptions should be thrown.
- if (lastCheckpointId < checkpointId) {
+ if (latestAttemptedCheckpointId < checkpointId) {
throw new IllegalStateException(
String.format(
- "Gateway closed for different checkpoint: closed
for = %d, expected = %d",
- currentCheckpointId, checkpointId));
+ "Trying to open gateway for unseen checkpoint: "
+ + "latest known checkpoint = %d, incoming
checkpoint = %d",
+ latestAttemptedCheckpointId, checkpointId));
}
// The message to open gateway with a specific checkpoint id might
arrive after the
// checkpoint has been aborted, or even after a new checkpoint has
started. In these cases
// this message should be ignored.
- if (currentCheckpointId == NO_CHECKPOINT || checkpointId <
lastCheckpointId) {
+ if (!currentMarkedCheckpointIds.contains(checkpointId)) {
return;
}
- openGatewayAndUnmarkCheckpoint();
+ if (blockedEventsMap.containsKey(checkpointId)) {
+ if (blockedEventsMap.firstKey() == checkpointId) {
+ for (BlockedEvent blockedEvent :
blockedEventsMap.firstEntry().getValue()) {
+ callSendAction(blockedEvent.sendAction,
blockedEvent.future);
+ }
+ } else {
+ blockedEventsMap
+ .floorEntry(checkpointId - 1)
+ .getValue()
+ .addAll(blockedEventsMap.get(checkpointId));
+ }
+ blockedEventsMap.remove(checkpointId);
+ }
+
+ currentMarkedCheckpointIds.remove(checkpointId);
}
/** Opens the gateway, releasing all buffered events. */
- void openGatewayAndUnmarkCheckpoint() {
+ void openGatewayAndUnmarkAllCheckpoint() {
checkRunsInMainThread();
- currentCheckpointId = NO_CHECKPOINT;
- if (!isClosed) {
- return;
+ for (List<BlockedEvent> blockedEvents : blockedEventsMap.values()) {
+ for (BlockedEvent blockedEvent : blockedEvents) {
+ callSendAction(blockedEvent.sendAction, blockedEvent.future);
+ }
}
- for (BlockedEvent blockedEvent : blockedEvents) {
- callSendAction(blockedEvent.sendAction, blockedEvent.future);
- }
- blockedEvents.clear();
+ blockedEventsMap.clear();
+ currentMarkedCheckpointIds.clear();
+ }
- isClosed = false;
+ void openGatewayAndUnmarkLastCheckpointIfAny() {
+ if (!currentMarkedCheckpointIds.isEmpty()) {
+ openGatewayAndUnmarkCheckpoint(currentMarkedCheckpointIds.last());
+ }
}
private void checkRunsInMainThread() {
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index 06b2466be93..020021c6b40 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -312,7 +312,7 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
protected int nextNumber;
protected CompletableFuture<byte[]> nextToComplete;
- private CompletableFuture<byte[]> requestedCheckpoint;
+ protected CompletableFuture<byte[]> requestedCheckpoint;
private SubtaskGateway subtaskGateway;
private boolean workLoopRunning;
@@ -683,7 +683,7 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, (key) -> new
TestScript());
}
- static void reset() {
+ public static void reset() {
MAP_FOR_OPERATOR.clear();
}
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index e6082f31d6e..cdd46f103e4 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -133,7 +133,7 @@ public class OperatorCoordinatorHolderTest extends
TestLogger {
}
@Test
- public void sourceBarrierInjectionReleasesBlockedEvents() throws Exception
{
+ public void acknowledgeCheckpointEventReleasesBlockedEvents() throws
Exception {
final EventReceivingTasks tasks =
EventReceivingTasks.createForRunningTasks();
final OperatorCoordinatorHolder holder =
createCoordinatorHolder(tasks,
TestingOperatorCoordinator::new);
@@ -183,19 +183,23 @@ public class OperatorCoordinatorHolderTest extends
TestLogger {
}
@Test
- public void triggeringFailsIfOtherTriggeringInProgress() throws Exception {
+ public void triggerConcurrentCheckpoints() throws Exception {
final EventReceivingTasks tasks =
EventReceivingTasks.createForRunningTasks();
final OperatorCoordinatorHolder holder =
createCoordinatorHolder(tasks,
TestingOperatorCoordinator::new);
- holder.checkpointCoordinator(11L, new CompletableFuture<>());
+ triggerAndCompleteCheckpoint(holder, 1111L);
+ getCoordinator(holder).getSubtaskGateway(0).sendEvent(new
TestOperatorEvent(1337));
+ triggerAndCompleteCheckpoint(holder, 1112L);
+ getCoordinator(holder).getSubtaskGateway(0).sendEvent(new
TestOperatorEvent(1338));
+ assertThat(tasks.getSentEventsForSubtask(0)).isEmpty();
- final CompletableFuture<byte[]> future = new CompletableFuture<>();
- holder.checkpointCoordinator(12L, future);
+ holder.handleEventFromOperator(0, 0, new
AcknowledgeCheckpointEvent(1111L));
+ assertThat(tasks.getSentEventsForSubtask(0)).containsExactly(new
TestOperatorEvent(1337));
- assertThat(future).isCompletedExceptionally();
- assertThat(globalFailure).isNotNull();
- globalFailure = null;
+ holder.handleEventFromOperator(0, 0, new
AcknowledgeCheckpointEvent(1112L));
+ assertThat(tasks.getSentEventsForSubtask(0))
+ .containsExactly(new TestOperatorEvent(1337), new
TestOperatorEvent(1338));
}
@Test
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
index 459d8c34099..ad15f61eec6 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/SubtaskGatewayImplTest.java
@@ -139,7 +139,7 @@ public class SubtaskGatewayImplTest {
final CompletableFuture<Acknowledge> future1 =
gateway3.sendEvent(event1);
final CompletableFuture<Acknowledge> future2 =
gateway0.sendEvent(event2);
- gateways.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkCheckpoint);
+
gateways.forEach(SubtaskGatewayImpl::openGatewayAndUnmarkAllCheckpoint);
assertThat(receiver.events)
.containsExactly(new EventWithSubtask(event1, 3), new
EventWithSubtask(event2, 0));
@@ -161,7 +161,7 @@ public class SubtaskGatewayImplTest {
gateway.tryCloseGateway(17L);
final CompletableFuture<Acknowledge> future = gateway.sendEvent(new
TestOperatorEvent());
- gateway.openGatewayAndUnmarkCheckpoint();
+ gateway.openGatewayAndUnmarkAllCheckpoint();
assertThat(future).isCompletedExceptionally();
}
diff --git
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
index a21d9dea702..5af4f469140 100644
---
a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
+++
b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/tasks/CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase.java
@@ -114,7 +114,12 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
env.setParallelism(1);
env.enableCheckpointing(100);
ManuallyClosedSourceFunction.shouldCloseSource = false;
-
EventSendingCoordinatorWithGuaranteedCheckpoint.isEventSentAfterFirstCheckpoint
= false;
+ EventReceivingOperator.shouldUnblockAllCheckpoint = false;
+ EventReceivingOperator.shouldUnblockNextCheckpoint = false;
+ EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint
+ .isCheckpointAbortedBeforeScriptFailure =
+ false;
+ TestScript.reset();
}
@Test
@@ -155,6 +160,19 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
.isTrue();
}
+ @Test
+ public void testConcurrentCheckpoint() throws Exception {
+ env.getCheckpointConfig().setMaxConcurrentCheckpoints(2);
+ executeAndVerifyResults(
+ env,
+ new
EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint<>(
+ "eventReceiving", NUM_EVENTS, DELAY));
+ assertThat(
+
EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint
+ .isCheckpointAbortedBeforeScriptFailure)
+ .isFalse();
+ }
+
private void executeAndVerifyResults(
StreamExecutionEnvironment env,
EventReceivingOperatorFactory<Long, Long> factory)
throws Exception {
@@ -162,10 +180,6 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
// when checkpoint barriers are injected into sources, the event
receiving operator has not
// started checkpoint yet.
env.addSource(new ManuallyClosedSourceFunction<>(),
TypeInformation.of(Long.class))
- .transform(
- "blockCheckpointBarrier",
- TypeInformation.of(Long.class),
- new BlockCheckpointBarrierOperator<>())
.disableChaining()
.transform(factory.name, TypeInformation.of(Long.class),
factory)
.addSink(new DiscardingSink<>());
@@ -197,29 +211,6 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
public void cancel() {}
}
- /**
- * A stream operator that blocks the checkpoint barrier until the
coordinator has sent events to
- * its subtask. It helps to guarantee that there are events being sent
when the coordinator has
- * completed the first checkpoint while the subtask has not yet.
- */
- private static class BlockCheckpointBarrierOperator<T> extends
AbstractStreamOperator<T>
- implements OneInputStreamOperator<T, T> {
-
- @Override
- public void processElement(StreamRecord<T> element) throws Exception {
- output.collect(element);
- }
-
- @Override
- public void snapshotState(StateSnapshotContext context) throws
Exception {
- super.snapshotState(context);
- while (!EventSendingCoordinatorWithGuaranteedCheckpoint
- .isEventSentAfterFirstCheckpoint) {
- Thread.sleep(100);
- }
- }
- }
-
/**
* A wrapper operator factory for {@link
EventSendingCoordinatorWithGuaranteedCheckpoint} and
* {@link EventReceivingOperator}.
@@ -232,7 +223,7 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
protected final int numEvents;
- private final int delay;
+ protected final int delay;
public EventReceivingOperatorFactory(String name, int numEvents, int
delay) {
this.name = name;
@@ -293,15 +284,15 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
private static class EventSendingCoordinatorWithGuaranteedCheckpoint
extends EventSendingCoordinator {
- /** Whether the coordinator has sent any event to its subtask after
any checkpoint. */
- private static boolean isEventSentAfterFirstCheckpoint;
-
/**
* The max number that the coordinator might send out before it
completes the first
* checkpoint.
*/
private final int maxNumberBeforeFirstCheckpoint;
+ /** Whether the coordinator has sent any event to its subtask after
any checkpoint. */
+ private boolean isEventSentAfterFirstCheckpoint;
+
/** Whether the coordinator has completed the first checkpoint. */
private boolean isCoordinatorFirstCheckpointCompleted;
@@ -312,6 +303,7 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
Context context, String name, int numEvents, int delay) {
super(context, name, numEvents, delay);
this.maxNumberBeforeFirstCheckpoint = new
Random().nextInt(numEvents / 6);
+ this.isEventSentAfterFirstCheckpoint = false;
this.isCoordinatorFirstCheckpointCompleted = false;
this.isJobFirstCheckpointCompleted = false;
}
@@ -331,6 +323,7 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
if (!isEventSentAfterFirstCheckpoint &&
isCoordinatorFirstCheckpointCompleted) {
isEventSentAfterFirstCheckpoint = true;
+ EventReceivingOperator.shouldUnblockAllCheckpoint = true;
}
}
@@ -382,12 +375,23 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
/**
* The stream operator that receives the events and accumulates the
numbers. The task is
* stateful and checkpoints the accumulator.
+ *
+ * <p>The operator also supports blocking the checkpoint process until
certain signal is invoked
+ * (See {@link #shouldUnblockAllCheckpoint} and {@link
#shouldUnblockNextCheckpoint}). It helps
+ * to guarantee that there are events being sent when the coordinator has
completed a checkpoint
+ * while the subtask has not yet.
*/
private static class EventReceivingOperator<T> extends
AbstractStreamOperator<T>
implements OneInputStreamOperator<T, T>, OperatorEventHandler {
protected static final String ACCUMULATOR_NAME = "receivedIntegers";
+ /** Whether to unblock all the following checkpoints. */
+ private static boolean shouldUnblockAllCheckpoint;
+
+ /** Whether to unblock the next checkpoint. */
+ private static boolean shouldUnblockNextCheckpoint;
+
protected final ListAccumulator<Integer> accumulator = new
ListAccumulator<>();
protected ListState<Integer> state;
@@ -426,6 +430,14 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
@Override
public void snapshotState(StateSnapshotContext context) throws
Exception {
super.snapshotState(context);
+ while (!shouldUnblockAllCheckpoint &&
!shouldUnblockNextCheckpoint) {
+ Thread.sleep(100);
+ }
+
+ if (shouldUnblockNextCheckpoint) {
+ shouldUnblockNextCheckpoint = false;
+ }
+
state.update(accumulator.getLocalValue());
}
@@ -550,4 +562,164 @@ public class
CoordinatorEventsToStreamOperatorRecipientExactlyOnceITCase
getOperatorID(), new SerializedValue<>(new
StartEvent(lastValue)));
}
}
+
+ /**
+ * A wrapper operator factory for {@link
+ * EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint} and {@link
+ * EventReceivingOperator}.
+ */
+ private static class
EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint<IN, OUT>
+ extends EventReceivingOperatorFactory<IN, OUT> {
+
+ public EventReceivingOperatorFactoryWithGuaranteedConcurrentCheckpoint(
+ String name, int numEvents, int delay) {
+ super(name, numEvents, delay);
+ }
+
+ @Override
+ public OperatorCoordinator.Provider getCoordinatorProvider(
+ String operatorName, OperatorID operatorID) {
+ return new OperatorCoordinator.Provider() {
+
+ @Override
+ public OperatorID getOperatorId() {
+ return operatorID;
+ }
+
+ @Override
+ public OperatorCoordinator create(OperatorCoordinator.Context
context) {
+ return new
EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint(
+ context, name, numEvents, delay);
+ }
+ };
+ }
+ }
+
+ /**
+ * A subclass of {@link EventSendingCoordinator} that additionally
guarantees the following
+ * behavior around checkpoint.
+ *
+ * <ul>
+ * <li>The job must have completed two checkpoints before the
coordinator injects the failure.
+ * <li>The two checkpoints must have an overlapping period. i.e. The
second checkpoint must
+ * have started before the first checkpoint finishes.
+ * <li>The failure must be injected after the coordinator has completed
its second checkpoint
+ * and before it completes the third.
+ * <li>There must be events being sent when the coordinator has
completed the second
+ * checkpoint while the subtask has not.
+ * </ul>
+ *
+ * <p>In order for this class to work correctly, make sure to invoke {@link
+ *
org.apache.flink.streaming.api.environment.CheckpointConfig#setMaxConcurrentCheckpoints(int)}
+ * method with a parameter value larger than 1.
+ */
+ private static class
EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint
+ extends EventSendingCoordinator {
+
+ /** Whether there is a checkpoint aborted before the test script
failure is triggered. */
+ private static boolean isCheckpointAbortedBeforeScriptFailure;
+
+ /**
+ * The max number that the coordinator might send out before it
completes the second
+ * checkpoint.
+ */
+ private final int maxNumberBeforeSecondCheckpoint;
+
+ /** Whether the coordinator has sent out any event after the second
checkpoint. */
+ private boolean isEventSentAfterSecondCheckpoint;
+
+ /** Whether the coordinator has completed the first checkpoint. */
+ private boolean isCoordinatorFirstCheckpointCompleted;
+
+ /** Whether the job (both coordinator and operator) has completed the
first checkpoint. */
+ private boolean isJobFirstCheckpointCompleted;
+
+ /** Whether the coordinator has completed the second checkpoint. */
+ private boolean isCoordinatorSecondCheckpointCompleted;
+
+ /** Whether the job (both coordinator and operator) has completed the
second checkpoint. */
+ private boolean isJobSecondCheckpointCompleted;
+
+ public EventSendingCoordinatorWithGuaranteedConcurrentCheckpoint(
+ Context context, String name, int numEvents, int delay) {
+ super(context, name, numEvents, delay);
+ this.maxNumberBeforeSecondCheckpoint = new
Random().nextInt(numEvents / 6);
+ this.isEventSentAfterSecondCheckpoint = false;
+ this.isCoordinatorFirstCheckpointCompleted = false;
+ this.isJobFirstCheckpointCompleted = false;
+ this.isCoordinatorSecondCheckpointCompleted = false;
+ this.isJobSecondCheckpointCompleted = false;
+ }
+
+ @Override
+ protected void sendNextEvent() {
+ if (!isCoordinatorSecondCheckpointCompleted
+ && nextNumber > maxNumberBeforeSecondCheckpoint) {
+ return;
+ }
+
+ if (!isJobSecondCheckpointCompleted && nextNumber >=
maxNumberBeforeFailure) {
+ return;
+ }
+
+ super.sendNextEvent();
+
+ if (!isEventSentAfterSecondCheckpoint &&
isCoordinatorSecondCheckpointCompleted) {
+ isEventSentAfterSecondCheckpoint = true;
+ EventReceivingOperator.shouldUnblockAllCheckpoint = true;
+ }
+ }
+
+ @Override
+ protected void handleCheckpoint() {
+ if (nextToComplete != null) {
+ if (!isCoordinatorFirstCheckpointCompleted) {
+ isCoordinatorFirstCheckpointCompleted = true;
+ } else if (!isCoordinatorSecondCheckpointCompleted) {
+ isCoordinatorSecondCheckpointCompleted = true;
+ EventReceivingOperator.shouldUnblockNextCheckpoint = true;
+ }
+ }
+
+ super.handleCheckpoint();
+
+ if (nextToComplete != null
+ && isEventSentAfterSecondCheckpoint
+ && !testScript.hasAlreadyFailed()) {
+ testScript.recordHasFailed();
+ context.failJob(new Exception("test failure"));
+ }
+ }
+
+ @Override
+ public void resetToCheckpoint(long checkpointId, @Nullable byte[]
checkpointData)
+ throws Exception {
+ super.resetToCheckpoint(checkpointId, checkpointData);
+ runInMailbox(
+ () -> {
+ isCoordinatorFirstCheckpointCompleted = true;
+ isJobFirstCheckpointCompleted = true;
+ });
+ }
+
+ @Override
+ public void notifyCheckpointAborted(long checkpointId) {
+ if (!testScript.hasAlreadyFailed()) {
+ isCheckpointAbortedBeforeScriptFailure = true;
+ }
+ }
+
+ @Override
+ public void notifyCheckpointComplete(long checkpointId) {
+ super.notifyCheckpointComplete(checkpointId);
+ runInMailbox(
+ () -> {
+ if (!isJobFirstCheckpointCompleted) {
+ isJobFirstCheckpointCompleted = true;
+ } else if (!isJobSecondCheckpointCompleted) {
+ isJobSecondCheckpointCompleted = true;
+ }
+ });
+ }
+ }
}