This is an automated email from the ASF dual-hosted git repository.
sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.13 by this push:
new 950ea33 [FLINK-22547][tests] Harden OperatorCoordinatorHolderTest.
950ea33 is described below
commit 950ea33cb6ec5716ff26750eddf8a7bd4b869d0e
Author: Stephan Ewen <[email protected]>
AuthorDate: Tue Jul 13 17:01:39 2021 +0200
[FLINK-22547][tests] Harden OperatorCoordinatorHolderTest.
Ensure that the 'FutureCompletedAfterSendingEventsCoordinator' cannot exit
before it has completed
the triggered checkpoint (completed the checkpoint future).
---
.../OperatorCoordinatorHolderTest.java | 35 ++++++++++++++++------
1 file changed, 26 insertions(+), 9 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index cfe31a4..d3b7994 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -18,6 +18,7 @@
package org.apache.flink.runtime.operators.coordination;
+import org.apache.flink.core.testutils.OneShotLatch;
import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
import
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
import
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -348,8 +349,11 @@ public class OperatorCoordinatorHolderTest extends
TestLogger {
final OperatorCoordinatorHolder holder =
createCoordinatorHolder(sender, coordinatorCtor,
mainThreadExecutor);
- // give the coordinator some time to emit some events
- Thread.sleep(new Random().nextInt(10) + 20);
+ // give the coordinator some time to emit some events. This isn't
strictly necessary,
+ // but it randomly alters the timings between the coordinator's thread
(event sender) and
+ // the main thread (holder). This should produce a flaky test if we
missed some corner
+ // cases.
+ Thread.sleep(new Random().nextInt(10));
executor.triggerAll();
// trigger the checkpoint - this should also shut the valve as soon as
the future is
@@ -358,8 +362,9 @@ public class OperatorCoordinatorHolderTest extends
TestLogger {
holder.checkpointCoordinator(0L, checkpointFuture);
executor.triggerAll();
- // give the coordinator some time to emit some events
- Thread.sleep(new Random().nextInt(10) + 10);
+ // give the coordinator some time to emit some events. Same as above,
this adds some
+ // randomization
+ Thread.sleep(new Random().nextInt(10));
holder.close();
executor.triggerAll();
@@ -563,7 +568,7 @@ public class OperatorCoordinatorHolderTest extends
TestLogger {
@Override
public void checkpointCoordinator(long checkpointId,
CompletableFuture<byte[]> result)
throws Exception {
- // before returning from this methof, we wait on a condition.
+ // before returning from this method, we wait on a condition.
// that way, we simulate a "context switch" just at the time when
the
// future would be returned and make the other thread complete the
future and send an
// event before this method returns
@@ -601,7 +606,9 @@ public class OperatorCoordinatorHolderTest extends
TestLogger {
private static final class FutureCompletedAfterSendingEventsCoordinator
extends CheckpointEventOrderTestBaseCoordinator {
- @Nullable private CompletableFuture<byte[]> checkpoint;
+ private final OneShotLatch checkpointCompleted = new OneShotLatch();
+
+ @Nullable private volatile CompletableFuture<byte[]> checkpoint;
private int num;
@@ -623,11 +630,21 @@ public class OperatorCoordinatorHolderTest extends
TestLogger {
subtaskGateways[1].sendEvent(new TestOperatorEvent(num++));
subtaskGateways[2].sendEvent(new TestOperatorEvent(num++));
- if (checkpoint != null) {
- checkpoint.complete(intToBytes(num));
- checkpoint = null;
+ final CompletableFuture<byte[]> chkpnt = this.checkpoint;
+ if (chkpnt != null) {
+ chkpnt.complete(intToBytes(num));
+ checkpointCompleted.trigger();
+ this.checkpoint = null;
}
}
+
+ @Override
+ public void close() throws Exception {
+ // we need to ensure that we don't close this before we have
actually completed the
+ // triggered checkpoint, to ensure the test conditions are robust.
+ checkpointCompleted.await();
+ super.close();
+ }
}
private abstract static class CheckpointEventOrderTestBaseCoordinator