This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit a8145e547e3a760960b4fd17c532770f281193bc
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed May 20 14:13:23 2020 +0200

    [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 27 +++++++++++++++-------
 1 file changed, 19 insertions(+), 8 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9ca4397..b538b36 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -268,7 +268,25 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                );
        }
 
-       private <C> void testPartitionRelease(PartitionTrackerSetup<C> 
partitionTrackerSetup, TestAction<C> testAction) throws Exception {
+       private void testPartitionRelease(PartitionTrackerSetup 
partitionTrackerSetup, TestAction testAction) throws Exception {
+               final TestingTaskExecutorPartitionTracker partitionTracker = 
new TestingTaskExecutorPartitionTracker();
+               final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();
+               partitionTracker.setStartTrackingPartitionsConsumer((jobId, 
partitionInfo) -> 
startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
+               partitionTrackerSetup.accept(partitionTracker);
+
+               internalTestPartitionRelease(
+                       partitionTracker,
+                       new NettyShuffleEnvironmentBuilder().build(),
+                       startTrackingFuture,
+                       testAction
+               );
+       }
+
+       private void internalTestPartitionRelease(
+                       TaskExecutorPartitionTracker partitionTracker,
+                       ShuffleEnvironment<?, ?> shuffleEnvironment,
+                       CompletableFuture<ResultPartitionID> 
startTrackingFuture,
+                       TestAction testAction) throws Exception {
 
                final ResultPartitionDeploymentDescriptor 
taskResultPartitionDescriptor =
                        
PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -301,8 +319,6 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
-               final ShuffleEnvironment<?, ?> shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder().build();
-
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskSlotTable(taskSlotTable)
                        .setTaskStateManager(localStateStoresManager)
@@ -326,11 +342,6 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        })
                        .build();
 
-               final TestingTaskExecutorPartitionTracker partitionTracker = 
new TestingTaskExecutorPartitionTracker();
-               final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();
-               partitionTracker.setStartTrackingPartitionsConsumer((jobId, 
partitionInfo) -> 
startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-               partitionTrackerSetup.accept(partitionTracker);
-
                final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
                final CompletableFuture<SlotReport> initialSlotReportFuture = 
new CompletableFuture<>();

Reply via email to