[FLINK-9406] Use equals in JobMaster#requestPartitionState Use equals instead of referential equality in JobMaster#requestPartitionState when comparing the producerExecution attempt id with the result partition producer id.
This closes #6057. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/777f50a9 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/777f50a9 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/777f50a9 Branch: refs/heads/master Commit: 777f50a91aa60cee9c863b5fca0fd481de8b01b9 Parents: 0643797 Author: Till Rohrmann <[email protected]> Authored: Wed May 23 00:47:45 2018 +0200 Committer: Till Rohrmann <[email protected]> Committed: Wed May 23 00:58:57 2018 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 2 +- .../flink/runtime/jobmaster/JobMasterTest.java | 94 ++++++++++++++++++++ 2 files changed, 95 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/777f50a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 1df9d89..aad81a1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -619,7 +619,7 @@ public class JobMaster extends FencedRpcEndpoint<JobMasterId> implements JobMast .getProducer() .getCurrentExecutionAttempt(); - if (producerExecution.getAttemptId() == resultPartitionId.getProducerId()) { + if (producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) { return CompletableFuture.completedFuture(producerExecution.getState()); } else { return FutureUtils.completedExceptionally(new PartitionProducerDisposedException(resultPartitionId)); http://git-wip-us.apache.org/repos/asf/flink/blob/777f50a9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 99cdc16..552c0f0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -38,14 +38,22 @@ import org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore; import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory; import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2; +import org.apache.flink.runtime.clusterframework.types.AllocationID; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; +import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph; +import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.io.network.partition.ResultPartitionID; +import org.apache.flink.runtime.io.network.partition.ResultPartitionType; +import org.apache.flink.runtime.jobgraph.DistributionPattern; +import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.JobVertex; import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings; @@ -67,6 +75,7 @@ import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; +import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.util.TestingFatalErrorHandler; @@ -96,6 +105,7 @@ import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.contains; import static org.hamcrest.Matchers.equalTo; import static org.hamcrest.Matchers.hasSize; import static org.hamcrest.Matchers.is; @@ -615,6 +625,90 @@ public class JobMasterTest extends TestLogger { } } + /** + * Tests the {@link JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)} + * call for a finished result partition. + */ + @Test + public void testRequestPartitionState() throws Exception { + final JobGraph producerConsumerJobGraph = producerConsumerJobGraph(); + final JobMaster jobMaster = createJobMaster( + JobMasterConfiguration.fromConfiguration(configuration), + producerConsumerJobGraph, + haServices, + new TestingJobManagerSharedServicesBuilder().build(), + heartbeatServices); + + jobMaster.start(jobMasterId, testingTimeout); + + try { + final TestingResourceManagerGateway testingResourceManagerGateway = new TestingResourceManagerGateway(); + + final CompletableFuture<AllocationID> allocationIdFuture = new CompletableFuture<>(); + testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> allocationIdFuture.complete(slotRequest.getAllocationId())); + + rpcService.registerGateway(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway); + + final CompletableFuture<TaskDeploymentDescriptor> tddFuture = new CompletableFuture<>(); + final TestingTaskExecutorGateway testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder() + .setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> { + tddFuture.complete(taskDeploymentDescriptor); + return CompletableFuture.completedFuture(Acknowledge.get()); + }) + .createTestingTaskExecutorGateway(); + rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), testingTaskExecutorGateway); + + final JobMasterGateway jobMasterGateway = jobMaster.getSelfGateway(JobMasterGateway.class); + + rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(), testingResourceManagerGateway.getFencingToken().toUUID()); + + final AllocationID allocationId = allocationIdFuture.get(); + + final LocalTaskManagerLocation taskManagerLocation = new LocalTaskManagerLocation(); + jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), taskManagerLocation, testingTimeout).get(); + + final SlotOffer slotOffer = new SlotOffer(allocationId, 0, ResourceProfile.UNKNOWN); + + final Collection<SlotOffer> slotOffers = jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), Collections.singleton(slotOffer), testingTimeout).get(); + + assertThat(slotOffers, hasSize(1)); + assertThat(slotOffers, contains(slotOffer)); + + // obtain tdd for the result partition ids + final TaskDeploymentDescriptor tdd = tddFuture.get(); + + assertThat(tdd.getProducedPartitions(), hasSize(1)); + final ResultPartitionDeploymentDescriptor partition = tdd.getProducedPartitions().iterator().next(); + + final ExecutionAttemptID executionAttemptId = tdd.getExecutionAttemptId(); + final ExecutionAttemptID copiedExecutionAttemptId = new ExecutionAttemptID(executionAttemptId.getLowerPart(), executionAttemptId.getUpperPart()); + + // finish the producer task + jobMasterGateway.updateTaskExecutionState(new TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, ExecutionState.FINISHED)).get(); + + // request the state of the result partition of the producer + final CompletableFuture<ExecutionState> partitionStateFuture = jobMasterGateway.requestPartitionState(partition.getResultId(), new ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId)); + + assertThat(partitionStateFuture.get(), equalTo(ExecutionState.FINISHED)); + } finally { + RpcUtils.terminateRpcEndpoint(jobMaster, testingTimeout); + } + } + + private JobGraph producerConsumerJobGraph() { + final JobVertex producer = new JobVertex("Producer"); + producer.setInvokableClass(NoOpInvokable.class); + final JobVertex consumer = new JobVertex("Consumer"); + consumer.setInvokableClass(NoOpInvokable.class); + + consumer.connectNewDataSetAsInput(producer, DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING); + + final JobGraph jobGraph = new JobGraph(producer, consumer); + jobGraph.setAllowQueuedScheduling(true); + + return jobGraph; + } + private File createSavepoint(long savepointId) throws IOException { final File savepointFile = temporaryFolder.newFile(); final SavepointV2 savepoint = new SavepointV2(savepointId, Collections.emptyList(), Collections.emptyList());
