This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch release-1.13 in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push: new 950ea33 [FLINK-22547][tests] Harden OperatorCoordinatorHolderTest. 950ea33 is described below commit 950ea33cb6ec5716ff26750eddf8a7bd4b869d0e Author: Stephan Ewen <se...@apache.org> AuthorDate: Tue Jul 13 17:01:39 2021 +0200 [FLINK-22547][tests] Harden OperatorCoordinatorHolderTest. Ensure that the 'FutureCompletedAfterSendingEventsCoordinator' cannot exit before it has completed the triggered checkpoint (completed the checkpoint future). --- .../OperatorCoordinatorHolderTest.java | 35 ++++++++++++++++------ 1 file changed, 26 insertions(+), 9 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java index cfe31a4..d3b7994 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java @@ -18,6 +18,7 @@ package org.apache.flink.runtime.operators.coordination; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; @@ -348,8 +349,11 @@ public class OperatorCoordinatorHolderTest extends TestLogger { final OperatorCoordinatorHolder holder = createCoordinatorHolder(sender, coordinatorCtor, mainThreadExecutor); - // give the coordinator some time to emit some events - Thread.sleep(new Random().nextInt(10) + 20); + // give the coordinator some time to emit some events. This isn't strictly necessary, + // but it randomly alters the timings between the coordinator's thread (event sender) and + // the main thread (holder). This should produce a flaky test if we missed some corner + // cases. + Thread.sleep(new Random().nextInt(10)); executor.triggerAll(); // trigger the checkpoint - this should also shut the valve as soon as the future is @@ -358,8 +362,9 @@ public class OperatorCoordinatorHolderTest extends TestLogger { holder.checkpointCoordinator(0L, checkpointFuture); executor.triggerAll(); - // give the coordinator some time to emit some events - Thread.sleep(new Random().nextInt(10) + 10); + // give the coordinator some time to emit some events. Same as above, this adds some + // randomization + Thread.sleep(new Random().nextInt(10)); holder.close(); executor.triggerAll(); @@ -563,7 +568,7 @@ public class OperatorCoordinatorHolderTest extends TestLogger { @Override public void checkpointCoordinator(long checkpointId, CompletableFuture<byte[]> result) throws Exception { - // before returning from this methof, we wait on a condition. + // before returning from this method, we wait on a condition. // that way, we simulate a "context switch" just at the time when the // future would be returned and make the other thread complete the future and send an // event before this method returns @@ -601,7 +606,9 @@ public class OperatorCoordinatorHolderTest extends TestLogger { private static final class FutureCompletedAfterSendingEventsCoordinator extends CheckpointEventOrderTestBaseCoordinator { - @Nullable private CompletableFuture<byte[]> checkpoint; + private final OneShotLatch checkpointCompleted = new OneShotLatch(); + + @Nullable private volatile CompletableFuture<byte[]> checkpoint; private int num; @@ -623,11 +630,21 @@ public class OperatorCoordinatorHolderTest extends TestLogger { subtaskGateways[1].sendEvent(new TestOperatorEvent(num++)); subtaskGateways[2].sendEvent(new TestOperatorEvent(num++)); - if (checkpoint != null) { - checkpoint.complete(intToBytes(num)); - checkpoint = null; + final CompletableFuture<byte[]> chkpnt = this.checkpoint; + if (chkpnt != null) { + chkpnt.complete(intToBytes(num)); + checkpointCompleted.trigger(); + this.checkpoint = null; } } + + @Override + public void close() throws Exception { + // we need to ensure that we don't close this before we have actually completed the + // triggered checkpoint, to ensure the test conditions are robust. + checkpointCompleted.await(); + super.close(); + } } private abstract static class CheckpointEventOrderTestBaseCoordinator