tillrohrmann commented on a change in pull request #8318: 
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph 
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280434123
 
 

 ##########
 File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
 ##########
 @@ -611,26 +473,11 @@ public void declineCheckpoint(DeclineCheckpoint decline) 
{
                        JobVertexID jobVertexId,
                        KeyGroupRange keyGroupRange,
                        String registrationName) {
-               if (jobGraph.getJobID().equals(jobId)) {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Key value state unregistered for job 
{} under name {}.",
-                                       jobGraph.getJobID(), registrationName);
-                       }
-
-                       try {
-                               
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
-                                       jobVertexId, keyGroupRange, 
registrationName);
-
-                               return 
CompletableFuture.completedFuture(Acknowledge.get());
-                       } catch (Exception e) {
-                               log.error("Failed to notify KvStateRegistry 
about unregistration {}.", registrationName, e);
-                               return FutureUtils.completedExceptionally(e);
-                       }
-               } else {
-                       if (log.isDebugEnabled()) {
-                               log.debug("Notification about key-value state 
deregistration for unknown job {} received.", jobId);
-                       }
-                       return FutureUtils.completedExceptionally(new 
FlinkJobNotFoundException(jobId));
+               try {
+                       schedulerNG.notifyKvStateUnregistered(jobId, 
jobVertexId, keyGroupRange, registrationName);
+                       return 
CompletableFuture.completedFuture(Acknowledge.get());
+               } catch (FlinkJobNotFoundException e) {
 
 Review comment:
   and here

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to