rkhachatryan commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147383681
##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws
Exception {
LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
}
+ @Override
+ public void putJobResourceRequirements(
+ JobID jobId, JobResourceRequirements jobResourceRequirements)
throws Exception {
+ synchronized (lock) {
+ @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+ if (jobGraph == null) {
+ throw new FileNotFoundException(
+ String.format(
+ "JobGraph for job [%s] was not found in
JobGraphStore and is needed for attaching JobResourceRequirements.",
+ jobId));
+ }
+ JobResourceRequirements.writeToJobGraph(jobGraph,
jobResourceRequirements);
+ putJobGraph(jobGraph);
Review Comment:
Could you explain how do we prevent concurrent writes?
I'm concerned about a case when a contender JM runs through the same code
path.
IIUC, that was not possible before, because it was called only during job
submission.
But now it will also be called for existing jobs, so it might be possible.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]