Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4789#discussion_r143728931
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -726,35 +724,29 @@ protected int savepoint(String[] args) {
         */
        private int triggerSavepoint(SavepointOptions options, JobID jobId, 
String savepointDirectory) {
                try {
    -                   ActorGateway jobManager = getJobManagerGateway(options);
    -
    -                   logAndSysout("Triggering savepoint for job " + jobId + 
".");
    -                   Future<Object> response = jobManager.ask(new 
TriggerSavepoint(jobId, Option.apply(savepointDirectory)),
    -                                   new FiniteDuration(1, TimeUnit.HOURS));
    -
    -                   Object result;
    +                   CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
    +                   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
                        try {
    -                           logAndSysout("Waiting for response...");
    -                           result = Await.result(response, 
FiniteDuration.Inf());
    -                   }
    -                   catch (Exception e) {
    -                           throw new Exception("Triggering a savepoint for 
the job " + jobId + " failed.", e);
    -                   }
    +                           logAndSysout("Triggering savepoint for job " + 
jobId + ".");
    +                           CompletableFuture<String> savepointPathFuture = 
client.triggerSavepoint(jobId, savepointDirectory);
     
    -                   if (result instanceof TriggerSavepointSuccess) {
    -                           TriggerSavepointSuccess success = 
(TriggerSavepointSuccess) result;
    -                           logAndSysout("Savepoint completed. Path: " + 
success.savepointPath());
    +                           String savepointPath;
    +                           try {
    +                                   logAndSysout("Waiting for response...");
    +                                   savepointPath = 
savepointPathFuture.get();
    +                           }
    +                           catch (ExecutionException ee) {
    +                                   Throwable cause = 
ExceptionUtils.stripExecutionException(ee);
    +                                   throw new Exception("Triggering a 
savepoint for the job " + jobId + " failed.", cause);
    --- End diff --
    
    `FlinkException`


---

Reply via email to