tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280434081
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##########
 @@ -582,26 +458,12 @@ public void declineCheckpoint(DeclineCheckpoint decline) 
{
                        final String registrationName,
                        final KvStateID kvStateId,
                        final InetSocketAddress kvStateServerAddress) {
-               if (jobGraph.getJobID().equals(jobId)) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Key value state registered for job 
{} under name {}.",
-                                       jobGraph.getJobID(), registrationName);
-                       }
-
-                       try {
-                               
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
-                                       jobVertexId, keyGroupRange, 
registrationName, kvStateId, kvStateServerAddress);
 
-                               return 
CompletableFuture.completedFuture(Acknowledge.get());
-                       } catch (Exception e) {
-                               log.error("Failed to notify KvStateRegistry 
about registration {}.", registrationName, e);
-                               return FutureUtils.completedExceptionally(e);
-                       }
-               } else {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Notification about key-value state 
registration for unknown job {} received.", jobId);
-                       }
-                       return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+               try {
+                       schedulerNG.notifyKvStateRegistered(jobId, jobVertexId, 
keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               } catch (FlinkJobNotFoundException e) {
 
 Review comment:
   same here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to