tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280432286
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##########
 @@ -392,43 +379,10 @@ public JobMaster(
                        final JobVertexID vertexID,
                        final ExecutionAttemptID executionAttempt) {
 
-               final Execution execution = 
executionGraph.getRegisteredExecutions().get(executionAttempt);
-               if (execution == null) {
-                       // can happen when JobManager had already unregistered 
this execution upon on task failure,
-                       // but TaskManager get some delay to aware of that 
situation
-                       if (log.isDebugEnabled()) {
-                               log.debug("Can not find Execution for attempt 
{}.", executionAttempt);
-                       }
-                       // but we should TaskManager be aware of this
-                       return FutureUtils.completedExceptionally(new 
Exception("Can not find Execution for attempt " + executionAttempt));
-               }
-
-               final ExecutionJobVertex vertex = 
executionGraph.getJobVertex(vertexID);
-               if (vertex == null) {
-                       log.error("Cannot find execution vertex for vertex ID 
{}.", vertexID);
-                       return FutureUtils.completedExceptionally(new 
Exception("Cannot find execution vertex for vertex ID " + vertexID));
-               }
-
-               if (vertex.getSplitAssigner() == null) {
-                       log.error("No InputSplitAssigner for vertex ID {}.", 
vertexID);
-                       return FutureUtils.completedExceptionally(new 
Exception("No InputSplitAssigner for vertex ID " + vertexID));
-               }
-
-               final InputSplit nextInputSplit = execution.getNextInputSplit();
-
-               if (log.isDebugEnabled()) {
-                       log.debug("Send next input split {}.", nextInputSplit);
-               }
-
                try {
-                       final byte[] serializedInputSplit = 
InstantiationUtil.serializeObject(nextInputSplit);
-                       return CompletableFuture.completedFuture(new 
SerializedInputSplit(serializedInputSplit));
-               } catch (Exception ex) {
-                       log.error("Could not serialize the next input split of 
class {}.", nextInputSplit.getClass(), ex);
-                       IOException reason = new IOException("Could not 
serialize the next input split of class " +
-                                       nextInputSplit.getClass() + ".", ex);
-                       vertex.fail(reason);
-                       return FutureUtils.completedExceptionally(reason);
+                       return 
CompletableFuture.completedFuture(schedulerNG.requestNextInputSplit(vertexID, 
executionAttempt));
+               } catch (IOException e) {
 
 Review comment:
   I would also add a logging statement logging the `e` because it might 
potentially leave the distributed component here.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to