[
https://issues.apache.org/jira/browse/FLINK-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16320363#comment-16320363
]
ASF GitHub Bot commented on FLINK-8332:
---------------------------------------
Github user tillrohrmann commented on a diff in the pull request:
https://github.com/apache/flink/pull/5219#discussion_r160697901
--- Diff:
flink-clients/src/main/java/org/apache/flink/client/CliFrontend.java ---
@@ -659,128 +655,107 @@ protected int savepoint(String[] args) {
return 0;
}
- if (options.isDispose()) {
- // Discard
- return disposeSavepoint(options);
- } else {
- // Trigger
- String[] cleanedArgs = options.getArgs();
- JobID jobId;
+ CustomCommandLine<?> customCommandLine =
getActiveCustomCommandLine(options.getCommandLine());
- if (cleanedArgs.length >= 1) {
- String jobIdString = cleanedArgs[0];
- try {
- jobId = new
JobID(StringUtils.hexStringToByte(jobIdString));
- } catch (Exception e) {
- return handleArgException(new
IllegalArgumentException(
- "Error: The value for
the Job ID is not a valid ID."));
- }
+ ClusterClient clusterClient =
customCommandLine.retrieveCluster(options.getCommandLine(), config,
configurationDirectory);
+
+ try {
+ if (options.isDispose()) {
+ // Discard
+ return disposeSavepoint(clusterClient,
options.getSavepointPath());
} else {
- return handleArgException(new
IllegalArgumentException(
+ // Trigger
+ String[] cleanedArgs = options.getArgs();
+ JobID jobId;
+
+ if (cleanedArgs.length >= 1) {
+ String jobIdString = cleanedArgs[0];
+ try {
+ jobId = new
JobID(StringUtils.hexStringToByte(jobIdString));
+ } catch (Exception e) {
+ return handleArgException(new
IllegalArgumentException(
+ "Error: The value for
the Job ID is not a valid ID."));
+ }
+ } else {
+ return handleArgException(new
IllegalArgumentException(
"Error: The value for the Job
ID is not a valid ID. " +
- "Specify a Job
ID to trigger a savepoint."));
- }
+ "Specify a Job ID to
trigger a savepoint."));
+ }
- String savepointDirectory = null;
- if (cleanedArgs.length >= 2) {
- savepointDirectory = cleanedArgs[1];
- }
+ String savepointDirectory = null;
+ if (cleanedArgs.length >= 2) {
+ savepointDirectory = cleanedArgs[1];
+ }
- // Print superfluous arguments
- if (cleanedArgs.length >= 3) {
- logAndSysout("Provided more arguments than
required. Ignoring not needed arguments.");
- }
+ // Print superfluous arguments
+ if (cleanedArgs.length >= 3) {
+ logAndSysout("Provided more arguments
than required. Ignoring not needed arguments.");
+ }
- return triggerSavepoint(options, jobId,
savepointDirectory);
+ return triggerSavepoint(clusterClient, jobId,
savepointDirectory);
+ }
+ } catch (Exception e) {
+ return handleError(e);
+ } finally {
+ try {
+ clusterClient.shutdown();
+ } catch (Exception e) {
+ LOG.info("Could not shutdown the cluster
client.", e);
+ }
}
}
/**
* Sends a {@link
org.apache.flink.runtime.messages.JobManagerMessages.TriggerSavepoint}
* message to the job manager.
*/
- private int triggerSavepoint(SavepointOptions options, JobID jobId,
String savepointDirectory) {
- try {
- CustomCommandLine<?> activeCommandLine =
getActiveCustomCommandLine(options.getCommandLine());
- ClusterClient client =
activeCommandLine.retrieveCluster(options.getCommandLine(), config,
configurationDirectory);
- try {
- logAndSysout("Triggering savepoint for job " +
jobId + ".");
- CompletableFuture<String> savepointPathFuture =
client.triggerSavepoint(jobId, savepointDirectory);
-
- String savepointPath;
- try {
- logAndSysout("Waiting for response...");
- savepointPath =
savepointPathFuture.get();
- }
- catch (ExecutionException ee) {
- Throwable cause =
ExceptionUtils.stripExecutionException(ee);
- throw new FlinkException("Triggering a
savepoint for the job " + jobId + " failed.", cause);
- }
-
- logAndSysout("Savepoint completed. Path: " +
savepointPath);
- logAndSysout("You can resume your program from
this savepoint with the run command.");
+ private int triggerSavepoint(ClusterClient clusterClient, JobID jobId,
String savepointDirectory) throws Exception {
+ logAndSysout("Triggering savepoint for job " + jobId + ".");
+ CompletableFuture<String> savepointPathFuture =
clusterClient.triggerSavepoint(jobId, savepointDirectory);
- return 0;
- }
- finally {
- client.shutdown();
- }
+ String savepointPath;
+ try {
+ logAndSysout("Waiting for response...");
+ savepointPath = savepointPathFuture.get();
}
- catch (Throwable t) {
- return handleError(t);
+ catch (ExecutionException ee) {
+ Throwable cause =
ExceptionUtils.stripExecutionException(ee);
+ throw new FlinkException("Triggering a savepoint for
the job " + jobId + " failed.", cause);
}
+
+ logAndSysout("Savepoint completed. Path: " + savepointPath);
+ logAndSysout("You can resume your program from this savepoint
with the run command.");
+
+ return 0;
}
/**
* Sends a {@link
org.apache.flink.runtime.messages.JobManagerMessages.DisposeSavepoint}
* message to the job manager.
*/
- private int disposeSavepoint(SavepointOptions options) {
- try {
- String savepointPath = options.getSavepointPath();
- if (savepointPath == null) {
- throw new IllegalArgumentException("Missing
required argument: savepoint path. " +
- "Usage: bin/flink savepoint -d
<savepoint-path>");
- }
-
- ActorGateway jobManager = getJobManagerGateway(options);
+ private int disposeSavepoint(ClusterClient clusterClient, String
savepointPath) {
+ Preconditions.checkNotNull(savepointPath, "Missing required
argument: savepoint path. " +
+ "Usage: bin/flink savepoint -d <savepoint-path>");
- logAndSysout("Disposing savepoint '" + savepointPath +
"'.");
+ logAndSysout("Disposing savepoint '" + savepointPath + "'.");
- Object msg = new DisposeSavepoint(savepointPath);
- Future<Object> response = jobManager.ask(msg,
clientTimeout);
+ CompletableFuture<Acknowledge> disposeFuture = null;
--- End diff --
You're right. Will change it.
> Move dispose savepoint into ClusterClient
> -----------------------------------------
>
> Key: FLINK-8332
> URL: https://issues.apache.org/jira/browse/FLINK-8332
> Project: Flink
> Issue Type: Sub-task
> Components: Client
> Affects Versions: 1.5.0
> Reporter: Till Rohrmann
> Assignee: Till Rohrmann
> Labels: flip-6
> Fix For: 1.5.0
>
>
> Currently, the {{CliFrontend}} sends the command for disposing a savepoint.
> In order to better abstract this functionality we should move it to the
> {{ClusterClient}}. That way we can have different implementations of the
> {{ClusterClient}} (Flip-6 and old code) which are used by the same
> {{CliFrontend}}.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)