[
https://issues.apache.org/jira/browse/FLINK-4512?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568224#comment-15568224
]
ASF GitHub Bot commented on FLINK-4512:
---------------------------------------
Github user uce commented on a diff in the pull request:
https://github.com/apache/flink/pull/2608#discussion_r82971228
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
---
@@ -282,29 +279,71 @@ public boolean isShutdown() {
// Handling checkpoints and messages
//
--------------------------------------------------------------------------------------------
- public Future<String> triggerSavepoint(long timestamp) throws Exception
{
- CheckpointTriggerResult result = triggerCheckpoint(timestamp,
CheckpointProperties.forStandardSavepoint());
+ /**
+ * Triggers a savepoint with the default savepoint directory as a
target.
+ *
+ * @param timestamp The timestamp for the savepoint.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no default savepoint directory has
been configured
+ * @throws Exception Failures during triggering are forwarded
+ */
+ public Future<CompletedCheckpoint> triggerSavepoint(long timestamp)
throws Exception {
+ return triggerSavepoint(timestamp, null);
+ }
+
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param timestamp The timestamp for the savepoint.
+ * @param savepointDirectory Target directory for the savepoint.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
+ * specified and no default savepoint
directory has been
+ * configured
+ * @throws Exception Failures during triggering are
forwarded
+ */
+ public Future<CompletedCheckpoint> triggerSavepoint(long timestamp,
String savepointDirectory) throws Exception {
+ String targetDirectory;
+ if (savepointDirectory != null) {
+ targetDirectory = savepointDirectory;
+ } else if (this.savepointDirectory != null) {
+ targetDirectory = this.savepointDirectory;
+ } else {
+ throw new IllegalStateException("No savepoint directory
configured. " +
+ "You can either specify a directory
when triggering this savepoint or " +
+ "configure a cluster-wide default via
key '" +
+ ConfigConstants.SAVEPOINT_DIRECTORY_KEY
+ "'.");
+ }
+
+ CheckpointProperties props =
CheckpointProperties.forStandardSavepoint();
+ CheckpointTriggerResult result = triggerCheckpoint(timestamp,
props, targetDirectory);
if (result.isSuccess()) {
- PendingSavepoint savepoint = (PendingSavepoint)
result.getPendingCheckpoint();
- return savepoint.getCompletionFuture();
- }
- else {
- return Futures.failed(new Exception("Failed to trigger
savepoint: " + result.getFailureReason().message()));
+ return
result.getPendingCheckpoint().getCompletionFuture();
+ } else {
+ CompletableFuture<CompletedCheckpoint> failed = new
FlinkCompletableFuture<>();
--- End diff --
I was looking for that one, but didn't find it. Thanks!
> Add option for persistent checkpoints
> -------------------------------------
>
> Key: FLINK-4512
> URL: https://issues.apache.org/jira/browse/FLINK-4512
> Project: Flink
> Issue Type: Sub-task
> Components: State Backends, Checkpointing
> Reporter: Ufuk Celebi
> Assignee: Ufuk Celebi
>
> Allow periodic checkpoints to be persisted by writing out their meta data.
> This is what we currently do for savepoints, but in the future checkpoints
> and savepoints are likely to diverge with respect to guarantees they give for
> updatability, etc.
> This means that the difference between persistent checkpoints and savepoints
> in the long term will be that persistent checkpoints can only be restored
> with the same job settings (like parallelism, etc.)
> Regular and persisted checkpoints should behave differently with respect to
> disposal in *globally* terminal job states (FINISHED, CANCELLED, FAILED):
> regular checkpoints are cleaned up in all of these cases whereas persistent
> checkpoints only on FINISHED. Maybe with the option to customize behaviour on
> CANCELLED or FAILED.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)