This is an automated email from the ASF dual-hosted git repository.

xtsong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a3b51f8  [FLINK-15687][runtime][test] Fix test instability due to 
concurrent access to JobTable.
a3b51f8 is described below

commit a3b51f805f247e6aeb681bb36b38da777ba25d79
Author: Xintong Song <[email protected]>
AuthorDate: Fri Jun 12 10:30:35 2020 +0800

    [FLINK-15687][runtime][test] Fix test instability due to concurrent access 
to JobTable.
    
    This closes #12623.
---
 .../TaskExecutorPartitionLifecycleTest.java        | 16 ++++++----
 .../TaskSubmissionTestEnvironment.java             | 37 ++++++++++++++--------
 2 files changed, 33 insertions(+), 20 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9a3afa1..578ac92 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -165,13 +165,6 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        }).build();
 
                final DefaultJobTable jobTable = DefaultJobTable.create();
-               TaskSubmissionTestEnvironment.registerJobMasterConnection(
-                       jobTable,
-                       jobId,
-                       rpc,
-                       jobMasterGateway,
-                       new NoOpTaskManagerActions(),
-                       timeout);
 
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setJobTable(jobTable)
@@ -196,6 +189,15 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        taskExecutor.start();
                        taskExecutor.waitUntilStarted();
 
+                       
TaskSubmissionTestEnvironment.registerJobMasterConnection(
+                               jobTable,
+                               jobId,
+                               rpc,
+                               jobMasterGateway,
+                               new NoOpTaskManagerActions(),
+                               timeout,
+                               
taskExecutor.getMainThreadExecutableForTesting());
+
                        final TaskExecutorGateway taskExecutorGateway = 
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
 
                        trackerIsTrackingPartitions.set(true);
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index e79b720..d7fcd02 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -42,6 +42,7 @@ import 
org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
 import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
 import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
 import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.RpcUtils;
 import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -170,7 +171,14 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                        taskManagerActions = testTaskManagerActions;
                }
 
-               registerJobMasterConnection(jobTable, jobId, testingRpcService, 
jobMasterGateway, taskManagerActions, timeout);
+               registerJobMasterConnection(
+                       jobTable,
+                       jobId,
+                       testingRpcService,
+                       jobMasterGateway,
+                       taskManagerActions,
+                       timeout,
+                       taskExecutor.getMainThreadExecutableForTesting());
        }
 
        static void registerJobMasterConnection(
@@ -179,18 +187,21 @@ class TaskSubmissionTestEnvironment implements 
AutoCloseable {
                        RpcService testingRpcService,
                        JobMasterGateway jobMasterGateway,
                        TaskManagerActions taskManagerActions,
-                       Time timeout) {
-               final JobTable.Job job = jobTable.getOrCreateJob(jobId, () -> 
TestingJobServices.newBuilder().build());
-               job.connect(
-                       ResourceID.generate(),
-                       jobMasterGateway,
-                       taskManagerActions,
-                       new TestCheckpointResponder(),
-                       new TestGlobalAggregateManager(),
-                       new 
RpcResultPartitionConsumableNotifier(jobMasterGateway, 
testingRpcService.getExecutor(), timeout),
-                       TestingPartitionProducerStateChecker.newBuilder()
-                               .setPartitionProducerStateFunction((jobID, 
intermediateDataSetID, resultPartitionID) -> 
CompletableFuture.completedFuture(ExecutionState.RUNNING))
-                               .build());
+                       Time timeout,
+                       MainThreadExecutable mainThreadExecutable) {
+               mainThreadExecutable.runAsync(() -> {
+                       final JobTable.Job job = jobTable.getOrCreateJob(jobId, 
() -> TestingJobServices.newBuilder().build());
+                       job.connect(
+                               ResourceID.generate(),
+                               jobMasterGateway,
+                               taskManagerActions,
+                               new TestCheckpointResponder(),
+                               new TestGlobalAggregateManager(),
+                               new 
RpcResultPartitionConsumableNotifier(jobMasterGateway, 
testingRpcService.getExecutor(), timeout),
+                               
TestingPartitionProducerStateChecker.newBuilder()
+                                       
.setPartitionProducerStateFunction((jobID, intermediateDataSetID, 
resultPartitionID) -> CompletableFuture.completedFuture(ExecutionState.RUNNING))
+                                       .build());
+               });
        }
 
        public TestingTaskExecutor getTaskExecutor() {

Reply via email to