tillrohrmann commented on a change in pull request #8318:
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280434081
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -582,26 +458,12 @@ public void declineCheckpoint(DeclineCheckpoint decline)
{
final String registrationName,
final KvStateID kvStateId,
final InetSocketAddress kvStateServerAddress) {
- if (jobGraph.getJobID().equals(jobId)) {
- if (log.isDebugEnabled()) {
- log.debug("Key value state registered for job
{} under name {}.",
- jobGraph.getJobID(), registrationName);
- }
-
- try {
-
executionGraph.getKvStateLocationRegistry().notifyKvStateRegistered(
- jobVertexId, keyGroupRange,
registrationName, kvStateId, kvStateServerAddress);
- return
CompletableFuture.completedFuture(Acknowledge.get());
- } catch (Exception e) {
- log.error("Failed to notify KvStateRegistry
about registration {}.", registrationName, e);
- return FutureUtils.completedExceptionally(e);
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Notification about key-value state
registration for unknown job {} received.", jobId);
- }
- return FutureUtils.completedExceptionally(new
FlinkJobNotFoundException(jobId));
+ try {
+ schedulerNG.notifyKvStateRegistered(jobId, jobVertexId,
keyGroupRange, registrationName, kvStateId, kvStateServerAddress);
+ return
CompletableFuture.completedFuture(Acknowledge.get());
+ } catch (FlinkJobNotFoundException e) {
Review comment:
same here
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services