tillrohrmann commented on a change in pull request #8318:
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280432286
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -392,43 +379,10 @@ public JobMaster(
final JobVertexID vertexID,
final ExecutionAttemptID executionAttempt) {
- final Execution execution =
executionGraph.getRegisteredExecutions().get(executionAttempt);
- if (execution == null) {
- // can happen when JobManager had already unregistered
this execution upon on task failure,
- // but TaskManager get some delay to aware of that
situation
- if (log.isDebugEnabled()) {
- log.debug("Can not find Execution for attempt
{}.", executionAttempt);
- }
- // but we should TaskManager be aware of this
- return FutureUtils.completedExceptionally(new
Exception("Can not find Execution for attempt " + executionAttempt));
- }
-
- final ExecutionJobVertex vertex =
executionGraph.getJobVertex(vertexID);
- if (vertex == null) {
- log.error("Cannot find execution vertex for vertex ID
{}.", vertexID);
- return FutureUtils.completedExceptionally(new
Exception("Cannot find execution vertex for vertex ID " + vertexID));
- }
-
- if (vertex.getSplitAssigner() == null) {
- log.error("No InputSplitAssigner for vertex ID {}.",
vertexID);
- return FutureUtils.completedExceptionally(new
Exception("No InputSplitAssigner for vertex ID " + vertexID));
- }
-
- final InputSplit nextInputSplit = execution.getNextInputSplit();
-
- if (log.isDebugEnabled()) {
- log.debug("Send next input split {}.", nextInputSplit);
- }
-
try {
- final byte[] serializedInputSplit =
InstantiationUtil.serializeObject(nextInputSplit);
- return CompletableFuture.completedFuture(new
SerializedInputSplit(serializedInputSplit));
- } catch (Exception ex) {
- log.error("Could not serialize the next input split of
class {}.", nextInputSplit.getClass(), ex);
- IOException reason = new IOException("Could not
serialize the next input split of class " +
- nextInputSplit.getClass() + ".", ex);
- vertex.fail(reason);
- return FutureUtils.completedExceptionally(reason);
+ return
CompletableFuture.completedFuture(schedulerNG.requestNextInputSplit(vertexID,
executionAttempt));
+ } catch (IOException e) {
Review comment:
I would also add a logging statement logging the `e` because it might
potentially leave the distributed component here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services