Github user tillrohrmann commented on a diff in the pull request: https://github.com/apache/flink/pull/5220#discussion_r160719791 --- Diff: flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java --- @@ -598,189 +535,154 @@ protected int cancel(String[] args) { jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); targetDirectory = null; } catch (Exception e) { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments: " + e.getMessage()); } } else { - LOG.error("Missing JobID in the command line arguments."); - System.out.println("Error: Specify a Job ID to cancel a job."); - return 1; + throw new CliArgsException("Missing JobID in the command line arguments."); } + final CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(commandLine); + final ClusterClient client = activeCommandLine.retrieveCluster(commandLine, config, configurationDirectory); + try { - CustomCommandLine<?> activeCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - ClusterClient client = activeCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); - try { - if (withSavepoint) { - if (targetDirectory == null) { - logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); - } else { - logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); - } - String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); - logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + if (withSavepoint) { + if (targetDirectory == null) { + logAndSysout("Cancelling job " + jobId + " with savepoint to default savepoint directory."); } else { - logAndSysout("Cancelling job " + jobId + '.'); - client.cancel(jobId); - logAndSysout("Cancelled job " + jobId + '.'); + logAndSysout("Cancelling job " + jobId + " with savepoint to " + targetDirectory + '.'); } + String savepointPath = client.cancelWithSavepoint(jobId, targetDirectory); + logAndSysout("Cancelled job " + jobId + ". Savepoint stored in " + savepointPath + '.'); + } else { + logAndSysout("Cancelling job " + jobId + '.'); + client.cancel(jobId); + logAndSysout("Cancelled job " + jobId + '.'); + } - return 0; - } finally { + return 0; + } finally { + try { client.shutdown(); + } catch (Exception e) { + LOG.info("Could not properly shut down the client.", e); } } - catch (Throwable t) { - return handleError(t); - } } /** * Executes the SAVEPOINT action. * * @param args Command line arguments for the cancel action. */ - protected int savepoint(String[] args) { + protected int savepoint(String[] args) throws CliArgsException { LOG.info("Running 'savepoint' command."); - SavepointOptions options; - try { - options = CliFrontendParser.parseSavepointCommand(args); - } catch (CliArgsException e) { - return handleArgException(e); - } catch (Throwable t) { - return handleError(t); - } + SavepointOptions options = CliFrontendParser.parseSavepointCommand(args); // evaluate help flag if (options.isPrintHelp()) { CliFrontendParser.printHelpForSavepoint(); return 0; } - if (options.isDispose()) { - // Discard - return disposeSavepoint(options); - } else { - // Trigger - String[] cleanedArgs = options.getArgs(); - JobID jobId; + CustomCommandLine<?> customCommandLine = getActiveCustomCommandLine(options.getCommandLine()); - if (cleanedArgs.length >= 1) { - String jobIdString = cleanedArgs[0]; - try { - jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); - } catch (Exception e) { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID.")); - } + ClusterClient clusterClient = customCommandLine.retrieveCluster(options.getCommandLine(), config, configurationDirectory); + + try { + if (options.isDispose()) { + // Discard + return disposeSavepoint(clusterClient, options.getSavepointPath()); } else { - return handleArgException(new IllegalArgumentException( - "Error: The value for the Job ID is not a valid ID. " + - "Specify a Job ID to trigger a savepoint.")); - } + // Trigger + String[] cleanedArgs = options.getArgs(); + JobID jobId; + + if (cleanedArgs.length >= 1) { + String jobIdString = cleanedArgs[0]; + try { + jobId = new JobID(StringUtils.hexStringToByte(jobIdString)); + } catch (Exception ignored) { + throw new CliArgsException("Error: The value for the Job ID is not a valid ID."); + } + } else { + throw new CliArgsException("Error: The value for the Job ID is not a valid ID. " + + "Specify a Job ID to trigger a savepoint."); + } - String savepointDirectory = null; - if (cleanedArgs.length >= 2) { - savepointDirectory = cleanedArgs[1]; - } + String savepointDirectory = null; + if (cleanedArgs.length >= 2) { + savepointDirectory = cleanedArgs[1]; + } - // Print superfluous arguments - if (cleanedArgs.length >= 3) { - logAndSysout("Provided more arguments than required. Ignoring not needed arguments."); - } + // Print superfluous arguments + if (cleanedArgs.length >= 3) { + logAndSysout("Provided more arguments than required. Ignoring not needed arguments."); + } - return triggerSavepoint(options, jobId, savepointDirectory); + return triggerSavepoint(clusterClient, jobId, savepointDirectory); + } + } catch (Exception e) { + return handleError(e); --- End diff -- Oversight from my side.
---