tillrohrmann commented on a change in pull request #8318:
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280432648
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -437,43 +391,20 @@ public JobMaster(
final IntermediateDataSetID intermediateResultId,
final ResultPartitionID resultPartitionId) {
- final Execution execution =
executionGraph.getRegisteredExecutions().get(resultPartitionId.getProducerId());
- if (execution != null) {
- return
CompletableFuture.completedFuture(execution.getState());
- }
- else {
- final IntermediateResult intermediateResult =
-
executionGraph.getAllIntermediateResults().get(intermediateResultId);
-
- if (intermediateResult != null) {
- // Try to find the producing execution
- Execution producerExecution = intermediateResult
-
.getPartitionById(resultPartitionId.getPartitionId())
- .getProducer()
- .getCurrentExecutionAttempt();
-
- if
(producerExecution.getAttemptId().equals(resultPartitionId.getProducerId())) {
- return
CompletableFuture.completedFuture(producerExecution.getState());
- } else {
- return
FutureUtils.completedExceptionally(new
PartitionProducerDisposedException(resultPartitionId));
- }
- } else {
- return FutureUtils.completedExceptionally(new
IllegalArgumentException("Intermediate data set with ID "
- + intermediateResultId + " not
found."));
- }
+ try {
+ return
CompletableFuture.completedFuture(schedulerNG.requestPartitionState(intermediateResultId,
resultPartitionId));
+ } catch (PartitionProducerDisposedException e) {
Review comment:
I would also log the exception here before it leaves the component.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services