This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit e2cffe19a044d6decfae83e14ce59eca1e4c449f
Author: Xintong Song <[email protected]>
AuthorDate: Thu May 28 16:23:31 2020 +0800

    [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table 
on rpc main thread in TaskSubmissionTestEnvironment.
    
    This closes #12399.
---
 .../TaskSubmissionTestEnvironment.java             | 25 +++++++++++-----------
 1 file changed, 13 insertions(+), 12 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index 14adabf..e79b720 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -137,20 +137,8 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                        jobMasterGateway = testingJobMasterGateway;
                }
 
-               TaskManagerActions taskManagerActions;
-               if (taskManagerActionListeners.size() == 0) {
-                       taskManagerActions = new NoOpTaskManagerActions();
-               } else {
-                       TestTaskManagerActions testTaskManagerActions = new 
TestTaskManagerActions(taskSlotTable, jobMasterGateway);
-                       for (Tuple3<ExecutionAttemptID, ExecutionState, 
CompletableFuture<Void>> listenerTuple : taskManagerActionListeners) {
-                               
testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, 
listenerTuple.f2);
-                       }
-                       taskManagerActions = testTaskManagerActions;
-               }
-
                this.testingRpcService = testingRpcService;
                final DefaultJobTable jobTable = DefaultJobTable.create();
-               registerJobMasterConnection(jobTable, jobId, testingRpcService, 
jobMasterGateway, taskManagerActions, timeout);
 
                TaskExecutorLocalStateStoresManager localStateStoresManager = 
new TaskExecutorLocalStateStoresManager(
                        false,
@@ -170,6 +158,19 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                taskExecutor.waitUntilStarted();
 
                this.threadSafeTaskSlotTable = new 
ThreadSafeTaskSlotTable<>(taskSlotTable, 
taskExecutor.getMainThreadExecutableForTesting());
+
+               TaskManagerActions taskManagerActions;
+               if (taskManagerActionListeners.size() == 0) {
+                       taskManagerActions = new NoOpTaskManagerActions();
+               } else {
+                       TestTaskManagerActions testTaskManagerActions = new 
TestTaskManagerActions(threadSafeTaskSlotTable, jobMasterGateway);
+                       for (Tuple3<ExecutionAttemptID, ExecutionState, 
CompletableFuture<Void>> listenerTuple : taskManagerActionListeners) {
+                               
testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, 
listenerTuple.f2);
+                       }
+                       taskManagerActions = testTaskManagerActions;
+               }
+
+               registerJobMasterConnection(jobTable, jobId, testingRpcService, 
jobMasterGateway, taskManagerActions, timeout);
        }
 
        static void registerJobMasterConnection(

Reply via email to