This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 0dc6326 [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work with Adaptive Scheduler. 0dc6326 is described below commit 0dc632681defaa1d66d3b2e884f311121467d894 Author: Stephan Ewen <se...@apache.org> AuthorDate: Fri Apr 23 17:17:01 2021 +0200 [FLINK-22433][tests] Make CoordinatorEventsExactlyOnceITCase work with Adaptive Scheduler. The test previously relied on an implicit contract that instances of OperatorCoordinators are never recreated on the same JobManager. That implicit contract is no longer true with the Adaptive Scheduler. This change adjusts the test to no longer make that assumption. This closes #15739 --- .../CoordinatorEventsExactlyOnceITCase.java | 112 ++++++++++++++++----- 1 file changed, 86 insertions(+), 26 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java index edc9ede..2337115 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/CoordinatorEventsExactlyOnceITCase.java @@ -64,11 +64,13 @@ import javax.annotation.Nullable; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CountDownLatch; @@ -169,6 +171,9 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { @Test public void test() throws Exception { + // this captures variables communicated across instances, recoveries, etc. + TestScript.reset(); + final int numEvents1 = 200; final int numEvents2 = 5; final int delay1 = 1; @@ -296,19 +301,23 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { private final int delay; private final int maxNumber; + private final int failAtMessage; private int nextNumber; private CompletableFuture<byte[]> requestedCheckpoint; private CompletableFuture<byte[]> nextToComplete; - private final int failAtMessage; - private boolean failedBefore; - - private final ArrayDeque<CountDownLatch> recoveredTaskRunning = new ArrayDeque<>(); - private SubtaskGateway subtaskGateway; private boolean workLoopRunning; + /** + * This contains all variables that are necessary to track the progress of the test, and + * which need to be tracked across instances of this coordinator (some scheduler + * implementations may re-instantiate the ExecutionGraph and the coordinators around global + * failures). + */ + private final TestScript testScript; + private EventSendingCoordinator(Context context, String name, int numEvents, int delay) { checkArgument(delay > 0); checkArgument(numEvents >= 3); @@ -316,6 +325,9 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { this.context = context; this.maxNumber = numEvents; this.delay = delay; + + this.testScript = TestScript.getForOperator(name); + this.mailboxExecutor = Executors.newSingleThreadExecutor( new DispatcherThreadFactory( @@ -349,17 +361,12 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { String.format("Don't recognize event '%s' from task %d.", event, subtask)); } - // We complete all events that were enqueued. We may need to complete - // multiple ones here, because it can happen that after a failure no real recovery - // happens that results in an event being sent (and this method being called), but that - // immediately another failure comes, triggered by the other operator coordinator (or - // its task). - synchronized (recoveredTaskRunning) { - for (CountDownLatch latch : recoveredTaskRunning) { - latch.countDown(); - } - recoveredTaskRunning.clear(); - } + // this unblocks all the delayed actions that where kicked off while the previous + // task was still running (if there was a previous task). this is part of simulating + // the extreme race where the coordinator thread stalls for so long that a new + // task execution attempt gets deployed before the last events targeted at the old task + // where sent. + testScript.signalRecoveredTaskReady(); // first, we hand this over to the mailbox thread, so we preserve order on operations, // even if the action is only to do a thread safe scheduling into the scheduledExecutor @@ -375,13 +382,13 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { @Override public void subtaskFailed(int subtask, @Nullable Throwable reason) { - // we need to create and enqueue this outside the mailbox, so that the - // enqueuing is strictly ordered with the completion (which also happens outside - // the mail box executor). + // we need to create and register this outside the mailbox so that the + // registration is not affected by the artificial stall on the mailbox, but happens + // strictly before the tasks are restored and the operator events are received (to + // trigger the latches) which also happens outside the mailbox. + final CountDownLatch successorIsRunning = new CountDownLatch(1); - synchronized (recoveredTaskRunning) { - recoveredTaskRunning.addLast(successorIsRunning); - } + testScript.registerHookToNotifyAfterTaskRecovered(successorIsRunning); // simulate a heavy thread race here: the mailbox has a last enqueued action before the // cancellation is processed. But through a race, the mailbox freezes for a while and in @@ -483,7 +490,12 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { System.exit(-1); } - // schedule the next step + // schedule the next step. we do this here, after the previous step concluded, rather + // than scheduling a periodic action. Otherwise, the periodic task would enqueue many + // actions while the mailbox stalls and process them all instantaneously after the + // un-stalling. That wouldn't break the test, but it voids the differences in event + // sending delays between the different coordinators, which are part of provoking the + // situation that requires checkpoint alignment between the coordinators' event streams. scheduleSingleAction(); } @@ -515,8 +527,8 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { } private void checkWhetherToTriggerFailure() { - if (nextNumber >= failAtMessage && !failedBefore) { - failedBefore = true; + if (nextNumber >= failAtMessage && !testScript.hasAlreadyFailed()) { + testScript.recordHasFailed(); context.failJob(new Exception("test failure")); } } @@ -623,6 +635,54 @@ public class CoordinatorEventsExactlyOnceITCase extends TestLogger { } // ------------------------------------------------------------------------ + // dedicated class to hold the "test script" + // ------------------------------------------------------------------------ + + private static final class TestScript { + + private static final Map<String, TestScript> MAP_FOR_OPERATOR = new HashMap<>(); + + static TestScript getForOperator(String operatorName) { + return MAP_FOR_OPERATOR.computeIfAbsent(operatorName, (key) -> new TestScript()); + } + + static void reset() { + MAP_FOR_OPERATOR.clear(); + } + + private final Collection<CountDownLatch> recoveredTaskRunning = new ArrayList<>(); + private boolean failedBefore; + + void recordHasFailed() { + this.failedBefore = true; + } + + boolean hasAlreadyFailed() { + return failedBefore; + } + + void registerHookToNotifyAfterTaskRecovered(CountDownLatch latch) { + synchronized (recoveredTaskRunning) { + recoveredTaskRunning.add(latch); + } + } + + void signalRecoveredTaskReady() { + // We complete all latches that were registered. We may need to complete + // multiple ones here, because it can happen that after a previous failure, the next + // executions fails immediately again, before even registering at the coordinator. + // in that case, we have multiple latches from multiple failure notifications waiting + // to be completed. + synchronized (recoveredTaskRunning) { + for (CountDownLatch latch : recoveredTaskRunning) { + latch.countDown(); + } + recoveredTaskRunning.clear(); + } + } + } + + // ------------------------------------------------------------------------ // serialization shenannigans // ------------------------------------------------------------------------