This is an automated email from the ASF dual-hosted git repository.

chesnay pushed a commit to branch release-1.10
in repository https://gitbox.apache.org/repos/asf/flink.git

commit cce971167d4de726dcdb7c9a6e4f0f648fb61266
Author: Chesnay Schepler <[email protected]>
AuthorDate: Wed May 20 14:11:09 2020 +0200

    [FLINK-17558][tests] Simplify partition tracker setup
---
 .../TaskExecutorPartitionLifecycleTest.java        | 43 ++++++++--------------
 1 file changed, 16 insertions(+), 27 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
index b8f43d0..4f6ee44 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java
@@ -206,14 +206,10 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
 
        @Test
        public void testPartitionReleaseAfterJobMasterDisconnect() throws 
Exception {
+               final CompletableFuture<JobID> releasePartitionsForJobFuture = 
new CompletableFuture<>();
                testPartitionRelease(
-                       partitionTracker -> {
-                               final CompletableFuture<JobID> 
releasePartitionsForJobFuture = new CompletableFuture<>();
-                               
partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete);
-                               return releasePartitionsForJobFuture;
-                       },
-                       (jobId, partitionId, taskExecutor, taskExecutorGateway, 
releasePartitionsForJobFuture) -> {
-
+                       partitionTracker -> 
partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete),
+                       (jobId, partitionId, taskExecutor, taskExecutorGateway) 
-> {
                                taskExecutorGateway.disconnectJobManager(jobId, 
new Exception("test"));
 
                                assertThat(releasePartitionsForJobFuture.get(), 
equalTo(jobId));
@@ -223,13 +219,10 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
 
        @Test
        public void testPartitionReleaseAfterReleaseCall() throws Exception {
+               final CompletableFuture<Collection<ResultPartitionID>> 
releasePartitionsFuture = new CompletableFuture<>();
                testPartitionRelease(
-                       partitionTracker -> {
-                               final 
CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new 
CompletableFuture<>();
-                               
partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete);
-                               return releasePartitionsFuture;
-                       },
-                       (jobId, partitionId, taskExecutor, taskExecutorGateway, 
releasePartitionsFuture) -> {
+                       partitionTracker -> 
partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete),
+                       (jobId, partitionId, taskExecutor, taskExecutorGateway) 
-> {
                                
taskExecutorGateway.releaseOrPromotePartitions(jobId, 
Collections.singleton(partitionId), Collections.emptySet());
 
                                assertThat(releasePartitionsFuture.get(), 
hasItems(partitionId));
@@ -239,13 +232,10 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
 
        @Test
        public void testPartitionPromotion() throws Exception {
+               final CompletableFuture<Collection<ResultPartitionID>> 
releasePartitionsFuture = new CompletableFuture<>();
                testPartitionRelease(
-                       partitionTracker -> {
-                               final 
CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new 
CompletableFuture<>();
-                               
partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete);
-                               return releasePartitionsFuture;
-                       },
-                       (jobId, partitionId, taskExecutor, taskExecutorGateway, 
releasePartitionsFuture) -> {
+                       partitionTracker -> 
partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete),
+                       (jobId, partitionId, taskExecutor, taskExecutorGateway) 
-> {
                                
taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.emptySet(), 
Collections.singleton(partitionId));
 
                                assertThat(releasePartitionsFuture.get(), 
hasItems(partitionId));
@@ -253,7 +243,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                );
        }
 
-       private <C> void testPartitionRelease(PartitionTrackerSetup<C> 
partitionTrackerSetup, TestAction<C> testAction) throws Exception {
+       private void testPartitionRelease(PartitionTrackerSetup 
partitionTrackerSetup, TestAction testAction) throws Exception {
 
                final ResultPartitionDeploymentDescriptor 
taskResultPartitionDescriptor =
                        
PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING);
@@ -314,7 +304,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                final TestingTaskExecutorPartitionTracker partitionTracker = 
new TestingTaskExecutorPartitionTracker();
                final CompletableFuture<ResultPartitionID> startTrackingFuture 
= new CompletableFuture<>();
                partitionTracker.setStartTrackingPartitionsConsumer((jobId, 
partitionInfo) -> 
startTrackingFuture.complete(partitionInfo.getResultPartitionId()));
-               C partitionTrackerSetupResult = 
partitionTrackerSetup.accept(partitionTracker);
+               partitionTrackerSetup.accept(partitionTracker);
 
                final TestingTaskExecutor taskExecutor = 
createTestingTaskExecutor(taskManagerServices, partitionTracker);
 
@@ -394,8 +384,7 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
                                jobId,
                                
taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID(),
                                taskExecutor,
-                               taskExecutorGateway,
-                               partitionTrackerSetupResult);
+                               taskExecutorGateway);
                } finally {
                        RpcUtils.terminateRpcEndpoint(taskExecutor, timeout);
                }
@@ -447,12 +436,12 @@ public class TaskExecutorPartitionLifecycleTest extends 
TestLogger {
        }
 
        @FunctionalInterface
-       private interface PartitionTrackerSetup<C> {
-               C accept(TestingTaskExecutorPartitionTracker partitionTracker) 
throws Exception;
+       private interface PartitionTrackerSetup {
+               void accept(TestingTaskExecutorPartitionTracker 
partitionTracker) throws Exception;
        }
 
        @FunctionalInterface
-       private interface TestAction<C> {
-               void accept(JobID jobId, ResultPartitionID resultPartitionId, 
TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway, C 
partitionTrackerSetupResult) throws Exception;
+       private interface TestAction {
+               void accept(JobID jobId, ResultPartitionID resultPartitionId, 
TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway) throws 
Exception;
        }
 }

Reply via email to