tillrohrmann commented on a change in pull request #17443:
URL: https://github.com/apache/flink/pull/17443#discussion_r741964620
##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -121,6 +122,8 @@ You can resume your program from this savepoint with the
run command.
The savepoint folder is optional and needs to be specified if
[state.savepoints.dir]({{< ref "docs/deployment/config"
>}}#state-savepoints-dir) isn't set.
+The savepoint timeout is optional and checkpoint timeout will take effect if
isn't set.
Review comment:
```suggestion
The savepoint timeout `--savepointTimeout` is optional and checkpoint
timeout will take effect if isn't set.
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -30,13 +31,18 @@
private boolean dispose;
private String disposeSavepointPath;
private String jarFile;
+ private long savepointTimeout;
Review comment:
Since we are not storing the unit explicitly:
```suggestion
private long savepointTimeoutMs;
```
##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -172,6 +176,8 @@ Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
We have to use `--savepointPath` to specify the savepoint folder if
[state.savepoints.dir]({{< ref "docs/deployment/config"
>}}#state-savepoints-dir) isn't set.
+The savepoint timeout is optional and checkpoint timeout will take effect if
isn't set.
Review comment:
```suggestion
The savepoint timeout `--savepointTimeout` is optional and checkpoint
timeout will take effect if isn't set.
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
static final Option SAVEPOINT_DISPOSE_OPTION =
new Option("d", "dispose", true, "Path of savepoint to dispose.");
+ static final Option SAVEPOINT_TIMEOUT_OPTION =
+ new Option("st", "savepointTimeout", true, "timeout of
savepoint.");
Review comment:
```suggestion
new Option("st", "savepointTimeout", true, "The maximum
completion time a savepoint is allowed to take before it is failed.");
```
##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -170,6 +174,8 @@ Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
We have to use `--savepointPath` to specify the savepoint folder if
[state.savepoints.dir]({{< ref "docs/deployment/config"
>}}#state-savepoints-dir) isn't set.
+The savepoint timeout is optional and checkpoint timeout will take effect if
isn't set.
Review comment:
```suggestion
The savepoint timeout `--savepointTimeout` is optional and checkpoint
timeout will take effect if isn't set.
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
*/
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String
savepointDirectory);
+ /**
+ * Triggers a savepoint for the job identified by the job id. The
savepoint will be written to
+ * the given savepoint directory, or {@link
+ *
org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it
is null.
+ *
+ * @param jobId job id
+ * @param savepointDirectory directory the savepoint should be written to
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return path future where the savepoint is located
+ */
+ CompletableFuture<String> triggerSavepoint(
+ JobID jobId, @Nullable String savepointDirectory, long
savepointTimeout);
Review comment:
```suggestion
JobID jobId, @Nullable String savepointDirectory, long
savepointTimeoutMs);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
final CheckpointProperties properties =
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a synchronous savepoint with the given savepoint directory as
a target.
+ *
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
specified and no default
+ * savepoint directory has been configured
+ */
+ public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+ final boolean terminate, @Nullable final String targetLocation,
long savepointTimeout) {
Review comment:
```suggestion
final boolean terminate, @Nullable final String targetLocation,
long savepointTimeoutMs) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -514,6 +558,16 @@ public boolean isShutdown() {
@Nullable String externalSavepointLocation,
boolean isPeriodic) {
+ return triggerCheckpoint(props, externalSavepointLocation, 0,
isPeriodic);
+ }
+
+ @VisibleForTesting
+ public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+ CheckpointProperties props,
+ @Nullable String externalSavepointLocation,
+ long savepointTimeout,
Review comment:
```suggestion
long savepointTimeoutMs,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
@Nullable final String targetLocation) {
final CheckpointProperties properties =
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
Review comment:
I am wondering whether it would be clearer if we passed
`checkpointTimeout` instead of `0` here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2069,9 +2127,19 @@ private static CheckpointException
getCheckpointException(
@Nullable String externalSavepointLocation,
boolean isPeriodic) {
+ this(props, externalSavepointLocation, 0, isPeriodic);
+ }
+
+ CheckpointTriggerRequest(
+ CheckpointProperties props,
+ @Nullable String externalSavepointLocation,
+ long savepointTimeout,
+ boolean isPeriodic) {
+
this.timestamp = System.currentTimeMillis();
this.props = checkNotNull(props);
this.externalSavepointLocation = externalSavepointLocation;
+ this.savepointTimeout = savepointTimeout;
Review comment:
Either `Duration` or we should put the unit in the name.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
final boolean cancelJob,
@RpcTimeout final Time timeout);
+ /**
+ * Triggers taking a savepoint of the executed job.
+ *
+ * @param targetDirectory to which to write the savepoint data or null if
the default savepoint
+ * directory should be used
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @param timeout for the rpc call
+ * @return Future which is completed with the savepoint path once completed
+ */
+ CompletableFuture<String> triggerSavepoint(
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean cancelJob,
+ @RpcTimeout final Time timeout);
Review comment:
Let's remove the old `triggerSavepoint` method.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
return null;
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ JobID jobId,
+ boolean advanceToEndOfEventTime,
+ @Nullable String savepointDirectory,
+ long savepointTimeout) {
+ return null;
Review comment:
Same here: `FutureUtils.unsupportedOperationFuture()`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
@Nullable final String targetLocation) {
final CheckpointProperties properties =
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
specified and no default
+ * savepoint directory has been configured
+ */
+ public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+ @Nullable final String targetLocation, final long
savepointTimeout) {
Review comment:
If we used for the `CheckpointCoordinator` a `Duration` type, then we
wouldn't have to rely that users of this class provide the timeout in the
correct unit.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
final boolean cancelJob,
@RpcTimeout final Time timeout);
+ /**
+ * Triggers taking a savepoint of the executed job.
+ *
+ * @param targetDirectory to which to write the savepoint data or null if
the default savepoint
+ * directory should be used
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @param timeout for the rpc call
+ * @return Future which is completed with the savepoint path once completed
+ */
+ CompletableFuture<String> triggerSavepoint(
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean cancelJob,
+ @RpcTimeout final Time timeout);
Review comment:
Alternatively we could rename the old method to
`triggerSavepointWithDefaultTimeout` or so.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
@Nullable final String targetLocation) {
final CheckpointProperties properties =
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
Review comment:
Unit is missing.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -54,4 +60,8 @@ public String getSavepointPath() {
public String getJarFilePath() {
return jarFile;
}
+
+ public long getSavepointTimeout() {
Review comment:
```suggestion
public long getSavepointTimeoutMs() {
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
final boolean advanceToEndOfEventTime,
@Nullable final String savepointDirectory);
+ /**
+ * Stops a program on Flink cluster whose job-manager is configured in
this client's
+ * configuration. Stopping works only for streaming programs. Be aware,
that the program might
+ * continue to run for a while after sending the stop command, because
after sources stopped to
+ * emit data all operators need to finish processing.
+ *
+ * @param jobId the job ID of the streaming program to stop
+ * @param advanceToEndOfEventTime flag indicating if the source should
inject a {@code
+ * MAX_WATERMARK} in the pipeline
+ * @param savepointDirectory directory the savepoint should be written to
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return a {@link CompletableFuture} containing the path where the
savepoint is located
+ */
+ CompletableFuture<String> stopWithSavepoint(
+ final JobID jobId,
+ final boolean advanceToEndOfEventTime,
+ @Nullable final String savepointDirectory,
+ final long savepointTimeout);
Review comment:
```suggestion
final long savepointTimeoutMs);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
@Nullable final String targetLocation) {
final CheckpointProperties properties =
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
specified and no default
+ * savepoint directory has been configured
+ */
+ public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+ @Nullable final String targetLocation, final long
savepointTimeout) {
Review comment:
```suggestion
@Nullable final String targetLocation, final long
savepointTimeoutMs) {
```
##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -119,6 +120,8 @@ You can resume your program from this savepoint with the
run command.
The savepoint folder is optional and needs to be specified if
[state.savepoints.dir]({{< ref "docs/deployment/config"
>}}#state-savepoints-dir) isn't set.
+The savepoint timeout is optional and checkpoint timeout will take effect if
isn't set.
Review comment:
```suggestion
The savepoint timeout `--savepointTimeout` is optional and checkpoint
timeout will take effect if isn't set.
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
final boolean advanceToEndOfEventTime,
@Nullable final String savepointDirectory);
+ /**
+ * Stops a program on Flink cluster whose job-manager is configured in
this client's
+ * configuration. Stopping works only for streaming programs. Be aware,
that the program might
+ * continue to run for a while after sending the stop command, because
after sources stopped to
+ * emit data all operators need to finish processing.
+ *
+ * @param jobId the job ID of the streaming program to stop
+ * @param advanceToEndOfEventTime flag indicating if the source should
inject a {@code
+ * MAX_WATERMARK} in the pipeline
+ * @param savepointDirectory directory the savepoint should be written to
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
Review comment:
Unit description is missing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -716,6 +731,20 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph,
long initializationTi
jobId, gateway -> gateway.stopWithSavepoint(targetDirectory,
terminate, timeout));
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ final JobID jobId,
+ final String targetDirectory,
+ final long savepointTimeout,
+ final boolean terminate,
+ final Time timeout) {
+ return performOperationOnJobMasterGateway(
+ jobId,
+ gateway ->
+ gateway.stopWithSavepoint(
+ targetDirectory, savepointTimeout, terminate,
timeout));
Review comment:
Same here for `stopWithSavepoint`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
throw new UnsupportedOperationException();
}
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param jobId ID of the job for which the savepoint should be triggered.
+ * @param targetDirectory Target directory for the savepoint.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @param timeout Timeout for the asynchronous operation
+ * @return A future to the {@link CompletedCheckpoint#getExternalPointer()
external pointer} of
+ * the savepoint.
+ */
+ default CompletableFuture<String> triggerSavepoint(
+ JobID jobId,
+ String targetDirectory,
+ long savepointTimeout,
+ boolean cancelJob,
+ @RpcTimeout Time timeout) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Stops the job with a savepoint.
+ *
+ * @param jobId ID of the job for which the savepoint should be triggered.
+ * @param targetDirectory to which to write the savepoint data or null if
the default savepoint
+ * directory should be used
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param timeout for the rpc call
+ * @return Future which is completed with the savepoint path once completed
+ */
+ default CompletableFuture<String> stopWithSavepoint(
+ final JobID jobId,
+ final String targetDirectory,
+ final boolean terminate,
+ @RpcTimeout final Time timeout) {
+ throw new UnsupportedOperationException();
+ }
Review comment:
Can we get rid of this method?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
throw new UnsupportedOperationException();
}
Review comment:
Can we get rid of this method?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
return null;
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ JobID jobId,
+ boolean advanceToEndOfEventTime,
+ @Nullable String savepointDirectory,
+ long savepointTimeout) {
+ return null;
+ }
+
@Override
public CompletableFuture<String> triggerSavepoint(
JobID jobId, @Nullable String savepointDirectory) {
return null;
}
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ JobID jobId, @Nullable String savepointDirectory, long
savepointTimeout) {
+ return null;
Review comment:
Let's return `FutureUtils.unsupportedOperationFuture()`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(String targetDirectory,
boolean terminate) {
+ return stopWithSavepoint(targetDirectory, 0, terminate);
+ }
+
@Override
public CompletableFuture<String> stopWithSavepoint(
- @Nullable final String targetDirectory, final boolean terminate) {
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
Review comment:
Unit is missing.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -57,6 +64,10 @@ String getTargetDirectory() {
return targetDirectory;
}
+ long getSavepointTimeout() {
Review comment:
```suggestion
long getSavepointTimeoutMs() {
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -33,6 +34,8 @@
/** Optional target directory for the savepoint. Overwrites cluster
default. */
private final String targetDirectory;
+ private final long savepointTimeout;
Review comment:
```suggestion
private final long savepointTimeoutMs;
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
final CheckpointProperties properties =
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a synchronous savepoint with the given savepoint directory as
a target.
+ *
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
Review comment:
Unit is missing.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
*/
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String
savepointDirectory);
+ /**
+ * Triggers a savepoint for the job identified by the job id. The
savepoint will be written to
+ * the given savepoint directory, or {@link
+ *
org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it
is null.
+ *
+ * @param jobId job id
+ * @param savepointDirectory directory the savepoint should be written to
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
Review comment:
Unit description is missing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
final CheckpointProperties properties =
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a synchronous savepoint with the given savepoint directory as
a target.
+ *
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
specified and no default
+ * savepoint directory has been configured
+ */
+ public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+ final boolean terminate, @Nullable final String targetLocation,
long savepointTimeout) {
+
+ final CheckpointProperties properties =
+
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
+
+ return triggerSavepointInternal(properties, targetLocation,
savepointTimeout);
}
private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
final CheckpointProperties checkpointProperties,
- @Nullable final String targetLocation) {
+ @Nullable final String targetLocation,
+ long savepointTimeout) {
Review comment:
```suggestion
long savepointTimeoutMs) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -706,6 +706,21 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph,
long initializationTi
jobId, gateway -> gateway.triggerSavepoint(targetDirectory,
cancelJob, timeout));
}
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ JobID jobId,
+ String targetDirectory,
+ long savepointTimeout,
+ boolean cancelJob,
+ Time timeout) {
Review comment:
Can we replace the old `triggerSavepoint` method completely?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -256,6 +272,23 @@ void failSlot(
final boolean terminate,
@RpcTimeout final Time timeout);
+ /**
+ * Stops the job with a savepoint.
+ *
+ * @param targetDirectory to which to write the savepoint data or null if
the default savepoint
+ * directory should be used
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param timeout for the rpc call
+ * @return Future which is completed with the savepoint path once completed
+ */
+ CompletableFuture<String> stopWithSavepoint(
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean terminate,
+ @RpcTimeout final Time timeout);
Review comment:
And `stopWithSavepoint`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -800,7 +809,17 @@ private boolean
isConnectingToResourceManager(ResourceManagerId resourceManagerI
public CompletableFuture<String> stopWithSavepoint(
@Nullable final String targetDirectory, final boolean terminate,
final Time timeout) {
- return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
+ return stopWithSavepoint(targetDirectory, 0, terminate, timeout);
+ }
+
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean terminate,
+ final Time timeout) {
+
+ return schedulerNG.stopWithSavepoint(targetDirectory,
savepointTimeout, terminate);
Review comment:
Same here.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
static final Option SAVEPOINT_DISPOSE_OPTION =
new Option("d", "dispose", true, "Path of savepoint to dispose.");
+ static final Option SAVEPOINT_TIMEOUT_OPTION =
+ new Option("st", "savepointTimeout", true, "timeout of
savepoint.");
Review comment:
What is the unit of the savepoint timeout? ms? Then I would suggest to
make this explicit via `savepointTimeoutMs` and also mentioning the timeout in
the description.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -788,7 +788,16 @@ private boolean
isConnectingToResourceManager(ResourceManagerId resourceManagerI
public CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory, final boolean cancelJob,
final Time timeout) {
- return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
+ return triggerSavepoint(targetDirectory, 0, cancelJob, timeout);
+ }
+
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ @Nullable String targetDirectory,
+ long savepointTimeout,
+ boolean cancelJob,
+ Time timeout) {
+ return schedulerNG.triggerSavepoint(targetDirectory, savepointTimeout,
cancelJob);
Review comment:
Can we replace the old `triggerSavepoint` method here? This should not
be public API.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(String targetDirectory,
boolean terminate) {
+ return stopWithSavepoint(targetDirectory, 0, terminate);
+ }
+
@Override
public CompletableFuture<String> stopWithSavepoint(
- @Nullable final String targetDirectory, final boolean terminate) {
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean terminate) {
Review comment:
If we have overloaded methods then I like to keep the parameter prefix
always the same between the different variants. I think this is easier for
people to use. So in this case, I would put `savepointTimeout` at the end.
##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -121,6 +122,8 @@ You can resume your program from this savepoint with the
run command.
The savepoint folder is optional and needs to be specified if
[state.savepoints.dir]({{< ref "docs/deployment/config"
>}}#state-savepoints-dir) isn't set.
+The savepoint timeout is optional and checkpoint timeout will take effect if
isn't set.
Review comment:
```suggestion
The savepoint timeout `--savepointTimeout` is optional and checkpoint
timeout will take effect if isn't set.
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -30,13 +31,18 @@
private boolean dispose;
private String disposeSavepointPath;
private String jarFile;
+ private long savepointTimeout;
Review comment:
Since we are not storing the unit explicitly:
```suggestion
private long savepointTimeoutMs;
```
##########
File path: docs/content.zh/docs/deployment/cli.md
##########
@@ -172,6 +176,8 @@ Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
We have to use `--savepointPath` to specify the savepoint folder if
[state.savepoints.dir]({{< ref "docs/deployment/config"
>}}#state-savepoints-dir) isn't set.
+The savepoint timeout is optional and checkpoint timeout will take effect if
isn't set.
Review comment:
```suggestion
The savepoint timeout `--savepointTimeout` is optional and checkpoint
timeout will take effect if isn't set.
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
static final Option SAVEPOINT_DISPOSE_OPTION =
new Option("d", "dispose", true, "Path of savepoint to dispose.");
+ static final Option SAVEPOINT_TIMEOUT_OPTION =
+ new Option("st", "savepointTimeout", true, "timeout of
savepoint.");
Review comment:
```suggestion
new Option("st", "savepointTimeout", true, "The maximum
completion time a savepoint is allowed to take before it is failed.");
```
##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -170,6 +174,8 @@ Savepoint completed. Path:
file:/tmp/flink-savepoints/savepoint-cca7bc-bb1e257f0
We have to use `--savepointPath` to specify the savepoint folder if
[state.savepoints.dir]({{< ref "docs/deployment/config"
>}}#state-savepoints-dir) isn't set.
+The savepoint timeout is optional and checkpoint timeout will take effect if
isn't set.
Review comment:
```suggestion
The savepoint timeout `--savepointTimeout` is optional and checkpoint
timeout will take effect if isn't set.
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
*/
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String
savepointDirectory);
+ /**
+ * Triggers a savepoint for the job identified by the job id. The
savepoint will be written to
+ * the given savepoint directory, or {@link
+ *
org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it
is null.
+ *
+ * @param jobId job id
+ * @param savepointDirectory directory the savepoint should be written to
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return path future where the savepoint is located
+ */
+ CompletableFuture<String> triggerSavepoint(
+ JobID jobId, @Nullable String savepointDirectory, long
savepointTimeout);
Review comment:
```suggestion
JobID jobId, @Nullable String savepointDirectory, long
savepointTimeoutMs);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
final CheckpointProperties properties =
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a synchronous savepoint with the given savepoint directory as
a target.
+ *
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
specified and no default
+ * savepoint directory has been configured
+ */
+ public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+ final boolean terminate, @Nullable final String targetLocation,
long savepointTimeout) {
Review comment:
```suggestion
final boolean terminate, @Nullable final String targetLocation,
long savepointTimeoutMs) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -514,6 +558,16 @@ public boolean isShutdown() {
@Nullable String externalSavepointLocation,
boolean isPeriodic) {
+ return triggerCheckpoint(props, externalSavepointLocation, 0,
isPeriodic);
+ }
+
+ @VisibleForTesting
+ public CompletableFuture<CompletedCheckpoint> triggerCheckpoint(
+ CheckpointProperties props,
+ @Nullable String externalSavepointLocation,
+ long savepointTimeout,
Review comment:
```suggestion
long savepointTimeoutMs,
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
@Nullable final String targetLocation) {
final CheckpointProperties properties =
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
Review comment:
I am wondering whether it would be clearer if we passed
`checkpointTimeout` instead of `0` here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -2069,9 +2127,19 @@ private static CheckpointException
getCheckpointException(
@Nullable String externalSavepointLocation,
boolean isPeriodic) {
+ this(props, externalSavepointLocation, 0, isPeriodic);
+ }
+
+ CheckpointTriggerRequest(
+ CheckpointProperties props,
+ @Nullable String externalSavepointLocation,
+ long savepointTimeout,
+ boolean isPeriodic) {
+
this.timestamp = System.currentTimeMillis();
this.props = checkNotNull(props);
this.externalSavepointLocation = externalSavepointLocation;
+ this.savepointTimeout = savepointTimeout;
Review comment:
Either `Duration` or we should put the unit in the name.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
final boolean cancelJob,
@RpcTimeout final Time timeout);
+ /**
+ * Triggers taking a savepoint of the executed job.
+ *
+ * @param targetDirectory to which to write the savepoint data or null if
the default savepoint
+ * directory should be used
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @param timeout for the rpc call
+ * @return Future which is completed with the savepoint path once completed
+ */
+ CompletableFuture<String> triggerSavepoint(
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean cancelJob,
+ @RpcTimeout final Time timeout);
Review comment:
Let's remove the old `triggerSavepoint` method.
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
return null;
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ JobID jobId,
+ boolean advanceToEndOfEventTime,
+ @Nullable String savepointDirectory,
+ long savepointTimeout) {
+ return null;
Review comment:
Same here: `FutureUtils.unsupportedOperationFuture()`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
@Nullable final String targetLocation) {
final CheckpointProperties properties =
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
specified and no default
+ * savepoint directory has been configured
+ */
+ public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+ @Nullable final String targetLocation, final long
savepointTimeout) {
Review comment:
If we used for the `CheckpointCoordinator` a `Duration` type, then we
wouldn't have to rely that users of this class provide the timeout in the
correct unit.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -234,6 +234,22 @@ void failSlot(
final boolean cancelJob,
@RpcTimeout final Time timeout);
+ /**
+ * Triggers taking a savepoint of the executed job.
+ *
+ * @param targetDirectory to which to write the savepoint data or null if
the default savepoint
+ * directory should be used
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @param timeout for the rpc call
+ * @return Future which is completed with the savepoint path once completed
+ */
+ CompletableFuture<String> triggerSavepoint(
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean cancelJob,
+ @RpcTimeout final Time timeout);
Review comment:
Alternatively we could rename the old method to
`triggerSavepointWithDefaultTimeout` or so.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
@Nullable final String targetLocation) {
final CheckpointProperties properties =
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
Review comment:
Unit is missing.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/SavepointOptions.java
##########
@@ -54,4 +60,8 @@ public String getSavepointPath() {
public String getJarFilePath() {
return jarFile;
}
+
+ public long getSavepointTimeout() {
Review comment:
```suggestion
public long getSavepointTimeoutMs() {
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
final boolean advanceToEndOfEventTime,
@Nullable final String savepointDirectory);
+ /**
+ * Stops a program on Flink cluster whose job-manager is configured in
this client's
+ * configuration. Stopping works only for streaming programs. Be aware,
that the program might
+ * continue to run for a while after sending the stop command, because
after sources stopped to
+ * emit data all operators need to finish processing.
+ *
+ * @param jobId the job ID of the streaming program to stop
+ * @param advanceToEndOfEventTime flag indicating if the source should
inject a {@code
+ * MAX_WATERMARK} in the pipeline
+ * @param savepointDirectory directory the savepoint should be written to
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return a {@link CompletableFuture} containing the path where the
savepoint is located
+ */
+ CompletableFuture<String> stopWithSavepoint(
+ final JobID jobId,
+ final boolean advanceToEndOfEventTime,
+ @Nullable final String savepointDirectory,
+ final long savepointTimeout);
Review comment:
```suggestion
final long savepointTimeoutMs);
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -450,7 +450,25 @@ public boolean isShutdown() {
@Nullable final String targetLocation) {
final CheckpointProperties properties =
CheckpointProperties.forSavepoint(!unalignedCheckpointsEnabled);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
specified and no default
+ * savepoint directory has been configured
+ */
+ public CompletableFuture<CompletedCheckpoint> triggerSavepoint(
+ @Nullable final String targetLocation, final long
savepointTimeout) {
Review comment:
```suggestion
@Nullable final String targetLocation, final long
savepointTimeoutMs) {
```
##########
File path: docs/content/docs/deployment/cli.md
##########
@@ -119,6 +120,8 @@ You can resume your program from this savepoint with the
run command.
The savepoint folder is optional and needs to be specified if
[state.savepoints.dir]({{< ref "docs/deployment/config"
>}}#state-savepoints-dir) isn't set.
+The savepoint timeout is optional and checkpoint timeout will take effect if
isn't set.
Review comment:
```suggestion
The savepoint timeout `--savepointTimeout` is optional and checkpoint
timeout will take effect if isn't set.
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -156,6 +156,26 @@
final boolean advanceToEndOfEventTime,
@Nullable final String savepointDirectory);
+ /**
+ * Stops a program on Flink cluster whose job-manager is configured in
this client's
+ * configuration. Stopping works only for streaming programs. Be aware,
that the program might
+ * continue to run for a while after sending the stop command, because
after sources stopped to
+ * emit data all operators need to finish processing.
+ *
+ * @param jobId the job ID of the streaming program to stop
+ * @param advanceToEndOfEventTime flag indicating if the source should
inject a {@code
+ * MAX_WATERMARK} in the pipeline
+ * @param savepointDirectory directory the savepoint should be written to
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
Review comment:
Unit description is missing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -716,6 +731,20 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph,
long initializationTi
jobId, gateway -> gateway.stopWithSavepoint(targetDirectory,
terminate, timeout));
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ final JobID jobId,
+ final String targetDirectory,
+ final long savepointTimeout,
+ final boolean terminate,
+ final Time timeout) {
+ return performOperationOnJobMasterGateway(
+ jobId,
+ gateway ->
+ gateway.stopWithSavepoint(
+ targetDirectory, savepointTimeout, terminate,
timeout));
Review comment:
Same here for `stopWithSavepoint`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
throw new UnsupportedOperationException();
}
+ /**
+ * Triggers a savepoint with the given savepoint directory as a target.
+ *
+ * @param jobId ID of the job for which the savepoint should be triggered.
+ * @param targetDirectory Target directory for the savepoint.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @param timeout Timeout for the asynchronous operation
+ * @return A future to the {@link CompletedCheckpoint#getExternalPointer()
external pointer} of
+ * the savepoint.
+ */
+ default CompletableFuture<String> triggerSavepoint(
+ JobID jobId,
+ String targetDirectory,
+ long savepointTimeout,
+ boolean cancelJob,
+ @RpcTimeout Time timeout) {
+ throw new UnsupportedOperationException();
+ }
+
+ /**
+ * Stops the job with a savepoint.
+ *
+ * @param jobId ID of the job for which the savepoint should be triggered.
+ * @param targetDirectory to which to write the savepoint data or null if
the default savepoint
+ * directory should be used
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param timeout for the rpc call
+ * @return Future which is completed with the savepoint path once completed
+ */
+ default CompletableFuture<String> stopWithSavepoint(
+ final JobID jobId,
+ final String targetDirectory,
+ final boolean terminate,
+ @RpcTimeout final Time timeout) {
+ throw new UnsupportedOperationException();
+ }
Review comment:
Can we get rid of this method?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/webmonitor/RestfulGateway.java
##########
@@ -145,19 +145,60 @@
throw new UnsupportedOperationException();
}
Review comment:
Can we get rid of this method?
##########
File path:
flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
##########
@@ -264,12 +264,27 @@ public String getWebInterfaceURL() {
return null;
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ JobID jobId,
+ boolean advanceToEndOfEventTime,
+ @Nullable String savepointDirectory,
+ long savepointTimeout) {
+ return null;
+ }
+
@Override
public CompletableFuture<String> triggerSavepoint(
JobID jobId, @Nullable String savepointDirectory) {
return null;
}
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ JobID jobId, @Nullable String savepointDirectory, long
savepointTimeout) {
+ return null;
Review comment:
Let's return `FutureUtils.unsupportedOperationFuture()`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(String targetDirectory,
boolean terminate) {
+ return stopWithSavepoint(targetDirectory, 0, terminate);
+ }
+
@Override
public CompletableFuture<String> stopWithSavepoint(
- @Nullable final String targetDirectory, final boolean terminate) {
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
Review comment:
Unit is missing.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -57,6 +64,10 @@ String getTargetDirectory() {
return targetDirectory;
}
+ long getSavepointTimeout() {
Review comment:
```suggestion
long getSavepointTimeoutMs() {
```
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
##########
@@ -33,6 +34,8 @@
/** Optional target directory for the savepoint. Overwrites cluster
default. */
private final String targetDirectory;
+ private final long savepointTimeout;
Review comment:
```suggestion
private final long savepointTimeoutMs;
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
final CheckpointProperties properties =
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a synchronous savepoint with the given savepoint directory as
a target.
+ *
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
Review comment:
Unit is missing.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
##########
@@ -167,6 +187,20 @@
*/
CompletableFuture<String> triggerSavepoint(JobID jobId, @Nullable String
savepointDirectory);
+ /**
+ * Triggers a savepoint for the job identified by the job id. The
savepoint will be written to
+ * the given savepoint directory, or {@link
+ *
org.apache.flink.configuration.CheckpointingOptions#SAVEPOINT_DIRECTORY} if it
is null.
+ *
+ * @param jobId job id
+ * @param savepointDirectory directory the savepoint should be written to
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
Review comment:
Unit description is missing.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/CheckpointCoordinator.java
##########
@@ -469,12 +487,34 @@ public boolean isShutdown() {
final CheckpointProperties properties =
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
- return triggerSavepointInternal(properties, targetLocation);
+ return triggerSavepointInternal(properties, targetLocation, 0);
+ }
+
+ /**
+ * Triggers a synchronous savepoint with the given savepoint directory as
a target.
+ *
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param targetLocation Target location for the savepoint, optional. If
null, the state
+ * backend's configured default will be used.
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @return A future to the completed checkpoint
+ * @throws IllegalStateException If no savepoint directory has been
specified and no default
+ * savepoint directory has been configured
+ */
+ public CompletableFuture<CompletedCheckpoint> triggerSynchronousSavepoint(
+ final boolean terminate, @Nullable final String targetLocation,
long savepointTimeout) {
+
+ final CheckpointProperties properties =
+
CheckpointProperties.forSyncSavepoint(!unalignedCheckpointsEnabled, terminate);
+
+ return triggerSavepointInternal(properties, targetLocation,
savepointTimeout);
}
private CompletableFuture<CompletedCheckpoint> triggerSavepointInternal(
final CheckpointProperties checkpointProperties,
- @Nullable final String targetLocation) {
+ @Nullable final String targetLocation,
+ long savepointTimeout) {
Review comment:
```suggestion
long savepointTimeoutMs) {
```
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/dispatcher/Dispatcher.java
##########
@@ -706,6 +706,21 @@ JobManagerRunner createJobManagerRunner(JobGraph jobGraph,
long initializationTi
jobId, gateway -> gateway.triggerSavepoint(targetDirectory,
cancelJob, timeout));
}
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ JobID jobId,
+ String targetDirectory,
+ long savepointTimeout,
+ boolean cancelJob,
+ Time timeout) {
Review comment:
Can we replace the old `triggerSavepoint` method completely?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java
##########
@@ -256,6 +272,23 @@ void failSlot(
final boolean terminate,
@RpcTimeout final Time timeout);
+ /**
+ * Stops the job with a savepoint.
+ *
+ * @param targetDirectory to which to write the savepoint data or null if
the default savepoint
+ * directory should be used
+ * @param savepointTimeout Timeout for the savepoint. If it <= 0,
checkpoint timeout will take
+ * effect.
+ * @param terminate flag indicating if the job should terminate or just
suspend
+ * @param timeout for the rpc call
+ * @return Future which is completed with the savepoint path once completed
+ */
+ CompletableFuture<String> stopWithSavepoint(
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean terminate,
+ @RpcTimeout final Time timeout);
Review comment:
And `stopWithSavepoint`.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -800,7 +809,17 @@ private boolean
isConnectingToResourceManager(ResourceManagerId resourceManagerI
public CompletableFuture<String> stopWithSavepoint(
@Nullable final String targetDirectory, final boolean terminate,
final Time timeout) {
- return schedulerNG.stopWithSavepoint(targetDirectory, terminate);
+ return stopWithSavepoint(targetDirectory, 0, terminate, timeout);
+ }
+
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean terminate,
+ final Time timeout) {
+
+ return schedulerNG.stopWithSavepoint(targetDirectory,
savepointTimeout, terminate);
Review comment:
Same here.
##########
File path:
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
##########
@@ -132,6 +132,9 @@
static final Option SAVEPOINT_DISPOSE_OPTION =
new Option("d", "dispose", true, "Path of savepoint to dispose.");
+ static final Option SAVEPOINT_TIMEOUT_OPTION =
+ new Option("st", "savepointTimeout", true, "timeout of
savepoint.");
Review comment:
What is the unit of the savepoint timeout? ms? Then I would suggest to
make this explicit via `savepointTimeoutMs` and also mentioning the timeout in
the description.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java
##########
@@ -788,7 +788,16 @@ private boolean
isConnectingToResourceManager(ResourceManagerId resourceManagerI
public CompletableFuture<String> triggerSavepoint(
@Nullable final String targetDirectory, final boolean cancelJob,
final Time timeout) {
- return schedulerNG.triggerSavepoint(targetDirectory, cancelJob);
+ return triggerSavepoint(targetDirectory, 0, cancelJob, timeout);
+ }
+
+ @Override
+ public CompletableFuture<String> triggerSavepoint(
+ @Nullable String targetDirectory,
+ long savepointTimeout,
+ boolean cancelJob,
+ Time timeout) {
+ return schedulerNG.triggerSavepoint(targetDirectory, savepointTimeout,
cancelJob);
Review comment:
Can we replace the old `triggerSavepoint` method here? This should not
be public API.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/scheduler/SchedulerBase.java
##########
@@ -935,9 +941,16 @@ public void reportCheckpointMetrics(
executionGraphHandler.reportCheckpointMetrics(attemptId, id, metrics);
}
+ @Override
+ public CompletableFuture<String> stopWithSavepoint(String targetDirectory,
boolean terminate) {
+ return stopWithSavepoint(targetDirectory, 0, terminate);
+ }
+
@Override
public CompletableFuture<String> stopWithSavepoint(
- @Nullable final String targetDirectory, final boolean terminate) {
+ @Nullable final String targetDirectory,
+ final long savepointTimeout,
+ final boolean terminate) {
Review comment:
If we have overloaded methods then I like to keep the parameter prefix
always the same between the different variants. I think this is easier for
people to use. So in this case, I would put `savepointTimeout` at the end.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]