This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch release-1.13
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.13 by this push:
     new 950ea33  [FLINK-22547][tests] Harden OperatorCoordinatorHolderTest.
950ea33 is described below

commit 950ea33cb6ec5716ff26750eddf8a7bd4b869d0e
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Tue Jul 13 17:01:39 2021 +0200

    [FLINK-22547][tests] Harden OperatorCoordinatorHolderTest.
    
    Ensure that the 'FutureCompletedAfterSendingEventsCoordinator' cannot exit 
before it has completed
    the triggered checkpoint (completed the checkpoint future).
---
 .../OperatorCoordinatorHolderTest.java             | 35 ++++++++++++++++------
 1 file changed, 26 insertions(+), 9 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
index cfe31a4..d3b7994 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorHolderTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.runtime.operators.coordination;
 
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -348,8 +349,11 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
         final OperatorCoordinatorHolder holder =
                 createCoordinatorHolder(sender, coordinatorCtor, 
mainThreadExecutor);
 
-        // give the coordinator some time to emit some events
-        Thread.sleep(new Random().nextInt(10) + 20);
+        // give the coordinator some time to emit some events. This isn't 
strictly necessary,
+        // but it randomly alters the timings between the coordinator's thread 
(event sender) and
+        // the main thread (holder). This should produce a flaky test if we 
missed some corner
+        // cases.
+        Thread.sleep(new Random().nextInt(10));
         executor.triggerAll();
 
         // trigger the checkpoint - this should also shut the valve as soon as 
the future is
@@ -358,8 +362,9 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
         holder.checkpointCoordinator(0L, checkpointFuture);
         executor.triggerAll();
 
-        // give the coordinator some time to emit some events
-        Thread.sleep(new Random().nextInt(10) + 10);
+        // give the coordinator some time to emit some events. Same as above, 
this adds some
+        // randomization
+        Thread.sleep(new Random().nextInt(10));
         holder.close();
         executor.triggerAll();
 
@@ -563,7 +568,7 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
         @Override
         public void checkpointCoordinator(long checkpointId, 
CompletableFuture<byte[]> result)
                 throws Exception {
-            // before returning from this methof, we wait on a condition.
+            // before returning from this method, we wait on a condition.
             // that way, we simulate a "context switch" just at the time when 
the
             // future would be returned and make the other thread complete the 
future and send an
             // event before this method returns
@@ -601,7 +606,9 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
     private static final class FutureCompletedAfterSendingEventsCoordinator
             extends CheckpointEventOrderTestBaseCoordinator {
 
-        @Nullable private CompletableFuture<byte[]> checkpoint;
+        private final OneShotLatch checkpointCompleted = new OneShotLatch();
+
+        @Nullable private volatile CompletableFuture<byte[]> checkpoint;
 
         private int num;
 
@@ -623,11 +630,21 @@ public class OperatorCoordinatorHolderTest extends 
TestLogger {
             subtaskGateways[1].sendEvent(new TestOperatorEvent(num++));
             subtaskGateways[2].sendEvent(new TestOperatorEvent(num++));
 
-            if (checkpoint != null) {
-                checkpoint.complete(intToBytes(num));
-                checkpoint = null;
+            final CompletableFuture<byte[]> chkpnt = this.checkpoint;
+            if (chkpnt != null) {
+                chkpnt.complete(intToBytes(num));
+                checkpointCompleted.trigger();
+                this.checkpoint = null;
             }
         }
+
+        @Override
+        public void close() throws Exception {
+            // we need to ensure that we don't close this before we have 
actually completed the
+            // triggered checkpoint, to ensure the test conditions are robust.
+            checkpointCompleted.await();
+            super.close();
+        }
     }
 
     private abstract static class CheckpointEventOrderTestBaseCoordinator

Reply via email to