[FLINK-9406] Use equals in JobMaster#requestPartitionState

Use equals instead of referential equality in JobMaster#requestPartitionState
when comparing the producerExecution attempt id with the result partition
producer id.

This closes #6057.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/777f50a9
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/777f50a9
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/777f50a9

Branch: refs/heads/master
Commit: 777f50a91aa60cee9c863b5fca0fd481de8b01b9
Parents: 0643797
Author: Till Rohrmann <[email protected]>
Authored: Wed May 23 00:47:45 2018 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Wed May 23 00:58:57 2018 +0200

----------------------------------------------------------------------
 .../flink/runtime/jobmaster/JobMaster.java      |  2 +-
 .../flink/runtime/jobmaster/JobMasterTest.java  | 94 ++++++++++++++++++++
 2 files changed, 95 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/777f50a9/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
index 1df9d89..aad81a1 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
@@ -619,7 +619,7 @@ public class JobMaster extends 
FencedRpcEndpoint<JobMasterId> implements JobMast
                                                .getProducer()
                                                .getCurrentExecutionAttempt();
 
-                               if (producerExecution.getAttemptId() == 
resultPartitionId.getProducerId()) {
+                               if 
(producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
                                        return 
CompletableFuture.completedFuture(producerExecution.getState());
                                } else {
                                        return 
FutureUtils.completedExceptionally(new 
PartitionProducerDisposedException(resultPartitionId));

http://git-wip-us.apache.org/repos/asf/flink/blob/777f50a9/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
index 99cdc16..552c0f0 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java
@@ -38,14 +38,22 @@ import 
org.apache.flink.runtime.checkpoint.StandaloneCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.StandaloneCompletedCheckpointStore;
 import org.apache.flink.runtime.checkpoint.TestingCheckpointRecoveryFactory;
 import org.apache.flink.runtime.checkpoint.savepoint.SavepointV2;
+import org.apache.flink.runtime.clusterframework.types.AllocationID;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor;
 import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor;
+import org.apache.flink.runtime.execution.ExecutionState;
 import org.apache.flink.runtime.executiongraph.ArchivedExecutionGraph;
+import org.apache.flink.runtime.executiongraph.ExecutionAttemptID;
 import org.apache.flink.runtime.heartbeat.HeartbeatServices;
 import org.apache.flink.runtime.heartbeat.TestingHeartbeatServices;
 import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
 import 
org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionID;
+import org.apache.flink.runtime.io.network.partition.ResultPartitionType;
+import org.apache.flink.runtime.jobgraph.DistributionPattern;
+import org.apache.flink.runtime.jobgraph.IntermediateDataSetID;
 import org.apache.flink.runtime.jobgraph.JobGraph;
 import org.apache.flink.runtime.jobgraph.JobVertex;
 import org.apache.flink.runtime.jobgraph.SavepointRestoreSettings;
@@ -67,6 +75,7 @@ import 
org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
 import org.apache.flink.runtime.taskexecutor.slot.SlotOffer;
 import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation;
+import org.apache.flink.runtime.taskmanager.TaskExecutionState;
 import org.apache.flink.runtime.taskmanager.TaskManagerLocation;
 import org.apache.flink.runtime.testtasks.NoOpInvokable;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
@@ -96,6 +105,7 @@ import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
 
 import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.contains;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.hasSize;
 import static org.hamcrest.Matchers.is;
@@ -615,6 +625,90 @@ public class JobMasterTest extends TestLogger {
                }
        }
 
+       /**
+        * Tests the {@link 
JobMaster#requestPartitionState(IntermediateDataSetID, ResultPartitionID)}
+        * call for a finished result partition.
+        */
+       @Test
+       public void testRequestPartitionState() throws Exception {
+               final JobGraph producerConsumerJobGraph = 
producerConsumerJobGraph();
+               final JobMaster jobMaster = createJobMaster(
+                       JobMasterConfiguration.fromConfiguration(configuration),
+                       producerConsumerJobGraph,
+                       haServices,
+                       new TestingJobManagerSharedServicesBuilder().build(),
+                       heartbeatServices);
+
+               jobMaster.start(jobMasterId, testingTimeout);
+
+               try {
+                       final TestingResourceManagerGateway 
testingResourceManagerGateway = new TestingResourceManagerGateway();
+
+                       final CompletableFuture<AllocationID> 
allocationIdFuture = new CompletableFuture<>();
+                       
testingResourceManagerGateway.setRequestSlotConsumer(slotRequest -> 
allocationIdFuture.complete(slotRequest.getAllocationId()));
+
+                       
rpcService.registerGateway(testingResourceManagerGateway.getAddress(), 
testingResourceManagerGateway);
+
+                       final CompletableFuture<TaskDeploymentDescriptor> 
tddFuture = new CompletableFuture<>();
+                       final TestingTaskExecutorGateway 
testingTaskExecutorGateway = new TestingTaskExecutorGatewayBuilder()
+                               
.setSubmitTaskConsumer((taskDeploymentDescriptor, jobMasterId) -> {
+                    tddFuture.complete(taskDeploymentDescriptor);
+                    return 
CompletableFuture.completedFuture(Acknowledge.get());
+                })
+                               .createTestingTaskExecutorGateway();
+                       
rpcService.registerGateway(testingTaskExecutorGateway.getAddress(), 
testingTaskExecutorGateway);
+
+                       final JobMasterGateway jobMasterGateway = 
jobMaster.getSelfGateway(JobMasterGateway.class);
+
+                       
rmLeaderRetrievalService.notifyListener(testingResourceManagerGateway.getAddress(),
 testingResourceManagerGateway.getFencingToken().toUUID());
+
+                       final AllocationID allocationId = 
allocationIdFuture.get();
+
+                       final LocalTaskManagerLocation taskManagerLocation = 
new LocalTaskManagerLocation();
+                       
jobMasterGateway.registerTaskManager(testingTaskExecutorGateway.getAddress(), 
taskManagerLocation, testingTimeout).get();
+
+                       final SlotOffer slotOffer = new SlotOffer(allocationId, 
0, ResourceProfile.UNKNOWN);
+
+                       final Collection<SlotOffer> slotOffers = 
jobMasterGateway.offerSlots(taskManagerLocation.getResourceID(), 
Collections.singleton(slotOffer), testingTimeout).get();
+
+                       assertThat(slotOffers, hasSize(1));
+                       assertThat(slotOffers, contains(slotOffer));
+
+                       // obtain tdd for the result partition ids
+                       final TaskDeploymentDescriptor tdd = tddFuture.get();
+
+                       assertThat(tdd.getProducedPartitions(), hasSize(1));
+                       final ResultPartitionDeploymentDescriptor partition = 
tdd.getProducedPartitions().iterator().next();
+
+                       final ExecutionAttemptID executionAttemptId = 
tdd.getExecutionAttemptId();
+                       final ExecutionAttemptID copiedExecutionAttemptId = new 
ExecutionAttemptID(executionAttemptId.getLowerPart(), 
executionAttemptId.getUpperPart());
+
+                       // finish the producer task
+                       jobMasterGateway.updateTaskExecutionState(new 
TaskExecutionState(producerConsumerJobGraph.getJobID(), executionAttemptId, 
ExecutionState.FINISHED)).get();
+
+                       // request the state of the result partition of the 
producer
+                       final CompletableFuture<ExecutionState> 
partitionStateFuture = 
jobMasterGateway.requestPartitionState(partition.getResultId(), new 
ResultPartitionID(partition.getPartitionId(), copiedExecutionAttemptId));
+
+                       assertThat(partitionStateFuture.get(), 
equalTo(ExecutionState.FINISHED));
+               } finally {
+                       RpcUtils.terminateRpcEndpoint(jobMaster, 
testingTimeout);
+               }
+       }
+
+       private JobGraph producerConsumerJobGraph() {
+               final JobVertex producer = new JobVertex("Producer");
+               producer.setInvokableClass(NoOpInvokable.class);
+               final JobVertex consumer = new JobVertex("Consumer");
+               consumer.setInvokableClass(NoOpInvokable.class);
+
+               consumer.connectNewDataSetAsInput(producer, 
DistributionPattern.POINTWISE, ResultPartitionType.BLOCKING);
+
+               final JobGraph jobGraph = new JobGraph(producer, consumer);
+               jobGraph.setAllowQueuedScheduling(true);
+
+               return jobGraph;
+       }
+
        private File createSavepoint(long savepointId) throws IOException {
                final File savepointFile = temporaryFolder.newFile();
                final SavepointV2 savepoint = new SavepointV2(savepointId, 
Collections.emptyList(), Collections.emptyList());

Reply via email to