This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 0efeea3ac5d3c04b830e87209b3e7357c8826931 Author: Chesnay Schepler <[email protected]> AuthorDate: Wed May 20 14:13:23 2020 +0200 [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup --- .../TaskExecutorPartitionLifecycleTest.java | 25 ++++++++++++++++------ 1 file changed, 18 insertions(+), 7 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index 4f6ee44..81b651f 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -244,6 +244,24 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { } private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception { + final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker(); + final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>(); + partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId())); + partitionTrackerSetup.accept(partitionTracker); + + internalTestPartitionRelease( + partitionTracker, + new NettyShuffleEnvironmentBuilder().build(), + startTrackingFuture, + testAction + ); + } + + private void internalTestPartitionRelease( + TaskExecutorPartitionTracker partitionTracker, + ShuffleEnvironment<?, ?> shuffleEnvironment, + CompletableFuture<ResultPartitionID> startTrackingFuture, + TestAction testAction) throws Exception { final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor = PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING); @@ -276,8 +294,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { new File[]{tmp.newFolder()}, Executors.directExecutor()); - final ShuffleEnvironment<?, ?> shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build(); - final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskSlotTable(taskSlotTable) .setTaskStateManager(localStateStoresManager) @@ -301,11 +317,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { }) .build(); - final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker(); - final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>(); - partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId())); - partitionTrackerSetup.accept(partitionTracker); - final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker); final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();
