[ 
https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16309496#comment-16309496
 ] 

ASF GitHub Bot commented on FLINK-8332:
---------------------------------------

Github user GJL commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5219#discussion_r159401004
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
    @@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
                        return 0;
                }
     
    -           if (options.isDispose()) {
    -                   // Discard
    -                   return disposeSavepoint(options);
    -           } else {
    -                   // Trigger
    -                   String[] cleanedArgs = options.getArgs();
    -                   JobID jobId;
    +           CustomCommandLine<?> customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
     
    -                   if (cleanedArgs.length >= 1) {
    -                           String jobIdString = cleanedArgs[0];
    -                           try {
    -                                   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
    -                           } catch (Exception e) {
    -                                   return handleArgException(new 
IllegalArgumentException(
    -                                                   "Error: The value for 
the Job ID is not a valid ID."));
    -                           }
    +           ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
    +
    +           try {
    +                   if (options.isDispose()) {
    +                           // Discard
    +                           return disposeSavepoint(clusterClient, 
options.getSavepointPath());
                        } else {
    -                           return handleArgException(new 
IllegalArgumentException(
    +                           // Trigger
    +                           String[] cleanedArgs = options.getArgs();
    +                           JobID jobId;
    +
    +                           if (cleanedArgs.length >= 1) {
    +                                   String jobIdString = cleanedArgs[0];
    +                                   try {
    +                                           jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
    +                                   } catch (Exception e) {
    +                                           return handleArgException(new 
IllegalArgumentException(
    +                                                   "Error: The value for 
the Job ID is not a valid ID."));
    +                                   }
    +                           } else {
    +                                   return handleArgException(new 
IllegalArgumentException(
                                                "Error: The value for the Job 
ID is not a valid ID. " +
    -                                                           "Specify a Job 
ID to trigger a savepoint."));
    -                   }
    +                                                   "Specify a Job ID to 
trigger a savepoint."));
    +                           }
     
    -                   String savepointDirectory = null;
    -                   if (cleanedArgs.length >= 2) {
    -                           savepointDirectory = cleanedArgs[1];
    -                   }
    +                           String savepointDirectory = null;
    +                           if (cleanedArgs.length >= 2) {
    +                                   savepointDirectory = cleanedArgs[1];
    +                           }
     
    -                   // Print superfluous arguments
    -                   if (cleanedArgs.length >= 3) {
    -                           logAndSysout("Provided more arguments than 
required. Ignoring not needed arguments.");
    -                   }
    +                           // Print superfluous arguments
    +                           if (cleanedArgs.length >= 3) {
    +                                   logAndSysout("Provided more arguments 
than required. Ignoring not needed arguments.");
    +                           }
     
    -                   return triggerSavepoint(options, jobId, 
savepointDirectory);
    +                           return triggerSavepoint(clusterClient, jobId, 
savepointDirectory);
    +                   }
    +           } catch (Exception e) {
    +                   return handleError(e);
    +           } finally {
    +                   try {
    +                           clusterClient.shutdown();
    +                   } catch (Exception e) {
    +                           LOG.info("Could not shutdown the cluster 
client.", e);
    +                   }
                }
        }
     
        /**
         * Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
         * message to the job manager.
         */
    -   private int triggerSavepoint(SavepointOptions options, JobID jobId, 
String savepointDirectory) {
    -           try {
    -                   CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
    -                   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
    -                   try {
    -                           logAndSysout("Triggering savepoint for job " + 
jobId + ".");
    -                           CompletableFuture<String> savepointPathFuture = 
client.triggerSavepoint(jobId, savepointDirectory);
    -
    -                           String savepointPath;
    -                           try {
    -                                   logAndSysout("Waiting for response...");
    -                                   savepointPath = 
savepointPathFuture.get();
    -                           }
    -                           catch (ExecutionException ee) {
    -                                   Throwable cause = 
ExceptionUtils.stripExecutionException(ee);
    -                                   throw new FlinkException("Triggering a 
savepoint for the job " + jobId + " failed.", cause);
    -                           }
    -
    -                           logAndSysout("Savepoint completed. Path: " + 
savepointPath);
    -                           logAndSysout("You can resume your program from 
this savepoint with the run command.");
    +   private int triggerSavepoint(ClusterClient clusterClient, JobID jobId, 
String savepointDirectory) throws Exception {
    +           logAndSysout("Triggering savepoint for job " + jobId + ".");
    +           CompletableFuture<String> savepointPathFuture = 
clusterClient.triggerSavepoint(jobId, savepointDirectory);
     
    -                           return 0;
    -                   }
    -                   finally {
    -                           client.shutdown();
    -                   }
    +           String savepointPath;
    +           try {
    +                   logAndSysout("Waiting for response...");
    +                   savepointPath = savepointPathFuture.get();
                }
    -           catch (Throwable t) {
    -                   return handleError(t);
    +           catch (ExecutionException ee) {
    +                   Throwable cause = 
ExceptionUtils.stripExecutionException(ee);
    +                   throw new FlinkException("Triggering a savepoint for 
the job " + jobId + " failed.", cause);
                }
    +
    +           logAndSysout("Savepoint completed. Path: " + savepointPath);
    +           logAndSysout("You can resume your program from this savepoint 
with the run command.");
    +
    +           return 0;
        }
     
        /**
         * Sends a {@link 
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
         * message to the job manager.
         */
    -   private int disposeSavepoint(SavepointOptions options) {
    -           try {
    -                   String savepointPath = options.getSavepointPath();
    -                   if (savepointPath == null) {
    -                           throw new IllegalArgumentException("Missing 
required argument: savepoint path. " +
    -                                           "Usage: bin/flink savepoint -d 
<savepoint-path>");
    -                   }
    -
    -                   ActorGateway jobManager = getJobManagerGateway(options);
    +   private int disposeSavepoint(ClusterClient clusterClient, String 
savepointPath) {
    +           Preconditions.checkNotNull(savepointPath, "Missing required 
argument: savepoint path. " +
    +                   "Usage: bin/flink savepoint -d <savepoint-path>");
     
    -                   logAndSysout("Disposing savepoint '" + savepointPath + 
"'.");
    +           logAndSysout("Disposing savepoint '" + savepointPath + "'.");
     
    -                   Object msg = new DisposeSavepoint(savepointPath);
    -                   Future<Object> response = jobManager.ask(msg, 
clientTimeout);
    +           CompletableFuture<Acknowledge> disposeFuture = null;
    --- End diff --
    
    nit: `null` initialization is not needed because you return in the `catch` 
block. Can be even declared final.
      


> Move dispose savepoint into ClusterClient
> -----------------------------------------
>
>                 Key: FLINK-8332
>                 URL: https://issues.apache.org/jira/browse/FLINK-8332
>             Project: Flink
>          Issue Type: Sub-task
>          Components: Client
>    Affects Versions: 1.5.0
>            Reporter: Till Rohrmann
>            Assignee: Till Rohrmann
>              Labels: flip-6
>             Fix For: 1.5.0
>
>
> Currently, the {{CliFrontend}} sends the command for disposing a savepoint. 
> In order to better abstract this functionality we should move it to the 
> {{ClusterClient}}. That way we can have different implementations of the 
> {{ClusterClient}} (Flip-6 and old code) which are used by the same 
> {{CliFrontend}}.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to