Github user tillrohrmann commented on a diff in the pull request:

    https://github.com/apache/flink/pull/5220#discussion_r160719791
  
    --- Diff: 
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java ---
    @@ -598,189 +535,154 @@ protected int cancel(String[] args) {
                                jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
                                targetDirectory = null;
                        } catch (Exception e) {
    -                           LOG.error("Missing JobID in the command line 
arguments.");
    -                           System.out.println("Error: Specify a Job ID to 
cancel a job.");
    -                           return 1;
    +                           throw new CliArgsException("Missing JobID in 
the command line arguments: " + e.getMessage());
                        }
                } else {
    -                   LOG.error("Missing JobID in the command line 
arguments.");
    -                   System.out.println("Error: Specify a Job ID to cancel a 
job.");
    -                   return 1;
    +                   throw new CliArgsException("Missing JobID in the 
command line arguments.");
                }
     
    +           final CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(commandLine);
    +           final ClusterClient client = 
activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory);
    +
                try {
    -                   CustomCommandLine<?> activeCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
    -                   ClusterClient client = 
activeCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
    -                   try {
    -                           if (withSavepoint) {
    -                                   if (targetDirectory == null) {
    -                                           logAndSysout("Cancelling job " 
+ jobId + " with savepoint to default savepoint directory.");
    -                                   } else {
    -                                           logAndSysout("Cancelling job " 
+ jobId + " with savepoint to " + targetDirectory + '.');
    -                                   }
    -                                   String savepointPath = 
client.cancelWithSavepoint(jobId, targetDirectory);
    -                                   logAndSysout("Cancelled job " + jobId + 
". Savepoint stored in " + savepointPath + '.');
    +                   if (withSavepoint) {
    +                           if (targetDirectory == null) {
    +                                   logAndSysout("Cancelling job " + jobId 
+ " with savepoint to default savepoint directory.");
                                } else {
    -                                   logAndSysout("Cancelling job " + jobId 
+ '.');
    -                                   client.cancel(jobId);
    -                                   logAndSysout("Cancelled job " + jobId + 
'.');
    +                                   logAndSysout("Cancelling job " + jobId 
+ " with savepoint to " + targetDirectory + '.');
                                }
    +                           String savepointPath = 
client.cancelWithSavepoint(jobId, targetDirectory);
    +                           logAndSysout("Cancelled job " + jobId + ". 
Savepoint stored in " + savepointPath + '.');
    +                   } else {
    +                           logAndSysout("Cancelling job " + jobId + '.');
    +                           client.cancel(jobId);
    +                           logAndSysout("Cancelled job " + jobId + '.');
    +                   }
     
    -                           return 0;
    -                   } finally {
    +                   return 0;
    +           } finally {
    +                   try {
                                client.shutdown();
    +                   } catch (Exception e) {
    +                           LOG.info("Could not properly shut down the 
client.", e);
                        }
                }
    -           catch (Throwable t) {
    -                   return handleError(t);
    -           }
        }
     
        /**
         * Executes the SAVEPOINT action.
         *
         * @param args Command line arguments for the cancel action.
         */
    -   protected int savepoint(String[] args) {
    +   protected int savepoint(String[] args) throws CliArgsException {
                LOG.info("Running 'savepoint' command.");
     
    -           SavepointOptions options;
    -           try {
    -                   options = CliFrontendParser.parseSavepointCommand(args);
    -           } catch (CliArgsException e) {
    -                   return handleArgException(e);
    -           } catch (Throwable t) {
    -                   return handleError(t);
    -           }
    +           SavepointOptions options = 
CliFrontendParser.parseSavepointCommand(args);
     
                // evaluate help flag
                if (options.isPrintHelp()) {
                        CliFrontendParser.printHelpForSavepoint();
                        return 0;
                }
     
    -           if (options.isDispose()) {
    -                   // Discard
    -                   return disposeSavepoint(options);
    -           } else {
    -                   // Trigger
    -                   String[] cleanedArgs = options.getArgs();
    -                   JobID jobId;
    +           CustomCommandLine<?> customCommandLine = 
getActiveCustomCommandLine(options.getCommandLine());
     
    -                   if (cleanedArgs.length >= 1) {
    -                           String jobIdString = cleanedArgs[0];
    -                           try {
    -                                   jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
    -                           } catch (Exception e) {
    -                                   return handleArgException(new 
IllegalArgumentException(
    -                                                   "Error: The value for 
the Job ID is not a valid ID."));
    -                           }
    +           ClusterClient clusterClient = 
customCommandLine.retrieveCluster(options.getCommandLine(), config, 
configurationDirectory);
    +
    +           try {
    +                   if (options.isDispose()) {
    +                           // Discard
    +                           return disposeSavepoint(clusterClient, 
options.getSavepointPath());
                        } else {
    -                           return handleArgException(new 
IllegalArgumentException(
    -                                           "Error: The value for the Job 
ID is not a valid ID. " +
    -                                                           "Specify a Job 
ID to trigger a savepoint."));
    -                   }
    +                           // Trigger
    +                           String[] cleanedArgs = options.getArgs();
    +                           JobID jobId;
    +
    +                           if (cleanedArgs.length >= 1) {
    +                                   String jobIdString = cleanedArgs[0];
    +                                   try {
    +                                           jobId = new 
JobID(StringUtils.hexStringToByte(jobIdString));
    +                                   } catch (Exception ignored) {
    +                                           throw new 
CliArgsException("Error: The value for the Job ID is not a valid ID.");
    +                                   }
    +                           } else {
    +                                   throw new CliArgsException("Error: The 
value for the Job ID is not a valid ID. " +
    +                                           "Specify a Job ID to trigger a 
savepoint.");
    +                           }
     
    -                   String savepointDirectory = null;
    -                   if (cleanedArgs.length >= 2) {
    -                           savepointDirectory = cleanedArgs[1];
    -                   }
    +                           String savepointDirectory = null;
    +                           if (cleanedArgs.length >= 2) {
    +                                   savepointDirectory = cleanedArgs[1];
    +                           }
     
    -                   // Print superfluous arguments
    -                   if (cleanedArgs.length >= 3) {
    -                           logAndSysout("Provided more arguments than 
required. Ignoring not needed arguments.");
    -                   }
    +                           // Print superfluous arguments
    +                           if (cleanedArgs.length >= 3) {
    +                                   logAndSysout("Provided more arguments 
than required. Ignoring not needed arguments.");
    +                           }
     
    -                   return triggerSavepoint(options, jobId, 
savepointDirectory);
    +                           return triggerSavepoint(clusterClient, jobId, 
savepointDirectory);
    +                   }
    +           } catch (Exception e) {
    +                   return handleError(e);
    --- End diff --
    
    Oversight from my side.


---

Reply via email to