This is an automated email from the ASF dual-hosted git repository.

hangxiang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 421f50e38a21974d043278d6161fab50fdb8c3e6
Author: 周仁祥 <[email protected]>
AuthorDate: Tue Dec 12 16:17:38 2023 +0800

    [FLINK-32881][checkpoint] add detached mode for stop-with-savepoint
---
 .../org/apache/flink/client/cli/CliFrontend.java   | 87 ++++++++++++++++++----
 .../apache/flink/client/cli/CliFrontendParser.java |  6 +-
 .../org/apache/flink/client/cli/StopOptions.java   |  9 +++
 .../apache/flink/client/program/ClusterClient.java | 19 +++++
 .../flink/client/program/MiniClusterClient.java    | 10 +++
 .../client/program/rest/RestClusterClient.java     | 68 ++++++++++-------
 .../cli/CliFrontendStopWithSavepointTest.java      | 30 ++++++++
 .../flink/client/program/TestingClusterClient.java | 21 ++++++
 .../flink/runtime/minicluster/MiniCluster.java     | 20 +++++
 .../environment/RemoteStreamEnvironmentTest.java   |  9 +++
 10 files changed, 233 insertions(+), 46 deletions(-)

diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
index 9aa1450b46f..60b34f3db9b 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java
@@ -581,23 +581,26 @@ public class CliFrontend {
                 activeCommandLine,
                 commandLine,
                 (clusterClient, effectiveConfiguration) -> {
-                    final String savepointPath;
-                    try {
-                        savepointPath =
-                                clusterClient
-                                        .stopWithSavepoint(
-                                                jobId,
-                                                advanceToEndOfEventTime,
-                                                targetDirectory,
-                                                formatType)
-                                        .get(
-                                                
getClientTimeout(effectiveConfiguration).toMillis(),
-                                                TimeUnit.MILLISECONDS);
-                    } catch (Exception e) {
-                        throw new FlinkException(
-                                "Could not stop with a savepoint job \"" + 
jobId + "\".", e);
+                    // Trigger savepoint in detached mode
+                    if (stopOptions.isDetached()) {
+                        // trigger stop-with-savepoint in detached mode and
+                        // return the trigger id immediately
+                        stopWithDetachedSavepoint(
+                                clusterClient,
+                                jobId,
+                                advanceToEndOfEventTime,
+                                targetDirectory,
+                                formatType,
+                                getClientTimeout(effectiveConfiguration));
+                    } else {
+                        stopWithSavepoint(
+                                clusterClient,
+                                jobId,
+                                advanceToEndOfEventTime,
+                                targetDirectory,
+                                formatType,
+                                getClientTimeout(effectiveConfiguration));
                     }
-                    logAndSysout("Savepoint completed. Path: " + 
savepointPath);
                 });
     }
 
@@ -804,6 +807,58 @@ public class CliFrontend {
         }
     }
 
