This is an automated email from the ASF dual-hosted git repository.
chesnay pushed a commit to branch release-1.14
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.14 by this push:
new 97fe172 [FLINK-24444][runtime][tests] Wait until checkpoints stopped
triggering
97fe172 is described below
commit 97fe1723d7e9071a0a3c93612a50a2dc9652e1d6
Author: David Moravek <[email protected]>
AuthorDate: Sun Jan 16 12:31:18 2022 +0100
[FLINK-24444][runtime][tests] Wait until checkpoints stopped triggering
---
.../coordination/OperatorCoordinatorSchedulerTest.java | 13 +++++++++++--
1 file changed, 11 insertions(+), 2 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 88cd7e6..1d8b59d 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.core.fs.CloseableRegistry;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.metrics.MetricGroup;
import org.apache.flink.runtime.OperatorIDPair;
+import org.apache.flink.runtime.checkpoint.CheckpointCoordinator;
import org.apache.flink.runtime.checkpoint.Checkpoints;
import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
import org.apache.flink.runtime.checkpoint.OperatorState;
@@ -105,7 +106,6 @@ import static org.junit.Assert.fail;
* Tests for the integration of the {@link OperatorCoordinator} with the
scheduler, to ensure the
* relevant actions are leading to the right method invocations on the
coordinator.
*/
-@SuppressWarnings("serial")
public class OperatorCoordinatorSchedulerTest extends TestLogger {
private final JobVertexID testVertexId = new JobVertexID();
@@ -774,10 +774,19 @@ public class OperatorCoordinatorSchedulerTest extends
TestLogger {
SchedulerTestingUtils.getExecutionState(scheduler,
testVertexId, subtask));
}
- private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable
reason) {
+ private void failGlobalAndRestart(DefaultScheduler scheduler, Throwable
reason)
+ throws InterruptedException {
scheduler.handleGlobalFailure(reason);
SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);
+ // make sure the checkpoint is no longer triggering (this means that
the operator event
+ // valve has been closed)
+ final CheckpointCoordinator checkpointCoordinator =
+ scheduler.getExecutionGraph().getCheckpointCoordinator();
+ while (checkpointCoordinator != null &&
checkpointCoordinator.isTriggering()) {
+ Thread.sleep(1);
+ }
+
// make sure we propagate all asynchronous and delayed actions
executor.triggerAll();
executor.triggerScheduledTasks();