Zakelly commented on code in PR #23679:
URL: https://github.com/apache/flink/pull/23679#discussion_r1387577165


##########
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java:
##########
@@ -842,6 +844,85 @@ private void disposeSavepoint(
         logAndSysout("Savepoint '" + savepointPath + "' disposed.");
     }
 
+    /**
+     * Executes the CHECKPOINT action.
+     *
+     * @param args Command line arguments for the checkpoint action.
+     */
+    protected void checkpoint(String[] args) throws Exception {
+        LOG.info("Running 'checkpoint' command.");
+
+        final Options commandOptions = 
CliFrontendParser.getCheckpointCommandOptions();
+
+        final CommandLine commandLine = getCommandLine(commandOptions, args, 
false);
+
+        final CheckpointOptions checkpointOptions = new 
CheckpointOptions(commandLine);
+
+        // evaluate help flag
+        if (checkpointOptions.isPrintHelp()) {
+            CliFrontendParser.printHelpForCheckpoint(customCommandLines);
+            return;
+        }
+
+        final CustomCommandLine activeCommandLine = 
validateAndGetActiveCommandLine(commandLine);
+
+        String[] cleanedArgs = checkpointOptions.getArgs();
+
+        final JobID jobId;
+
+        if (cleanedArgs.length >= 1) {
+            String jobIdString = cleanedArgs[0];
+
+            jobId = parseJobId(jobIdString);
+        } else {
+            throw new CliArgsException(
+                    "Missing JobID. " + "Specify a Job ID to manipulate a 
checkpoint.");
+        }
+        runClusterAction(
+                activeCommandLine,
+                commandLine,
+                (clusterClient, effectiveConfiguration) ->
+                        triggerCheckpoint(
+                                clusterClient,
+                                jobId,
+                                checkpointOptions.getCheckpointType(),
+                                getClientTimeout(effectiveConfiguration)));
+    }
+
+    /** Sends a CheckpointTriggerMessage to the job manager. */
+    private void triggerCheckpoint(
+            ClusterClient<?> clusterClient,
+            JobID jobId,
+            CheckpointType checkpointType,
+            Duration clientTimeout)
+            throws FlinkException {
+        logAndSysout("Triggering checkpoint for job " + jobId + '.');
+
+        CompletableFuture<Long> checkpointFuture =
+                clusterClient.triggerCheckpoint(jobId, checkpointType);
+
+        logAndSysout("Waiting for response...");
+
+        try {
+            final long checkpointId =
+                    checkpointFuture.get(clientTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+
+            logAndSysout(
+                    "Checkpoint("
+                            + checkpointType
+                            + ") "
+                            + checkpointId
+                            + " for job "
+                            + jobId
+                            + " completed.");
+            logAndSysout("You can resume your program from this checkpoint 
with the run command.");
+        } catch (Exception e) {
+            Throwable cause = ExceptionUtils.stripExecutionException(e);
+            throw new FlinkException(
+                    "Triggering a checkpoint for the job " + jobId + " 
failed.", cause);

Review Comment:
   👍  I also changed the same message in `savepoint` command.



##########
flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java:
##########
@@ -842,6 +844,85 @@ private void disposeSavepoint(
         logAndSysout("Savepoint '" + savepointPath + "' disposed.");
     }
 
+    /**
+     * Executes the CHECKPOINT action.
+     *
+     * @param args Command line arguments for the checkpoint action.
+     */
+    protected void checkpoint(String[] args) throws Exception {
+        LOG.info("Running 'checkpoint' command.");
+
+        final Options commandOptions = 
CliFrontendParser.getCheckpointCommandOptions();
+
+        final CommandLine commandLine = getCommandLine(commandOptions, args, 
false);
+
+        final CheckpointOptions checkpointOptions = new 
CheckpointOptions(commandLine);
+
+        // evaluate help flag
+        if (checkpointOptions.isPrintHelp()) {
+            CliFrontendParser.printHelpForCheckpoint(customCommandLines);
+            return;
+        }
+
+        final CustomCommandLine activeCommandLine = 
validateAndGetActiveCommandLine(commandLine);
+
+        String[] cleanedArgs = checkpointOptions.getArgs();
+
+        final JobID jobId;
+
+        if (cleanedArgs.length >= 1) {
+            String jobIdString = cleanedArgs[0];
+
+            jobId = parseJobId(jobIdString);
+        } else {
+            throw new CliArgsException(
+                    "Missing JobID. " + "Specify a Job ID to manipulate a 
checkpoint.");
+        }
+        runClusterAction(
+                activeCommandLine,
+                commandLine,
+                (clusterClient, effectiveConfiguration) ->
+                        triggerCheckpoint(
+                                clusterClient,
+                                jobId,
+                                checkpointOptions.getCheckpointType(),
+                                getClientTimeout(effectiveConfiguration)));
+    }
+
+    /** Sends a CheckpointTriggerMessage to the job manager. */
+    private void triggerCheckpoint(
+            ClusterClient<?> clusterClient,
+            JobID jobId,
+            CheckpointType checkpointType,
+            Duration clientTimeout)
+            throws FlinkException {
+        logAndSysout("Triggering checkpoint for job " + jobId + '.');
+
+        CompletableFuture<Long> checkpointFuture =
+                clusterClient.triggerCheckpoint(jobId, checkpointType);
+
+        logAndSysout("Waiting for response...");
+
+        try {
+            final long checkpointId =
+                    checkpointFuture.get(clientTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+
+            logAndSysout(
+                    "Checkpoint("
+                            + checkpointType
+                            + ") "
+                            + checkpointId
+                            + " for job "
+                            + jobId
+                            + " completed.");
+            logAndSysout("You can resume your program from this checkpoint 
with the run command.");
+        } catch (Exception e) {
+            Throwable cause = ExceptionUtils.stripExecutionException(e);
+            throw new FlinkException(
+                    "Triggering a checkpoint for the job " + jobId + " 
failed.", cause);

Review Comment:
   👍  I also changed the same message in `savepoint` command.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to