This is an automated email from the ASF dual-hosted git repository. xtsong pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 10b284201337750422b663e8b69b7d867825da40 Author: Xintong Song <[email protected]> AuthorDate: Thu May 28 16:23:31 2020 +0800 [FLINK-15687][runtime][test] Make TaskManagerActions access task slot table on rpc main thread in TaskSubmissionTestEnvironment. This closes #12399. --- .../TaskSubmissionTestEnvironment.java | 25 +++++++++++----------- 1 file changed, 13 insertions(+), 12 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java index 14adabf..e79b720 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java @@ -137,20 +137,8 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { jobMasterGateway = testingJobMasterGateway; } - TaskManagerActions taskManagerActions; - if (taskManagerActionListeners.size() == 0) { - taskManagerActions = new NoOpTaskManagerActions(); - } else { - TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(taskSlotTable, jobMasterGateway); - for (Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>> listenerTuple : taskManagerActionListeners) { - testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, listenerTuple.f2); - } - taskManagerActions = testTaskManagerActions; - } - this.testingRpcService = testingRpcService; final DefaultJobTable jobTable = DefaultJobTable.create(); - registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout); TaskExecutorLocalStateStoresManager localStateStoresManager = new TaskExecutorLocalStateStoresManager( false, @@ -170,6 +158,19 @@ class TaskSubmissionTestEnvironment implements AutoCloseable { taskExecutor.waitUntilStarted(); this.threadSafeTaskSlotTable = new ThreadSafeTaskSlotTable<>(taskSlotTable, taskExecutor.getMainThreadExecutableForTesting()); + + TaskManagerActions taskManagerActions; + if (taskManagerActionListeners.size() == 0) { + taskManagerActions = new NoOpTaskManagerActions(); + } else { + TestTaskManagerActions testTaskManagerActions = new TestTaskManagerActions(threadSafeTaskSlotTable, jobMasterGateway); + for (Tuple3<ExecutionAttemptID, ExecutionState, CompletableFuture<Void>> listenerTuple : taskManagerActionListeners) { + testTaskManagerActions.addListener(listenerTuple.f0, listenerTuple.f1, listenerTuple.f2); + } + taskManagerActions = testTaskManagerActions; + } + + registerJobMasterConnection(jobTable, jobId, testingRpcService, jobMasterGateway, taskManagerActions, timeout); } static void registerJobMasterConnection(