+    /** Sends a SavepointTriggerMessage to the job manager. */
+    private void stopWithSavepoint(
+            ClusterClient<?> clusterClient,
+            JobID jobId,
+            boolean advanceToEndOfEventTime,
+            String targetDirectory,
+            SavepointFormatType formatType,
+            Duration clientTimeout)
+            throws FlinkException {
+        logAndSysout("Triggering stop-with-savepoint for job " + jobId + '.');
+
+        CompletableFuture<String> savepointPathFuture =
+                clusterClient.stopWithSavepoint(
+                        jobId, advanceToEndOfEventTime, targetDirectory, 
formatType);
+
+        logAndSysout("Waiting for response...");
+
+        try {
+            final String savepointPath =
+                    savepointPathFuture.get(clientTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+
+            logAndSysout("Savepoint completed. Path: " + savepointPath);
+        } catch (Exception e) {
+            throw new FlinkException("Could not stop with a savepoint job \"" 
+ jobId + "\".", e);
+        }
+    }
+
+    /** Sends a SavepointTriggerMessage to the job manager in detached mode. */
+    private void stopWithDetachedSavepoint(
+            ClusterClient<?> clusterClient,
+            JobID jobId,
+            boolean advanceToEndOfEventTime,
+            String targetDirectory,
+            SavepointFormatType formatType,
+            Duration clientTimeout)
+            throws FlinkException {
+        logAndSysout("Triggering stop-with-savepoint in detached mode for job 
" + jobId + '.');
+        try {
+            final String triggerId =
+                    clusterClient
+                            .stopWithDetachedSavepoint(
+                                    jobId, advanceToEndOfEventTime, 
targetDirectory, formatType)
+                            .get(clientTimeout.toMillis(), 
TimeUnit.MILLISECONDS);
+            logAndSysout(
+                    "Successfully trigger stop-with-savepoint in detached 
mode, triggerId: "
+                            + triggerId);
+        } catch (Exception e) {
+            throw new FlinkException(
+                    "Could not stop with a detached savepoint job \"" + jobId 
+ "\".", e);
+        }
+    }
+
     /** Sends a SavepointTriggerMessage to the job manager. */
     private void triggerSavepoint(
             ClusterClient<?> clusterClient,
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
index 43491a97ee1..ceca4968d21 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java
@@ -447,7 +447,8 @@ public class CliFrontendParser {
         return buildGeneralOptions(new Options())
                 .addOption(STOP_WITH_SAVEPOINT_PATH)
                 .addOption(STOP_AND_DRAIN)
-                .addOption(SAVEPOINT_FORMAT_OPTION);
+                .addOption(SAVEPOINT_FORMAT_OPTION)
+                .addOption(SAVEPOINT_DETACHED_OPTION);
     }
 
     static Options getSavepointCommandOptions() {
@@ -493,7 +494,8 @@ public class CliFrontendParser {
     private static Options getStopOptionsWithoutDeprecatedOptions(Options 
options) {
         return options.addOption(STOP_WITH_SAVEPOINT_PATH)
                 .addOption(STOP_AND_DRAIN)
-                .addOption(SAVEPOINT_FORMAT_OPTION);
+                .addOption(SAVEPOINT_FORMAT_OPTION)
+                .addOption(SAVEPOINT_DETACHED_OPTION);
     }
 
     private static Options getSavepointOptionsWithoutDeprecatedOptions(Options 
options) {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java 
b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
index 3363e1a9090..7c981fb53b2 100644
--- a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
+++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java
@@ -23,6 +23,7 @@ import org.apache.flink.core.execution.SavepointFormatType;
 
 import org.apache.commons.cli.CommandLine;
 
+import static 
org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DETACHED_OPTION;
 import static 
org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_FORMAT_OPTION;
 import static org.apache.flink.client.cli.CliFrontendParser.STOP_AND_DRAIN;
 import static 
org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT_PATH;
@@ -41,6 +42,8 @@ class StopOptions extends CommandLineOptions {
 
     private final SavepointFormatType formatType;
 
+    private boolean isDetached;
+
     StopOptions(CommandLine line) {
         super(line);
         this.args = line.getArgs();
@@ -58,6 +61,8 @@ class StopOptions extends CommandLineOptions {
         } else {
             formatType = SavepointFormatType.DEFAULT;
         }
+
+        this.isDetached = line.hasOption(SAVEPOINT_DETACHED_OPTION.getOpt());
     }
 
     String[] getArgs() {
@@ -79,4 +84,8 @@ class StopOptions extends CommandLineOptions {
     public SavepointFormatType getFormatType() {
         return formatType;
     }
+
+    public boolean isDetached() {
+        return isDetached;
+    }
 }
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
index f25946971c3..30c126d1645 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java
@@ -166,6 +166,25 @@ public interface ClusterClient<T> extends AutoCloseable {
             @Nullable final String savepointDirectory,
             final SavepointFormatType formatType);
 
+    /**
+     * Stops a program on Flink cluster whose job-manager is configured in 
this client's
+     * configuration. Stopping works only for streaming programs. Be aware, 
that the program might
+     * continue to run for a while after sending the stop command, because 
after sources stopped to
+     * emit data all operators need to finish processing.
+     *
+     * @param jobId the job ID of the streaming program to stop
+     * @param advanceToEndOfEventTime flag indicating if the source should 
inject a {@code
+     *     MAX_WATERMARK} in the pipeline
+     * @param savepointDirectory directory the savepoint should be written to
+     * @param formatType a binary format of the savepoint
+     * @return the savepoint trigger id
+     */
+    CompletableFuture<String> stopWithDetachedSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfEventTime,
+            @Nullable final String savepointDirectory,
+            final SavepointFormatType formatType);
+
     /**
      * Triggers a savepoint for the job identified by the job id. The 
savepoint will be written to
      * the given savepoint directory, or {@link
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
index 889fa8b8202..d8c64dd9eb3 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java
@@ -104,6 +104,16 @@ public class MiniClusterClient implements 
ClusterClient<MiniClusterClient.MiniCl
                 jobId, savepointDirectory, advanceToEndOfEventTime, 
formatType);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithDetachedSavepoint(
+            JobID jobId,
+            boolean advanceToEndOfEventTime,
+            @Nullable String savepointDirectory,
+            SavepointFormatType formatType) {
+        return miniCluster.stopWithDetachedSavepoint(
+                jobId, savepointDirectory, advanceToEndOfEventTime, 
formatType);
+    }
+
     @Override
     public CompletableFuture<String> triggerSavepoint(
             JobID jobId, @Nullable String savepointDirectory, 
SavepointFormatType formatType) {
diff --git 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
index 06587021bcb..6fa83259671 100644
--- 
a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
+++ 
b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java
@@ -505,35 +505,16 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
             final boolean advanceToEndOfTime,
             @Nullable final String savepointDirectory,
             final SavepointFormatType formatType) {
+        return stopWithSavepoint(jobId, advanceToEndOfTime, 
savepointDirectory, formatType, false);
+    }
 
-        final StopWithSavepointTriggerHeaders stopWithSavepointTriggerHeaders =
-                StopWithSavepointTriggerHeaders.getInstance();
-
-        final SavepointTriggerMessageParameters 
stopWithSavepointTriggerMessageParameters =
-                
stopWithSavepointTriggerHeaders.getUnresolvedMessageParameters();
-        stopWithSavepointTriggerMessageParameters.jobID.resolve(jobId);
-
-        final CompletableFuture<TriggerResponse> responseFuture =
-                sendRequest(
-                        stopWithSavepointTriggerHeaders,
-                        stopWithSavepointTriggerMessageParameters,
-                        new StopWithSavepointRequestBody(
-                                savepointDirectory, advanceToEndOfTime, 
formatType, null));
-
-        return responseFuture
-                .thenCompose(
-                        savepointTriggerResponseBody -> {
-                            final TriggerId savepointTriggerId =
-                                    
savepointTriggerResponseBody.getTriggerId();
-                            return pollSavepointAsync(jobId, 
savepointTriggerId);
-                        })
-                .thenApply(
-                        savepointInfo -> {
-                            if (savepointInfo.getFailureCause() != null) {
-                                throw new 
CompletionException(savepointInfo.getFailureCause());
-                            }
-                            return savepointInfo.getLocation();
-                        });
+    @Override
+    public CompletableFuture<String> stopWithDetachedSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfTime,
+            @Nullable final String savepointDirectory,
+            final SavepointFormatType formatType) {
+        return stopWithSavepoint(jobId, advanceToEndOfTime, 
savepointDirectory, formatType, true);
     }
 
     @Override
@@ -619,6 +600,30 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                         });
     }
 
+    public CompletableFuture<String> stopWithSavepoint(
+            final JobID jobId,
+            final boolean advanceToEndOfTime,
+            @Nullable final String savepointDirectory,
+            final SavepointFormatType formatType,
+            final boolean isDetachedMode) {
+
+        final StopWithSavepointTriggerHeaders stopWithSavepointTriggerHeaders =
+                StopWithSavepointTriggerHeaders.getInstance();
+
+        final SavepointTriggerMessageParameters 
stopWithSavepointTriggerMessageParameters =
+                
stopWithSavepointTriggerHeaders.getUnresolvedMessageParameters();
+        stopWithSavepointTriggerMessageParameters.jobID.resolve(jobId);
+
+        final CompletableFuture<TriggerResponse> responseFuture =
+                sendRequest(
+                        stopWithSavepointTriggerHeaders,
+                        stopWithSavepointTriggerMessageParameters,
+                        new StopWithSavepointRequestBody(
+                                savepointDirectory, advanceToEndOfTime, 
formatType, null));
+
+        return getSavepointTriggerFuture(jobId, isDetachedMode, 
responseFuture);
+    }
+
     private CompletableFuture<String> triggerSavepoint(
             final JobID jobId,
             final @Nullable String savepointDirectory,
@@ -638,6 +643,13 @@ public class RestClusterClient<T> implements 
ClusterClient<T> {
                         new SavepointTriggerRequestBody(
                                 savepointDirectory, cancelJob, formatType, 
null));
 
+        return getSavepointTriggerFuture(jobId, isDetachedMode, 
responseFuture);
+    }
+
+    private CompletableFuture<String> getSavepointTriggerFuture(
+            JobID jobId,
+            boolean isDetachedMode,
+            CompletableFuture<TriggerResponse> responseFuture) {
         CompletableFuture<String> futureResult;
         if (isDetachedMode) {
             // we just return the savepoint trigger id in detached savepoint,
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
index 927b9123519..66fa420e662 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java
@@ -24,6 +24,7 @@ import org.apache.flink.client.program.TestingClusterClient;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.core.execution.SavepointFormatType;
 import org.apache.flink.core.testutils.OneShotLatch;
+import org.apache.flink.runtime.rest.messages.TriggerId;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.concurrent.FutureUtils;
 
@@ -33,6 +34,8 @@ import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.CsvSource;
 
+import java.io.ByteArrayOutputStream;
+import java.io.PrintStream;
 import java.util.Collections;
 import java.util.concurrent.CompletableFuture;
 
@@ -142,6 +145,33 @@ class CliFrontendStopWithSavepointTest extends 
CliFrontendTestBase {
         stopWithSavepointLatch.await();
     }
 
+    @Test
+    void testStopWithDetachedSavepoint() throws Exception {
+        ByteArrayOutputStream buffer = new ByteArrayOutputStream();
+        System.setOut(new PrintStream(buffer));
+
+        JobID jid = new JobID();
+
+        String[] parameters = {"-detached", jid.toString()};
+        OneShotLatch stopWithSavepointLatch = new OneShotLatch();
+        TestingClusterClient<String> clusterClient = new 
TestingClusterClient<>();
+        String savepointTriggerId = new TriggerId().toString();
+        clusterClient.setStopWithDetachedSavepointFunction(
+                (jobID, advanceToEndOfEventTime, savepointDirectory, 
formatType) -> {
+                    assertThat(jobID).isEqualTo(jid);
+                    assertThat(advanceToEndOfEventTime).isFalse();
+                    assertThat(savepointDirectory).isNull();
+                    stopWithSavepointLatch.trigger();
+                    return 
CompletableFuture.completedFuture(savepointTriggerId);
+                });
+        MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient);
+        testFrontend.stop(parameters);
+
+        stopWithSavepointLatch.await();
+        // the savepoint trigger id will output to the stdout in detached mode.
+        assertThat(buffer.toString()).contains(savepointTriggerId);
+    }
+
     @Test
     void testStopOnlyWithMaxWM() throws Exception {
         JobID jid = new JobID();
diff --git 
a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
 
b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
index 0649ff3e777..261b62b0564 100644
--- 
a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
+++ 
b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java
@@ -56,6 +56,11 @@ public class TestingClusterClient<T> implements 
ClusterClient<T> {
             stopWithSavepointFunction =
                     (ignore1, ignore2, savepointPath, formatType) ->
                             CompletableFuture.completedFuture(savepointPath);
+    private QuadFunction<JobID, Boolean, String, SavepointFormatType, 
CompletableFuture<String>>
+            stopWithDetachedSavepointFunction =
+                    (ignore1, ignore2, savepointPath, formatType) ->
+                            CompletableFuture.completedFuture(new 
TriggerId().toString());
+
     private TriFunction<JobID, String, SavepointFormatType, 
CompletableFuture<String>>
             triggerSavepointFunction =
                     (ignore, savepointPath, formatType) ->
@@ -84,6 +89,12 @@ public class TestingClusterClient<T> implements 
ClusterClient<T> {
         this.stopWithSavepointFunction = stopWithSavepointFunction;
     }
 
+    public void setStopWithDetachedSavepointFunction(
+            QuadFunction<JobID, Boolean, String, SavepointFormatType, 
CompletableFuture<String>>
+                    stopWithDetachedSavepointFunction) {
+        this.stopWithDetachedSavepointFunction = 
stopWithDetachedSavepointFunction;
+    }
+
     public void setTriggerSavepointFunction(
             TriFunction<JobID, String, SavepointFormatType, 
CompletableFuture<String>>
                     triggerSavepointFunction) {
@@ -172,6 +183,16 @@ public class TestingClusterClient<T> implements 
ClusterClient<T> {
                 jobId, advanceToEndOfEventTime, savepointDirectory, 
formatType);
     }
 
+    @Override
+    public CompletableFuture<String> stopWithDetachedSavepoint(
+            JobID jobId,
+            boolean advanceToEndOfEventTime,
+            @org.jetbrains.annotations.Nullable String savepointDirectory,
+            SavepointFormatType formatType) {
+        return stopWithDetachedSavepointFunction.apply(
+                jobId, advanceToEndOfEventTime, savepointDirectory, 
formatType);
+    }
+
     @Override
     public CompletableFuture<String> triggerSavepoint(
             JobID jobId, @Nullable String savepointDirectory, 
SavepointFormatType formatType) {
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
index cc8b6bb91e8..740d7e5c701 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java
@@ -940,6 +940,26 @@ public class MiniCluster implements AutoCloseableAsync {
                                 rpcTimeout));
     }
 
+    public CompletableFuture<String> stopWithDetachedSavepoint(
+            JobID jobId,
+            String targetDirectory,
+            boolean terminate,
+            SavepointFormatType formatType) {
+        return runDispatcherCommand(
+                dispatcherGateway -> {
+                    dispatcherGateway.stopWithSavepointAndGetLocation(
+                            jobId,
+                            targetDirectory,
+                            formatType,
+                            terminate
+                                    ? 
TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT
+                                    : 
TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT,
+                            rpcTimeout);
+                    // return immediately, no need to wait for the future 
savepoint path
+                    return CompletableFuture.completedFuture("");
+                });
+    }
+
     public CompletableFuture<Acknowledge> disposeSavepoint(String 
savepointPath) {
         return runDispatcherCommand(
                 dispatcherGateway -> 
dispatcherGateway.disposeSavepoint(savepointPath, rpcTimeout));
diff --git 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
index 2f89d3cafbf..22a7b027bc3 100644
--- 
a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
+++ 
b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java
@@ -269,6 +269,15 @@ public class RemoteStreamEnvironmentTest extends 
TestLogger {
             return null;
         }
 
+        @Override
+        public CompletableFuture<String> stopWithDetachedSavepoint(
+                JobID jobId,
+                boolean advanceToEndOfEventTime,
+                @org.jetbrains.annotations.Nullable String savepointDirectory,
+                SavepointFormatType formatType) {
+            return null;
+        }
+
         @Override
         public CompletableFuture<String> triggerSavepoint(
                 JobID jobId, @Nullable String savepointDirectory, 
SavepointFormatType formatType) {

Reply via email to