[ https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15565535#comment-15565535 ]
ASF GitHub Bot commented on FLINK-4512: --------------------------------------- Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/2608#discussion_r82778557 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java --- @@ -282,29 +279,71 @@ public boolean isShutdown() { // Handling checkpoints and messages // -------------------------------------------------------------------------------------------- - public Future<String> triggerSavepoint(long timestamp) throws Exception { - CheckpointTriggerResult result = triggerCheckpoint(timestamp, CheckpointProperties.forStandardSavepoint()); + /** + * Triggers a savepoint with the default savepoint directory as a target. + * + * @param timestamp The timestamp for the savepoint. + * @return A future to the completed checkpoint + * @throws IllegalStateException If no default savepoint directory has been configured + * @throws Exception Failures during triggering are forwarded + */ + public Future<CompletedCheckpoint> triggerSavepoint(long timestamp) throws Exception { + return triggerSavepoint(timestamp, null); + } + + /** + * Triggers a savepoint with the given savepoint directory as a target. + * + * @param timestamp The timestamp for the savepoint. + * @param savepointDirectory Target directory for the savepoint. + * @return A future to the completed checkpoint + * @throws IllegalStateException If no savepoint directory has been + * specified and no default savepoint directory has been + * configured + * @throws Exception Failures during triggering are forwarded + */ + public Future<CompletedCheckpoint> triggerSavepoint(long timestamp, String savepointDirectory) throws Exception { + String targetDirectory; + if (savepointDirectory != null) { + targetDirectory = savepointDirectory; + } else if (this.savepointDirectory != null) { + targetDirectory = this.savepointDirectory; + } else { + throw new IllegalStateException("No savepoint directory configured. " + + "You can either specify a directory when triggering this savepoint or " + + "configure a cluster-wide default via key '" + + ConfigConstants.SAVEPOINT_DIRECTORY_KEY + "'."); + } + + CheckpointProperties props = CheckpointProperties.forStandardSavepoint(); + CheckpointTriggerResult result = triggerCheckpoint(timestamp, props, targetDirectory); if (result.isSuccess()) { - PendingSavepoint savepoint = (PendingSavepoint) result.getPendingCheckpoint(); - return savepoint.getCompletionFuture(); - } - else { - return Futures.failed(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); + return result.getPendingCheckpoint().getCompletionFuture(); + } else { + CompletableFuture<CompletedCheckpoint> failed = new FlinkCompletableFuture<>(); + failed.completeExceptionally(new Exception("Failed to trigger savepoint: " + result.getFailureReason().message())); --- End diff -- Not adding the complete stack trace is on purpose, right? I'm wondering whether this could not help to debug problems later. > Add option for persistent checkpoints > ------------------------------------- > > Key: FLINK-4512 > URL: https://issues.apache.org/jira/browse/FLINK-4512 > Project: Flink > Issue Type: Sub-task > Components: State Backends, Checkpointing > Reporter: Ufuk Celebi > Assignee: Ufuk Celebi > > Allow periodic checkpoints to be persisted by writing out their meta data. > This is what we currently do for savepoints, but in the future checkpoints > and savepoints are likely to diverge with respect to guarantees they give for > updatability, etc. > This means that the difference between persistent checkpoints and savepoints > in the long term will be that persistent checkpoints can only be restored > with the same job settings (like parallelism, etc.) > Regular and persisted checkpoints should behave differently with respect to > disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED): > regular checkpoints are cleaned up in all of these cases whereas persistent > checkpoints only on FINISHED. Maybe with the option to customize behaviour on > CANCELLED or FAILED. -- This message was sent by Atlassian JIRA (v6.3.4#6332)