This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.14 by this push:
     new 97fe172  [FLINK-24444][runtime][tests] Wait until checkpoints stopped 
triggering
97fe172 is described below

commit 97fe1723d7e9071a0a3c93612a50a2dc9652e1d6
Author: David Moravek <[email protected]>
AuthorDate: Sun Jan 16 12:31:18 2022 +0100

    [FLINK-24444][runtime][tests] Wait until checkpoints stopped triggering
---
 .../coordination/OperatorCoordinatorSchedulerTest.java      | 13 +++++++++++--
 1 file changed, 11 insertions(+), 2 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 88cd7e6..1d8b59d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
 import org.apache.flink.core.testutils.CommonTestUtils;
 import org.apache.flink.metrics.MetricGroup;
 import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
 import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -105,7 +106,6 @@ import static org.junit.Assert.fail;
  * Tests for the integration of the {@link OperatorCoordinator} with the 
scheduler, to ensure the
  * relevant actions are leading to the right method invocations on the 
coordinator.
  */
-@SuppressWarnings("serial")
 public class OperatorCoordinatorSchedulerTest extends TestLogger {
 
     private final JobVertexID testVertexId = new JobVertexID();
@@ -774,10 +774,19 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                 SchedulerTestingUtils.getExecutionState(scheduler, 
testVertexId, subtask));
     }
 
-    private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable 
reason) {
+    private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable 
reason)
+            throws InterruptedException {
         scheduler.handleGlobalFailure(reason);
         SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);
 
+        // make sure the checkpoint is no longer triggering (this means that 
the operator event
+        // valve has been closed)
+        final CheckpointCoordinator checkpointCoordinator =
+                scheduler.getExecutionGraph().getCheckpointCoordinator();
+        while (checkpointCoordinator != null && 
checkpointCoordinator.isTriggering()) {
+            Thread.sleep(1);
+        }
+
         // make sure we propagate all asynchronous and delayed actions
         executor.triggerAll();
         executor.triggerScheduledTasks();

Reply via email to