This is an automated email from the ASF dual-hosted git repository. chesnay pushed a commit to branch release-1.11 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 7fdd48864dab360c6f7c01e5f422fb2e02d661b8 Author: Chesnay Schepler <[email protected]> AuthorDate: Wed May 20 14:11:09 2020 +0200 [FLINK-17558][tests] Simplify partition tracker setup --- .../TaskExecutorPartitionLifecycleTest.java | 49 ++++++++-------------- 1 file changed, 18 insertions(+), 31 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java index 9cc5d69..9ca4397 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorPartitionLifecycleTest.java @@ -212,13 +212,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { @Test public void testPartitionReleaseAfterJobMasterDisconnect() throws Exception { + final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>(); testPartitionRelease( - partitionTracker -> { - final CompletableFuture<JobID> releasePartitionsForJobFuture = new CompletableFuture<>(); - partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete); - return releasePartitionsForJobFuture; - }, - (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway, releasePartitionsForJobFuture) -> { + partitionTracker -> partitionTracker.setStopTrackingAndReleaseAllPartitionsConsumer(releasePartitionsForJobFuture::complete), + (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> { taskExecutorGateway.disconnectJobManager(jobId, new Exception("test")); assertThat(releasePartitionsForJobFuture.get(), equalTo(jobId)); @@ -228,13 +225,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { @Test public void testPartitionReleaseAfterReleaseCall() throws Exception { + final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>(); testPartitionRelease( - partitionTracker -> { - final CompletableFuture<Collection<ResultPartitionID>> releasePartitionsFuture = new CompletableFuture<>(); - partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete); - return releasePartitionsFuture; - }, - (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> { + partitionTracker -> partitionTracker.setStopTrackingAndReleasePartitionsConsumer(releasePartitionsFuture::complete), + (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> { final ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(); taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.singleton(resultPartitionId), Collections.emptySet()); @@ -246,13 +240,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { @Test public void testPartitionPromotion() throws Exception { + final CompletableFuture<Collection<ResultPartitionID>> promotePartitionsFuture = new CompletableFuture<>(); testPartitionRelease( - partitionTracker -> { - final CompletableFuture<Collection<ResultPartitionID>> promotePartitionsFuture = new CompletableFuture<>(); - partitionTracker.setPromotePartitionsConsumer(promotePartitionsFuture::complete); - return promotePartitionsFuture; - }, - (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway, promotePartitionsFuture) -> { + partitionTracker -> partitionTracker.setPromotePartitionsConsumer(promotePartitionsFuture::complete), + (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> { final ResultPartitionID resultPartitionId = resultPartitionDeploymentDescriptor.getShuffleDescriptor().getResultPartitionID(); taskExecutorGateway.releaseOrPromotePartitions(jobId, Collections.emptySet(), Collections.singleton(resultPartitionId)); @@ -264,13 +255,10 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { @Test public void testClusterPartitionRelease() throws Exception { + final CompletableFuture<Collection<IntermediateDataSetID>> releasePartitionsFuture = new CompletableFuture<>(); testPartitionRelease( - partitionTracker -> { - final CompletableFuture<Collection<IntermediateDataSetID>> releasePartitionsFuture = new CompletableFuture<>(); - partitionTracker.setReleaseClusterPartitionsConsumer(releasePartitionsFuture::complete); - return releasePartitionsFuture; - }, - (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway, releasePartitionsFuture) -> { + partitionTracker -> partitionTracker.setReleaseClusterPartitionsConsumer(releasePartitionsFuture::complete), + (jobId, resultPartitionDeploymentDescriptor, taskExecutor, taskExecutorGateway) -> { final IntermediateDataSetID dataSetId = resultPartitionDeploymentDescriptor.getResultId(); taskExecutorGateway.releaseClusterPartitions(Collections.singleton(dataSetId), timeout); @@ -341,7 +329,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { final TestingTaskExecutorPartitionTracker partitionTracker = new TestingTaskExecutorPartitionTracker(); final CompletableFuture<ResultPartitionID> startTrackingFuture = new CompletableFuture<>(); partitionTracker.setStartTrackingPartitionsConsumer((jobId, partitionInfo) -> startTrackingFuture.complete(partitionInfo.getResultPartitionId())); - C partitionTrackerSetupResult = partitionTrackerSetup.accept(partitionTracker); + partitionTrackerSetup.accept(partitionTracker); final TestingTaskExecutor taskExecutor = createTestingTaskExecutor(taskManagerServices, partitionTracker); @@ -421,8 +409,7 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { jobId, taskResultPartitionDescriptor, taskExecutor, - taskExecutorGateway, - partitionTrackerSetupResult); + taskExecutorGateway); } finally { RpcUtils.terminateRpcEndpoint(taskExecutor, timeout); } @@ -478,12 +465,12 @@ public class TaskExecutorPartitionLifecycleTest extends TestLogger { } @FunctionalInterface - private interface PartitionTrackerSetup<C> { - C accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception; + private interface PartitionTrackerSetup { + void accept(TestingTaskExecutorPartitionTracker partitionTracker) throws Exception; } @FunctionalInterface - private interface TestAction<C> { - void accept(JobID jobId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway, C partitionTrackerSetupResult) throws Exception; + private interface TestAction { + void accept(JobID jobId, ResultPartitionDeploymentDescriptor resultPartitionDeploymentDescriptor, TaskExecutor taskExecutor, TaskExecutorGateway taskExecutorGateway) throws Exception; } }
