This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.11
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 7fdd48864dab360c6f7c01e5f422fb2e02d661b8
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed May 20 14:11:09 2020 +0200

    [FLINK-17558][tests] Simplify partition tracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 49 ++++++++--------------
 1 file changed, 18 insertions(+), 31 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index 9cc5d69..9ca4397 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -212,13 +212,10 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
 
        @Test
        public void testPartitionReleaseAfterJobMasterDisconnect() throws 
Exception {
+               final CompletableFuture<JobID> releasePartitionsForJobFuture = 
new CompletableFuture<>();
                testPartitionRelease(
-                       partitionTracker -> {
-                               final CompletableFuture<JobID> 
releasePartitionsForJobFuture = new CompletableFuture<>();
-                               
partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete);
-                               return releasePartitionsForJobFuture;
-                       },
-                       (jobId, resultPartitionDeploymentDescriptor, 
taskExecutor, taskExecutorGateway, releasePartitionsForJobFuture) -> {
+                       partitionTracker -> 
partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete),
+                       (jobId, resultPartitionDeploymentDescriptor, 
taskExecutor, taskExecutorGateway) -> {
                                taskExecutorGateway.disconnectJobManager(jobId, 
new Exception("test"));
 
                                assertThat(releasePartitionsForJobFuture.get(), 
equalTo(jobId));
@@ -228,13 +225,10 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
 
        @Test
        public void testPartitionReleaseAfterReleaseCall() throws Exception {
+               final CompletableFuture<Collection<ResultPartitionID>> 
releasePartitionsFuture = new CompletableFuture<>();
                testPartitionRelease(
-                       partitionTracker -> {
-                               final 
CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new 
CompletableFuture<>();
-                               
partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete);
-                               return releasePartitionsFuture;
-                       },
-                       (jobId, resultPartitionDeploymentDescriptor, 
taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+                       partitionTracker -> 
partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete),
+                       (jobId, resultPartitionDeploymentDescriptor, 
taskExecutor, taskExecutorGateway) -> {
                                final ResultPartitionID resultPartitionId = 
resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
 
                                
taskExecutorGateway.releaseOrPromotePartitions(jobId, 
Collections.singleton(resultPartitionId), Collections.emptySet());
@@ -246,13 +240,10 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
 
        @Test
        public void testPartitionPromotion() throws Exception {
+               final CompletableFuture<Collection<ResultPartitionID>> 
promotePartitionsFuture = new CompletableFuture<>();
                testPartitionRelease(
-                       partitionTracker -> {
-                               final 
CompletableFuture<Collection<ResultPartitionID>> promotePartitionsFuture = new 
CompletableFuture<>();
-                               
partitionTracker.setPromotePartitionsConsumer(promotePartitionsFuture::complete);
-                               return promotePartitionsFuture;
-                       },
-                       (jobId, resultPartitionDeploymentDescriptor, 
taskExecutor, taskExecutorGateway, promotePartitionsFuture) -> {
+                       partitionTracker -> 
partitionTracker.setPromotePartitionsConsumer(promotePartitionsFuture::complete),
+                       (jobId, resultPartitionDeploymentDescriptor, 
taskExecutor, taskExecutorGateway) -> {
                                final ResultPartitionID resultPartitionId = 
resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID();
 
                                
taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.emptySet(), 
Collections.singleton(resultPartitionId));
@@ -264,13 +255,10 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
 
        @Test
        public void testClusterPartitionRelease() throws Exception {
+               final CompletableFuture<Collection<IntermediateDataSetID>> 
releasePartitionsFuture = new CompletableFuture<>();
                testPartitionRelease(
-                       partitionTracker -> {
-                               final 
CompletableFuture<Collection<IntermediateDataSetID>> releasePartitionsFuture = 
new CompletableFuture<>();
-                               
partitionTracker.setReleaseClusterPartitionsConsumer(releasePartitionsFuture::complete);
-                               return releasePartitionsFuture;
-                       },
-                       (jobId, resultPartitionDeploymentDescriptor, 
taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> {
+                       partitionTracker -> 
partitionTracker.setReleaseClusterPartitionsConsumer(releasePartitionsFuture::complete),
+                       (jobId, resultPartitionDeploymentDescriptor, 
taskExecutor, taskExecutorGateway) -> {
                                final IntermediateDataSetID dataSetId = 
resultPartitionDeploymentDescriptor.getResultId();
 
                                
taskExecutorGateway.releaseClusterPartitions(Collections.singleton(dataSetId), 
timeout);
@@ -341,7 +329,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                final TestingTaskExecutorPartitionTracker partitionTracker = 
new TestingTaskExecutorPartitionTracker();
                final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();
                partitionTracker.setStartTrackingPartitionsConsumer((jobId, 
partitionInfo) -> 
startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-               C partitionTrackerSetupResult = 
partitionTrackerSetup.accept(partitionTracker);
+               partitionTrackerSetup.accept(partitionTracker);
 
                final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
@@ -421,8 +409,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                                jobId,
                                taskResultPartitionDescriptor,
                                taskExecutor,
-                               taskExecutorGateway,
-                               partitionTrackerSetupResult);
+                               taskExecutorGateway);
                } finally {
                        RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
                }
@@ -478,12 +465,12 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
        }
 
        @FunctionalInterface
-       private interface PartitionTrackerSetup<C> {
-               C accept(TestingTaskExecutorPartitionTracker partitionTracker) 
throws Exception;
+       private interface PartitionTrackerSetup {
+               void accept(TestingTaskExecutorPartitionTracker 
partitionTracker) throws Exception;
        }
 
        @FunctionalInterface
-       private interface TestAction<C> {
-               void accept(JobID jobId, ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor, TaskExecutor taskExecutor, 
TaskExecutorGateway taskExecutorGateway, C partitionTrackerSetupResult) throws 
Exception;
+       private interface TestAction {
+               void accept(JobID jobId, ResultPartitionDeploymentDescriptor 
resultPartitionDeploymentDescriptor, TaskExecutor taskExecutor, 
TaskExecutorGateway taskExecutorGateway) throws Exception;
        }
 }

Reply via email to