tillrohrmann commented on a change in pull request #8318:
[FLINK-12231][runtime] Introduce Scheduler interface and adapt ExecutionGraph
to it
URL: https://github.com/apache/flink/pull/8318#discussion_r280434680
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -809,80 +625,18 @@ public void heartbeatFromResourceManager(final
ResourceID resourceID) {
final boolean advanceToEndOfEventTime,
final Time timeout) {
- final CheckpointCoordinator checkpointCoordinator =
executionGraph.getCheckpointCoordinator();
-
- if (checkpointCoordinator == null) {
- return FutureUtils.completedExceptionally(new
IllegalStateException(
- String.format("Job %s is not a
streaming job.", jobGraph.getJobID())));
- }
-
- if (targetDirectory == null &&
!checkpointCoordinator.getCheckpointStorage().hasDefaultSavepointLocation()) {
- log.info("Trying to cancel job {} with savepoint, but
no savepoint directory configured.", jobGraph.getJobID());
-
- return FutureUtils.completedExceptionally(new
IllegalStateException(
- "No savepoint directory configured. You
can either specify a directory " +
- "while cancelling via
-s :targetDirectory or configure a cluster-wide " +
- "default via key '" +
CheckpointingOptions.SAVEPOINT_DIRECTORY.key() + "'."));
- }
-
- final long now = System.currentTimeMillis();
-
- // we stop the checkpoint coordinator so that we are guaranteed
- // to have only the data of the synchronous savepoint committed.
- // in case of failure, and if the job restarts, the coordinator
- // will be restarted by the CheckpointCoordinatorDeActivator.
- checkpointCoordinator.stopCheckpointScheduler();
-
- final CompletableFuture<String> savepointFuture =
checkpointCoordinator
- .triggerSynchronousSavepoint(now,
advanceToEndOfEventTime, targetDirectory)
- .handleAsync((completedCheckpoint, throwable)
-> {
- if (throwable != null) {
- log.info("Failed during
stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(),
throwable.getMessage());
- throw new
CompletionException(throwable);
- }
- return
completedCheckpoint.getExternalPointer();
- }, getMainThreadExecutor());
-
- final CompletableFuture<JobStatus> terminationFuture =
executionGraph
- .getTerminationFuture()
- .handleAsync((jobstatus, throwable) -> {
-
- if (throwable != null) {
- log.info("Failed during
stopping job {} with a savepoint. Reason: {}", jobGraph.getJobID(),
throwable.getMessage());
- throw new
CompletionException(throwable);
- } else if(jobstatus !=
JobStatus.FINISHED) {
- log.info("Failed during
stopping job {} with a savepoint. Reason: Reached state {} instead of
FINISHED.", jobGraph.getJobID(), jobstatus);
- throw new
CompletionException(new FlinkException("Reached state " + jobstatus + " instead
of FINISHED."));
- }
- return jobstatus;
- }, getMainThreadExecutor());
-
- return savepointFuture.thenCompose((path) ->
- terminationFuture.thenApply((jobStatus -> path)));
- }
-
- private void startCheckpointScheduler(final CheckpointCoordinator
checkpointCoordinator) {
- if (checkpointCoordinator.isPeriodicCheckpointingConfigured()) {
- try {
-
checkpointCoordinator.startCheckpointScheduler();
- } catch (IllegalStateException ignored) {
- // Concurrent shut down of the coordinator
- }
- }
+ return schedulerNG.stopWithSavepoint(targetDirectory,
advanceToEndOfEventTime);
}
@Override
public CompletableFuture<OperatorBackPressureStatsResponse>
requestOperatorBackPressureStats(final JobVertexID jobVertexId) {
- final ExecutionJobVertex jobVertex =
executionGraph.getJobVertex(jobVertexId);
- if (jobVertex == null) {
- return FutureUtils.completedExceptionally(new
FlinkException("JobVertexID not found " +
- jobVertexId));
+ try {
+ final Optional<OperatorBackPressureStats>
operatorBackPressureStats =
schedulerNG.requestOperatorBackPressureStats(jobVertexId);
+ return
CompletableFuture.completedFuture(OperatorBackPressureStatsResponse.of(
+ operatorBackPressureStats.orElse(null)));
+ } catch (FlinkException e) {
Review comment:
logging statement missing
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services