This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7ceee2395330c9c723ae83a85de2cf44f8378efc
Author: Stephan Ewen <se...@apache.org>
AuthorDate: Fri May 29 08:55:36 2020 +0200

    [FLINK-16986][coordination][refactor] Change executor in 
OperatorCoordinatorSchedulerTest
    
    This prepares the test to be ready to run with proper main-thread-execution 
in the
    OperatorCoordinators.
---
 .../OperatorCoordinatorSchedulerTest.java          | 41 +++++++++++++++++++---
 .../coordination/TestingOperatorCoordinator.java   |  8 +++++
 .../runtime/scheduler/SchedulerTestingUtils.java   | 10 ++++--
 3 files changed, 52 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
index 8e5404a..6ec4dd7 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/OperatorCoordinatorSchedulerTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.runtime.checkpoint.Checkpoints;
 import org.apache.flink.runtime.checkpoint.CompletedCheckpoint;
 import org.apache.flink.runtime.checkpoint.OperatorState;
 import org.apache.flink.runtime.checkpoint.metadata.CheckpointMetadata;
+import org.apache.flink.runtime.concurrent.ComponentMainThreadExecutor;
 import 
org.apache.flink.runtime.concurrent.ComponentMainThreadExecutorServiceAdapter;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import 
org.apache.flink.runtime.concurrent.ManuallyTriggeredScheduledExecutorService;
@@ -64,6 +65,7 @@ import java.util.Collections;
 import java.util.Optional;
 import java.util.Random;
 import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ScheduledExecutorService;
 import java.util.function.Consumer;
 
 import static org.apache.flink.core.testutils.FlinkMatchers.futureFailedWith;
@@ -204,6 +206,7 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                final OperatorCoordinator.Context context = 
getCoordinator(scheduler).getContext();
 
                final CompletableFuture<?> result = context.sendEvent(new 
TestOperatorEvent(), 0);
+               executor.triggerAll(); // process event sending
 
                assertThat(result, 
futureFailedWith(TaskNotRunningException.class));
        }
@@ -214,6 +217,7 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
 
                final OperatorCoordinator.Context context = 
getCoordinator(scheduler).getContext();
                final CompletableFuture<?> result = context.sendEvent(new 
TestOperatorEvent(), 0);
+               executor.triggerAll();  // process event sending
 
                assertThat(result, futureFailedWith(TestException.class));
        }
@@ -482,7 +486,10 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                }
 
                final DefaultScheduler scheduler = schedulerBuilder.build();
-               
scheduler.setMainThreadExecutor(ComponentMainThreadExecutorServiceAdapter.forMainThread());
+
+               final ComponentMainThreadExecutor mainThreadExecutor = new 
ComponentMainThreadExecutorServiceAdapter(
+                       (ScheduledExecutorService) executor, 
Thread.currentThread());
+               scheduler.setMainThreadExecutor(mainThreadExecutor);
 
                this.createdScheduler = scheduler;
                return scheduler;
@@ -529,7 +536,10 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
 
        private void failAndRedeployTask(DefaultScheduler scheduler, int 
subtask) {
                failTask(scheduler, subtask);
+
+               executor.triggerAll();
                executor.triggerScheduledTasks();
+               executor.triggerAll();
 
                // guard the test assumptions: This must lead to a restarting 
and redeploying
                assertEquals(ExecutionState.DEPLOYING, 
SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, subtask));
@@ -547,8 +557,13 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                scheduler.handleGlobalFailure(reason);
                SchedulerTestingUtils.setAllExecutionsToCancelled(scheduler);
 
-               executor.triggerScheduledTasks();   // this handles the restart 
/ redeploy
+               // make sure we propagate all asynchronous and delayed actions
+               executor.triggerAll();
+               executor.triggerScheduledTasks();
+               executor.triggerAll();
+
                SchedulerTestingUtils.setAllExecutionsToRunning(scheduler);
+               executor.triggerAll();
 
                // guard the test assumptions: This must bring the tasks back 
to RUNNING
                assertEquals(ExecutionState.RUNNING, 
SchedulerTestingUtils.getExecutionState(scheduler, testVertexId, 0));
@@ -564,7 +579,17 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
 
        private CompletableFuture<CompletedCheckpoint> 
triggerCheckpoint(DefaultScheduler scheduler) throws Exception {
                final CompletableFuture<CompletedCheckpoint> future = 
SchedulerTestingUtils.triggerCheckpoint(scheduler);
-               executor.triggerAll();
+               final TestingOperatorCoordinator coordinator = 
getCoordinator(scheduler);
+
+               // the Checkpoint Coordinator executes parts of the logic in 
its timer thread, and delegates some calls
+               // to the scheduler executor. so we need to do a mix of waiting 
for the timer thread and working off
+               // tasks in the scheduler executor.
+               // we can drop this here once the CheckpointCoordinator also 
runs in a 'main thread executor'.
+               while (!(coordinator.hasTriggeredCheckpoint() || 
future.isDone())) {
+                       executor.triggerAll();
+                       Thread.sleep(1);
+               }
+
                return future;
        }
 
@@ -585,7 +610,15 @@ public class OperatorCoordinatorSchedulerTest extends 
TestLogger {
                acknowledgeCurrentCheckpoint(scheduler);
 
                // wait until checkpoint has completed
-               return checkpointFuture.get().getCheckpointID();
+               final long checkpointId = 
checkpointFuture.get().getCheckpointID();
+
+               // now wait until it has been acknowledged
+               while (!testingOperatorCoordinator.hasCompleteCheckpoint()) {
+                       executor.triggerAll();
+                       Thread.sleep(1);
+               }
+
+               return checkpointId;
        }
 
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
index 5d93bcc..e914afe 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/operators/coordination/TestingOperatorCoordinator.java
@@ -121,10 +121,18 @@ class TestingOperatorCoordinator implements 
OperatorCoordinator {
                return triggeredCheckpoints.take();
        }
 
+       public boolean hasTriggeredCheckpoint() {
+               return !triggeredCheckpoints.isEmpty();
+       }
+
        public long getLastCheckpointComplete() throws InterruptedException {
                return lastCheckpointComplete.take();
        }
 
+       public boolean hasCompleteCheckpoint() throws InterruptedException {
+               return !lastCheckpointComplete.isEmpty();
+       }
+
        // 
------------------------------------------------------------------------
        //  The provider for this coordinator implementation
        // 
------------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
index 6f62067..1686c5a 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/scheduler/SchedulerTestingUtils.java
@@ -90,6 +90,7 @@ import java.util.stream.StreamSupport;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 /**
@@ -265,9 +266,12 @@ public class SchedulerTestingUtils {
 
        public static void setAllExecutionsToCancelled(final DefaultScheduler 
scheduler) {
                final JobID jid = scheduler.getJobId();
-               getAllCurrentExecutionAttempts(scheduler).forEach(
-                       (attemptId) -> scheduler.updateTaskExecutionState(new 
TaskExecutionState(jid, attemptId, ExecutionState.CANCELED))
-               );
+               for (final ExecutionAttemptID attemptId : 
getAllCurrentExecutionAttempts(scheduler)) {
+                       final boolean setToRunning = 
scheduler.updateTaskExecutionState(
+                                       new TaskExecutionState(jid, attemptId, 
ExecutionState.CANCELED));
+
+                       assertTrue("could not switch task to RUNNING", 
setToRunning);
+               }
        }
 
        public static void acknowledgePendingCheckpoint(final DefaultScheduler 
scheduler, final long checkpointId) throws CheckpointException {

Reply via email to