This is an automated email from the ASF dual-hosted git repository.
xtsong pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/release-1.11 by this push:
new 15e7c26 [FLINK-15687][runtime][test] Fix test instability due to
concurrent access to JobTable.
15e7c26 is described below
commit 15e7c26ab6b9c9b2b1faa81495dc941e2f10bac6
Author: Xintong Song <[email protected]>
AuthorDate: Fri Jun 12 10:30:35 2020 +0800
[FLINK-15687][runtime][test] Fix test instability due to concurrent access
to JobTable.
This closes #12623.
---
.../TaskExecutorPartitionLifecycleTest.java | 16 ++++++----
.../TaskSubmissionTestEnvironment.java | 37 ++++++++++++++--------
2 files changed, 33 insertions(+), 20 deletions(-)
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9a3afa1..578ac92 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -165,13 +165,6 @@ public class TaskExecutorPartitionLifecycleTest extends
TestLogger {
}).build();
final DefaultJobTable jobTable = DefaultJobTable.create();
- TaskSubmissionTestEnvironment.registerJobMasterConnection(
- jobTable,
- jobId,
- rpc,
- jobMasterGateway,
- new NoOpTaskManagerActions(),
- timeout);
final TaskManagerServices taskManagerServices = new
TaskManagerServicesBuilder()
.setJobTable(jobTable)
@@ -196,6 +189,15 @@ public class TaskExecutorPartitionLifecycleTest extends
TestLogger {
taskExecutor.start();
taskExecutor.waitUntilStarted();
+
TaskSubmissionTestEnvironment.registerJobMasterConnection(
+ jobTable,
+ jobId,
+ rpc,
+ jobMasterGateway,
+ new NoOpTaskManagerActions(),
+ timeout,
+
taskExecutor.getMainThreadExecutableForTesting());
+
final TaskExecutorGateway taskExecutorGateway =
taskExecutor.getSelfGateway(TaskExecutorGateway.class);
trackerIsTrackingPartitions.set(true);
diff --git
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
index e79b720..d7fcd02 100644
---
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
+++
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskSubmissionTestEnvironment.java
@@ -42,6 +42,7 @@ import
org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGateway;
import org.apache.flink.runtime.jobmaster.utils.TestingJobMasterGatewayBuilder;
import org.apache.flink.runtime.leaderretrieval.SettableLeaderRetrievalService;
import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.rpc.MainThreadExecutable;
import org.apache.flink.runtime.rpc.RpcService;
import org.apache.flink.runtime.rpc.RpcUtils;
import org.apache.flink.runtime.rpc.TestingRpcService;
@@ -170,7 +171,14 @@ class TaskSubmissionTestEnvironment implements
AutoCloseable {
taskManagerActions = testTaskManagerActions;
}
- registerJobMasterConnection(jobTable, jobId, testingRpcService,
jobMasterGateway, taskManagerActions, timeout);
+ registerJobMasterConnection(
+ jobTable,
+ jobId,
+ testingRpcService,
+ jobMasterGateway,
+ taskManagerActions,
+ timeout,
+ taskExecutor.getMainThreadExecutableForTesting());
}
static void registerJobMasterConnection(
@@ -179,18 +187,21 @@ class TaskSubmissionTestEnvironment implements
AutoCloseable {
RpcService testingRpcService,
JobMasterGateway jobMasterGateway,
TaskManagerActions taskManagerActions,
- Time timeout) {
- final JobTable.Job job = jobTable.getOrCreateJob(jobId, () ->
TestingJobServices.newBuilder().build());
- job.connect(
- ResourceID.generate(),
- jobMasterGateway,
- taskManagerActions,
- new TestCheckpointResponder(),
- new TestGlobalAggregateManager(),
- new
RpcResultPartitionConsumableNotifier(jobMasterGateway,
testingRpcService.getExecutor(), timeout),
- TestingPartitionProducerStateChecker.newBuilder()
- .setPartitionProducerStateFunction((jobID,
intermediateDataSetID, resultPartitionID) ->
CompletableFuture.completedFuture(ExecutionState.RUNNING))
- .build());
+ Time timeout,
+ MainThreadExecutable mainThreadExecutable) {
+ mainThreadExecutable.runAsync(() -> {
+ final JobTable.Job job = jobTable.getOrCreateJob(jobId,
() -> TestingJobServices.newBuilder().build());
+ job.connect(
+ ResourceID.generate(),
+ jobMasterGateway,
+ taskManagerActions,
+ new TestCheckpointResponder(),
+ new TestGlobalAggregateManager(),
+ new
RpcResultPartitionConsumableNotifier(jobMasterGateway,
testingRpcService.getExecutor(), timeout),
+
TestingPartitionProducerStateChecker.newBuilder()
+
.setPartitionProducerStateFunction((jobID, intermediateDataSetID,
resultPartitionID) -> CompletableFuture.completedFuture(ExecutionState.RUNNING))
+ .build());
+ });
}
public TestingTaskExecutor getTaskExecutor() {