This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.10 in repository https://gitbox.apache.org/repos/asf/flink.git
commit cce971167d4de726dcdb7c9a6e4f0f648fb61266 Author: Chesnay Schepler <[email protected]> AuthorDate: Wed May 20 14:11:09 2020 +0200 [FLINK-17558][tests] Simplify partition tracker setup --- .../TaskExecutorPartitionLifecycleTest.java | 43 ++++++++-------------- 1 file changed, 16 insertions(+), 27 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index b8f43d0..4f6ee44 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -206,14 +206,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { @Test public void testPartitionReleaseAfterJobMasterDisconnect() throws Exception { + final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>(); testPartitionRelease( - partitionTracker -> { - final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>(); - partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete); - return releasePartitionsForJobFuture; - }, - (jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsForJobFuture) -> { - + partitionTracker -> partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete), + (jobId, partitionId, taskExecutor, taskExecutorGateway) -> { taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")); assertThat(releasePartitionsForJobFuture.get(), equalTo(jobId)); @@ -223,13 +219,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { @Test public void testPartitionReleaseAfterReleaseCall() throws Exception { + final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>(); testPartitionRelease( - partitionTracker -> { - final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>(); - partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete); - return releasePartitionsFuture; - }, - (jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> { + partitionTracker -> partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete), + (jobId, partitionId, taskExecutor, taskExecutorGateway) -> { taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(partitionId), Collections.emptySet()); assertThat(releasePartitionsFuture.get(), hasItems(partitionId)); @@ -239,13 +232,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { @Test public void testPartitionPromotion() throws Exception { + final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>(); testPartitionRelease( - partitionTracker -> { - final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>(); - partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete); - return releasePartitionsFuture; - }, - (jobId, partitionId, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> { + partitionTracker -> partitionTracker.setPromotePartitionsConsumer(releasePartitionsFuture::complete), + (jobId, partitionId, taskExecutor, taskExecutorGateway) -> { taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.emptySet(), Collections.singleton(partitionId)); assertThat(releasePartitionsFuture.get(), hasItems(partitionId)); @@ -253,7 +243,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { ); } - private <C> void testPartitionRelease(PartitionTrackerSetup<C> partitionTrackerSetup, TestAction<C> testAction) throws Exception { + private void testPartitionRelease(PartitionTrackerSetup partitionTrackerSetup, TestAction testAction) throws Exception { final ResultPartitionDeploymentDescriptor taskResultPartitionDescriptor = PartitionTestUtils.createPartitionDeploymentDescriptor(ResultPartitionType.BLOCKING); @@ -314,7 +304,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker(); final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>(); partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId())); - C partitionTrackerSetupResult = partitionTrackerSetup.accept(partitionTracker); + partitionTrackerSetup.accept(partitionTracker); final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker); @@ -394,8 +384,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { jobId, taskResultPartitionDescriptor.getShuffleDescriptor().getResultPartitionID(), taskExecutor, - taskExecutorGateway, - partitionTrackerSetupResult); + taskExecutorGateway); } finally { RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); } @@ -447,12 +436,12 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { } @FunctionalInterface - private interface PartitionTrackerSetup<C> { - C accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception; + private interface PartitionTrackerSetup { + void accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception; } @FunctionalInterface - private interface TestAction<C> { - void accept(JobID jobId, ResultPartitionID resultPartitionId, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway, C partitionTrackerSetupResult) throws Exception; + private interface TestAction { + void accept(JobID jobId, ResultPartitionID resultPartitionId, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway) throws Exception; } }
