This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 0efeea3ac5d3c04b830e87209b3e7357c8826931
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed May 20 14:13:23 2020 +0200

    [FLINK-17558][tests] Extract ShuffleEnvironment/PartitionTracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 25 ++++++++++++++++------
 1 file changed, 18 insertions(+), 7 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 4f6ee44..81b651f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -244,6 +244,24 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
        }
 
        private void testPartitionRelease(PartitionTrackerSetup 
partitionTrackerSetup, TestAction testAction) throws Exception {
+               final TestingTaskExecutorPartitionTracker partitionTracker = 
new TestingTaskExecutorPartitionTracker();
+               final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();
+               partitionTracker.setStartTrackingPartitionsConsumer((jobId, 
partitionInfo) -> 
startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
+               partitionTrackerSetup.accept(partitionTracker);
+
+               internalTestPartitionRelease(
+                       partitionTracker,
+                       new NettyShuffleEnvironmentBuilder().build(),
+                       startTrackingFuture,
+                       testAction
+               );
+       }
+
+       private void internalTestPartitionRelease(
+                       TaskExecutorPartitionTracker partitionTracker,
+                       ShuffleEnvironment<?, ?> shuffleEnvironment,
+                       CompletableFuture<ResultPartitionID> 
startTrackingFuture,
+                       TestAction testAction) throws Exception {
 
                final ResultPartitionDeploymentDescriptor 
taskResultPartitionDescriptor =
                        
PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -276,8 +294,6 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        new File[]{tmp.newFolder()},
                        Executors.directExecutor());
 
-               final ShuffleEnvironment<?, ?> shuffleEnvironment = new 
NettyShuffleEnvironmentBuilder().build();
-
                final TaskManagerServices taskManagerServices = new 
TaskManagerServicesBuilder()
                        .setTaskSlotTable(taskSlotTable)
                        .setTaskStateManager(localStateStoresManager)
@@ -301,11 +317,6 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                        })
                        .build();
 
-               final TestingTaskExecutorPartitionTracker partitionTracker = 
new TestingTaskExecutorPartitionTracker();
-               final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();
-               partitionTracker.setStartTrackingPartitionsConsumer((jobId, 
partitionInfo) -> 
startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-               partitionTrackerSetup.accept(partitionTracker);
-
                final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
                final CompletableFuture<SlotReport> initialSlotReportFuture = 
new CompletableFuture<>();

Reply via email to