This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e676442 [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase
work with Adaptive Scheduler.
e676442 is described below
commit e676442b9faa1ec0b668e8394dd2353ac2de01c6
Author: Stephan Ewen <[email protected]>
AuthorDate: Fri Apr 23 17:17:01 2021 +0200
[FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work with
Adaptive Scheduler.
The test previously relied on an implicit contract that instances of
OperatorCoordinators are never recreated
on the same JobManager. That implicit contract is no longer true with the
Adaptive Scheduler.
This change adjusts the test to no longer make that assumption.
This closes #15739
---
.../CoordinatorEventsExactlyOnceITCase.java | 112 ++++++++++++++++-----
1 file changed, 86 insertions(+), 26 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
index edc9ede..2337115 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java
@@ -64,11 +64,13 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.ByteOrder;
-import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.Random;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CountDownLatch;
@@ -169,6 +171,9 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
@Test
public void test() throws Exception {
+ // this captures variables communicated across instances, recoveries,
etc.
+ TestScript.reset();
+
final int numEvents1 = 200;
final int numEvents2 = 5;
final int delay1 = 1;
@@ -296,19 +301,23 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
private final int delay;
private final int maxNumber;
+ private final int failAtMessage;
private int nextNumber;
private CompletableFuture<byte[]> requestedCheckpoint;
private CompletableFuture<byte[]> nextToComplete;
- private final int failAtMessage;
- private boolean failedBefore;
-
- private final ArrayDeque<CountDownLatch> recoveredTaskRunning = new
ArrayDeque<>();
-
private SubtaskGateway subtaskGateway;
private boolean workLoopRunning;
+ /**
+ * This contains all variables that are necessary to track the
progress of the test, and
+ * which need to be tracked across instances of this coordinator (some
scheduler
+ * implementations may re-instantiate the ExecutionGraph and the
coordinators around global
+ * failures).
+ */
+ private final TestScript testScript;
+
private EventSendingCoordinator(Context context, String name, int
numEvents, int delay) {
checkArgument(delay > 0);
checkArgument(numEvents >= 3);
@@ -316,6 +325,9 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
this.context = context;
this.maxNumber = numEvents;
this.delay = delay;
+
+ this.testScript = TestScript.getForOperator(name);
+
this.mailboxExecutor =
Executors.newSingleThreadExecutor(
new DispatcherThreadFactory(
@@ -349,17 +361,12 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
String.format("Don't recognize event '%s' from task
%d.", event, subtask));
}
- // We complete all events that were enqueued. We may need to
complete
- // multiple ones here, because it can happen that after a failure
no real recovery
- // happens that results in an event being sent (and this method
being called), but that
- // immediately another failure comes, triggered by the other
operator coordinator (or
- // its task).
- synchronized (recoveredTaskRunning) {
- for (CountDownLatch latch : recoveredTaskRunning) {
- latch.countDown();
- }
- recoveredTaskRunning.clear();
- }
+ // this unblocks all the delayed actions that where kicked off
while the previous
+ // task was still running (if there was a previous task). this is
part of simulating
+ // the extreme race where the coordinator thread stalls for so
long that a new
+ // task execution attempt gets deployed before the last events
targeted at the old task
+ // where sent.
+ testScript.signalRecoveredTaskReady();
// first, we hand this over to the mailbox thread, so we preserve
order on operations,
// even if the action is only to do a thread safe scheduling into
the scheduledExecutor
@@ -375,13 +382,13 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
@Override
public void subtaskFailed(int subtask, @Nullable Throwable reason) {
- // we need to create and enqueue this outside the mailbox, so that
the
- // enqueuing is strictly ordered with the completion (which also
happens outside
- // the mail box executor).
+ // we need to create and register this outside the mailbox so that
the
+ // registration is not affected by the artificial stall on the
mailbox, but happens
+ // strictly before the tasks are restored and the operator events
are received (to
+ // trigger the latches) which also happens outside the mailbox.
+
final CountDownLatch successorIsRunning = new CountDownLatch(1);
- synchronized (recoveredTaskRunning) {
- recoveredTaskRunning.addLast(successorIsRunning);
- }
+
testScript.registerHookToNotifyAfterTaskRecovered(successorIsRunning);
// simulate a heavy thread race here: the mailbox has a last
enqueued action before the
// cancellation is processed. But through a race, the mailbox
freezes for a while and in
@@ -483,7 +490,12 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
System.exit(-1);
}
- // schedule the next step
+ // schedule the next step. we do this here, after the previous
step concluded, rather
+ // than scheduling a periodic action. Otherwise, the periodic task
would enqueue many
+ // actions while the mailbox stalls and process them all
instantaneously after the
+ // un-stalling. That wouldn't break the test, but it voids the
differences in event
+ // sending delays between the different coordinators, which are
part of provoking the
+ // situation that requires checkpoint alignment between the
coordinators' event streams.
scheduleSingleAction();
}
@@ -515,8 +527,8 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
}
private void checkWhetherToTriggerFailure() {
- if (nextNumber >= failAtMessage && !failedBefore) {
- failedBefore = true;
+ if (nextNumber >= failAtMessage && !testScript.hasAlreadyFailed())
{
+ testScript.recordHasFailed();
context.failJob(new Exception("test failure"));
}
}
@@ -623,6 +635,54 @@ public class CoordinatorEventsExactlyOnceITCase extends
TestLogger {
}
// ------------------------------------------------------------------------
+ // dedicated class to hold the "test script"
+ // ------------------------------------------------------------------------
+
+ private static final class TestScript {
+
+ private static final Map<String, TestScript> MAP_FOR_OPERATOR = new
HashMap<>();
+
+ static TestScript getForOperator(String operatorName) {
+ return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, (key) -> new
TestScript());
+ }
+
+ static void reset() {
+ MAP_FOR_OPERATOR.clear();
+ }
+
+ private final Collection<CountDownLatch> recoveredTaskRunning = new
ArrayList<>();
+ private boolean failedBefore;
+
+ void recordHasFailed() {
+ this.failedBefore = true;
+ }
+
+ boolean hasAlreadyFailed() {
+ return failedBefore;
+ }
+
+ void registerHookToNotifyAfterTaskRecovered(CountDownLatch latch) {
+ synchronized (recoveredTaskRunning) {
+ recoveredTaskRunning.add(latch);
+ }
+ }
+
+ void signalRecoveredTaskReady() {
+ // We complete all latches that were registered. We may need to
complete
+ // multiple ones here, because it can happen that after a previous
failure, the next
+ // executions fails immediately again, before even registering at
the coordinator.
+ // in that case, we have multiple latches from multiple failure
notifications waiting
+ // to be completed.
+ synchronized (recoveredTaskRunning) {
+ for (CountDownLatch latch : recoveredTaskRunning) {
+ latch.countDown();
+ }
+ recoveredTaskRunning.clear();
+ }
+ }
+ }
+
+ // ------------------------------------------------------------------------
// serialization shenannigans
// ------------------------------------------------------------------------