This is an automated email from the ASF dual-hosted git repository.
shengkai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7bce4854fac [FLINK-28360][sql-client] Support stop job statement in
SQL client (#20159)
7bce4854fac is described below
commit 7bce4854faccc0914951832d1dda101afce0105a
Author: Paul Lin <[email protected]>
AuthorDate: Mon Oct 10 19:21:09 2022 +0800
[FLINK-28360][sql-client] Support stop job statement in SQL client (#20159)
---
.../apache/flink/table/client/cli/CliClient.java | 21 ++++
.../apache/flink/table/client/cli/CliStrings.java | 5 +
.../flink/table/client/gateway/Executor.java | 6 ++
.../client/gateway/context/ExecutionContext.java | 11 ++
.../table/client/gateway/local/LocalExecutor.java | 112 +++++++++++++++++++
.../flink/table/client/cli/CliClientTest.java | 76 +++++++++++++
.../flink/table/client/cli/CliResultViewTest.java | 8 ++
.../flink/table/client/cli/TestingExecutor.java | 8 ++
.../client/gateway/local/LocalExecutorITCase.java | 118 ++++++++++++++++-----
.../src/main/codegen/data/Parser.tdd | 8 ++
.../src/main/codegen/includes/parserImpls.ftl | 45 ++++++++
.../apache/flink/sql/parser/ddl/SqlStopJob.java | 95 +++++++++++++++++
.../flink/sql/parser/utils/ParserResource.java | 3 +
.../flink/sql/parser/FlinkSqlParserImplTest.java | 12 +++
.../table/operations/command/StopJobOperation.java | 62 +++++++++++
.../operations/SqlToOperationConverter.java | 9 ++
16 files changed, 573 insertions(+), 26 deletions(-)
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
index f6ade74ae70..fb4b3ffdafd 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliClient.java
@@ -48,6 +48,7 @@ import
org.apache.flink.table.operations.command.QuitOperation;
import org.apache.flink.table.operations.command.RemoveJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
+import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.ddl.AlterOperation;
import org.apache.flink.table.operations.ddl.CreateOperation;
import org.apache.flink.table.operations.ddl.DropOperation;
@@ -468,6 +469,9 @@ public class CliClient implements AutoCloseable {
} else if (operation instanceof CreateTableASOperation) {
// CTAS
callInsert((CreateTableASOperation) operation);
+ } else if (operation instanceof StopJobOperation) {
+ // STOP JOB
+ callStopJob((StopJobOperation) operation);
} else {
// fallback to default implementation
executeOperation(operation);
@@ -635,6 +639,23 @@ public class CliClient implements AutoCloseable {
}
}
+ private void callStopJob(StopJobOperation stopJobOperation) {
+ Optional<String> savepoint =
+ executor.stopJob(
+ sessionId,
+ stopJobOperation.getJobId(),
+ stopJobOperation.isWithSavepoint(),
+ stopJobOperation.isWithDrain());
+ if (stopJobOperation.isWithSavepoint()) {
+ Preconditions.checkState(savepoint.isPresent());
+ printInfo(
+ String.format(
+
CliStrings.MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT, savepoint.get()));
+ } else {
+ printInfo(CliStrings.MESSAGE_STOP_JOB_STATEMENT);
+ }
+ }
+
private void executeOperation(Operation operation) {
TableResultInternal result = executor.executeOperation(sessionId,
operation);
if (TABLE_RESULT_OK == result) {
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
index a8160711aac..43a8782e69f 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/cli/CliStrings.java
@@ -246,6 +246,11 @@ public final class CliStrings {
public static final String MESSAGE_REMOVE_JAR_STATEMENT =
"The specified jar is removed from session classloader.";
+ public static final String MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT =
+ "The specified job is stopped with savepoint %s.";
+
+ public static final String MESSAGE_STOP_JOB_STATEMENT = "The specified job
is stopped.";
+
//
--------------------------------------------------------------------------------------------
public static final String RESULT_TITLE = "SQL Query Result";
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
index c4a022e1d0b..0dbe999fd45 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/Executor.java
@@ -30,6 +30,7 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/** A gateway for communicating with Flink and other external systems. */
public interface Executor {
@@ -143,4 +144,9 @@ public interface Executor {
/** Remove the JAR resource from the classloader with specified session. */
void removeJar(String sessionId, String jarPath);
+
+ /** Stops a job in the specified session. */
+ Optional<String> stopJob(
+ String sessionId, String jobId, boolean isWithSavepoint, boolean
isWithDrain)
+ throws SqlExecutionException;
}
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
index c2bf0b1d725..d172de81f2a 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/context/ExecutionContext.java
@@ -36,8 +36,10 @@ import org.apache.flink.table.factories.PlannerFactoryUtil;
import org.apache.flink.table.module.ModuleManager;
import org.apache.flink.table.resource.ResourceManager;
import org.apache.flink.util.MutableURLClassLoader;
+import org.apache.flink.util.TemporaryClassLoaderContext;
import java.lang.reflect.Method;
+import java.util.function.Supplier;
import static
org.apache.flink.table.client.gateway.context.SessionContext.SessionState;
@@ -167,4 +169,13 @@ public class ExecutionContext {
e);
}
}
+
+ /**
+ * Executes the given supplier using the execution context's classloader
as thread classloader.
+ */
+ public <R> R wrapClassLoader(Supplier<R> supplier) {
+ try (TemporaryClassLoaderContext ignored =
TemporaryClassLoaderContext.of(classLoader)) {
+ return supplier.get();
+ }
+ }
}
diff --git
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
index f5e1c630863..125d12667aa 100644
---
a/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
+++
b/flink-table/flink-sql-client/src/main/java/org/apache/flink/table/client/gateway/local/LocalExecutor.java
@@ -19,7 +19,16 @@
package org.apache.flink.table.client.gateway.local;
import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.client.cli.ClientOptions;
+import org.apache.flink.client.deployment.ClusterClientFactory;
+import org.apache.flink.client.deployment.ClusterClientServiceLoader;
+import org.apache.flink.client.deployment.ClusterDescriptor;
+import org.apache.flink.client.deployment.DefaultClusterClientServiceLoader;
+import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.core.execution.SavepointFormatType;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentInternal;
import org.apache.flink.table.api.internal.TableResultInternal;
@@ -38,17 +47,22 @@ import org.apache.flink.table.delegation.Parser;
import org.apache.flink.table.operations.ModifyOperation;
import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
+import org.apache.flink.util.FlinkException;
+import org.apache.flink.util.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
+import java.time.Duration;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
import static
org.apache.flink.table.client.cli.CliStrings.MESSAGE_SQL_EXECUTION_ERROR;
import static org.apache.flink.util.Preconditions.checkArgument;
@@ -69,11 +83,14 @@ public class LocalExecutor implements Executor {
private final ResultStore resultStore;
private final DefaultContext defaultContext;
+ private final ClusterClientServiceLoader clusterClientServiceLoader;
+
/** Creates a local executor for submitting table programs and retrieving
results. */
public LocalExecutor(DefaultContext defaultContext) {
this.contextMap = new ConcurrentHashMap<>();
this.resultStore = new ResultStore();
this.defaultContext = defaultContext;
+ this.clusterClientServiceLoader = new
DefaultClusterClientServiceLoader();
}
@Override
@@ -307,4 +324,99 @@ public class LocalExecutor implements Executor {
final SessionContext context = getSessionContext(sessionId);
context.removeJar(jarUrl);
}
+
+ @Override
+ public Optional<String> stopJob(
+ String sessionId, String jobId, boolean isWithSavepoint, boolean
isWithDrain)
+ throws SqlExecutionException {
+ Duration clientTimeout =
getSessionConfig(sessionId).get(ClientOptions.CLIENT_TIMEOUT);
+ try {
+ return runClusterAction(
+ sessionId,
+ clusterClient -> {
+ if (isWithSavepoint) {
+ // blocking get savepoint path
+ try {
+ String savepoint =
+ clusterClient
+ .stopWithSavepoint(
+
JobID.fromHexString(jobId),
+ isWithDrain,
+ null,
+
SavepointFormatType.DEFAULT)
+ .get(
+
clientTimeout.toMillis(),
+ TimeUnit.MILLISECONDS);
+ return Optional.of(savepoint);
+ } catch (Exception e) {
+ throw new FlinkException(
+ "Could not stop job "
+ + jobId
+ + " in session "
+ + sessionId
+ + ".",
+ e);
+ }
+ } else {
+ clusterClient.cancel(JobID.fromHexString(jobId));
+ return Optional.empty();
+ }
+ });
+ } catch (Exception e) {
+ throw new SqlExecutionException(
+ "Could not stop job " + jobId + " in session " + sessionId
+ ".", e);
+ }
+ }
+
+ /**
+ * Retrieves the {@link ClusterClient} from the session and runs the given
{@link ClusterAction}
+ * against it.
+ *
+ * @param sessionId the specified session ID
+ * @param clusterAction the cluster action to run against the retrieved
{@link ClusterClient}.
+ * @param <ClusterID> type of the cluster id
+ * @param <Result>> type of the result
+ * @throws FlinkException if something goes wrong
+ */
+ private <ClusterID, Result> Result runClusterAction(
+ String sessionId, ClusterAction<ClusterID, Result> clusterAction)
+ throws FlinkException {
+ final SessionContext context = getSessionContext(sessionId);
+ final Configuration configuration = (Configuration)
context.getReadableConfig();
+ final ClusterClientFactory<ClusterID> clusterClientFactory =
+ context.getExecutionContext()
+ .wrapClassLoader(
+ () ->
+
clusterClientServiceLoader.getClusterClientFactory(
+ configuration));
+
+ final ClusterID clusterId =
clusterClientFactory.getClusterId(configuration);
+ Preconditions.checkNotNull(clusterId, "No cluster ID found for session
" + sessionId);
+
+ try (final ClusterDescriptor<ClusterID> clusterDescriptor =
+
clusterClientFactory.createClusterDescriptor(configuration);
+ final ClusterClient<ClusterID> clusterClient =
+
clusterDescriptor.retrieve(clusterId).getClusterClient()) {
+ return clusterAction.runAction(clusterClient);
+ }
+ }
+
+ /**
+ * Internal interface to encapsulate cluster actions which are executed
via the {@link
+ * ClusterClient}.
+ *
+ * @param <ClusterID> type of the cluster id
+ * @param <Result>> type of the result
+ */
+ @FunctionalInterface
+ private interface ClusterAction<ClusterID, Result> {
+
+ /**
+ * Run the cluster action with the given {@link ClusterClient}.
+ *
+ * @param clusterClient to run the cluster action against
+ * @throws FlinkException if something goes wrong
+ */
+ Result runAction(ClusterClient<ClusterID> clusterClient) throws
FlinkException;
+ }
}
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
index a1b6e0069f6..b4c4e595fe8 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliClientTest.java
@@ -70,6 +70,9 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
import static
org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC;
import static
org.apache.flink.table.client.cli.CliClient.DEFAULT_TERMINAL_FACTORY;
@@ -356,6 +359,58 @@ public class CliClientTest extends TestLogger {
}
}
+ @Test(timeout = 10000)
+ public void testStopJob() throws Exception {
+ final MockExecutor mockExecutor = new MockExecutor();
+ mockExecutor.isSync = false;
+
+ String sessionId = mockExecutor.openSession("test-session");
+ OutputStream outputStream = new ByteArrayOutputStream(256);
+ try (CliClient client =
+ new CliClient(
+ () -> TerminalUtils.createDumbTerminal(outputStream),
+ sessionId,
+ mockExecutor,
+ historyTempFile(),
+ null)) {
+ client.executeInNonInteractiveMode(INSERT_INTO_STATEMENT);
+ String dmlResult = outputStream.toString();
+ String jobId = extractJobId(dmlResult);
+ client.executeInNonInteractiveMode("STOP JOB '" + jobId + "'");
+ String stopResult = outputStream.toString();
+
assertThat(stopResult).contains(CliStrings.MESSAGE_STOP_JOB_STATEMENT);
+ }
+ }
+
+ @Test(timeout = 10000)
+ public void testStopJobWithSavepoint() throws Exception {
+ final MockExecutor mockExecutor = new MockExecutor();
+ mockExecutor.isSync = false;
+ final String mockSavepoint = "/my/savepoint/path";
+ mockExecutor.savepoint = mockSavepoint;
+
+ String sessionId = mockExecutor.openSession("test-session");
+ OutputStream outputStream = new ByteArrayOutputStream(256);
+ try (CliClient client =
+ new CliClient(
+ () -> TerminalUtils.createDumbTerminal(outputStream),
+ sessionId,
+ mockExecutor,
+ historyTempFile(),
+ null)) {
+ client.executeInNonInteractiveMode(INSERT_INTO_STATEMENT);
+ String dmlResult = outputStream.toString();
+ String jobId = extractJobId(dmlResult);
+ client.executeInNonInteractiveMode("STOP JOB '" + jobId + "' WITH
SAVEPOINT");
+ String stopResult = outputStream.toString();
+ assertThat(stopResult)
+ .contains(
+ String.format(
+
CliStrings.MESSAGE_STOP_JOB_WITH_SAVEPOINT_STATEMENT,
+ mockSavepoint));
+ }
+ }
+
//
--------------------------------------------------------------------------------------------
private void verifyUpdateSubmission(
@@ -419,11 +474,21 @@ public class CliClientTest extends TestLogger {
return outputStream.toString();
}
+ private String extractJobId(String result) {
+ Pattern pattern = Pattern.compile("[\\s\\S]*Job ID: (.*)[\\s\\S]*");
+ Matcher matcher = pattern.matcher(result);
+ if (!matcher.matches()) {
+ throw new IllegalStateException("No job ID found in string: " +
result);
+ }
+ return matcher.group(1);
+ }
+
//
--------------------------------------------------------------------------------------------
private static class MockExecutor implements Executor {
public boolean failExecution;
+ public String savepoint;
public volatile boolean isSync = false;
public volatile boolean isAwait = false;
@@ -595,5 +660,16 @@ public class CliClientTest extends TestLogger {
public void removeJar(String sessionId, String jarUrl) {
throw new UnsupportedOperationException("Not implemented.");
}
+
+ @Override
+ public Optional<String> stopJob(
+ String sessionId, String jobId, boolean isWithSavepoint,
boolean isWithDrain)
+ throws SqlExecutionException {
+ if (isWithSavepoint) {
+ return Optional.of(savepoint);
+ } else {
+ return Optional.empty();
+ }
+ }
}
}
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
index e2b7bb8e25e..bba5f9cb5c2 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/CliResultViewTest.java
@@ -49,6 +49,7 @@ import java.nio.file.Path;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -245,6 +246,13 @@ public class CliResultViewTest {
public void removeJar(String sessionId, String jarUrl) {
throw new UnsupportedOperationException("Not implemented.");
}
+
+ @Override
+ public Optional<String> stopJob(
+ String sessionId, String jobId, boolean isWithSavepoint,
boolean isWithDrain)
+ throws SqlExecutionException {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
}
private static final class TestingCliResultView implements Runnable {
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
index b4c68f6951c..55c6413a976 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/cli/TestingExecutor.java
@@ -35,6 +35,7 @@ import javax.annotation.Nullable;
import java.util.List;
import java.util.Map;
+import java.util.Optional;
/** A customizable {@link Executor} for testing purposes. */
class TestingExecutor implements Executor {
@@ -71,6 +72,13 @@ class TestingExecutor implements Executor {
throw new UnsupportedOperationException("Not implemented.");
}
+ @Override
+ public Optional<String> stopJob(
+ String sessionId, String jobId, boolean isWithSavepoint, boolean
isWithDrain)
+ throws SqlExecutionException {
+ throw new UnsupportedOperationException("Not implemented.");
+ }
+
@Override
public TypedResult<List<RowData>> retrieveResultChanges(String sessionId,
String resultId)
throws SqlExecutionException {
diff --git
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
index 7c7a5062136..73b03112fcf 100644
---
a/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
+++
b/flink-table/flink-sql-client/src/test/java/org/apache/flink/table/client/gateway/local/LocalExecutorITCase.java
@@ -19,16 +19,22 @@
package org.apache.flink.table.client.gateway.local;
+import org.apache.flink.api.common.JobID;
+import org.apache.flink.api.common.JobStatus;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.client.cli.DefaultCLI;
import org.apache.flink.client.program.ClusterClient;
+import org.apache.flink.configuration.CheckpointingOptions;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.StateBackendOptions;
import org.apache.flink.configuration.TaskManagerOptions;
import org.apache.flink.configuration.WebOptions;
+import org.apache.flink.core.execution.JobClient;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.api.config.TableConfigOptions;
import org.apache.flink.table.client.config.ResultMode;
import org.apache.flink.table.client.gateway.Executor;
import org.apache.flink.table.client.gateway.ResultDescriptor;
@@ -41,16 +47,19 @@ import org.apache.flink.table.operations.Operation;
import org.apache.flink.table.operations.QueryOperation;
import org.apache.flink.table.utils.UserDefinedFunctions;
import org.apache.flink.table.utils.print.RowDataToStringConverter;
-import org.apache.flink.test.util.MiniClusterWithClientResource;
+import org.apache.flink.test.junit5.InjectClusterClient;
+import org.apache.flink.test.junit5.MiniClusterExtension;
import org.apache.flink.test.util.TestBaseUtils;
import org.apache.flink.util.StringUtils;
import org.apache.flink.util.TestLogger;
import org.apache.flink.util.UserClassLoaderJarTestUtils;
-import org.junit.BeforeClass;
-import org.junit.ClassRule;
-import org.junit.Test;
-import org.junit.rules.TemporaryFolder;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.Order;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+import org.junit.jupiter.api.extension.RegisterExtension;
+import org.junit.jupiter.api.io.TempDir;
import java.io.File;
import java.io.IOException;
@@ -63,6 +72,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
+import java.util.Optional;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import java.util.stream.Stream;
@@ -81,28 +91,33 @@ public class LocalExecutorITCase extends TestLogger {
private static final int NUM_TMS = 2;
private static final int NUM_SLOTS_PER_TM = 2;
- @ClassRule public static TemporaryFolder tempFolder = new
TemporaryFolder();
-
- @ClassRule
- public static final MiniClusterWithClientResource MINI_CLUSTER_RESOURCE =
- new MiniClusterWithClientResource(
- new MiniClusterResourceConfiguration.Builder()
- .setConfiguration(getConfig())
- .setNumberTaskManagers(NUM_TMS)
- .setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
- .build());
+ @TempDir
+ @Order(1)
+ public static File tempFolder;
+
+ @RegisterExtension
+ @Order(2)
+ public static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
+ new MiniClusterExtension(
+ () ->
+ new MiniClusterResourceConfiguration.Builder()
+ .setConfiguration(getConfig())
+ .setNumberTaskManagers(NUM_TMS)
+
.setNumberSlotsPerTaskManager(NUM_SLOTS_PER_TM)
+ .build());
private static ClusterClient<?> clusterClient;
// a generated UDF jar used for testing classloading of dependencies
private static URL udfDependency;
- @BeforeClass
- public static void setup() throws IOException {
- clusterClient = MINI_CLUSTER_RESOURCE.getClusterClient();
+ @BeforeAll
+ public static void setup(@InjectClusterClient ClusterClient<?>
injectedClusterClient)
+ throws Exception {
+ clusterClient = injectedClusterClient;
File udfJar =
UserClassLoaderJarTestUtils.createJarFile(
- tempFolder.newFolder("test-jar"),
+ tempFolder,
"test-classloader-udf.jar",
GENERATED_LOWER_UDF_CLASS,
String.format(GENERATED_LOWER_UDF_CODE,
GENERATED_LOWER_UDF_CLASS));
@@ -115,6 +130,10 @@ public class LocalExecutorITCase extends TestLogger {
config.setInteger(ConfigConstants.LOCAL_NUMBER_TASK_MANAGER, NUM_TMS);
config.setInteger(TaskManagerOptions.NUM_TASK_SLOTS, NUM_SLOTS_PER_TM);
config.setBoolean(WebOptions.SUBMIT_ENABLE, false);
+ config.set(StateBackendOptions.STATE_BACKEND, "hashmap");
+ config.set(CheckpointingOptions.CHECKPOINT_STORAGE, "filesystem");
+ config.set(CheckpointingOptions.CHECKPOINTS_DIRECTORY,
tempFolder.toURI().toString());
+ config.set(CheckpointingOptions.SAVEPOINT_DIRECTORY,
tempFolder.toURI().toString());
return config;
}
@@ -144,7 +163,8 @@ public class LocalExecutorITCase extends TestLogger {
executor.closeSession(sessionId);
}
- @Test(timeout = 90_000L)
+ @Test
+ @Timeout(value = 90)
public void testStreamQueryExecutionChangelog() throws Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
@@ -191,7 +211,8 @@ public class LocalExecutorITCase extends TestLogger {
}
}
- @Test(timeout = 90_000L)
+ @Test
+ @Timeout(value = 90)
public void testStreamQueryExecutionChangelogMultipleTimes() throws
Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
@@ -240,7 +261,8 @@ public class LocalExecutorITCase extends TestLogger {
}
}
- @Test(timeout = 90_000L)
+ @Test
+ @Timeout(value = 90)
public void testStreamQueryExecutionTable() throws Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
@@ -265,7 +287,8 @@ public class LocalExecutorITCase extends TestLogger {
executeStreamQueryTable(replaceVars, configMap, query,
expectedResults);
}
- @Test(timeout = 90_000L)
+ @Test
+ @Timeout(value = 90)
public void testStreamQueryExecutionTableMultipleTimes() throws Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
@@ -291,7 +314,8 @@ public class LocalExecutorITCase extends TestLogger {
}
}
- @Test(timeout = 90_000L)
+ @Test
+ @Timeout(value = 90)
public void testStreamQueryExecutionLimitedTable() throws Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
@@ -312,7 +336,8 @@ public class LocalExecutorITCase extends TestLogger {
executeStreamQueryTable(replaceVars, configMap, query,
expectedResults);
}
- @Test(timeout = 90_000L)
+ @Test
+ @Timeout(value = 90)
public void testBatchQueryExecution() throws Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
@@ -358,7 +383,8 @@ public class LocalExecutorITCase extends TestLogger {
}
}
- @Test(timeout = 90_000L)
+ @Test
+ @Timeout(value = 90)
public void testBatchQueryExecutionMultipleTimes() throws Exception {
final URL url =
getClass().getClassLoader().getResource("test-data.csv");
Objects.requireNonNull(url);
@@ -406,6 +432,46 @@ public class LocalExecutorITCase extends TestLogger {
}
}
+ @Test
+ @Timeout(value = 90)
+ public void testStopJob() throws Exception {
+ final Map<String, String> configMap = new HashMap<>();
+ configMap.put(EXECUTION_RESULT_MODE.key(), ResultMode.TABLE.name());
+ configMap.put(RUNTIME_MODE.key(),
RuntimeExecutionMode.STREAMING.name());
+ configMap.put(TableConfigOptions.TABLE_DML_SYNC.key(), "false");
+
+ final LocalExecutor executor =
+ createLocalExecutor(
+ Collections.singletonList(udfDependency),
Configuration.fromMap(configMap));
+ String sessionId = executor.openSession("test-session");
+
+ final String srcDdl = "CREATE TABLE src (a STRING) WITH ('connector' =
'datagen')";
+ final String snkDdl = "CREATE TABLE snk (a STRING) WITH ('connector' =
'blackhole')";
+ final String insert = "INSERT INTO snk SELECT a FROM src;";
+
+ try {
+ executor.executeOperation(sessionId,
executor.parseStatement(sessionId, srcDdl));
+ executor.executeOperation(sessionId,
executor.parseStatement(sessionId, snkDdl));
+ TableResult result =
+ executor.executeOperation(
+ sessionId, executor.parseStatement(sessionId,
insert));
+ JobClient jobClient = result.getJobClient().get();
+ JobID jobId = jobClient.getJobID();
+
+ // wait till the job turns into running status or the test times
out
+ JobStatus jobStatus;
+ do {
+ Thread.sleep(2_000L);
+ jobStatus = jobClient.getJobStatus().get();
+ } while (jobStatus != JobStatus.RUNNING);
+
+ Optional<String> savepoint = executor.stopJob(sessionId,
jobId.toString(), true, true);
+ assertThat(savepoint.isPresent()).isTrue();
+ } finally {
+ executor.closeSession(sessionId);
+ }
+ }
+
//
--------------------------------------------------------------------------------------------
// Helper method
//
--------------------------------------------------------------------------------------------
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 8c87f645d02..a775b699480 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -102,6 +102,7 @@
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
"org.apache.flink.sql.parser.dql.SqlUnloadModule"
"org.apache.flink.sql.parser.expr.SqlUnresolvedTryCastFunction"
+ "org.apache.flink.sql.parser.ddl.SqlStopJob"
"org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec"
"org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec"
"org.apache.flink.sql.parser.type.SqlMapTypeNameSpec"
@@ -132,6 +133,7 @@
"COMPILE"
"COLUMNS"
"DATABASES"
+ "DRAIN"
"ENFORCED"
"ESTIMATED_COST"
"EXTENDED"
@@ -140,6 +142,7 @@
"JSON_EXECUTION_PLAN"
"JAR"
"JARS"
+ "JOB"
"LOAD"
"METADATA"
"MODIFY"
@@ -153,6 +156,7 @@
"REMOVE"
"RENAME"
"SCALA"
+ "STOP"
"STRING"
"TABLES"
"UNLOAD"
@@ -244,6 +248,7 @@
"DOMAIN"
"DOW"
"DOY"
+ "DRAIN"
"DYNAMIC_FUNCTION"
"DYNAMIC_FUNCTION_CODE"
"ENCODING"
@@ -285,6 +290,7 @@
"ISOYEAR"
"JAR"
"JARS"
+ "JOB"
"JAVA"
"JSON"
"K"
@@ -444,6 +450,7 @@
"SQL_VARBINARY"
"SQL_VARCHAR"
"STATE"
+ "STOP"
"STRUCTURE"
"STYLE"
"SUBCLASS_ORIGIN"
@@ -557,6 +564,7 @@
"SqlSet()"
"SqlReset()"
"SqlAnalyzeTable()"
+ "SqlStopJob()"
]
# List of methods for parsing custom literals.
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 19677305cd2..797248abf79 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -2325,3 +2325,48 @@ SqlNode SqlAnalyzeTable():
return new SqlAnalyzeTable(s.end(this), tableName, partitionSpec,
columns, allColumns);
}
}
+
+/**
+* Parses a STOP JOB statement:
+* STOP JOB <JOB_ID> [<WITH SAVEPOINT>] [<WITH DRAIN>];
+*/
+SqlStopJob SqlStopJob() :
+{
+ SqlCharStringLiteral jobId;
+ boolean isWithSavepoint = false;
+ boolean isWithDrain = false;
+ final Span span;
+}
+{
+ <STOP> <JOB> <QUOTED_STRING>
+ {
+ String id = SqlParserUtil.parseString(token.image);
+ jobId = SqlLiteral.createCharString(id, getPos());
+ }
+ [
+ LOOKAHEAD(2)
+ <WITH> <SAVEPOINT>
+ {
+ isWithSavepoint = true;
+ }
+ ]
+ [
+ LOOKAHEAD(2)
+ <WITH>
+ {
+ span = span();
+ }
+ <DRAIN>
+ {
+ span.end(this);
+ if (!isWithSavepoint) {
+ throw SqlUtil.newContextException(span.pos(),
+ ParserResource.RESOURCE.withDrainOnlyUsedWithSavepoint());
+ }
+ isWithDrain = true;
+ }
+ ]
+ {
+ return new SqlStopJob(getPos(), jobId, isWithSavepoint, isWithDrain);
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlStopJob.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlStopJob.java
new file mode 100644
index 00000000000..a50a03d8154
--- /dev/null
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlStopJob.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.NlsString;
+
+import javax.annotation.Nonnull;
+
+import java.util.Collections;
+import java.util.List;
+
+/** The command to stop a flink job. */
+public class SqlStopJob extends SqlCall {
+
+ public static final SqlOperator OPERATOR =
+ new SqlSpecialOperator("STOP JOB", SqlKind.OTHER_DDL);
+
+ private final SqlCharStringLiteral jobId;
+
+ private final boolean isWithDrain;
+
+ private final boolean isWithSavepoint;
+
+ public SqlStopJob(
+ SqlParserPos pos,
+ SqlCharStringLiteral jobId,
+ boolean isWithSavepoint,
+ boolean isWithDrain) {
+ super(pos);
+ this.jobId = jobId;
+ this.isWithSavepoint = isWithSavepoint;
+ this.isWithDrain = isWithDrain;
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("STOP");
+ writer.keyword("JOB");
+ jobId.unparse(writer, leftPrec, rightPrec);
+ if (isWithSavepoint) {
+ writer.keyword("WITH SAVEPOINT");
+ }
+ if (isWithDrain) {
+ writer.keyword("WITH DRAIN");
+ }
+ }
+
+ @Nonnull
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Nonnull
+ @Override
+ public List<SqlNode> getOperandList() {
+ return Collections.singletonList(jobId);
+ }
+
+ public String getId() {
+ return jobId.getValueAs(NlsString.class).getValue();
+ }
+
+ public boolean isWithSavepoint() {
+ return isWithSavepoint;
+ }
+
+ public boolean isWithDrain() {
+ return isWithDrain;
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 8fa96b90ae2..8a63777f543 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -43,4 +43,7 @@ public interface ParserResource {
@Resources.BaseMessage("CREATE FUNCTION USING JAR syntax is not applicable
to {0} language.")
Resources.ExInst<ParseException> createFunctionUsingJar(String language);
+
+ @Resources.BaseMessage("WITH DRAIN could only be used after WITH
SAVEPOINT.")
+ Resources.ExInst<ParseException> withDrainOnlyUsedWithSavepoint();
}
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index b6dd3fdf74e..27d57acc88f 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1950,6 +1950,18 @@ class FlinkSqlParserImplTest extends SqlParserTest {
"CREATE TABLE AS SELECT syntax does
not support to create partitioned table yet."));
}
+ @Test
+ void testStopJob() {
+ sql("STOP JOB 'myjob'").ok("STOP JOB 'myjob'");
+ sql("STOP JOB 'myjob' WITH SAVEPOINT").ok("STOP JOB 'myjob' WITH
SAVEPOINT");
+ sql("STOP JOB 'myjob' WITH SAVEPOINT WITH DRAIN")
+ .ok("STOP JOB 'myjob' WITH SAVEPOINT WITH DRAIN");
+ sql("STOP JOB 'myjob' ^WITH DRAIN^")
+ .fails("WITH DRAIN could only be used after WITH SAVEPOINT.");
+ sql("STOP JOB 'myjob' ^WITH DRAIN^ WITH SAVEPOINT")
+ .fails("WITH DRAIN could only be used after WITH SAVEPOINT.");
+ }
+
public static BaseMatcher<SqlNode> validated(String validatedSql) {
return new TypeSafeDiagnosingMatcher<SqlNode>() {
@Override
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/StopJobOperation.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/StopJobOperation.java
new file mode 100644
index 00000000000..dcb56c20ab6
--- /dev/null
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/operations/command/StopJobOperation.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.operations.command;
+
+import org.apache.flink.table.operations.Operation;
+
+/** Operation to stop a running job. */
+public class StopJobOperation implements Operation {
+
+ private final String jobId;
+
+ private final boolean isWithSavepoint;
+
+ private final boolean isWithDrain;
+
+ public StopJobOperation(String jobId, boolean isWithSavepoint, boolean
isWithDrain) {
+ this.jobId = jobId;
+ this.isWithSavepoint = isWithSavepoint;
+ this.isWithDrain = isWithDrain;
+ }
+
+ public String getJobId() {
+ return jobId;
+ }
+
+ public boolean isWithSavepoint() {
+ return isWithSavepoint;
+ }
+
+ public boolean isWithDrain() {
+ return isWithDrain;
+ }
+
+ @Override
+ public String asSummaryString() {
+ StringBuilder summary = new StringBuilder("STOP JOB ");
+ summary.append("'").append(jobId).append("'");
+ if (isWithSavepoint) {
+ summary.append(" WITH SAVEPOINT");
+ }
+ if (isWithDrain) {
+ summary.append(" WITH DRAIN");
+ }
+ return summary.toString();
+ }
+}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 2e137b3ebeb..c1860e5e9c1 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -52,6 +52,7 @@ import org.apache.flink.sql.parser.ddl.SqlDropView;
import org.apache.flink.sql.parser.ddl.SqlRemoveJar;
import org.apache.flink.sql.parser.ddl.SqlReset;
import org.apache.flink.sql.parser.ddl.SqlSet;
+import org.apache.flink.sql.parser.ddl.SqlStopJob;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
import org.apache.flink.sql.parser.ddl.SqlUseCatalog;
import org.apache.flink.sql.parser.ddl.SqlUseDatabase;
@@ -154,6 +155,7 @@ import
org.apache.flink.table.operations.command.RemoveJarOperation;
import org.apache.flink.table.operations.command.ResetOperation;
import org.apache.flink.table.operations.command.SetOperation;
import org.apache.flink.table.operations.command.ShowJarsOperation;
+import org.apache.flink.table.operations.command.StopJobOperation;
import org.apache.flink.table.operations.ddl.AddPartitionsOperation;
import org.apache.flink.table.operations.ddl.AlterCatalogFunctionOperation;
import org.apache.flink.table.operations.ddl.AlterDatabaseOperation;
@@ -374,6 +376,8 @@ public class SqlToOperationConverter {
return Optional.of(converter.convertSqlQuery(validated));
} else if (validated instanceof SqlAnalyzeTable) {
return Optional.of(converter.convertAnalyzeTable((SqlAnalyzeTable)
validated));
+ } else if (validated instanceof SqlStopJob) {
+ return Optional.of(converter.convertStopJob((SqlStopJob)
validated));
} else {
return Optional.empty();
}
@@ -1467,6 +1471,11 @@ public class SqlToOperationConverter {
return new ValueLiteralExpression(value, dataType.notNull());
}
+ private Operation convertStopJob(SqlStopJob sqlStopJob) {
+ return new StopJobOperation(
+ sqlStopJob.getId(), sqlStopJob.isWithSavepoint(),
sqlStopJob.isWithDrain());
+ }
+
private void validateTableConstraint(SqlTableConstraint constraint) {
if (constraint.isUnique()) {
throw new UnsupportedOperationException("UNIQUE constraint is not
supported yet");