Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/4789#discussion_r143728931
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -726,35 +724,29 @@ protected int savepoint(String[] args) {
*/
private int triggerSavepoint(SavepointOptions options, JobID jobId,
String savepointDirectory) {
try {
- ActorGateway jobManager = getJobManagerGateway(options);
-
- logAndSysout("Triggering savepoint for job " + jobId +
".");
- Future<Object> response = jobManager.ask(new
TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
- new FiniteDuration(1, TimeUnit.HOURS));
-
- Object result;
+ CustomCommandLine<?> activeCommandLine =
getActiveCustomCommandLine(options.getCommandLine());
+ ClusterClient client =
activeCommandLine.retrieveCluster(options.getCommandLine(), config,
configurationDirectory);
try {
- logAndSysout("Waiting for response...");
- result = Await.result(response,
FiniteDuration.Inf());
- }
- catch (Exception e) {
- throw new Exception("Triggering a savepoint for
the job " + jobId + " failed.", e);
- }
+ logAndSysout("Triggering savepoint for job " +
jobId + ".");
+ CompletableFuture<String> savepointPathFuture =
client.triggerSavepoint(jobId, savepointDirectory);
- if (result instanceof TriggerSavepointSuccess) {
- TriggerSavepointSuccess success =
(TriggerSavepointSuccess) result;
- logAndSysout("Savepoint completed. Path: " +
success.savepointPath());
+ String savepointPath;
+ try {
+ logAndSysout("Waiting for response...");
+ savepointPath =
savepointPathFuture.get();
+ }
+ catch (ExecutionException ee) {
+ Throwable cause =
ExceptionUtils.stripExecutionException(ee);
+ throw new Exception("Triggering a
savepoint for the job " + jobId + " failed.", cause);
--- End diff --
`FlinkException`
---