This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7ceee2395330c9c723ae83a85de2cf44f8378efc Author: Stephan Ewen <se...@apache.org> AuthorDate: Fri May 29 08:55:36 2020 +0200 [FLINK-16986][coordination][refactor] Change executor in OperatorCoordinatorSchedulerTest This prepares the test to be ready to run with proper main-thread-execution in the OperatorCoordinators. --- .../OperatorCoordinatorSchedulerTest.java | 41 +++++++++++++++++++--- .../coordination/TestingOperatorCoordinator.java | 8 +++++ .../runtime/scheduler/SchedulerTestingUtils.java | 10 ++++-- 3 files changed, 52 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java index 8e5404a..6ec4dd7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java @@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.Checkpoints; import org.apache.flink.runtime.checkpoint.CompletedCheckpoint; import org.apache.flink.runtime.checkpoint.OperatorState; import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata; +import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor; import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService; @@ -64,6 +65,7 @@ import java.util.Collections; import java.util.Optional; import java.util.Random; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import java.util.function.Consumer; import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith; @@ -204,6 +206,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { final OperatorCoordinator.Context context = getCoordinator(scheduler).getContext(); final CompletableFuture<?> result = context.sendEvent(new TestOperatorEvent(), 0); + executor.triggerAll(); // process event sending assertThat(result, futureFailedWith(TaskNotRunningException.class)); } @@ -214,6 +217,7 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { final OperatorCoordinator.Context context = getCoordinator(scheduler).getContext(); final CompletableFuture<?> result = context.sendEvent(new TestOperatorEvent(), 0); + executor.triggerAll(); // process event sending assertThat(result, futureFailedWith(TestException.class)); } @@ -482,7 +486,10 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { } final DefaultScheduler scheduler = schedulerBuilder.build(); - scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread()); + + final ComponentMainThreadExecutor mainThreadExecutor = new ComponentMainThreadExecutorServiceAdapter( + (ScheduledExecutorService) executor, Thread.currentThread()); + scheduler.setMainThreadExecutor(mainThreadExecutor); this.createdScheduler = scheduler; return scheduler; @@ -529,7 +536,10 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { private void failAndRedeployTask(DefaultScheduler scheduler, int subtask) { failTask(scheduler, subtask); + + executor.triggerAll(); executor.triggerScheduledTasks(); + executor.triggerAll(); // guard the test assumptions: This must lead to a restarting and redeploying assertEquals(ExecutionState.DEPLOYING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask)); @@ -547,8 +557,13 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { scheduler.handleGlobalFailure(reason); SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler); - executor.triggerScheduledTasks(); // this handles the restart / redeploy + // make sure we propagate all asynchronous and delayed actions + executor.triggerAll(); + executor.triggerScheduledTasks(); + executor.triggerAll(); + SchedulerTestingUtils.setAllExecutionsToRunning(scheduler); + executor.triggerAll(); // guard the test assumptions: This must bring the tasks back to RUNNING assertEquals(ExecutionState.RUNNING, SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0)); @@ -564,7 +579,17 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { private CompletableFuture<CompletedCheckpoint> triggerCheckpoint(DefaultScheduler scheduler) throws Exception { final CompletableFuture<CompletedCheckpoint> future = SchedulerTestingUtils.triggerCheckpoint(scheduler); - executor.triggerAll(); + final TestingOperatorCoordinator coordinator = getCoordinator(scheduler); + + // the Checkpoint Coordinator executes parts of the logic in its timer thread, and delegates some calls + // to the scheduler executor. so we need to do a mix of waiting for the timer thread and working off + // tasks in the scheduler executor. + // we can drop this here once the CheckpointCoordinator also runs in a 'main thread executor'. + while (!(coordinator.hasTriggeredCheckpoint() || future.isDone())) { + executor.triggerAll(); + Thread.sleep(1); + } + return future; } @@ -585,7 +610,15 @@ public class OperatorCoordinatorSchedulerTest extends TestLogger { acknowledgeCurrentCheckpoint(scheduler); // wait until checkpoint has completed - return checkpointFuture.get().getCheckpointID(); + final long checkpointId = checkpointFuture.get().getCheckpointID(); + + // now wait until it has been acknowledged + while (!testingOperatorCoordinator.hasCompleteCheckpoint()) { + executor.triggerAll(); + Thread.sleep(1); + } + + return checkpointId; } // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java index 5d93bcc..e914afe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java @@ -121,10 +121,18 @@ class TestingOperatorCoordinator implements OperatorCoordinator { return triggeredCheckpoints.take(); } + public boolean hasTriggeredCheckpoint() { + return !triggeredCheckpoints.isEmpty(); + } + public long getLastCheckpointComplete() throws InterruptedException { return lastCheckpointComplete.take(); } + public boolean hasCompleteCheckpoint() throws InterruptedException { + return !lastCheckpointComplete.isEmpty(); + } + // ------------------------------------------------------------------------ // The provider for this coordinator implementation // ------------------------------------------------------------------------ diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java index 6f62067..1686c5a 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java @@ -90,6 +90,7 @@ import java.util.stream.StreamSupport; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; /** @@ -265,9 +266,12 @@ public class SchedulerTestingUtils { public static void setAllExecutionsToCancelled(final DefaultScheduler scheduler) { final JobID jid = scheduler.getJobId(); - getAllCurrentExecutionAttempts(scheduler).forEach( - (attemptId) -> scheduler.updateTaskExecutionState(new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED)) - ); + for (final ExecutionAttemptID attemptId : getAllCurrentExecutionAttempts(scheduler)) { + final boolean setToRunning = scheduler.updateTaskExecutionState( + new TaskExecutionState(jid, attemptId, ExecutionState.CANCELED)); + + assertTrue("could not switch task to RUNNING", setToRunning); + } } public static void acknowledgePendingCheckpoint(final DefaultScheduler scheduler, final long checkpointId) throws CheckpointException {