This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 805bc718a init base sql gateway module (#2267)
805bc718a is described below
commit 805bc718a5e1f5a2720c4c91de846d06fe90b83a
Author: gongzhongqiang <[email protected]>
AuthorDate: Wed May 10 12:57:05 2023 +0800
init base sql gateway module (#2267)
Co-authored-by: gongzhongqiang <[email protected]>
---
pom.xml | 1 +
streampark-flink/pom.xml | 1 +
.../streampark-flink-sql-gateway/pom.xml | 34 +++
.../streampark-flink-sql-gateway-base/pom.xml | 101 ++++++++
.../apache/streampark/gateway/ConfigOption.java | 256 +++++++++++++++++++++
.../streampark/gateway/ExecutionConfiguration.java | 22 ++
.../apache/streampark/gateway/OperationHandle.java | 58 +++++
.../apache/streampark/gateway/OperationStatus.java | 85 +++++++
.../gateway/exception/SqlGatewayException.java | 35 +++
.../gateway/exception/ValidationException.java | 35 +++
.../streampark/gateway/factories/Factory.java | 31 +++
.../streampark/gateway/factories/FactoryUtil.java | 117 ++++++++++
.../gateway/factories/ServiceLoaderUtil.java | 82 +++++++
.../factories/SqlGatewayServiceFactory.java | 38 +++
.../factories/SqlGatewayServiceFactoryUtils.java | 153 ++++++++++++
.../apache/streampark/gateway/results/Column.java | 86 +++++++
.../gateway/results/FetchOrientation.java | 28 +++
.../streampark/gateway/results/FunctionInfo.java | 22 ++
.../streampark/gateway/results/GatewayInfo.java | 75 ++++++
.../apache/streampark/gateway/results/JobID.java | 59 +++++
.../gateway/results/ObjectIdentifier.java | 87 +++++++
.../streampark/gateway/results/OperationInfo.java | 68 ++++++
.../streampark/gateway/results/ResultKind.java | 33 +++
.../gateway/results/ResultQueryCondition.java | 90 ++++++++
.../streampark/gateway/results/ResultSet.java | 172 ++++++++++++++
.../apache/streampark/gateway/results/RowData.java | 64 ++++++
.../streampark/gateway/results/TableInfo.java | 62 +++++
.../streampark/gateway/results/TableKind.java | 23 ++
.../gateway/service/SqlGatewayService.java | 147 ++++++++++++
.../gateway/session/SessionEnvironment.java | 87 +++++++
.../streampark/gateway/session/SessionHandle.java | 61 +++++
.../service/SqlGatewayServiceFactoryUtilsTest.java | 189 +++++++++++++++
.../gateway/utils/FakeSqlGatewayService.java | 101 ++++++++
.../utils/FakeSqlGatewayServiceFactory.java | 71 ++++++
.../gateway/utils/MockedSqlGatewayService.java | 129 +++++++++++
.../utils/MockedSqlGatewayServiceFactory.java | 71 ++++++
...org.apache.streampark.gateway.factories.Factory | 20 ++
37 files changed, 2794 insertions(+)
diff --git a/pom.xml b/pom.xml
index b17a6820a..4fa03bd2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,7 @@
<commons-net.version>3.9.0</commons-net.version>
<commons-lang3.version>3.8.1</commons-lang3.version>
<enumeratum.version>1.6.1</enumeratum.version>
+ <assertj.version>3.23.1</assertj.version>
<maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
<maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 0ae28211d..0445b9c81 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -38,6 +38,7 @@
<module>streampark-flink-proxy</module>
<module>streampark-flink-packer</module>
<module>streampark-flink-kubernetes</module>
+ <module>streampark-flink-sql-gateway</module>
</modules>
<dependencies>
diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml
b/streampark-flink/streampark-flink-sql-gateway/pom.xml
new file mode 100644
index 000000000..fe1824b59
--- /dev/null
+++ b/streampark-flink/streampark-flink-sql-gateway/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streampark-flink-sql-gateway</artifactId>
+ <name>StreamPark : SQL Gateway Parent</name>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>streampark-flink-sql-gateway-base</module>
+ </modules>
+
+</project>
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
new file mode 100644
index 000000000..b2f5a6eb7
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-sql-gateway</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streampark-flink-sql-gateway-base</artifactId>
+ <name>StreamPark : Sql Gateway Base</name>
+
+ <dependencies>
+
+ <!-- test -->
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-engine</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.junit.jupiter</groupId>
+ <artifactId>junit-jupiter-params</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
+
+ <!-- This manages the 'javax.annotation' annotations (JSR305) -->
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>3.0.2</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+
+ </dependencies>
+
+
+</project>
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
new file mode 100644
index 000000000..c6e89a643
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Map;
+
+/** @param <T> */
+public class ConfigOption<T> {
+
+ private static final String EMPTY_DESCRIPTION = "";
+
+ // ------------------------------------------------------------------------
+
+ /** The current key for that config option. */
+ private final String key;
+
+ /** The default value for this option. */
+ private final T defaultValue;
+
+ /** The description for this option. */
+ private final String description;
+
+ /**
+ * Type of the value that this ConfigOption describes.
+ *
+ * <ul>
+ * <li>typeClass == atomic class (e.g. {@code Integer.class}) -> {@code
ConfigOption<Integer>}
+ * <li>typeClass == {@code Map.class} -> {@code ConfigOption<Map<String,
String>>}
+ * <li>typeClass == atomic class and isList == true for {@code
ConfigOption<List<Integer>>}
+ * </ul>
+ */
+ private final Class<?> clazz;
+
+ /**
+ * Creates a new config option.
+ *
+ * @param key The current key for that config option
+ * @param defaultValue The default value for this option
+ * @param description Description for that option
+ * @param clazz describes type of the ConfigOption, see description of the
clazz field
+ */
+ private ConfigOption(String key, T defaultValue, String description,
Class<?> clazz) {
+ this.key = key;
+ this.defaultValue = defaultValue;
+ this.description = description;
+ this.clazz = clazz;
+ }
+
+ /**
+ * Creates a new config option, using this option's key and default value,
and adding the given
+ * description. The given description is used when generation the
configuration documentation.
+ *
+ * @param description The description for this option.
+ * @return A new config option, with given description.
+ */
+ public ConfigOption<T> withDescription(final String description) {
+ return new ConfigOption<>(key, defaultValue, description, clazz);
+ }
+
+ /**
+ * Starts building a new {@link ConfigOption}.
+ *
+ * @param key The key for the config option.
+ * @return The builder for the config option with the given key.
+ */
+ public static OptionBuilder key(String key) {
+ checkNotNull(key);
+ return new OptionBuilder(key);
+ }
+
+ public String getKey() {
+ return key;
+ }
+
+ public boolean hasDefaultValue() {
+ return defaultValue != null;
+ }
+
+ public T getDefaultValue() {
+ return defaultValue;
+ }
+
+ public String getDescription() {
+ return description;
+ }
+
+ public Class<?> getClazz() {
+ return clazz;
+ }
+
+ /**
+ * The option builder is used to create a {@link ConfigOption}. It is
instantiated via {@link
+ * ConfigOption#key(String)}.
+ */
+ public static final class OptionBuilder {
+ /**
+ * Workaround to reuse the {@link TypedConfigOptionBuilder} for a {@link
Map Map<String,
+ * String>}.
+ */
+ @SuppressWarnings("unchecked")
+ private static final Class<Map<String, String>> PROPERTIES_MAP_CLASS =
+ (Class<Map<String, String>>) (Class<?>) Map.class;
+
+ /** The key for the config option. */
+ private final String key;
+
+ /**
+ * Creates a new OptionBuilder.
+ *
+ * @param key The key for the config option
+ */
+ OptionBuilder(String key) {
+ this.key = key;
+ }
+
+ /** Defines that the value of the option should be of {@link Boolean}
type. */
+ public TypedConfigOptionBuilder<Boolean> booleanType() {
+ return new TypedConfigOptionBuilder<>(key, Boolean.class);
+ }
+
+ /** Defines that the value of the option should be of {@link Integer}
type. */
+ public TypedConfigOptionBuilder<Integer> intType() {
+ return new TypedConfigOptionBuilder<>(key, Integer.class);
+ }
+
+ /** Defines that the value of the option should be of {@link Long} type. */
+ public TypedConfigOptionBuilder<Long> longType() {
+ return new TypedConfigOptionBuilder<>(key, Long.class);
+ }
+
+ /** Defines that the value of the option should be of {@link Float} type.
*/
+ public TypedConfigOptionBuilder<Float> floatType() {
+ return new TypedConfigOptionBuilder<>(key, Float.class);
+ }
+
+ /** Defines that the value of the option should be of {@link Double} type.
*/
+ public TypedConfigOptionBuilder<Double> doubleType() {
+ return new TypedConfigOptionBuilder<>(key, Double.class);
+ }
+
+ /** Defines that the value of the option should be of {@link String} type.
*/
+ public TypedConfigOptionBuilder<String> stringType() {
+ return new TypedConfigOptionBuilder<>(key, String.class);
+ }
+
+ /** Defines that the value of the option should be of {@link Duration}
type. */
+ public TypedConfigOptionBuilder<Duration> durationType() {
+ return new TypedConfigOptionBuilder<>(key, Duration.class);
+ }
+
+ /**
+ * Defines that the value of the option should be of {@link Enum} type.
+ *
+ * @param enumClass Concrete type of the expected enum.
+ */
+ public <T extends Enum<T>> TypedConfigOptionBuilder<T> enumType(Class<T>
enumClass) {
+ return new TypedConfigOptionBuilder<>(key, enumClass);
+ }
+
+ /**
+ * Defines that the value of the option should be a set of properties,
which can be represented
+ * as {@code Map<String, String>}.
+ */
+ public TypedConfigOptionBuilder<Map<String, String>> mapType() {
+ return new TypedConfigOptionBuilder<>(key, PROPERTIES_MAP_CLASS);
+ }
+
+ /**
+ * Creates a ConfigOption with the given default value.
+ *
+ * <p>This method does not accept "null". For options with no default
value, choose one of the
+ * {@code noDefaultValue} methods.
+ *
+ * @param value The default value for the config option
+ * @param <T> The type of the default value.
+ * @return The config option with the default value.
+ * @deprecated define the type explicitly first with one of the intType(),
stringType(), etc.
+ */
+ @Deprecated
+ public <T> ConfigOption<T> defaultValue(T value) {
+ checkNotNull(value);
+ return new ConfigOption<>(key, value, ConfigOption.EMPTY_DESCRIPTION,
value.getClass());
+ }
+
+ /**
+ * Creates a string-valued option with no default value. String-valued
options are the only ones
+ * that can have no default value.
+ *
+ * @return The created ConfigOption.
+ * @deprecated define the type explicitly first with one of the intType(),
stringType(), etc.
+ */
+ @Deprecated
+ public ConfigOption<String> noDefaultValue() {
+ return new ConfigOption<>(key, null, ConfigOption.EMPTY_DESCRIPTION,
String.class);
+ }
+ }
+
+ /**
+ * Builder for {@link ConfigOption} with a defined atomic type.
+ *
+ * @param <T> atomic type of the option
+ */
+ public static class TypedConfigOptionBuilder<T> {
+ private final String key;
+ private final Class<T> clazz;
+
+ TypedConfigOptionBuilder(String key, Class<T> clazz) {
+ this.key = key;
+ this.clazz = clazz;
+ }
+
+ /**
+ * Creates a ConfigOption with the given default value.
+ *
+ * @param value The default value for the config option
+ * @return The config option with the default value.
+ */
+ public ConfigOption<T> defaultValue(T value) {
+ return new ConfigOption<>(key, value, ConfigOption.EMPTY_DESCRIPTION,
clazz);
+ }
+
+ /**
+ * Creates a ConfigOption without a default value.
+ *
+ * @return The config option without a default value.
+ */
+ public ConfigOption<T> noDefaultValue() {
+ return new ConfigOption<>(key, null, ConfigOption.EMPTY_DESCRIPTION,
clazz);
+ }
+ }
+
+ public static <T> T checkNotNull(@Nullable T reference) {
+ if (reference == null) {
+ throw new NullPointerException();
+ }
+ return reference;
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ExecutionConfiguration.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ExecutionConfiguration.java
new file mode 100644
index 000000000..92dc198c2
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ExecutionConfiguration.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway;
+
+import java.io.Serializable;
+
+public class ExecutionConfiguration implements Serializable {}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
new file mode 100644
index 000000000..a8be0eaf4
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+
+/** {@link OperationHandle} to index the {@code Operation}. */
+public class OperationHandle implements Serializable {
+
+ private final UUID identifier;
+
+ public OperationHandle(UUID identifier) {
+ this.identifier = identifier;
+ }
+
+ public UUID getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OperationHandle that = (OperationHandle) o;
+ return Objects.equals(identifier, that.identifier);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identifier);
+ }
+
+ @Override
+ public String toString() {
+ return "OperationHandle{" + "identifier=" + identifier + '}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationStatus.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationStatus.java
new file mode 100644
index 000000000..21c9c0e4f
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationStatus.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Status to describe the {@code Operation}. */
+public enum OperationStatus {
+ /** The operation is newly created. */
+ INITIALIZED(false),
+
+ /** Prepare the resources for the operation. */
+ PENDING(false),
+
+ /** The operation is running. */
+ RUNNING(false),
+
+ /** All the work is finished and ready for the client to fetch the results.
*/
+ FINISHED(true),
+
+ /** Operation has been cancelled. */
+ CANCELED(true),
+
+ /** Operation has been closed and all related resources are collected. */
+ CLOSED(true),
+
+ /** Some error happens. */
+ ERROR(true),
+
+ /** The execution of the operation timeout. */
+ TIMEOUT(true);
+
+ private final boolean isTerminalStatus;
+
+ OperationStatus(boolean isTerminalStatus) {
+ this.isTerminalStatus = isTerminalStatus;
+ }
+
+ public static boolean isValidStatusTransition(
+ OperationStatus fromStatus, OperationStatus toStatus) {
+ return toOperationStatusSet(fromStatus).contains(toStatus);
+ }
+
+ public boolean isTerminalStatus() {
+ return isTerminalStatus;
+ }
+
+ private static Set<OperationStatus> toOperationStatusSet(OperationStatus
fromStatus) {
+ switch (fromStatus) {
+ case INITIALIZED:
+ return new HashSet<>(Arrays.asList(PENDING, CANCELED, CLOSED, TIMEOUT,
ERROR));
+ case PENDING:
+ return new HashSet<>(Arrays.asList(RUNNING, CANCELED, CLOSED, TIMEOUT,
ERROR));
+ case RUNNING:
+ return new HashSet<>(Arrays.asList(FINISHED, CANCELED, CLOSED,
TIMEOUT, ERROR));
+ case FINISHED:
+ case CANCELED:
+ case TIMEOUT:
+ case ERROR:
+ return Collections.singleton(CLOSED);
+ case CLOSED:
+ return Collections.emptySet();
+ default:
+ throw new IllegalArgumentException("Unknown from status: " +
fromStatus);
+ }
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/SqlGatewayException.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/SqlGatewayException.java
new file mode 100644
index 000000000..2b3268f7b
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/SqlGatewayException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.exception;
+
+/** General exception for SQL gateway related errors. */
+public class SqlGatewayException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public SqlGatewayException(String message) {
+ super(message);
+ }
+
+ public SqlGatewayException(String message, Throwable e) {
+ super(message, e);
+ }
+
+ public SqlGatewayException(Throwable e) {
+ super(e);
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/ValidationException.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/ValidationException.java
new file mode 100644
index 000000000..f9250ac2e
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/ValidationException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.exception;
+
+/** General exception for SQL gateway related errors. */
+public class ValidationException extends RuntimeException {
+ private static final long serialVersionUID = 1L;
+
+ public ValidationException(String message) {
+ super(message);
+ }
+
+ public ValidationException(String message, Throwable e) {
+ super(message, e);
+ }
+
+ public ValidationException(Throwable e) {
+ super(e);
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/Factory.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/Factory.java
new file mode 100644
index 000000000..c0409e7f3
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/Factory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import org.apache.streampark.gateway.ConfigOption;
+
+import java.util.Set;
+
+/** F */
+public interface Factory {
+ String factoryIdentifier();
+
+ Set<ConfigOption<?>> requiredOptions();
+
+ Set<ConfigOption<?>> optionalOptions();
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
new file mode 100644
index 000000000..a5fb8e263
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.exception.ValidationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Factory utils for {@link Factory}. */
+public class FactoryUtil {
+
+ private static final String DEFAULT_IDENTIFIER = "default";
+ private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+ public static final ConfigOption<String> SQL_GATEWAY_SERVICE_TYPE =
+ ConfigOption.key("streampark.sql-gateway.service")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The service to execute the request.");
+
+ public static <T extends Factory> T discoverFactory(
+ ClassLoader classLoader, Class<T> factoryClass, String
factoryIdentifier) {
+ final List<Factory> factories = discoverFactories(classLoader);
+
+ final List<Factory> foundFactories =
+ factories.stream()
+ .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+ .collect(Collectors.toList());
+
+ if (foundFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factories that implement '%s' in the
classpath.",
+ factoryClass.getName()));
+ }
+
+ final List<Factory> matchingFactories =
+ foundFactories.stream()
+ .filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+ .collect(Collectors.toList());
+
+ if (matchingFactories.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Could not find any factory for identifier '%s' that implements
'%s' in the classpath.\n\n"
+ + "Available factory identifiers are:\n\n"
+ + "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ foundFactories.stream()
+ .map(Factory::factoryIdentifier)
+ .filter(identifier -> !DEFAULT_IDENTIFIER.equals(identifier))
+ .distinct()
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+ if (matchingFactories.size() > 1) {
+ throw new ValidationException(
+ String.format(
+ "Multiple factories for identifier '%s' that implement '%s'
found in the classpath.\n\n"
+ + "Ambiguous factory classes are:\n\n"
+ + "%s",
+ factoryIdentifier,
+ factoryClass.getName(),
+ matchingFactories.stream()
+ .map(f -> f.getClass().getName())
+ .sorted()
+ .collect(Collectors.joining("\n"))));
+ }
+
+ return (T) matchingFactories.get(0);
+ }
+
+ static List<Factory> discoverFactories(ClassLoader classLoader) {
+ final List<Factory> result = new LinkedList<>();
+ ServiceLoaderUtil.load(Factory.class, classLoader)
+ .forEach(
+ loadResult -> {
+ if (loadResult.hasFailed()) {
+ if (loadResult.getError() instanceof NoClassDefFoundError) {
+ LOG.debug(
+ "NoClassDefFoundError when loading a "
+ + Factory.class
+ + ". This is expected when trying to load a format
dependency but load failed.",
+ loadResult.getError());
+ // After logging, we just ignore this failure
+ return;
+ }
+ throw new RuntimeException(
+ "Unexpected error when trying to load service provider for
factories.",
+ loadResult.getError());
+ }
+ result.add(loadResult.getService());
+ });
+ return result;
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/ServiceLoaderUtil.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/ServiceLoaderUtil.java
new file mode 100644
index 000000000..7f770afd8
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/ServiceLoaderUtil.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.ServiceLoader;
+
+/** This class contains utilities to deal with {@link ServiceLoader}. */
+class ServiceLoaderUtil {
+
+ /**
+ * This method behaves similarly to {@link ServiceLoader#load(Class,
ClassLoader)}, but it returns
+ * a list with the results of the iteration, wrapping the iteration failures
such as {@link
+ * NoClassDefFoundError}.
+ */
+ static <T> List<LoadResult<T>> load(Class<T> clazz, ClassLoader classLoader)
{
+ List<LoadResult<T>> loadResults = new ArrayList<>();
+
+ Iterator<T> serviceLoaderIterator = ServiceLoader.load(clazz,
classLoader).iterator();
+
+ while (serviceLoaderIterator.hasNext()) {
+ try {
+ T next = serviceLoaderIterator.next();
+ loadResults.add(new LoadResult<>(next));
+ } catch (NoSuchElementException e) {
+ break;
+ } catch (Throwable t) {
+ loadResults.add(new LoadResult<>(t));
+ }
+ }
+
+ return loadResults;
+ }
+
+ static class LoadResult<T> {
+ private final T service;
+ private final Throwable error;
+
+ private LoadResult(T service, Throwable error) {
+ this.service = service;
+ this.error = error;
+ }
+
+ private LoadResult(T service) {
+ this(service, null);
+ }
+
+ private LoadResult(Throwable error) {
+ this(null, error);
+ }
+
+ public boolean hasFailed() {
+ return error != null;
+ }
+
+ public Throwable getError() {
+ return error;
+ }
+
+ public T getService() {
+ return service;
+ }
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactory.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactory.java
new file mode 100644
index 000000000..0986864a9
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.Map;
+
+/**
+ * A factory for creating {@link SqlGatewayService} from Configuration. This
factory is used with
+ * Java's Service Provider Interfaces (SPI) for discovery.
+ */
+public interface SqlGatewayServiceFactory extends Factory {
+
+ /** Creates a {@link SqlGatewayService} from the given {@link Context}. */
+ SqlGatewayService createSqlGatewayService(Context context);
+
+ interface Context {
+
+ /** Gives read-only access to the configuration of the current session. */
+ Map<String, String> getGateWayServiceOptions();
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
new file mode 100644
index 000000000..a219739bb
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.exception.ValidationException;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static
org.apache.streampark.gateway.factories.FactoryUtil.SQL_GATEWAY_SERVICE_TYPE;
+
+/** Util to discover the {@link SqlGatewayService}. */
+public class SqlGatewayServiceFactoryUtils {
+
+ /**
+ * Attempts to discover the appropriate service factory and creates the
instance of the services.
+ */
+ public static List<SqlGatewayService> createSqlGatewayService(Map<String,
String> configuration) {
+
+ String identifiersStr =
+
Optional.ofNullable(configuration.get(SQL_GATEWAY_SERVICE_TYPE.getKey()))
+ .map(
+ idStr -> {
+ if (idStr.trim().isEmpty()) {
+ return null;
+ }
+ return idStr.trim();
+ })
+ .orElseThrow(
+ () ->
+ new ValidationException(
+ String.format(
+ "Service options do not contain an option key '%s'
for discovering an service.",
+ SQL_GATEWAY_SERVICE_TYPE.getKey())));
+
+ List<String> identifiers = Arrays.asList(identifiersStr.split(";"));
+
+ if (identifiers.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "Service options do not contain an option key '%s' for
discovering an service.",
+ SQL_GATEWAY_SERVICE_TYPE.getKey()));
+ }
+ validateSpecifiedServicesAreUnique(identifiers);
+
+ List<SqlGatewayService> services = new ArrayList<>();
+ for (String identifier : identifiers) {
+ final SqlGatewayServiceFactory factory =
+ FactoryUtil.discoverFactory(
+ Thread.currentThread().getContextClassLoader(),
+ SqlGatewayServiceFactory.class,
+ identifier);
+
+ services.add(
+ factory.createSqlGatewayService(new
DefaultServiceFactoryContext(configuration)));
+ }
+ return services;
+ }
+
+ public static EndpointFactoryHelper createEndpointFactoryHelper(
+ SqlGatewayServiceFactory endpointFactory,
SqlGatewayServiceFactory.Context context) {
+ return new EndpointFactoryHelper(endpointFactory,
context.getGateWayServiceOptions());
+ }
+
+ public static class EndpointFactoryHelper {
+
+ public final SqlGatewayServiceFactory factory;
+ public final Map<String, String> configOptions;
+
+ public EndpointFactoryHelper(
+ SqlGatewayServiceFactory factory, Map<String, String> configOptions) {
+ this.factory = factory;
+ this.configOptions = configOptions;
+ }
+
+ public void validate() {
+ validateFactoryOptions(factory.requiredOptions(), configOptions);
+ }
+
+ public static void validateFactoryOptions(
+ Set<ConfigOption<?>> requiredOptions, Map<String, String> options) {
+
+ final List<String> missingRequiredOptions =
+ requiredOptions.stream()
+ .map(ConfigOption::getKey)
+ .filter(key -> options.get(key) == null)
+ .sorted()
+ .collect(Collectors.toList());
+
+ if (!missingRequiredOptions.isEmpty()) {
+ throw new ValidationException(
+ String.format(
+ "One or more required options are missing.\n\n"
+ + "Missing required options are:\n\n"
+ + "%s",
+ String.join("\n", missingRequiredOptions)));
+ }
+ }
+ }
+
+ /** The default context of {@link SqlGatewayServiceFactory}. */
+ public static class DefaultServiceFactoryContext implements
SqlGatewayServiceFactory.Context {
+
+ private final Map<String, String> gateWayServiceOptions;
+
+ public DefaultServiceFactoryContext(Map<String, String> endpointConfig) {
+ this.gateWayServiceOptions = endpointConfig;
+ }
+
+ @Override
+ public Map<String, String> getGateWayServiceOptions() {
+ return gateWayServiceOptions;
+ }
+ }
+
+ private static void validateSpecifiedServicesAreUnique(List<String>
identifiers) {
+ Set<String> uniqueIdentifiers = new HashSet<>();
+
+ for (String identifier : identifiers) {
+ if (uniqueIdentifiers.contains(identifier)) {
+ throw new ValidationException(
+ String.format(
+ "Get the duplicate service identifier '%s' for the option
'%s'. "
+ + "Please keep the specified service identifier unique.",
+ identifier, SQL_GATEWAY_SERVICE_TYPE.getKey()));
+ }
+ uniqueIdentifiers.add(identifier);
+ }
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/Column.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/Column.java
new file mode 100644
index 000000000..f99264814
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/Column.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Column information. */
+public class Column implements Serializable {
+
+ private final String name;
+
+ private final String type;
+
+ private final @Nullable String comment;
+
+ public Column(String name, String type, String comment) {
+ this.name = name;
+ this.type = type;
+ this.comment = comment;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public String getType() {
+ return type;
+ }
+
+ @Nullable
+ public String getComment() {
+ return comment;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ Column column = (Column) o;
+ return Objects.equals(name, column.name)
+ && Objects.equals(type, column.type)
+ && Objects.equals(comment, column.comment);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(name, type, comment);
+ }
+
+ @Override
+ public String toString() {
+ return "Column{"
+ + "name='"
+ + name
+ + '\''
+ + ", type='"
+ + type
+ + '\''
+ + ", comment='"
+ + comment
+ + '\''
+ + '}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FetchOrientation.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FetchOrientation.java
new file mode 100644
index 000000000..b6439a504
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FetchOrientation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+/** Orientation to fetch results. */
+public enum FetchOrientation {
+
+ /** Fetch the next results. */
+ FETCH_NEXT,
+
+ /** Fetch the prior results. */
+ FETCH_PRIOR
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FunctionInfo.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FunctionInfo.java
new file mode 100644
index 000000000..8b88d1bb2
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FunctionInfo.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+
+public class FunctionInfo implements Serializable {}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/GatewayInfo.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/GatewayInfo.java
new file mode 100644
index 000000000..0b9d23b88
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/GatewayInfo.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Info to describe the {@link SqlGatewayService}. */
+public class GatewayInfo implements Serializable {
+
+ /** Gateway service type. */
+ public final String serviceType;
+
+ /** Gateway service version. */
+ public final String version;
+
+ public GatewayInfo(String serviceType, String version) {
+ this.serviceType = serviceType;
+ this.version = version;
+ }
+
+ public String getServiceType() {
+ return serviceType;
+ }
+
+ public String getVersion() {
+ return version;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ GatewayInfo that = (GatewayInfo) o;
+ return Objects.equals(serviceType, that.serviceType) &&
Objects.equals(version, that.version);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(serviceType, version);
+ }
+
+ @Override
+ public String toString() {
+ return "GatewayInfo{"
+ + "serviceType='"
+ + serviceType
+ + '\''
+ + ", version='"
+ + version
+ + '\''
+ + '}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/JobID.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/JobID.java
new file mode 100644
index 000000000..095e6c029
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/JobID.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class JobID implements Serializable {
+ private String jobID;
+
+ public String getJobID() {
+ return jobID;
+ }
+
+ public void setJobID(String jobID) {
+ this.jobID = jobID;
+ }
+
+ public JobID(String jobID) {
+ this.jobID = jobID;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobID jobID1 = (JobID) o;
+ return Objects.equals(jobID, jobID1.jobID);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(jobID);
+ }
+
+ @Override
+ public String toString() {
+ return "JobID{" + "jobID='" + jobID + '\'' + '}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ObjectIdentifier.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ObjectIdentifier.java
new file mode 100644
index 000000000..7bfe8650a
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ObjectIdentifier.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public final class ObjectIdentifier implements Serializable {
+
+ static final String UNKNOWN = "<UNKNOWN>";
+
+ private final @Nullable String catalogName;
+ private final @Nullable String databaseName;
+ private final String objectName;
+
+ public ObjectIdentifier(
+ @Nullable String catalogName, @Nullable String databaseName, String
objectName) {
+ this.catalogName = catalogName;
+ this.databaseName = databaseName;
+ this.objectName = objectName;
+ }
+
+ @Nullable
+ public String getCatalogName() {
+ return catalogName;
+ }
+
+ @Nullable
+ public String getDatabaseName() {
+ return databaseName;
+ }
+
+ public String getObjectName() {
+ return objectName;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ObjectIdentifier that = (ObjectIdentifier) o;
+ return Objects.equals(catalogName, that.catalogName)
+ && Objects.equals(databaseName, that.databaseName)
+ && Objects.equals(objectName, that.objectName);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(catalogName, databaseName, objectName);
+ }
+
+ @Override
+ public String toString() {
+ return "ObjectIdentifier{"
+ + "catalogName='"
+ + catalogName
+ + '\''
+ + ", databaseName='"
+ + databaseName
+ + '\''
+ + ", objectName='"
+ + objectName
+ + '\''
+ + '}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/OperationInfo.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/OperationInfo.java
new file mode 100644
index 000000000..7699f210b
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/OperationInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import org.apache.streampark.gateway.OperationStatus;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Information of the {@code Operation}. */
+public class OperationInfo implements Serializable {
+
+ private final OperationStatus status;
+ @Nullable private final Exception exception;
+
+ public OperationInfo(OperationStatus status, @Nullable Exception exception) {
+ this.status = status;
+ this.exception = exception;
+ }
+
+ public OperationStatus getStatus() {
+ return status;
+ }
+
+ @Nullable
+ public Exception getException() {
+ return exception;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ OperationInfo that = (OperationInfo) o;
+ return status == that.status && Objects.equals(exception, that.exception);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(status, exception);
+ }
+
+ @Override
+ public String toString() {
+ return "OperationInfo{" + "status=" + status + ", exception=" + exception
+ '}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultKind.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultKind.java
new file mode 100644
index 000000000..f2fd7e82e
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultKind.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+/** ResultKind defines the types of the result. */
+public enum ResultKind {
+ /**
+ * The statement (e.g. DDL, USE) executes successfully, and the result only
contains a simple
+ * "OK".
+ */
+ SUCCESS,
+
+ /**
+ * The statement (e.g. DML, DQL, SHOW) executes successfully, and the result
contains important
+ * content.
+ */
+ SUCCESS_WITH_CONTENT
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultQueryCondition.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultQueryCondition.java
new file mode 100644
index 000000000..cc740a950
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultQueryCondition.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Condition of result query. */
+public class ResultQueryCondition implements Serializable {
+ public FetchOrientation orientation;
+
+ public long token;
+ public int maxRows;
+
+ public ResultQueryCondition() {}
+
+ public ResultQueryCondition(FetchOrientation orientation, long token, int
maxRows) {
+ this.orientation = orientation;
+ this.token = token;
+ this.maxRows = maxRows;
+ }
+
+ public FetchOrientation getOrientation() {
+ return orientation;
+ }
+
+ public void setOrientation(FetchOrientation orientation) {
+ this.orientation = orientation;
+ }
+
+ public long getToken() {
+ return token;
+ }
+
+ public void setToken(long token) {
+ this.token = token;
+ }
+
+ public int getMaxRows() {
+ return maxRows;
+ }
+
+ public void setMaxRows(int maxRows) {
+ this.maxRows = maxRows;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ResultQueryCondition that = (ResultQueryCondition) o;
+ return token == that.token && maxRows == that.maxRows && orientation ==
that.orientation;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(orientation, token, maxRows);
+ }
+
+ @Override
+ public String toString() {
+ return "ResultQueryCondition{"
+ + "orientation="
+ + orientation
+ + ", token="
+ + token
+ + ", maxRows="
+ + maxRows
+ + '}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultSet.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultSet.java
new file mode 100644
index 000000000..fafc0a800
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultSet.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/** An implementation of {@link ResultSet}. */
+public class ResultSet implements Serializable {
+
+ /** The type of the results, which may indicate the result is EOS or has
data. */
+ private final ResultType resultType;
+
+ /**
+ * The token indicates the next batch of the data.
+ *
+ * <p>When the token is null, it means all the data has been fetched.
+ */
+ @Nullable private final Long nextToken;
+
+ /**
+ * The schema of the data.
+ *
+ * <p>The schema of the DDL, USE, EXPLAIN, SHOW and DESCRIBE align with the
schema of the {@link
+ * TableResult#getResolvedSchema()}. The only differences is the schema of
the `INSERT` statement.
+ *
+ * <p>The schema of INSERT:
+ *
+ * <pre>
+ * +-------------+-------------+----------+
+ * | column name | column type | comments |
+ * +-------------+-------------+----------+
+ * | job id | string | |
+ * +- -----------+-------------+----------+
+ * </pre>
+ */
+ private final List<Column> columns;
+
+ /** All the data in the current results. */
+ private final List<RowData> data;
+
+ /** Indicates that whether the result is for a query. */
+ private final boolean isQueryResult;
+ /**
+ * If the statement was submitted to a client, returns the JobID which
uniquely identifies the
+ * job. Otherwise, returns null.
+ */
+ @Nullable private final JobID jobID;
+
+ /** Gets the result kind of the result. */
+ private final ResultKind resultKind;
+
+ public ResultSet(
+ ResultType resultType,
+ @Nullable Long nextToken,
+ List<Column> columns,
+ List<RowData> data,
+ boolean isQueryResult,
+ @Nullable JobID jobID,
+ ResultKind resultKind) {
+ this.resultType = resultType;
+ this.nextToken = nextToken;
+ this.columns = columns;
+ this.data = data;
+ this.isQueryResult = isQueryResult;
+ this.jobID = jobID;
+ this.resultKind = resultKind;
+ }
+
+ public ResultType getResultType() {
+ return resultType;
+ }
+
+ @Nullable
+ public Long getNextToken() {
+ return nextToken;
+ }
+
+ public List<Column> getColumns() {
+ return columns;
+ }
+
+ public List<RowData> getData() {
+ return data;
+ }
+
+ public boolean isQueryResult() {
+ return isQueryResult;
+ }
+
+ @Nullable
+ public JobID getJobID() {
+ return jobID;
+ }
+
+ public ResultKind getResultKind() {
+ return resultKind;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ ResultSet resultSet = (ResultSet) o;
+ return isQueryResult == resultSet.isQueryResult
+ && resultType == resultSet.resultType
+ && Objects.equals(nextToken, resultSet.nextToken)
+ && Objects.equals(columns, resultSet.columns)
+ && Objects.equals(data, resultSet.data)
+ && Objects.equals(jobID, resultSet.jobID)
+ && resultKind == resultSet.resultKind;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(resultType, nextToken, columns, data, isQueryResult,
jobID, resultKind);
+ }
+
+ @Override
+ public String toString() {
+ return "ResultSet{"
+ + "resultType="
+ + resultType
+ + ", nextToken="
+ + nextToken
+ + ", resultSchema="
+ + columns
+ + ", data="
+ + data
+ + ", isQueryResult="
+ + isQueryResult
+ + ", jobID="
+ + jobID
+ + ", resultKind="
+ + resultKind
+ + '}';
+ }
+
+ /** Describe the kind of the result. */
+ public enum ResultType {
+ /** Indicate the result is not ready. */
+ NOT_READY,
+
+ /** Indicate the result has data. */
+ PAYLOAD,
+
+ /** Indicate all results have been fetched. */
+ EOS
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/RowData.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/RowData.java
new file mode 100644
index 000000000..891d0b80a
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/RowData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/** Row data. */
+public class RowData implements Serializable {
+ private final String rowKind;
+
+ private final List<Object> fields;
+
+ public RowData(String rowKind, List<Object> fields) {
+ this.rowKind = rowKind;
+ this.fields = fields;
+ }
+
+ public String getRowKind() {
+ return rowKind;
+ }
+
+ public List<Object> getFields() {
+ return fields;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ RowData rowData = (RowData) o;
+ return Objects.equals(rowKind, rowData.rowKind) && Objects.equals(fields,
rowData.fields);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(rowKind, fields);
+ }
+
+ @Override
+ public String toString() {
+ return "RowData{" + "rowKind='" + rowKind + '\'' + ", fields=" + fields +
'}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableInfo.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableInfo.java
new file mode 100644
index 000000000..b096edb57
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Information of the table or view. */
+public class TableInfo implements Serializable {
+ private final ObjectIdentifier identifier;
+ private final TableKind tableKind;
+
+ public TableInfo(ObjectIdentifier identifier, TableKind tableKind) {
+ this.identifier = identifier;
+ this.tableKind = tableKind;
+ }
+
+ public ObjectIdentifier getIdentifier() {
+ return identifier;
+ }
+
+ public TableKind getTableKind() {
+ return tableKind;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TableInfo tableInfo = (TableInfo) o;
+ return Objects.equals(identifier, tableInfo.identifier) && tableKind ==
tableInfo.tableKind;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identifier, tableKind);
+ }
+
+ @Override
+ public String toString() {
+ return "TableInfo{" + "identifier=" + identifier + ", tableKind=" +
tableKind + '}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableKind.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableKind.java
new file mode 100644
index 000000000..cd76a613f
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableKind.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+public enum TableKind {
+ TABLE,
+ VIEW
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
new file mode 100644
index 000000000..d737bbb0a
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.service;
+
+import org.apache.streampark.gateway.ExecutionConfiguration;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.OperationStatus;
+import org.apache.streampark.gateway.exception.SqlGatewayException;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+/** A service of SQL gateway is responsible for handling requests from
streampark console. */
+public interface SqlGatewayService {
+
+ //
-------------------------------------------------------------------------------------------
+ // Info API
+ //
-------------------------------------------------------------------------------------------
+
+ GatewayInfo getGatewayInfo() throws SqlGatewayException;
+
+ //
-------------------------------------------------------------------------------------------
+ // Session Management
+ //
-------------------------------------------------------------------------------------------
+
+ /**
+ * Open the {@code Session}.
+ *
+ * @param environment Environment to initialize the Session.
+ * @return Returns a handle that used to identify the Session.
+ */
+ SessionHandle openSession(SessionEnvironment environment) throws
SqlGatewayException;
+
+ /**
+ * Heartbeat for session
+ *
+ * @param sessionHandle handle to identify the Session.
+ */
+ void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException;
+
+ /**
+ * Close the {@code Session}.
+ *
+ * @param sessionHandle handle to identify the Session needs to be closed.
+ */
+ void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
+
+ //
-------------------------------------------------------------------------------------------
+ // Operation Management
+ //
-------------------------------------------------------------------------------------------
+
+ /**
+ * Cancel the operation when it is not in terminal status.
+ *
+ * <p>It can't cancel an Operation if it is terminated.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param operationHandle handle to identify the operation.JarURLConnection
+ */
+ void cancelOperation(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException;
+
+ /**
+ * Close the operation and release all used resource by the operation.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param operationHandle handle to identify the operation.
+ */
+ void closeOperation(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException;
+
+ /**
+ * Get the {@link OperationInfo} of the operation.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param operationHandle handle to identify the operation.
+ */
+ OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException;
+
+ /**
+ * Get the result schema for the specified Operation.
+ *
+ * <p>Note: The result schema is available when the Operation is in the
{@link
+ * OperationStatus#FINISHED}.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param operationHandle handle to identify the operation.
+ */
+ Column getOperationResultSchema(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException;
+
+ //
-------------------------------------------------------------------------------------------
+ // Statements API
+ //
-------------------------------------------------------------------------------------------
+
+ /**
+ * Execute the submitted statement.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param statement the SQL to execute.
+ * @param executionTimeoutMs the execution timeout. Please use non-positive
value to forbid the
+ * timeout mechanism.
+ * @param executionConfig execution config for the statement.
+ * @return handle to identify the operation.
+ */
+ OperationHandle executeStatement(
+ SessionHandle sessionHandle,
+ String statement,
+ long executionTimeoutMs,
+ ExecutionConfiguration executionConfig)
+ throws SqlGatewayException;
+
+ /**
+ * Fetch the results from the operation. When maxRows is Integer.MAX_VALUE,
it means to fetch all
+ * available data.
+ *
+ * @param sessionHandle handle to identify the session.
+ * @param operationHandle handle to identify the operation.
+ * @param resultQueryCondition condition of result query.
+ * @return Returns the results.
+ */
+ ResultSet fetchResults(
+ SessionHandle sessionHandle,
+ OperationHandle operationHandle,
+ ResultQueryCondition resultQueryCondition)
+ throws SqlGatewayException;
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionEnvironment.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionEnvironment.java
new file mode 100644
index 000000000..3baebd360
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionEnvironment.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.session;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/** Environment to initialize the {@code Session}. */
+public class SessionEnvironment implements Serializable {
+ private final @Nullable String sessionName;
+ private final @Nullable String defaultCatalog;
+ private final Map<String, String> sessionConfig;
+
+ public SessionEnvironment(
+ @Nullable String sessionName,
+ @Nullable String defaultCatalog,
+ Map<String, String> sessionConfig) {
+ this.sessionName = sessionName;
+ this.defaultCatalog = defaultCatalog;
+ this.sessionConfig = sessionConfig;
+ }
+
+ @Nullable
+ public String getSessionName() {
+ return sessionName;
+ }
+
+ @Nullable
+ public String getDefaultCatalog() {
+ return defaultCatalog;
+ }
+
+ public Map<String, String> getSessionConfig() {
+ return sessionConfig;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ SessionEnvironment that = (SessionEnvironment) o;
+ return Objects.equals(sessionName, that.sessionName)
+ && Objects.equals(defaultCatalog, that.defaultCatalog)
+ && Objects.equals(sessionConfig, that.sessionConfig);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(sessionName, defaultCatalog, sessionConfig);
+ }
+
+ @Override
+ public String toString() {
+ return "SessionEnvironment{"
+ + "sessionName='"
+ + sessionName
+ + '\''
+ + ", defaultCatalog='"
+ + defaultCatalog
+ + '\''
+ + ", sessionConfig="
+ + sessionConfig
+ + '}';
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
new file mode 100644
index 000000000..b57d1ac3e
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.session;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Session Handle that used to identify the Session. */
+public class SessionHandle {
+
+ private final UUID identifier;
+
+ public static SessionHandle create() {
+ return new SessionHandle(UUID.randomUUID());
+ }
+
+ public SessionHandle(UUID identifier) {
+ this.identifier = identifier;
+ }
+
+ public UUID getIdentifier() {
+ return identifier;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (!(o instanceof SessionHandle)) {
+ return false;
+ }
+ SessionHandle that = (SessionHandle) o;
+ return Objects.equals(identifier, that.identifier);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(identifier);
+ }
+
+ @Override
+ public String toString() {
+ return identifier.toString();
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java
new file mode 100644
index 000000000..475676c0f
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.service;
+
+import org.apache.streampark.gateway.exception.ValidationException;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils;
+import org.apache.streampark.gateway.utils.FakeSqlGatewayService;
+import org.apache.streampark.gateway.utils.MockedSqlGatewayService;
+
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.assertj.core.api.AssertFactory;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.InstanceOfAssertFactory;
+import org.assertj.core.api.ListAssert;
+import org.assertj.core.api.ThrowingConsumer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static
org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils.createSqlGatewayService;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test {@link SqlGatewayServiceFactoryUtils}. */
+public class SqlGatewayServiceFactoryUtilsTest {
+ public static final InstanceOfAssertFactory<Stream, ListAssert<Throwable>>
STREAM_THROWABLE =
+ new InstanceOfAssertFactory<>(Stream.class,
Assertions::<Throwable>assertThat);
+
+ @Test
+ public void testCreateServices() {
+ String id = UUID.randomUUID().toString();
+ Map<String, String> config = getDefaultConfig(id);
+ config.put("streampark.sql-gateway.service", "mocked;fake");
+ List<SqlGatewayService> actual =
SqlGatewayServiceFactoryUtils.createSqlGatewayService(config);
+ MockedSqlGatewayService expectedMocked =
+ new MockedSqlGatewayService("localhost", 8080, "The Mocked SQL gateway
service");
+ assertThat(actual).isEqualTo(Arrays.asList(expectedMocked,
FakeSqlGatewayService.INSTANCE));
+ }
+
+ @Test
+ public void testCreateServiceWithDuplicateIdentifier() {
+ Map<String, String> config = getDefaultConfig();
+ config.put("streampark.sql-gateway.service", "mocked;mocked");
+ validateException(
+ config,
+ "Get the duplicate service identifier 'mocked' for the option
'streampark.sql-gateway.service'. "
+ + "Please keep the specified service identifier unique.");
+ }
+
+ @Test
+ public void testCreateServiceWithoutType() {
+ Map<String, String> config = getDefaultConfig();
+ config.remove("streampark.sql-gateway.service");
+ validateException(
+ config,
+ "Service options do not contain an option key
'streampark.sql-gateway.service' for discovering an service.");
+ }
+
+ @Test
+ public void testCreateUnknownService() {
+ Map<String, String> config = getDefaultConfig();
+ config.put("streampark.sql-gateway.service", "mocked;unknown");
+ validateException(
+ config,
+ String.format(
+ "Could not find any factory for identifier 'unknown' "
+ + "that implements '%s' in the classpath.",
+ SqlGatewayServiceFactory.class.getCanonicalName()));
+ }
+
+ /* @Test
+ public void testCreateServiceWithMissingOptions() {
+ Map<String, String> config = getDefaultConfig();
+ config.remove("sql-gateway.Service.mocked.host");
+
+ validateException(
+ config,
+ "One or more required options are missing.\n\n"
+ + "Missing required options are:\n\n"
+ + "host");
+ }*/
+
+ /* @Test
+ public void testCreateServiceWithUnconsumedOptions() {
+ Map<String, String> config = getDefaultConfig();
+ config.put("sql-gateway.Service.mocked.unconsumed-option", "error");
+
+ validateException(
+ config,
+ "Unsupported options found for 'mocked'.\n\n"
+ + "Unsupported options:\n\n"
+ + "unconsumed-option\n\n"
+ + "Supported options:\n\n"
+ + "description\n"
+ + "host\n"
+ + "id\n"
+ + "port");
+ }*/
+
+ //
--------------------------------------------------------------------------------------------
+
+ private void validateException(Map<String, String> config, String
errorMessage) {
+ assertThatThrownBy(() -> createSqlGatewayService(config))
+ .satisfies(anyCauseMatches(ValidationException.class, errorMessage));
+ }
+
+ private Map<String, String> getDefaultConfig() {
+ return getDefaultConfig(UUID.randomUUID().toString());
+ }
+
+ private Map<String, String> getDefaultConfig(String id) {
+ Map<String, String> config = new HashMap<>();
+ config.put("sql-gateway.Service.mocked.id", id);
+ config.put("streampark.sql-gateway.service", "mocked");
+ config.put("sql-gateway.Service.mocked.host", "localhost");
+ config.put("sql-gateway.Service.mocked.port", "9999");
+ return config;
+ }
+
+ /**
+ * Shorthand to assert chain of causes. Same as:
+ *
+ * <pre>{@code
+ * assertThat(throwable)
+ * .extracting(FlinkAssertions::chainOfCauses,
FlinkAssertions.STREAM_THROWABLE)
+ * }</pre>
+ */
+ public static ListAssert<Throwable> assertThatChainOfCauses(Throwable root) {
+ return Assertions.assertThat(root)
+ .extracting(SqlGatewayServiceFactoryUtilsTest::chainOfCauses,
STREAM_THROWABLE);
+ }
+
+ public static ThrowingConsumer<? super Throwable> anyCauseMatches(
+ Class<? extends Throwable> clazz, String containsMessage) {
+ return t ->
+ assertThatChainOfCauses(t)
+ .as(
+ "Any cause is instance of class '%s' and contains message
'%s'",
+ clazz, containsMessage)
+ .anySatisfy(
+ cause ->
+ Assertions.assertThat(cause)
+ .isInstanceOf(clazz)
+ .hasMessageContaining(containsMessage));
+ }
+
+ /**
+ * You can use this method in combination with {@link
AbstractThrowableAssert#extracting(Function,
+ * AssertFactory)} to perform assertions on a chain of causes. For example:
+ *
+ * <pre>{@code
+ * assertThat(throwable)
+ * .extracting(FlinkAssertions::chainOfCauses,
FlinkAssertions.STREAM_THROWABLE)
+ * }</pre>
+ *
+ * @return the list is ordered from the current {@link Throwable} up to the
root cause.
+ */
+ public static Stream<Throwable> chainOfCauses(Throwable throwable) {
+ if (throwable == null) {
+ return Stream.empty();
+ }
+ if (throwable.getCause() == null) {
+ return Stream.of(throwable);
+ }
+ return Stream.concat(Stream.of(throwable),
chainOfCauses(throwable.getCause()));
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
new file mode 100644
index 000000000..fcc6303f5
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.utils;
+
+import org.apache.streampark.gateway.ExecutionConfiguration;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.exception.SqlGatewayException;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+/** Mocked implementation of {@link SqlGatewayService}. */
+public class FakeSqlGatewayService implements SqlGatewayService {
+
+ public static final FakeSqlGatewayService INSTANCE = new
FakeSqlGatewayService();
+
+ private FakeSqlGatewayService() {}
+
+ @Override
+ public GatewayInfo getGatewayInfo() throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SessionHandle openSession(SessionEnvironment environment) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void heartbeat(SessionHandle sessionHandle) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void closeSession(SessionHandle sessionHandle) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cancelOperation(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void closeOperation(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OperationInfo getOperationInfo(
+ SessionHandle sessionHandle, OperationHandle operationHandle) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Column getOperationResultSchema(
+ SessionHandle sessionHandle, OperationHandle operationHandle) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OperationHandle executeStatement(
+ SessionHandle sessionHandle,
+ String statement,
+ long executionTimeoutMs,
+ ExecutionConfiguration executionConfig)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet fetchResults(
+ SessionHandle sessionHandle,
+ OperationHandle operationHandle,
+ ResultQueryCondition resultQueryCondition)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayServiceFactory.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayServiceFactory.java
new file mode 100644
index 000000000..054e5553d
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayServiceFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.utils;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Factory for {@link SqlGatewayService}. */
+public class FakeSqlGatewayServiceFactory implements SqlGatewayServiceFactory {
+
+ public static final ConfigOption<String> HOST =
+ ConfigOption.key("host")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The host of the Fake SQL gateway service.");
+
+ public static final ConfigOption<Integer> PORT =
+ ConfigOption.key("port")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The port of the Fake SQL gateway service.");
+
+ public static final ConfigOption<Integer> DESCRIPTION =
+ ConfigOption.key("description")
+ .intType()
+ .defaultValue(8080)
+ .withDescription("The Fake SQL gateway service.");
+
+ @Override
+ public String factoryIdentifier() {
+ return "fake";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOST);
+ options.add(PORT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.singleton(DESCRIPTION);
+ }
+
+ @Override
+ public SqlGatewayService createSqlGatewayService(Context context) {
+ return FakeSqlGatewayService.INSTANCE;
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
new file mode 100644
index 000000000..e1f7be03f
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.utils;
+
+import org.apache.streampark.gateway.ExecutionConfiguration;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.exception.SqlGatewayException;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+import java.util.Objects;
+
+/** Mocked implementation of {@link SqlGatewayService}. */
+public class MockedSqlGatewayService implements SqlGatewayService {
+
+ public final String host;
+ public final int port;
+
+ public final String description;
+
+ public MockedSqlGatewayService(String host, int port, String description) {
+ this.host = host;
+ this.port = port;
+ this.description = description;
+ }
+
+ @Override
+ public GatewayInfo getGatewayInfo() throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public SessionHandle openSession(SessionEnvironment environment) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void heartbeat(SessionHandle sessionHandle) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void closeSession(SessionHandle sessionHandle) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void cancelOperation(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void closeOperation(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OperationInfo getOperationInfo(
+ SessionHandle sessionHandle, OperationHandle operationHandle) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public Column getOperationResultSchema(
+ SessionHandle sessionHandle, OperationHandle operationHandle) throws
SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public OperationHandle executeStatement(
+ SessionHandle sessionHandle,
+ String statement,
+ long executionTimeoutMs,
+ ExecutionConfiguration executionConfig)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public ResultSet fetchResults(
+ SessionHandle sessionHandle,
+ OperationHandle operationHandle,
+ ResultQueryCondition resultQueryCondition)
+ throws SqlGatewayException {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ MockedSqlGatewayService that = (MockedSqlGatewayService) o;
+ return port == that.port
+ && Objects.equals(host, that.host)
+ && Objects.equals(description, that.description);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(host, port, description);
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayServiceFactory.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayServiceFactory.java
new file mode 100644
index 000000000..76ac79f32
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayServiceFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.utils;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Factory for {@link SqlGatewayService}. */
+public class MockedSqlGatewayServiceFactory implements
SqlGatewayServiceFactory {
+
+ public static final ConfigOption<String> HOST =
+ ConfigOption.key("host")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The host of the Mocked SQL gateway service.");
+
+ public static final ConfigOption<Integer> PORT =
+ ConfigOption.key("port")
+ .intType()
+ .noDefaultValue()
+ .withDescription("The port of the Mocked SQL gateway service.");
+
+ public static final ConfigOption<Integer> DESCRIPTION =
+ ConfigOption.key("description")
+ .intType()
+ .defaultValue(8080)
+ .withDescription("The Mocked SQL gateway service.");
+
+ @Override
+ public String factoryIdentifier() {
+ return "mocked";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(HOST);
+ options.add(PORT);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.singleton(DESCRIPTION);
+ }
+
+ @Override
+ public SqlGatewayService createSqlGatewayService(Context context) {
+ return new MockedSqlGatewayService("localhost", 8080, "The Mocked SQL
gateway service");
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
new file mode 100644
index 000000000..3c317041c
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
@@ -0,0 +1,20 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.streampark.gateway.utils.MockedSqlGatewayServiceFactory
+org.apache.streampark.gateway.utils.FakeSqlGatewayServiceFactory