This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit a8145e547e3a760960b4fd17c532770f281193bc Author: Chesnay Schepler <[email protected]> AuthorDate: Wed May 20 14:13:23 2020 +0200 [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup --- .../TaskExecutorPartitionLifecycleTest.java | 27 +++++++++++++++------- 1 file changed, 19 insertions(+), 8 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index 9ca4397..b538b36 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -268,7 +268,25 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { ); } - private <C> void testPartitionRelease(PartitionTrackerSetup<C> partitionTrackerSetup, TestAction<C> testAction) throws Exception { + private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception { + final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker(); + final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>(); + partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId())); + partitionTrackerSetup.accept(partitionTracker); + + internalTestPartitionRelease( + partitionTracker, + new NettyShuffleEnvironmentBuilder().build(), + startTrackingFuture, + testAction + ); + } + + private void internalTestPartitionRelease( + TaskExecutorPartitionTracker partitionTracker, + ShuffleEnvironment<?, ?> shuffleEnvironment, + CompletableFuture<ResultPartitionID> startTrackingFuture, + TestAction testAction) throws Exception { final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor = PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING); @@ -301,8 +319,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { new File[]{tmp.newFolder()}, Executors.directExecutor()); - final ShuffleEnvironment<?, ?> shuffleEnvironment = new NettyShuffleEnvironmentBuilder().build(); - final TaskManagerServices taskManagerServices = new TaskManagerServicesBuilder() .setTaskSlotTable(taskSlotTable) .setTaskStateManager(localStateStoresManager) @@ -326,11 +342,6 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { }) .build(); - final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker(); - final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>(); - partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId())); - partitionTrackerSetup.accept(partitionTracker); - final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker); final CompletableFuture<SlotReport> initialSlotReportFuture = new CompletableFuture<>();
