tillrohrmann commented on a change in pull request #8318:
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280434123
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -611,26 +473,11 @@ public void declineCheckpoint(DeclineCheckpoint decline)
{
JobVertexID jobVertexId,
KeyGroupRange keyGroupRange,
String registrationName) {
- if (jobGraph.getJobID().equals(jobId)) {
- if (log.isDebugEnabled()) {
- log.debug("Key value state unregistered for job
{} under name {}.",
- jobGraph.getJobID(), registrationName);
- }
-
- try {
-
executionGraph.getKvStateLocationRegistry().notifyKvStateUnregistered(
- jobVertexId, keyGroupRange,
registrationName);
-
- return
CompletableFuture.completedFuture(Acknowledge.get());
- } catch (Exception e) {
- log.error("Failed to notify KvStateRegistry
about unregistration {}.", registrationName, e);
- return FutureUtils.completedExceptionally(e);
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Notification about key-value state
deregistration for unknown job {} received.", jobId);
- }
- return FutureUtils.completedExceptionally(new
FlinkJobNotFoundException(jobId));
+ try {
+ schedulerNG.notifyKvStateUnregistered(jobId,
jobVertexId, keyGroupRange, registrationName);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ } catch (FlinkJobNotFoundException e) {
Review comment:
and here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services