tillrohrmann commented on a change in pull request #8318:
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280433723
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -501,76 +432,21 @@ public void acknowledgeCheckpoint(
final CheckpointMetrics checkpointMetrics,
final TaskStateSnapshot checkpointState) {
- final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
- final AcknowledgeCheckpoint ackMessage = new
AcknowledgeCheckpoint(
- jobID,
- executionAttemptID,
- checkpointId,
- checkpointMetrics,
- checkpointState);
-
- if (checkpointCoordinator != null) {
- getRpcService().execute(() -> {
- try {
-
checkpointCoordinator.receiveAcknowledgeMessage(ackMessage);
- } catch (Throwable t) {
- log.warn("Error while processing
checkpoint acknowledgement message", t);
- }
- });
- } else {
- String errorMessage = "Received AcknowledgeCheckpoint
message for job {} with no CheckpointCoordinator";
- if (executionGraph.getState() == JobStatus.RUNNING) {
- log.error(errorMessage, jobGraph.getJobID());
- } else {
- log.debug(errorMessage, jobGraph.getJobID());
- }
- }
+ schedulerNG.acknowledgeCheckpoint(jobID, executionAttemptID,
checkpointId, checkpointMetrics, checkpointState);
}
// TODO: This method needs a leader session ID
@Override
public void declineCheckpoint(DeclineCheckpoint decline) {
- final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
-
- if (checkpointCoordinator != null) {
- getRpcService().execute(() -> {
- try {
-
checkpointCoordinator.receiveDeclineMessage(decline);
- } catch (Exception e) {
- log.error("Error in
CheckpointCoordinator while processing {}", decline, e);
- }
- });
- } else {
- String errorMessage = "Received DeclineCheckpoint
message for job {} with no CheckpointCoordinator";
- if (executionGraph.getState() == JobStatus.RUNNING) {
- log.error(errorMessage, jobGraph.getJobID());
- } else {
- log.debug(errorMessage, jobGraph.getJobID());
- }
- }
+ schedulerNG.declineCheckpoint(decline);
}
@Override
public CompletableFuture<KvStateLocation> requestKvStateLocation(final
JobID jobId, final String registrationName) {
- // sanity check for the correct JobID
- if (jobGraph.getJobID().equals(jobId)) {
- if (log.isDebugEnabled()) {
- log.debug("Lookup key-value state for job {}
with registration " +
- "name {}.", jobGraph.getJobID(),
registrationName);
- }
-
- final KvStateLocationRegistry registry =
executionGraph.getKvStateLocationRegistry();
- final KvStateLocation location =
registry.getKvStateLocation(registrationName);
- if (location != null) {
- return
CompletableFuture.completedFuture(location);
- } else {
- return FutureUtils.completedExceptionally(new
UnknownKvStateLocation(registrationName));
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Request of key-value state location
for unknown job {} received.", jobId);
- }
- return FutureUtils.completedExceptionally(new
FlinkJobNotFoundException(jobId));
+ try {
+ return
CompletableFuture.completedFuture(schedulerNG.requestKvStateLocation(jobId,
registrationName));
+ } catch (UnknownKvStateLocation | FlinkJobNotFoundException e) {
Review comment:
logging statement missing
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services