rkhachatryan commented on code in PR #22251:
URL: https://github.com/apache/flink/pull/22251#discussion_r1147384435


##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws 
Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) 
throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(

Review Comment:
   I think `java.util.NoSuchElementException` would be more appropriate here 
because it doesn't necessarily involves files.



##########
flink-runtime/src/test/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStoreTest.java:
##########
@@ -368,6 +376,61 @@ public void testLocalCleanupShouldReleaseHandle() throws 
Exception {
         assertThat(actual, is(testingJobGraph.getJobID().toString()));
     }
 
+    @Test
+    public void testRecoverPersistedJobResourceRequirements() throws Exception 
{
+        final Map<String, RetrievableStateHandle<JobGraph>> handles = new 
HashMap<>();
+        final TestingStateHandleStore<JobGraph> stateHandleStore =
+                builder.setAddFunction(
+                                (key, state) -> {
+                                    final RetrievableStateHandle<JobGraph> 
handle =
+                                            jobGraphStorageHelper.store(state);
+                                    handles.put(key, handle);
+                                    return handle;
+                                })
+                        .setGetFunction(
+                                key -> {
+                                    final RetrievableStateHandle<JobGraph> 
handle =
+                                            handles.get(key);
+                                    if (handle != null) {
+                                        return handle;
+                                    }
+                                    throw new 
StateHandleStore.NotExistException("Does not exist.");
+                                })
+                        .build();
+
+        final JobResourceRequirements jobResourceRequirements =
+                JobResourceRequirements.newBuilder()
+                        .setParallelismForJobVertex(new JobVertexID(), 1, 1)
+                        .build();
+
+        final JobGraphStore jobGraphStore = 
createAndStartJobGraphStore(stateHandleStore);
+        jobGraphStore.putJobGraph(testingJobGraph);
+        jobGraphStore.putJobResourceRequirements(
+                testingJobGraph.getJobID(), jobResourceRequirements);
+
+        final Optional<JobResourceRequirements> maybeRecovered =
+                JobResourceRequirements.readFromJobGraph(
+                        Objects.requireNonNull(
+                                
jobGraphStore.recoverJobGraph(testingJobGraph.getJobID())));
+        
Assertions.assertThat(maybeRecovered).get().isEqualTo(jobResourceRequirements);

Review Comment:
   Maybe also check that new resource requirements overwrite already written 
ones?



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws 
Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) 
throws Exception {
+        synchronized (lock) {

Review Comment:
   Could you explain why do we need synchronization here?
   It is already performed inside `recoverJobGraph` and `putJobGraph`. The 
latter also checks the version before writing, so there shouldn't be 
consistency issues IIUC.



##########
flink-runtime/src/main/java/org/apache/flink/runtime/jobmanager/DefaultJobGraphStore.java:
##########
@@ -241,6 +243,22 @@ public void putJobGraph(JobGraph jobGraph) throws 
Exception {
         LOG.info("Added {} to {}.", jobGraph, jobGraphStateHandleStore);
     }
 
+    @Override
+    public void putJobResourceRequirements(
+            JobID jobId, JobResourceRequirements jobResourceRequirements) 
throws Exception {
+        synchronized (lock) {
+            @Nullable final JobGraph jobGraph = recoverJobGraph(jobId);
+            if (jobGraph == null) {
+                throw new FileNotFoundException(
+                        String.format(
+                                "JobGraph for job [%s] was not found in 
JobGraphStore and is needed for attaching JobResourceRequirements.",
+                                jobId));
+            }
+            JobResourceRequirements.writeToJobGraph(jobGraph, 
jobResourceRequirements);
+            putJobGraph(jobGraph);

Review Comment:
   Could you explain how do we prevent concurrent writes?
   I'm concerned about a case when a stand-by JM runs through the same code 
path.
   
   IIUC, that was not possible before, because it was called only during job 
submission. But now it will also be called for existing jobs.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to