This is an automated email from the ASF dual-hosted git repository. hangxiang pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 421f50e38a21974d043278d6161fab50fdb8c3e6 Author: 周仁祥 <[email protected]> AuthorDate: Tue Dec 12 16:17:38 2023 +0800 [FLINK-32881][checkpoint] add detached mode for stop-with-savepoint --- .../org/apache/flink/client/cli/CliFrontend.java | 87 ++++++++++++++++++---- .../apache/flink/client/cli/CliFrontendParser.java | 6 +- .../org/apache/flink/client/cli/StopOptions.java | 9 +++ .../apache/flink/client/program/ClusterClient.java | 19 +++++ .../flink/client/program/MiniClusterClient.java | 10 +++ .../client/program/rest/RestClusterClient.java | 68 ++++++++++------- .../cli/CliFrontendStopWithSavepointTest.java | 30 ++++++++ .../flink/client/program/TestingClusterClient.java | 21 ++++++ .../flink/runtime/minicluster/MiniCluster.java | 20 +++++ .../environment/RemoteStreamEnvironmentTest.java | 9 +++ 10 files changed, 233 insertions(+), 46 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index 9aa1450b46f..60b34f3db9b 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -581,23 +581,26 @@ public class CliFrontend { activeCommandLine, commandLine, (clusterClient, effectiveConfiguration) -> { - final String savepointPath; - try { - savepointPath = - clusterClient - .stopWithSavepoint( - jobId, - advanceToEndOfEventTime, - targetDirectory, - formatType) - .get( - getClientTimeout(effectiveConfiguration).toMillis(), - TimeUnit.MILLISECONDS); - } catch (Exception e) { - throw new FlinkException( - "Could not stop with a savepoint job \"" + jobId + "\".", e); + // Trigger savepoint in detached mode + if (stopOptions.isDetached()) { + // trigger stop-with-savepoint in detached mode and + // return the trigger id immediately + stopWithDetachedSavepoint( + clusterClient, + jobId, + advanceToEndOfEventTime, + targetDirectory, + formatType, + getClientTimeout(effectiveConfiguration)); + } else { + stopWithSavepoint( + clusterClient, + jobId, + advanceToEndOfEventTime, + targetDirectory, + formatType, + getClientTimeout(effectiveConfiguration)); } - logAndSysout("Savepoint completed. Path: " + savepointPath); }); } @@ -804,6 +807,58 @@ public class CliFrontend { } } + /** Sends a SavepointTriggerMessage to the job manager. */ + private void stopWithSavepoint( + ClusterClient<?> clusterClient, + JobID jobId, + boolean advanceToEndOfEventTime, + String targetDirectory, + SavepointFormatType formatType, + Duration clientTimeout) + throws FlinkException { + logAndSysout("Triggering stop-with-savepoint for job " + jobId + '.'); + + CompletableFuture<String> savepointPathFuture = + clusterClient.stopWithSavepoint( + jobId, advanceToEndOfEventTime, targetDirectory, formatType); + + logAndSysout("Waiting for response..."); + + try { + final String savepointPath = + savepointPathFuture.get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS); + + logAndSysout("Savepoint completed. Path: " + savepointPath); + } catch (Exception e) { + throw new FlinkException("Could not stop with a savepoint job \"" + jobId + "\".", e); + } + } + + /** Sends a SavepointTriggerMessage to the job manager in detached mode. */ + private void stopWithDetachedSavepoint( + ClusterClient<?> clusterClient, + JobID jobId, + boolean advanceToEndOfEventTime, + String targetDirectory, + SavepointFormatType formatType, + Duration clientTimeout) + throws FlinkException { + logAndSysout("Triggering stop-with-savepoint in detached mode for job " + jobId + '.'); + try { + final String triggerId = + clusterClient + .stopWithDetachedSavepoint( + jobId, advanceToEndOfEventTime, targetDirectory, formatType) + .get(clientTimeout.toMillis(), TimeUnit.MILLISECONDS); + logAndSysout( + "Successfully trigger stop-with-savepoint in detached mode, triggerId: " + + triggerId); + } catch (Exception e) { + throw new FlinkException( + "Could not stop with a detached savepoint job \"" + jobId + "\".", e); + } + } + /** Sends a SavepointTriggerMessage to the job manager. */ private void triggerSavepoint( ClusterClient<?> clusterClient, diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java index 43491a97ee1..ceca4968d21 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontendParser.java @@ -447,7 +447,8 @@ public class CliFrontendParser { return buildGeneralOptions(new Options()) .addOption(STOP_WITH_SAVEPOINT_PATH) .addOption(STOP_AND_DRAIN) - .addOption(SAVEPOINT_FORMAT_OPTION); + .addOption(SAVEPOINT_FORMAT_OPTION) + .addOption(SAVEPOINT_DETACHED_OPTION); } static Options getSavepointCommandOptions() { @@ -493,7 +494,8 @@ public class CliFrontendParser { private static Options getStopOptionsWithoutDeprecatedOptions(Options options) { return options.addOption(STOP_WITH_SAVEPOINT_PATH) .addOption(STOP_AND_DRAIN) - .addOption(SAVEPOINT_FORMAT_OPTION); + .addOption(SAVEPOINT_FORMAT_OPTION) + .addOption(SAVEPOINT_DETACHED_OPTION); } private static Options getSavepointOptionsWithoutDeprecatedOptions(Options options) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java index 3363e1a9090..7c981fb53b2 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/StopOptions.java @@ -23,6 +23,7 @@ import org.apache.flink.core.execution.SavepointFormatType; import org.apache.commons.cli.CommandLine; +import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_DETACHED_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.SAVEPOINT_FORMAT_OPTION; import static org.apache.flink.client.cli.CliFrontendParser.STOP_AND_DRAIN; import static org.apache.flink.client.cli.CliFrontendParser.STOP_WITH_SAVEPOINT_PATH; @@ -41,6 +42,8 @@ class StopOptions extends CommandLineOptions { private final SavepointFormatType formatType; + private boolean isDetached; + StopOptions(CommandLine line) { super(line); this.args = line.getArgs(); @@ -58,6 +61,8 @@ class StopOptions extends CommandLineOptions { } else { formatType = SavepointFormatType.DEFAULT; } + + this.isDetached = line.hasOption(SAVEPOINT_DETACHED_OPTION.getOpt()); } String[] getArgs() { @@ -79,4 +84,8 @@ class StopOptions extends CommandLineOptions { public SavepointFormatType getFormatType() { return formatType; } + + public boolean isDetached() { + return isDetached; + } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java index f25946971c3..30c126d1645 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/ClusterClient.java @@ -166,6 +166,25 @@ public interface ClusterClient<T> extends AutoCloseable { @Nullable final String savepointDirectory, final SavepointFormatType formatType); + /** + * Stops a program on Flink cluster whose job-manager is configured in this client's + * configuration. Stopping works only for streaming programs. Be aware, that the program might + * continue to run for a while after sending the stop command, because after sources stopped to + * emit data all operators need to finish processing. + * + * @param jobId the job ID of the streaming program to stop + * @param advanceToEndOfEventTime flag indicating if the source should inject a {@code + * MAX_WATERMARK} in the pipeline + * @param savepointDirectory directory the savepoint should be written to + * @param formatType a binary format of the savepoint + * @return the savepoint trigger id + */ + CompletableFuture<String> stopWithDetachedSavepoint( + final JobID jobId, + final boolean advanceToEndOfEventTime, + @Nullable final String savepointDirectory, + final SavepointFormatType formatType); + /** * Triggers a savepoint for the job identified by the job id. The savepoint will be written to * the given savepoint directory, or {@link diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java index 889fa8b8202..d8c64dd9eb3 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/MiniClusterClient.java @@ -104,6 +104,16 @@ public class MiniClusterClient implements ClusterClient<MiniClusterClient.MiniCl jobId, savepointDirectory, advanceToEndOfEventTime, formatType); } + @Override + public CompletableFuture<String> stopWithDetachedSavepoint( + JobID jobId, + boolean advanceToEndOfEventTime, + @Nullable String savepointDirectory, + SavepointFormatType formatType) { + return miniCluster.stopWithDetachedSavepoint( + jobId, savepointDirectory, advanceToEndOfEventTime, formatType); + } + @Override public CompletableFuture<String> triggerSavepoint( JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) { diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java index 06587021bcb..6fa83259671 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/rest/RestClusterClient.java @@ -505,35 +505,16 @@ public class RestClusterClient<T> implements ClusterClient<T> { final boolean advanceToEndOfTime, @Nullable final String savepointDirectory, final SavepointFormatType formatType) { + return stopWithSavepoint(jobId, advanceToEndOfTime, savepointDirectory, formatType, false); + } - final StopWithSavepointTriggerHeaders stopWithSavepointTriggerHeaders = - StopWithSavepointTriggerHeaders.getInstance(); - - final SavepointTriggerMessageParameters stopWithSavepointTriggerMessageParameters = - stopWithSavepointTriggerHeaders.getUnresolvedMessageParameters(); - stopWithSavepointTriggerMessageParameters.jobID.resolve(jobId); - - final CompletableFuture<TriggerResponse> responseFuture = - sendRequest( - stopWithSavepointTriggerHeaders, - stopWithSavepointTriggerMessageParameters, - new StopWithSavepointRequestBody( - savepointDirectory, advanceToEndOfTime, formatType, null)); - - return responseFuture - .thenCompose( - savepointTriggerResponseBody -> { - final TriggerId savepointTriggerId = - savepointTriggerResponseBody.getTriggerId(); - return pollSavepointAsync(jobId, savepointTriggerId); - }) - .thenApply( - savepointInfo -> { - if (savepointInfo.getFailureCause() != null) { - throw new CompletionException(savepointInfo.getFailureCause()); - } - return savepointInfo.getLocation(); - }); + @Override + public CompletableFuture<String> stopWithDetachedSavepoint( + final JobID jobId, + final boolean advanceToEndOfTime, + @Nullable final String savepointDirectory, + final SavepointFormatType formatType) { + return stopWithSavepoint(jobId, advanceToEndOfTime, savepointDirectory, formatType, true); } @Override @@ -619,6 +600,30 @@ public class RestClusterClient<T> implements ClusterClient<T> { }); } + public CompletableFuture<String> stopWithSavepoint( + final JobID jobId, + final boolean advanceToEndOfTime, + @Nullable final String savepointDirectory, + final SavepointFormatType formatType, + final boolean isDetachedMode) { + + final StopWithSavepointTriggerHeaders stopWithSavepointTriggerHeaders = + StopWithSavepointTriggerHeaders.getInstance(); + + final SavepointTriggerMessageParameters stopWithSavepointTriggerMessageParameters = + stopWithSavepointTriggerHeaders.getUnresolvedMessageParameters(); + stopWithSavepointTriggerMessageParameters.jobID.resolve(jobId); + + final CompletableFuture<TriggerResponse> responseFuture = + sendRequest( + stopWithSavepointTriggerHeaders, + stopWithSavepointTriggerMessageParameters, + new StopWithSavepointRequestBody( + savepointDirectory, advanceToEndOfTime, formatType, null)); + + return getSavepointTriggerFuture(jobId, isDetachedMode, responseFuture); + } + private CompletableFuture<String> triggerSavepoint( final JobID jobId, final @Nullable String savepointDirectory, @@ -638,6 +643,13 @@ public class RestClusterClient<T> implements ClusterClient<T> { new SavepointTriggerRequestBody( savepointDirectory, cancelJob, formatType, null)); + return getSavepointTriggerFuture(jobId, isDetachedMode, responseFuture); + } + + private CompletableFuture<String> getSavepointTriggerFuture( + JobID jobId, + boolean isDetachedMode, + CompletableFuture<TriggerResponse> responseFuture) { CompletableFuture<String> futureResult; if (isDetachedMode) { // we just return the savepoint trigger id in detached savepoint, diff --git a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java index 927b9123519..66fa420e662 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java +++ b/flink-clients/src/test/java/org/apache/flink/client/cli/CliFrontendStopWithSavepointTest.java @@ -24,6 +24,7 @@ import org.apache.flink.client.program.TestingClusterClient; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.execution.SavepointFormatType; import org.apache.flink.core.testutils.OneShotLatch; +import org.apache.flink.runtime.rest.messages.TriggerId; import org.apache.flink.util.FlinkException; import org.apache.flink.util.concurrent.FutureUtils; @@ -33,6 +34,8 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.CsvSource; +import java.io.ByteArrayOutputStream; +import java.io.PrintStream; import java.util.Collections; import java.util.concurrent.CompletableFuture; @@ -142,6 +145,33 @@ class CliFrontendStopWithSavepointTest extends CliFrontendTestBase { stopWithSavepointLatch.await(); } + @Test + void testStopWithDetachedSavepoint() throws Exception { + ByteArrayOutputStream buffer = new ByteArrayOutputStream(); + System.setOut(new PrintStream(buffer)); + + JobID jid = new JobID(); + + String[] parameters = {"-detached", jid.toString()}; + OneShotLatch stopWithSavepointLatch = new OneShotLatch(); + TestingClusterClient<String> clusterClient = new TestingClusterClient<>(); + String savepointTriggerId = new TriggerId().toString(); + clusterClient.setStopWithDetachedSavepointFunction( + (jobID, advanceToEndOfEventTime, savepointDirectory, formatType) -> { + assertThat(jobID).isEqualTo(jid); + assertThat(advanceToEndOfEventTime).isFalse(); + assertThat(savepointDirectory).isNull(); + stopWithSavepointLatch.trigger(); + return CompletableFuture.completedFuture(savepointTriggerId); + }); + MockedCliFrontend testFrontend = new MockedCliFrontend(clusterClient); + testFrontend.stop(parameters); + + stopWithSavepointLatch.await(); + // the savepoint trigger id will output to the stdout in detached mode. + assertThat(buffer.toString()).contains(savepointTriggerId); + } + @Test void testStopOnlyWithMaxWM() throws Exception { JobID jid = new JobID(); diff --git a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java index 0649ff3e777..261b62b0564 100644 --- a/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java +++ b/flink-clients/src/test/java/org/apache/flink/client/program/TestingClusterClient.java @@ -56,6 +56,11 @@ public class TestingClusterClient<T> implements ClusterClient<T> { stopWithSavepointFunction = (ignore1, ignore2, savepointPath, formatType) -> CompletableFuture.completedFuture(savepointPath); + private QuadFunction<JobID, Boolean, String, SavepointFormatType, CompletableFuture<String>> + stopWithDetachedSavepointFunction = + (ignore1, ignore2, savepointPath, formatType) -> + CompletableFuture.completedFuture(new TriggerId().toString()); + private TriFunction<JobID, String, SavepointFormatType, CompletableFuture<String>> triggerSavepointFunction = (ignore, savepointPath, formatType) -> @@ -84,6 +89,12 @@ public class TestingClusterClient<T> implements ClusterClient<T> { this.stopWithSavepointFunction = stopWithSavepointFunction; } + public void setStopWithDetachedSavepointFunction( + QuadFunction<JobID, Boolean, String, SavepointFormatType, CompletableFuture<String>> + stopWithDetachedSavepointFunction) { + this.stopWithDetachedSavepointFunction = stopWithDetachedSavepointFunction; + } + public void setTriggerSavepointFunction( TriFunction<JobID, String, SavepointFormatType, CompletableFuture<String>> triggerSavepointFunction) { @@ -172,6 +183,16 @@ public class TestingClusterClient<T> implements ClusterClient<T> { jobId, advanceToEndOfEventTime, savepointDirectory, formatType); } + @Override + public CompletableFuture<String> stopWithDetachedSavepoint( + JobID jobId, + boolean advanceToEndOfEventTime, + @org.jetbrains.annotations.Nullable String savepointDirectory, + SavepointFormatType formatType) { + return stopWithDetachedSavepointFunction.apply( + jobId, advanceToEndOfEventTime, savepointDirectory, formatType); + } + @Override public CompletableFuture<String> triggerSavepoint( JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) { diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index cc8b6bb91e8..740d7e5c701 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -940,6 +940,26 @@ public class MiniCluster implements AutoCloseableAsync { rpcTimeout)); } + public CompletableFuture<String> stopWithDetachedSavepoint( + JobID jobId, + String targetDirectory, + boolean terminate, + SavepointFormatType formatType) { + return runDispatcherCommand( + dispatcherGateway -> { + dispatcherGateway.stopWithSavepointAndGetLocation( + jobId, + targetDirectory, + formatType, + terminate + ? TriggerSavepointMode.TERMINATE_WITH_SAVEPOINT + : TriggerSavepointMode.SUSPEND_WITH_SAVEPOINT, + rpcTimeout); + // return immediately, no need to wait for the future savepoint path + return CompletableFuture.completedFuture(""); + }); + } + public CompletableFuture<Acknowledge> disposeSavepoint(String savepointPath) { return runDispatcherCommand( dispatcherGateway -> dispatcherGateway.disposeSavepoint(savepointPath, rpcTimeout)); diff --git a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java index 2f89d3cafbf..22a7b027bc3 100644 --- a/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java +++ b/flink-tests/src/test/java/org/apache/flink/test/streaming/api/environment/RemoteStreamEnvironmentTest.java @@ -269,6 +269,15 @@ public class RemoteStreamEnvironmentTest extends TestLogger { return null; } + @Override + public CompletableFuture<String> stopWithDetachedSavepoint( + JobID jobId, + boolean advanceToEndOfEventTime, + @org.jetbrains.annotations.Nullable String savepointDirectory, + SavepointFormatType formatType) { + return null; + } + @Override public CompletableFuture<String> triggerSavepoint( JobID jobId, @Nullable String savepointDirectory, SavepointFormatType formatType) {
