This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 805bc718a init base sql gateway module (#2267)
805bc718a is described below

commit 805bc718a5e1f5a2720c4c91de846d06fe90b83a
Author: gongzhongqiang <[email protected]>
AuthorDate: Wed May 10 12:57:05 2023 +0800

    init base sql gateway module (#2267)
    
    Co-authored-by: gongzhongqiang <[email protected]>
---
 pom.xml                                            |   1 +
 streampark-flink/pom.xml                           |   1 +
 .../streampark-flink-sql-gateway/pom.xml           |  34 +++
 .../streampark-flink-sql-gateway-base/pom.xml      | 101 ++++++++
 .../apache/streampark/gateway/ConfigOption.java    | 256 +++++++++++++++++++++
 .../streampark/gateway/ExecutionConfiguration.java |  22 ++
 .../apache/streampark/gateway/OperationHandle.java |  58 +++++
 .../apache/streampark/gateway/OperationStatus.java |  85 +++++++
 .../gateway/exception/SqlGatewayException.java     |  35 +++
 .../gateway/exception/ValidationException.java     |  35 +++
 .../streampark/gateway/factories/Factory.java      |  31 +++
 .../streampark/gateway/factories/FactoryUtil.java  | 117 ++++++++++
 .../gateway/factories/ServiceLoaderUtil.java       |  82 +++++++
 .../factories/SqlGatewayServiceFactory.java        |  38 +++
 .../factories/SqlGatewayServiceFactoryUtils.java   | 153 ++++++++++++
 .../apache/streampark/gateway/results/Column.java  |  86 +++++++
 .../gateway/results/FetchOrientation.java          |  28 +++
 .../streampark/gateway/results/FunctionInfo.java   |  22 ++
 .../streampark/gateway/results/GatewayInfo.java    |  75 ++++++
 .../apache/streampark/gateway/results/JobID.java   |  59 +++++
 .../gateway/results/ObjectIdentifier.java          |  87 +++++++
 .../streampark/gateway/results/OperationInfo.java  |  68 ++++++
 .../streampark/gateway/results/ResultKind.java     |  33 +++
 .../gateway/results/ResultQueryCondition.java      |  90 ++++++++
 .../streampark/gateway/results/ResultSet.java      | 172 ++++++++++++++
 .../apache/streampark/gateway/results/RowData.java |  64 ++++++
 .../streampark/gateway/results/TableInfo.java      |  62 +++++
 .../streampark/gateway/results/TableKind.java      |  23 ++
 .../gateway/service/SqlGatewayService.java         | 147 ++++++++++++
 .../gateway/session/SessionEnvironment.java        |  87 +++++++
 .../streampark/gateway/session/SessionHandle.java  |  61 +++++
 .../service/SqlGatewayServiceFactoryUtilsTest.java | 189 +++++++++++++++
 .../gateway/utils/FakeSqlGatewayService.java       | 101 ++++++++
 .../utils/FakeSqlGatewayServiceFactory.java        |  71 ++++++
 .../gateway/utils/MockedSqlGatewayService.java     | 129 +++++++++++
 .../utils/MockedSqlGatewayServiceFactory.java      |  71 ++++++
 ...org.apache.streampark.gateway.factories.Factory |  20 ++
 37 files changed, 2794 insertions(+)

diff --git a/pom.xml b/pom.xml
index b17a6820a..4fa03bd2b 100644
--- a/pom.xml
+++ b/pom.xml
@@ -121,6 +121,7 @@
         <commons-net.version>3.9.0</commons-net.version>
         <commons-lang3.version>3.8.1</commons-lang3.version>
         <enumeratum.version>1.6.1</enumeratum.version>
+        <assertj.version>3.23.1</assertj.version>
 
         <maven-compiler-plugin.version>3.10.1</maven-compiler-plugin.version>
         <maven-shade-plugin.version>3.2.4</maven-shade-plugin.version>
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 0ae28211d..0445b9c81 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -38,6 +38,7 @@
         <module>streampark-flink-proxy</module>
         <module>streampark-flink-packer</module>
         <module>streampark-flink-kubernetes</module>
+        <module>streampark-flink-sql-gateway</module>
     </modules>
 
     <dependencies>
diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml 
b/streampark-flink/streampark-flink-sql-gateway/pom.xml
new file mode 100644
index 000000000..fe1824b59
--- /dev/null
+++ b/streampark-flink/streampark-flink-sql-gateway/pom.xml
@@ -0,0 +1,34 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-flink</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampark-flink-sql-gateway</artifactId>
+    <name>StreamPark : SQL Gateway Parent</name>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>streampark-flink-sql-gateway-base</module>
+    </modules>
+
+</project>
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
new file mode 100644
index 000000000..b2f5a6eb7
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
@@ -0,0 +1,101 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-flink-sql-gateway</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampark-flink-sql-gateway-base</artifactId>
+    <name>StreamPark : Sql Gateway Base</name>
+
+    <dependencies>
+
+        <!-- test -->
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-engine</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.junit.jupiter</groupId>
+            <artifactId>junit-jupiter-params</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
+
+        <!-- This manages the 'javax.annotation' annotations (JSR305) -->
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>3.0.2</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.slf4j</groupId>
+            <artifactId>slf4j-api</artifactId>
+        </dependency>
+
+    </dependencies>
+
+
+</project>
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
new file mode 100644
index 000000000..c6e89a643
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ConfigOption.java
@@ -0,0 +1,256 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway;
+
+import javax.annotation.Nullable;
+
+import java.time.Duration;
+import java.util.Map;
+
+/** @param <T> */
+public class ConfigOption<T> {
+
+  private static final String EMPTY_DESCRIPTION = "";
+
+  // ------------------------------------------------------------------------
+
+  /** The current key for that config option. */
+  private final String key;
+
+  /** The default value for this option. */
+  private final T defaultValue;
+
+  /** The description for this option. */
+  private final String description;
+
+  /**
+   * Type of the value that this ConfigOption describes.
+   *
+   * <ul>
+   *   <li>typeClass == atomic class (e.g. {@code Integer.class}) -> {@code 
ConfigOption<Integer>}
+   *   <li>typeClass == {@code Map.class} -> {@code ConfigOption<Map<String, 
String>>}
+   *   <li>typeClass == atomic class and isList == true for {@code 
ConfigOption<List<Integer>>}
+   * </ul>
+   */
+  private final Class<?> clazz;
+
+  /**
+   * Creates a new config option.
+   *
+   * @param key The current key for that config option
+   * @param defaultValue The default value for this option
+   * @param description Description for that option
+   * @param clazz describes type of the ConfigOption, see description of the 
clazz field
+   */
+  private ConfigOption(String key, T defaultValue, String description, 
Class<?> clazz) {
+    this.key = key;
+    this.defaultValue = defaultValue;
+    this.description = description;
+    this.clazz = clazz;
+  }
+
+  /**
+   * Creates a new config option, using this option's key and default value, 
and adding the given
+   * description. The given description is used when generation the 
configuration documentation.
+   *
+   * @param description The description for this option.
+   * @return A new config option, with given description.
+   */
+  public ConfigOption<T> withDescription(final String description) {
+    return new ConfigOption<>(key, defaultValue, description, clazz);
+  }
+
+  /**
+   * Starts building a new {@link ConfigOption}.
+   *
+   * @param key The key for the config option.
+   * @return The builder for the config option with the given key.
+   */
+  public static OptionBuilder key(String key) {
+    checkNotNull(key);
+    return new OptionBuilder(key);
+  }
+
+  public String getKey() {
+    return key;
+  }
+
+  public boolean hasDefaultValue() {
+    return defaultValue != null;
+  }
+
+  public T getDefaultValue() {
+    return defaultValue;
+  }
+
+  public String getDescription() {
+    return description;
+  }
+
+  public Class<?> getClazz() {
+    return clazz;
+  }
+
+  /**
+   * The option builder is used to create a {@link ConfigOption}. It is 
instantiated via {@link
+   * ConfigOption#key(String)}.
+   */
+  public static final class OptionBuilder {
+    /**
+     * Workaround to reuse the {@link TypedConfigOptionBuilder} for a {@link 
Map Map&lt;String,
+     * String&gt;}.
+     */
+    @SuppressWarnings("unchecked")
+    private static final Class<Map<String, String>> PROPERTIES_MAP_CLASS =
+        (Class<Map<String, String>>) (Class<?>) Map.class;
+
+    /** The key for the config option. */
+    private final String key;
+
+    /**
+     * Creates a new OptionBuilder.
+     *
+     * @param key The key for the config option
+     */
+    OptionBuilder(String key) {
+      this.key = key;
+    }
+
+    /** Defines that the value of the option should be of {@link Boolean} 
type. */
+    public TypedConfigOptionBuilder<Boolean> booleanType() {
+      return new TypedConfigOptionBuilder<>(key, Boolean.class);
+    }
+
+    /** Defines that the value of the option should be of {@link Integer} 
type. */
+    public TypedConfigOptionBuilder<Integer> intType() {
+      return new TypedConfigOptionBuilder<>(key, Integer.class);
+    }
+
+    /** Defines that the value of the option should be of {@link Long} type. */
+    public TypedConfigOptionBuilder<Long> longType() {
+      return new TypedConfigOptionBuilder<>(key, Long.class);
+    }
+
+    /** Defines that the value of the option should be of {@link Float} type. 
*/
+    public TypedConfigOptionBuilder<Float> floatType() {
+      return new TypedConfigOptionBuilder<>(key, Float.class);
+    }
+
+    /** Defines that the value of the option should be of {@link Double} type. 
*/
+    public TypedConfigOptionBuilder<Double> doubleType() {
+      return new TypedConfigOptionBuilder<>(key, Double.class);
+    }
+
+    /** Defines that the value of the option should be of {@link String} type. 
*/
+    public TypedConfigOptionBuilder<String> stringType() {
+      return new TypedConfigOptionBuilder<>(key, String.class);
+    }
+
+    /** Defines that the value of the option should be of {@link Duration} 
type. */
+    public TypedConfigOptionBuilder<Duration> durationType() {
+      return new TypedConfigOptionBuilder<>(key, Duration.class);
+    }
+
+    /**
+     * Defines that the value of the option should be of {@link Enum} type.
+     *
+     * @param enumClass Concrete type of the expected enum.
+     */
+    public <T extends Enum<T>> TypedConfigOptionBuilder<T> enumType(Class<T> 
enumClass) {
+      return new TypedConfigOptionBuilder<>(key, enumClass);
+    }
+
+    /**
+     * Defines that the value of the option should be a set of properties, 
which can be represented
+     * as {@code Map<String, String>}.
+     */
+    public TypedConfigOptionBuilder<Map<String, String>> mapType() {
+      return new TypedConfigOptionBuilder<>(key, PROPERTIES_MAP_CLASS);
+    }
+
+    /**
+     * Creates a ConfigOption with the given default value.
+     *
+     * <p>This method does not accept "null". For options with no default 
value, choose one of the
+     * {@code noDefaultValue} methods.
+     *
+     * @param value The default value for the config option
+     * @param <T> The type of the default value.
+     * @return The config option with the default value.
+     * @deprecated define the type explicitly first with one of the intType(), 
stringType(), etc.
+     */
+    @Deprecated
+    public <T> ConfigOption<T> defaultValue(T value) {
+      checkNotNull(value);
+      return new ConfigOption<>(key, value, ConfigOption.EMPTY_DESCRIPTION, 
value.getClass());
+    }
+
+    /**
+     * Creates a string-valued option with no default value. String-valued 
options are the only ones
+     * that can have no default value.
+     *
+     * @return The created ConfigOption.
+     * @deprecated define the type explicitly first with one of the intType(), 
stringType(), etc.
+     */
+    @Deprecated
+    public ConfigOption<String> noDefaultValue() {
+      return new ConfigOption<>(key, null, ConfigOption.EMPTY_DESCRIPTION, 
String.class);
+    }
+  }
+
+  /**
+   * Builder for {@link ConfigOption} with a defined atomic type.
+   *
+   * @param <T> atomic type of the option
+   */
+  public static class TypedConfigOptionBuilder<T> {
+    private final String key;
+    private final Class<T> clazz;
+
+    TypedConfigOptionBuilder(String key, Class<T> clazz) {
+      this.key = key;
+      this.clazz = clazz;
+    }
+
+    /**
+     * Creates a ConfigOption with the given default value.
+     *
+     * @param value The default value for the config option
+     * @return The config option with the default value.
+     */
+    public ConfigOption<T> defaultValue(T value) {
+      return new ConfigOption<>(key, value, ConfigOption.EMPTY_DESCRIPTION, 
clazz);
+    }
+
+    /**
+     * Creates a ConfigOption without a default value.
+     *
+     * @return The config option without a default value.
+     */
+    public ConfigOption<T> noDefaultValue() {
+      return new ConfigOption<>(key, null, ConfigOption.EMPTY_DESCRIPTION, 
clazz);
+    }
+  }
+
+  public static <T> T checkNotNull(@Nullable T reference) {
+    if (reference == null) {
+      throw new NullPointerException();
+    }
+    return reference;
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ExecutionConfiguration.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ExecutionConfiguration.java
new file mode 100644
index 000000000..92dc198c2
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/ExecutionConfiguration.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway;
+
+import java.io.Serializable;
+
+public class ExecutionConfiguration implements Serializable {}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
new file mode 100644
index 000000000..a8be0eaf4
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway;
+
+import java.io.Serializable;
+import java.util.Objects;
+import java.util.UUID;
+
+/** {@link OperationHandle} to index the {@code Operation}. */
+public class OperationHandle implements Serializable {
+
+  private final UUID identifier;
+
+  public OperationHandle(UUID identifier) {
+    this.identifier = identifier;
+  }
+
+  public UUID getIdentifier() {
+    return identifier;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OperationHandle that = (OperationHandle) o;
+    return Objects.equals(identifier, that.identifier);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(identifier);
+  }
+
+  @Override
+  public String toString() {
+    return "OperationHandle{" + "identifier=" + identifier + '}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationStatus.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationStatus.java
new file mode 100644
index 000000000..21c9c0e4f
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationStatus.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway;
+
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Status to describe the {@code Operation}. */
+public enum OperationStatus {
+  /** The operation is newly created. */
+  INITIALIZED(false),
+
+  /** Prepare the resources for the operation. */
+  PENDING(false),
+
+  /** The operation is running. */
+  RUNNING(false),
+
+  /** All the work is finished and ready for the client to fetch the results. 
*/
+  FINISHED(true),
+
+  /** Operation has been cancelled. */
+  CANCELED(true),
+
+  /** Operation has been closed and all related resources are collected. */
+  CLOSED(true),
+
+  /** Some error happens. */
+  ERROR(true),
+
+  /** The execution of the operation timeout. */
+  TIMEOUT(true);
+
+  private final boolean isTerminalStatus;
+
+  OperationStatus(boolean isTerminalStatus) {
+    this.isTerminalStatus = isTerminalStatus;
+  }
+
+  public static boolean isValidStatusTransition(
+      OperationStatus fromStatus, OperationStatus toStatus) {
+    return toOperationStatusSet(fromStatus).contains(toStatus);
+  }
+
+  public boolean isTerminalStatus() {
+    return isTerminalStatus;
+  }
+
+  private static Set<OperationStatus> toOperationStatusSet(OperationStatus 
fromStatus) {
+    switch (fromStatus) {
+      case INITIALIZED:
+        return new HashSet<>(Arrays.asList(PENDING, CANCELED, CLOSED, TIMEOUT, 
ERROR));
+      case PENDING:
+        return new HashSet<>(Arrays.asList(RUNNING, CANCELED, CLOSED, TIMEOUT, 
ERROR));
+      case RUNNING:
+        return new HashSet<>(Arrays.asList(FINISHED, CANCELED, CLOSED, 
TIMEOUT, ERROR));
+      case FINISHED:
+      case CANCELED:
+      case TIMEOUT:
+      case ERROR:
+        return Collections.singleton(CLOSED);
+      case CLOSED:
+        return Collections.emptySet();
+      default:
+        throw new IllegalArgumentException("Unknown from status: " + 
fromStatus);
+    }
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/SqlGatewayException.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/SqlGatewayException.java
new file mode 100644
index 000000000..2b3268f7b
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/SqlGatewayException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.exception;
+
+/** General exception for SQL gateway related errors. */
+public class SqlGatewayException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public SqlGatewayException(String message) {
+    super(message);
+  }
+
+  public SqlGatewayException(String message, Throwable e) {
+    super(message, e);
+  }
+
+  public SqlGatewayException(Throwable e) {
+    super(e);
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/ValidationException.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/ValidationException.java
new file mode 100644
index 000000000..f9250ac2e
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/exception/ValidationException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.exception;
+
+/** General exception for SQL gateway related errors. */
+public class ValidationException extends RuntimeException {
+  private static final long serialVersionUID = 1L;
+
+  public ValidationException(String message) {
+    super(message);
+  }
+
+  public ValidationException(String message, Throwable e) {
+    super(message, e);
+  }
+
+  public ValidationException(Throwable e) {
+    super(e);
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/Factory.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/Factory.java
new file mode 100644
index 000000000..c0409e7f3
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/Factory.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import org.apache.streampark.gateway.ConfigOption;
+
+import java.util.Set;
+
+/** F */
+public interface Factory {
+  String factoryIdentifier();
+
+  Set<ConfigOption<?>> requiredOptions();
+
+  Set<ConfigOption<?>> optionalOptions();
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
new file mode 100644
index 000000000..a5fb8e263
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
@@ -0,0 +1,117 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.exception.ValidationException;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+/** Factory utils for {@link Factory}. */
+public class FactoryUtil {
+
+  private static final String DEFAULT_IDENTIFIER = "default";
+  private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
+  public static final ConfigOption<String> SQL_GATEWAY_SERVICE_TYPE =
+      ConfigOption.key("streampark.sql-gateway.service")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("The service to execute the request.");
+
+  public static <T extends Factory> T discoverFactory(
+      ClassLoader classLoader, Class<T> factoryClass, String 
factoryIdentifier) {
+    final List<Factory> factories = discoverFactories(classLoader);
+
+    final List<Factory> foundFactories =
+        factories.stream()
+            .filter(f -> factoryClass.isAssignableFrom(f.getClass()))
+            .collect(Collectors.toList());
+
+    if (foundFactories.isEmpty()) {
+      throw new ValidationException(
+          String.format(
+              "Could not find any factories that implement '%s' in the 
classpath.",
+              factoryClass.getName()));
+    }
+
+    final List<Factory> matchingFactories =
+        foundFactories.stream()
+            .filter(f -> f.factoryIdentifier().equals(factoryIdentifier))
+            .collect(Collectors.toList());
+
+    if (matchingFactories.isEmpty()) {
+      throw new ValidationException(
+          String.format(
+              "Could not find any factory for identifier '%s' that implements 
'%s' in the classpath.\n\n"
+                  + "Available factory identifiers are:\n\n"
+                  + "%s",
+              factoryIdentifier,
+              factoryClass.getName(),
+              foundFactories.stream()
+                  .map(Factory::factoryIdentifier)
+                  .filter(identifier -> !DEFAULT_IDENTIFIER.equals(identifier))
+                  .distinct()
+                  .sorted()
+                  .collect(Collectors.joining("\n"))));
+    }
+    if (matchingFactories.size() > 1) {
+      throw new ValidationException(
+          String.format(
+              "Multiple factories for identifier '%s' that implement '%s' 
found in the classpath.\n\n"
+                  + "Ambiguous factory classes are:\n\n"
+                  + "%s",
+              factoryIdentifier,
+              factoryClass.getName(),
+              matchingFactories.stream()
+                  .map(f -> f.getClass().getName())
+                  .sorted()
+                  .collect(Collectors.joining("\n"))));
+    }
+
+    return (T) matchingFactories.get(0);
+  }
+
+  static List<Factory> discoverFactories(ClassLoader classLoader) {
+    final List<Factory> result = new LinkedList<>();
+    ServiceLoaderUtil.load(Factory.class, classLoader)
+        .forEach(
+            loadResult -> {
+              if (loadResult.hasFailed()) {
+                if (loadResult.getError() instanceof NoClassDefFoundError) {
+                  LOG.debug(
+                      "NoClassDefFoundError when loading a "
+                          + Factory.class
+                          + ". This is expected when trying to load a format 
dependency but load failed.",
+                      loadResult.getError());
+                  // After logging, we just ignore this failure
+                  return;
+                }
+                throw new RuntimeException(
+                    "Unexpected error when trying to load service provider for 
factories.",
+                    loadResult.getError());
+              }
+              result.add(loadResult.getService());
+            });
+    return result;
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/ServiceLoaderUtil.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/ServiceLoaderUtil.java
new file mode 100644
index 000000000..7f770afd8
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/ServiceLoaderUtil.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.ServiceLoader;
+
+/** This class contains utilities to deal with {@link ServiceLoader}. */
+class ServiceLoaderUtil {
+
+  /**
+   * This method behaves similarly to {@link ServiceLoader#load(Class, 
ClassLoader)}, but it returns
+   * a list with the results of the iteration, wrapping the iteration failures 
such as {@link
+   * NoClassDefFoundError}.
+   */
+  static <T> List<LoadResult<T>> load(Class<T> clazz, ClassLoader classLoader) 
{
+    List<LoadResult<T>> loadResults = new ArrayList<>();
+
+    Iterator<T> serviceLoaderIterator = ServiceLoader.load(clazz, 
classLoader).iterator();
+
+    while (serviceLoaderIterator.hasNext()) {
+      try {
+        T next = serviceLoaderIterator.next();
+        loadResults.add(new LoadResult<>(next));
+      } catch (NoSuchElementException e) {
+        break;
+      } catch (Throwable t) {
+        loadResults.add(new LoadResult<>(t));
+      }
+    }
+
+    return loadResults;
+  }
+
+  static class LoadResult<T> {
+    private final T service;
+    private final Throwable error;
+
+    private LoadResult(T service, Throwable error) {
+      this.service = service;
+      this.error = error;
+    }
+
+    private LoadResult(T service) {
+      this(service, null);
+    }
+
+    private LoadResult(Throwable error) {
+      this(null, error);
+    }
+
+    public boolean hasFailed() {
+      return error != null;
+    }
+
+    public Throwable getError() {
+      return error;
+    }
+
+    public T getService() {
+      return service;
+    }
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactory.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactory.java
new file mode 100644
index 000000000..0986864a9
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactory.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.Map;
+
+/**
+ * A factory for creating {@link SqlGatewayService} from Configuration. This 
factory is used with
+ * Java's Service Provider Interfaces (SPI) for discovery.
+ */
+public interface SqlGatewayServiceFactory extends Factory {
+
+  /** Creates a {@link SqlGatewayService} from the given {@link Context}. */
+  SqlGatewayService createSqlGatewayService(Context context);
+
+  interface Context {
+
+    /** Gives read-only access to the configuration of the current session. */
+    Map<String, String> getGateWayServiceOptions();
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
new file mode 100644
index 000000000..a219739bb
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.factories;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.exception.ValidationException;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static 
org.apache.streampark.gateway.factories.FactoryUtil.SQL_GATEWAY_SERVICE_TYPE;
+
+/** Util to discover the {@link SqlGatewayService}. */
+public class SqlGatewayServiceFactoryUtils {
+
+  /**
+   * Attempts to discover the appropriate service factory and creates the 
instance of the services.
+   */
+  public static List<SqlGatewayService> createSqlGatewayService(Map<String, 
String> configuration) {
+
+    String identifiersStr =
+        
Optional.ofNullable(configuration.get(SQL_GATEWAY_SERVICE_TYPE.getKey()))
+            .map(
+                idStr -> {
+                  if (idStr.trim().isEmpty()) {
+                    return null;
+                  }
+                  return idStr.trim();
+                })
+            .orElseThrow(
+                () ->
+                    new ValidationException(
+                        String.format(
+                            "Service options do not contain an option key '%s' 
for discovering an service.",
+                            SQL_GATEWAY_SERVICE_TYPE.getKey())));
+
+    List<String> identifiers = Arrays.asList(identifiersStr.split(";"));
+
+    if (identifiers.isEmpty()) {
+      throw new ValidationException(
+          String.format(
+              "Service options do not contain an option key '%s' for 
discovering an service.",
+              SQL_GATEWAY_SERVICE_TYPE.getKey()));
+    }
+    validateSpecifiedServicesAreUnique(identifiers);
+
+    List<SqlGatewayService> services = new ArrayList<>();
+    for (String identifier : identifiers) {
+      final SqlGatewayServiceFactory factory =
+          FactoryUtil.discoverFactory(
+              Thread.currentThread().getContextClassLoader(),
+              SqlGatewayServiceFactory.class,
+              identifier);
+
+      services.add(
+          factory.createSqlGatewayService(new 
DefaultServiceFactoryContext(configuration)));
+    }
+    return services;
+  }
+
+  public static EndpointFactoryHelper createEndpointFactoryHelper(
+      SqlGatewayServiceFactory endpointFactory, 
SqlGatewayServiceFactory.Context context) {
+    return new EndpointFactoryHelper(endpointFactory, 
context.getGateWayServiceOptions());
+  }
+
+  public static class EndpointFactoryHelper {
+
+    public final SqlGatewayServiceFactory factory;
+    public final Map<String, String> configOptions;
+
+    public EndpointFactoryHelper(
+        SqlGatewayServiceFactory factory, Map<String, String> configOptions) {
+      this.factory = factory;
+      this.configOptions = configOptions;
+    }
+
+    public void validate() {
+      validateFactoryOptions(factory.requiredOptions(), configOptions);
+    }
+
+    public static void validateFactoryOptions(
+        Set<ConfigOption<?>> requiredOptions, Map<String, String> options) {
+
+      final List<String> missingRequiredOptions =
+          requiredOptions.stream()
+              .map(ConfigOption::getKey)
+              .filter(key -> options.get(key) == null)
+              .sorted()
+              .collect(Collectors.toList());
+
+      if (!missingRequiredOptions.isEmpty()) {
+        throw new ValidationException(
+            String.format(
+                "One or more required options are missing.\n\n"
+                    + "Missing required options are:\n\n"
+                    + "%s",
+                String.join("\n", missingRequiredOptions)));
+      }
+    }
+  }
+
+  /** The default context of {@link SqlGatewayServiceFactory}. */
+  public static class DefaultServiceFactoryContext implements 
SqlGatewayServiceFactory.Context {
+
+    private final Map<String, String> gateWayServiceOptions;
+
+    public DefaultServiceFactoryContext(Map<String, String> endpointConfig) {
+      this.gateWayServiceOptions = endpointConfig;
+    }
+
+    @Override
+    public Map<String, String> getGateWayServiceOptions() {
+      return gateWayServiceOptions;
+    }
+  }
+
+  private static void validateSpecifiedServicesAreUnique(List<String> 
identifiers) {
+    Set<String> uniqueIdentifiers = new HashSet<>();
+
+    for (String identifier : identifiers) {
+      if (uniqueIdentifiers.contains(identifier)) {
+        throw new ValidationException(
+            String.format(
+                "Get the duplicate service identifier '%s' for the option 
'%s'. "
+                    + "Please keep the specified service identifier unique.",
+                identifier, SQL_GATEWAY_SERVICE_TYPE.getKey()));
+      }
+      uniqueIdentifiers.add(identifier);
+    }
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/Column.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/Column.java
new file mode 100644
index 000000000..f99264814
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/Column.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Column information. */
+public class Column implements Serializable {
+
+  private final String name;
+
+  private final String type;
+
+  private final @Nullable String comment;
+
+  public Column(String name, String type, String comment) {
+    this.name = name;
+    this.type = type;
+    this.comment = comment;
+  }
+
+  public String getName() {
+    return name;
+  }
+
+  public String getType() {
+    return type;
+  }
+
+  @Nullable
+  public String getComment() {
+    return comment;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    Column column = (Column) o;
+    return Objects.equals(name, column.name)
+        && Objects.equals(type, column.type)
+        && Objects.equals(comment, column.comment);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(name, type, comment);
+  }
+
+  @Override
+  public String toString() {
+    return "Column{"
+        + "name='"
+        + name
+        + '\''
+        + ", type='"
+        + type
+        + '\''
+        + ", comment='"
+        + comment
+        + '\''
+        + '}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FetchOrientation.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FetchOrientation.java
new file mode 100644
index 000000000..b6439a504
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FetchOrientation.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+/** Orientation to fetch results. */
+public enum FetchOrientation {
+
+  /** Fetch the next results. */
+  FETCH_NEXT,
+
+  /** Fetch the prior results. */
+  FETCH_PRIOR
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FunctionInfo.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FunctionInfo.java
new file mode 100644
index 000000000..8b88d1bb2
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/FunctionInfo.java
@@ -0,0 +1,22 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+
+public class FunctionInfo implements Serializable {}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/GatewayInfo.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/GatewayInfo.java
new file mode 100644
index 000000000..0b9d23b88
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/GatewayInfo.java
@@ -0,0 +1,75 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Info to describe the {@link SqlGatewayService}. */
+public class GatewayInfo implements Serializable {
+
+  /** Gateway service type. */
+  public final String serviceType;
+
+  /** Gateway service version. */
+  public final String version;
+
+  public GatewayInfo(String serviceType, String version) {
+    this.serviceType = serviceType;
+    this.version = version;
+  }
+
+  public String getServiceType() {
+    return serviceType;
+  }
+
+  public String getVersion() {
+    return version;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    GatewayInfo that = (GatewayInfo) o;
+    return Objects.equals(serviceType, that.serviceType) && 
Objects.equals(version, that.version);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(serviceType, version);
+  }
+
+  @Override
+  public String toString() {
+    return "GatewayInfo{"
+        + "serviceType='"
+        + serviceType
+        + '\''
+        + ", version='"
+        + version
+        + '\''
+        + '}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/JobID.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/JobID.java
new file mode 100644
index 000000000..095e6c029
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/JobID.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public class JobID implements Serializable {
+  private String jobID;
+
+  public String getJobID() {
+    return jobID;
+  }
+
+  public void setJobID(String jobID) {
+    this.jobID = jobID;
+  }
+
+  public JobID(String jobID) {
+    this.jobID = jobID;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    JobID jobID1 = (JobID) o;
+    return Objects.equals(jobID, jobID1.jobID);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(jobID);
+  }
+
+  @Override
+  public String toString() {
+    return "JobID{" + "jobID='" + jobID + '\'' + '}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ObjectIdentifier.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ObjectIdentifier.java
new file mode 100644
index 000000000..7bfe8650a
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ObjectIdentifier.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+public final class ObjectIdentifier implements Serializable {
+
+  static final String UNKNOWN = "<UNKNOWN>";
+
+  private final @Nullable String catalogName;
+  private final @Nullable String databaseName;
+  private final String objectName;
+
+  public ObjectIdentifier(
+      @Nullable String catalogName, @Nullable String databaseName, String 
objectName) {
+    this.catalogName = catalogName;
+    this.databaseName = databaseName;
+    this.objectName = objectName;
+  }
+
+  @Nullable
+  public String getCatalogName() {
+    return catalogName;
+  }
+
+  @Nullable
+  public String getDatabaseName() {
+    return databaseName;
+  }
+
+  public String getObjectName() {
+    return objectName;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ObjectIdentifier that = (ObjectIdentifier) o;
+    return Objects.equals(catalogName, that.catalogName)
+        && Objects.equals(databaseName, that.databaseName)
+        && Objects.equals(objectName, that.objectName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(catalogName, databaseName, objectName);
+  }
+
+  @Override
+  public String toString() {
+    return "ObjectIdentifier{"
+        + "catalogName='"
+        + catalogName
+        + '\''
+        + ", databaseName='"
+        + databaseName
+        + '\''
+        + ", objectName='"
+        + objectName
+        + '\''
+        + '}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/OperationInfo.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/OperationInfo.java
new file mode 100644
index 000000000..7699f210b
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/OperationInfo.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import org.apache.streampark.gateway.OperationStatus;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Information of the {@code Operation}. */
+public class OperationInfo implements Serializable {
+
+  private final OperationStatus status;
+  @Nullable private final Exception exception;
+
+  public OperationInfo(OperationStatus status, @Nullable Exception exception) {
+    this.status = status;
+    this.exception = exception;
+  }
+
+  public OperationStatus getStatus() {
+    return status;
+  }
+
+  @Nullable
+  public Exception getException() {
+    return exception;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    OperationInfo that = (OperationInfo) o;
+    return status == that.status && Objects.equals(exception, that.exception);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(status, exception);
+  }
+
+  @Override
+  public String toString() {
+    return "OperationInfo{" + "status=" + status + ", exception=" + exception 
+ '}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultKind.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultKind.java
new file mode 100644
index 000000000..f2fd7e82e
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultKind.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+/** ResultKind defines the types of the result. */
+public enum ResultKind {
+  /**
+   * The statement (e.g. DDL, USE) executes successfully, and the result only 
contains a simple
+   * "OK".
+   */
+  SUCCESS,
+
+  /**
+   * The statement (e.g. DML, DQL, SHOW) executes successfully, and the result 
contains important
+   * content.
+   */
+  SUCCESS_WITH_CONTENT
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultQueryCondition.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultQueryCondition.java
new file mode 100644
index 000000000..cc740a950
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultQueryCondition.java
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Condition of result query. */
+public class ResultQueryCondition implements Serializable {
+  public FetchOrientation orientation;
+
+  public long token;
+  public int maxRows;
+
+  public ResultQueryCondition() {}
+
+  public ResultQueryCondition(FetchOrientation orientation, long token, int 
maxRows) {
+    this.orientation = orientation;
+    this.token = token;
+    this.maxRows = maxRows;
+  }
+
+  public FetchOrientation getOrientation() {
+    return orientation;
+  }
+
+  public void setOrientation(FetchOrientation orientation) {
+    this.orientation = orientation;
+  }
+
+  public long getToken() {
+    return token;
+  }
+
+  public void setToken(long token) {
+    this.token = token;
+  }
+
+  public int getMaxRows() {
+    return maxRows;
+  }
+
+  public void setMaxRows(int maxRows) {
+    this.maxRows = maxRows;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ResultQueryCondition that = (ResultQueryCondition) o;
+    return token == that.token && maxRows == that.maxRows && orientation == 
that.orientation;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(orientation, token, maxRows);
+  }
+
+  @Override
+  public String toString() {
+    return "ResultQueryCondition{"
+        + "orientation="
+        + orientation
+        + ", token="
+        + token
+        + ", maxRows="
+        + maxRows
+        + '}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultSet.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultSet.java
new file mode 100644
index 000000000..fafc0a800
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/ResultSet.java
@@ -0,0 +1,172 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/** An implementation of {@link ResultSet}. */
+public class ResultSet implements Serializable {
+
+  /** The type of the results, which may indicate the result is EOS or has 
data. */
+  private final ResultType resultType;
+
+  /**
+   * The token indicates the next batch of the data.
+   *
+   * <p>When the token is null, it means all the data has been fetched.
+   */
+  @Nullable private final Long nextToken;
+
+  /**
+   * The schema of the data.
+   *
+   * <p>The schema of the DDL, USE, EXPLAIN, SHOW and DESCRIBE align with the 
schema of the {@link
+   * TableResult#getResolvedSchema()}. The only differences is the schema of 
the `INSERT` statement.
+   *
+   * <p>The schema of INSERT:
+   *
+   * <pre>
+   * +-------------+-------------+----------+
+   * | column name | column type | comments |
+   * +-------------+-------------+----------+
+   * |   job id    |    string   |          |
+   * +- -----------+-------------+----------+
+   * </pre>
+   */
+  private final List<Column> columns;
+
+  /** All the data in the current results. */
+  private final List<RowData> data;
+
+  /** Indicates that whether the result is for a query. */
+  private final boolean isQueryResult;
+  /**
+   * If the statement was submitted to a client, returns the JobID which 
uniquely identifies the
+   * job. Otherwise, returns null.
+   */
+  @Nullable private final JobID jobID;
+
+  /** Gets the result kind of the result. */
+  private final ResultKind resultKind;
+
+  public ResultSet(
+      ResultType resultType,
+      @Nullable Long nextToken,
+      List<Column> columns,
+      List<RowData> data,
+      boolean isQueryResult,
+      @Nullable JobID jobID,
+      ResultKind resultKind) {
+    this.resultType = resultType;
+    this.nextToken = nextToken;
+    this.columns = columns;
+    this.data = data;
+    this.isQueryResult = isQueryResult;
+    this.jobID = jobID;
+    this.resultKind = resultKind;
+  }
+
+  public ResultType getResultType() {
+    return resultType;
+  }
+
+  @Nullable
+  public Long getNextToken() {
+    return nextToken;
+  }
+
+  public List<Column> getColumns() {
+    return columns;
+  }
+
+  public List<RowData> getData() {
+    return data;
+  }
+
+  public boolean isQueryResult() {
+    return isQueryResult;
+  }
+
+  @Nullable
+  public JobID getJobID() {
+    return jobID;
+  }
+
+  public ResultKind getResultKind() {
+    return resultKind;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    ResultSet resultSet = (ResultSet) o;
+    return isQueryResult == resultSet.isQueryResult
+        && resultType == resultSet.resultType
+        && Objects.equals(nextToken, resultSet.nextToken)
+        && Objects.equals(columns, resultSet.columns)
+        && Objects.equals(data, resultSet.data)
+        && Objects.equals(jobID, resultSet.jobID)
+        && resultKind == resultSet.resultKind;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(resultType, nextToken, columns, data, isQueryResult, 
jobID, resultKind);
+  }
+
+  @Override
+  public String toString() {
+    return "ResultSet{"
+        + "resultType="
+        + resultType
+        + ", nextToken="
+        + nextToken
+        + ", resultSchema="
+        + columns
+        + ", data="
+        + data
+        + ", isQueryResult="
+        + isQueryResult
+        + ", jobID="
+        + jobID
+        + ", resultKind="
+        + resultKind
+        + '}';
+  }
+
+  /** Describe the kind of the result. */
+  public enum ResultType {
+    /** Indicate the result is not ready. */
+    NOT_READY,
+
+    /** Indicate the result has data. */
+    PAYLOAD,
+
+    /** Indicate all results have been fetched. */
+    EOS
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/RowData.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/RowData.java
new file mode 100644
index 000000000..891d0b80a
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/RowData.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Objects;
+
+/** Row data. */
+public class RowData implements Serializable {
+  private final String rowKind;
+
+  private final List<Object> fields;
+
+  public RowData(String rowKind, List<Object> fields) {
+    this.rowKind = rowKind;
+    this.fields = fields;
+  }
+
+  public String getRowKind() {
+    return rowKind;
+  }
+
+  public List<Object> getFields() {
+    return fields;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    RowData rowData = (RowData) o;
+    return Objects.equals(rowKind, rowData.rowKind) && Objects.equals(fields, 
rowData.fields);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(rowKind, fields);
+  }
+
+  @Override
+  public String toString() {
+    return "RowData{" + "rowKind='" + rowKind + '\'' + ", fields=" + fields + 
'}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableInfo.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableInfo.java
new file mode 100644
index 000000000..b096edb57
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableInfo.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+import java.io.Serializable;
+import java.util.Objects;
+
+/** Information of the table or view. */
+public class TableInfo implements Serializable {
+  private final ObjectIdentifier identifier;
+  private final TableKind tableKind;
+
+  public TableInfo(ObjectIdentifier identifier, TableKind tableKind) {
+    this.identifier = identifier;
+    this.tableKind = tableKind;
+  }
+
+  public ObjectIdentifier getIdentifier() {
+    return identifier;
+  }
+
+  public TableKind getTableKind() {
+    return tableKind;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    TableInfo tableInfo = (TableInfo) o;
+    return Objects.equals(identifier, tableInfo.identifier) && tableKind == 
tableInfo.tableKind;
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(identifier, tableKind);
+  }
+
+  @Override
+  public String toString() {
+    return "TableInfo{" + "identifier=" + identifier + ", tableKind=" + 
tableKind + '}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableKind.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableKind.java
new file mode 100644
index 000000000..cd76a613f
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/results/TableKind.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.results;
+
+public enum TableKind {
+  TABLE,
+  VIEW
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
new file mode 100644
index 000000000..d737bbb0a
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.service;
+
+import org.apache.streampark.gateway.ExecutionConfiguration;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.OperationStatus;
+import org.apache.streampark.gateway.exception.SqlGatewayException;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+/** A service of SQL gateway is responsible for handling requests from 
streampark console. */
+public interface SqlGatewayService {
+
+  // 
-------------------------------------------------------------------------------------------
+  // Info API
+  // 
-------------------------------------------------------------------------------------------
+
+  GatewayInfo getGatewayInfo() throws SqlGatewayException;
+
+  // 
-------------------------------------------------------------------------------------------
+  // Session Management
+  // 
-------------------------------------------------------------------------------------------
+
+  /**
+   * Open the {@code Session}.
+   *
+   * @param environment Environment to initialize the Session.
+   * @return Returns a handle that used to identify the Session.
+   */
+  SessionHandle openSession(SessionEnvironment environment) throws 
SqlGatewayException;
+
+  /**
+   * Heartbeat for session
+   *
+   * @param sessionHandle handle to identify the Session.
+   */
+  void heartbeat(SessionHandle sessionHandle) throws SqlGatewayException;
+
+  /**
+   * Close the {@code Session}.
+   *
+   * @param sessionHandle handle to identify the Session needs to be closed.
+   */
+  void closeSession(SessionHandle sessionHandle) throws SqlGatewayException;
+
+  // 
-------------------------------------------------------------------------------------------
+  // Operation Management
+  // 
-------------------------------------------------------------------------------------------
+
+  /**
+   * Cancel the operation when it is not in terminal status.
+   *
+   * <p>It can't cancel an Operation if it is terminated.
+   *
+   * @param sessionHandle handle to identify the session.
+   * @param operationHandle handle to identify the operation.JarURLConnection
+   */
+  void cancelOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException;
+
+  /**
+   * Close the operation and release all used resource by the operation.
+   *
+   * @param sessionHandle handle to identify the session.
+   * @param operationHandle handle to identify the operation.
+   */
+  void closeOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException;
+
+  /**
+   * Get the {@link OperationInfo} of the operation.
+   *
+   * @param sessionHandle handle to identify the session.
+   * @param operationHandle handle to identify the operation.
+   */
+  OperationInfo getOperationInfo(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException;
+
+  /**
+   * Get the result schema for the specified Operation.
+   *
+   * <p>Note: The result schema is available when the Operation is in the 
{@link
+   * OperationStatus#FINISHED}.
+   *
+   * @param sessionHandle handle to identify the session.
+   * @param operationHandle handle to identify the operation.
+   */
+  Column getOperationResultSchema(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException;
+
+  // 
-------------------------------------------------------------------------------------------
+  // Statements API
+  // 
-------------------------------------------------------------------------------------------
+
+  /**
+   * Execute the submitted statement.
+   *
+   * @param sessionHandle handle to identify the session.
+   * @param statement the SQL to execute.
+   * @param executionTimeoutMs the execution timeout. Please use non-positive 
value to forbid the
+   *     timeout mechanism.
+   * @param executionConfig execution config for the statement.
+   * @return handle to identify the operation.
+   */
+  OperationHandle executeStatement(
+      SessionHandle sessionHandle,
+      String statement,
+      long executionTimeoutMs,
+      ExecutionConfiguration executionConfig)
+      throws SqlGatewayException;
+
+  /**
+   * Fetch the results from the operation. When maxRows is Integer.MAX_VALUE, 
it means to fetch all
+   * available data.
+   *
+   * @param sessionHandle handle to identify the session.
+   * @param operationHandle handle to identify the operation.
+   * @param resultQueryCondition condition of result query.
+   * @return Returns the results.
+   */
+  ResultSet fetchResults(
+      SessionHandle sessionHandle,
+      OperationHandle operationHandle,
+      ResultQueryCondition resultQueryCondition)
+      throws SqlGatewayException;
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionEnvironment.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionEnvironment.java
new file mode 100644
index 000000000..3baebd360
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionEnvironment.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.session;
+
+import javax.annotation.Nullable;
+
+import java.io.Serializable;
+import java.util.Map;
+import java.util.Objects;
+
+/** Environment to initialize the {@code Session}. */
+public class SessionEnvironment implements Serializable {
+  private final @Nullable String sessionName;
+  private final @Nullable String defaultCatalog;
+  private final Map<String, String> sessionConfig;
+
+  public SessionEnvironment(
+      @Nullable String sessionName,
+      @Nullable String defaultCatalog,
+      Map<String, String> sessionConfig) {
+    this.sessionName = sessionName;
+    this.defaultCatalog = defaultCatalog;
+    this.sessionConfig = sessionConfig;
+  }
+
+  @Nullable
+  public String getSessionName() {
+    return sessionName;
+  }
+
+  @Nullable
+  public String getDefaultCatalog() {
+    return defaultCatalog;
+  }
+
+  public Map<String, String> getSessionConfig() {
+    return sessionConfig;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    SessionEnvironment that = (SessionEnvironment) o;
+    return Objects.equals(sessionName, that.sessionName)
+        && Objects.equals(defaultCatalog, that.defaultCatalog)
+        && Objects.equals(sessionConfig, that.sessionConfig);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(sessionName, defaultCatalog, sessionConfig);
+  }
+
+  @Override
+  public String toString() {
+    return "SessionEnvironment{"
+        + "sessionName='"
+        + sessionName
+        + '\''
+        + ", defaultCatalog='"
+        + defaultCatalog
+        + '\''
+        + ", sessionConfig="
+        + sessionConfig
+        + '}';
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
new file mode 100644
index 000000000..b57d1ac3e
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.session;
+
+import java.util.Objects;
+import java.util.UUID;
+
+/** Session Handle that used to identify the Session. */
+public class SessionHandle {
+
+  private final UUID identifier;
+
+  public static SessionHandle create() {
+    return new SessionHandle(UUID.randomUUID());
+  }
+
+  public SessionHandle(UUID identifier) {
+    this.identifier = identifier;
+  }
+
+  public UUID getIdentifier() {
+    return identifier;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (!(o instanceof SessionHandle)) {
+      return false;
+    }
+    SessionHandle that = (SessionHandle) o;
+    return Objects.equals(identifier, that.identifier);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(identifier);
+  }
+
+  @Override
+  public String toString() {
+    return identifier.toString();
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java
new file mode 100644
index 000000000..475676c0f
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/service/SqlGatewayServiceFactoryUtilsTest.java
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.service;
+
+import org.apache.streampark.gateway.exception.ValidationException;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils;
+import org.apache.streampark.gateway.utils.FakeSqlGatewayService;
+import org.apache.streampark.gateway.utils.MockedSqlGatewayService;
+
+import org.assertj.core.api.AbstractThrowableAssert;
+import org.assertj.core.api.AssertFactory;
+import org.assertj.core.api.Assertions;
+import org.assertj.core.api.InstanceOfAssertFactory;
+import org.assertj.core.api.ListAssert;
+import org.assertj.core.api.ThrowingConsumer;
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.UUID;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+import static 
org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils.createSqlGatewayService;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThat;
+
+/** Test {@link SqlGatewayServiceFactoryUtils}. */
+public class SqlGatewayServiceFactoryUtilsTest {
+  public static final InstanceOfAssertFactory<Stream, ListAssert<Throwable>> 
STREAM_THROWABLE =
+      new InstanceOfAssertFactory<>(Stream.class, 
Assertions::<Throwable>assertThat);
+
+  @Test
+  public void testCreateServices() {
+    String id = UUID.randomUUID().toString();
+    Map<String, String> config = getDefaultConfig(id);
+    config.put("streampark.sql-gateway.service", "mocked;fake");
+    List<SqlGatewayService> actual = 
SqlGatewayServiceFactoryUtils.createSqlGatewayService(config);
+    MockedSqlGatewayService expectedMocked =
+        new MockedSqlGatewayService("localhost", 8080, "The Mocked SQL gateway 
service");
+    assertThat(actual).isEqualTo(Arrays.asList(expectedMocked, 
FakeSqlGatewayService.INSTANCE));
+  }
+
+  @Test
+  public void testCreateServiceWithDuplicateIdentifier() {
+    Map<String, String> config = getDefaultConfig();
+    config.put("streampark.sql-gateway.service", "mocked;mocked");
+    validateException(
+        config,
+        "Get the duplicate service identifier 'mocked' for the option 
'streampark.sql-gateway.service'. "
+            + "Please keep the specified service identifier unique.");
+  }
+
+  @Test
+  public void testCreateServiceWithoutType() {
+    Map<String, String> config = getDefaultConfig();
+    config.remove("streampark.sql-gateway.service");
+    validateException(
+        config,
+        "Service options do not contain an option key 
'streampark.sql-gateway.service' for discovering an service.");
+  }
+
+  @Test
+  public void testCreateUnknownService() {
+    Map<String, String> config = getDefaultConfig();
+    config.put("streampark.sql-gateway.service", "mocked;unknown");
+    validateException(
+        config,
+        String.format(
+            "Could not find any factory for identifier 'unknown' "
+                + "that implements '%s' in the classpath.",
+            SqlGatewayServiceFactory.class.getCanonicalName()));
+  }
+
+  /*  @Test
+  public void testCreateServiceWithMissingOptions() {
+    Map<String, String> config = getDefaultConfig();
+    config.remove("sql-gateway.Service.mocked.host");
+
+    validateException(
+        config,
+        "One or more required options are missing.\n\n"
+            + "Missing required options are:\n\n"
+            + "host");
+  }*/
+
+  /*  @Test
+  public void testCreateServiceWithUnconsumedOptions() {
+    Map<String, String> config = getDefaultConfig();
+    config.put("sql-gateway.Service.mocked.unconsumed-option", "error");
+
+    validateException(
+        config,
+        "Unsupported options found for 'mocked'.\n\n"
+            + "Unsupported options:\n\n"
+            + "unconsumed-option\n\n"
+            + "Supported options:\n\n"
+            + "description\n"
+            + "host\n"
+            + "id\n"
+            + "port");
+  }*/
+
+  // 
--------------------------------------------------------------------------------------------
+
+  private void validateException(Map<String, String> config, String 
errorMessage) {
+    assertThatThrownBy(() -> createSqlGatewayService(config))
+        .satisfies(anyCauseMatches(ValidationException.class, errorMessage));
+  }
+
+  private Map<String, String> getDefaultConfig() {
+    return getDefaultConfig(UUID.randomUUID().toString());
+  }
+
+  private Map<String, String> getDefaultConfig(String id) {
+    Map<String, String> config = new HashMap<>();
+    config.put("sql-gateway.Service.mocked.id", id);
+    config.put("streampark.sql-gateway.service", "mocked");
+    config.put("sql-gateway.Service.mocked.host", "localhost");
+    config.put("sql-gateway.Service.mocked.port", "9999");
+    return config;
+  }
+
+  /**
+   * Shorthand to assert chain of causes. Same as:
+   *
+   * <pre>{@code
+   * assertThat(throwable)
+   *     .extracting(FlinkAssertions::chainOfCauses, 
FlinkAssertions.STREAM_THROWABLE)
+   * }</pre>
+   */
+  public static ListAssert<Throwable> assertThatChainOfCauses(Throwable root) {
+    return Assertions.assertThat(root)
+        .extracting(SqlGatewayServiceFactoryUtilsTest::chainOfCauses, 
STREAM_THROWABLE);
+  }
+
+  public static ThrowingConsumer<? super Throwable> anyCauseMatches(
+      Class<? extends Throwable> clazz, String containsMessage) {
+    return t ->
+        assertThatChainOfCauses(t)
+            .as(
+                "Any cause is instance of class '%s' and contains message 
'%s'",
+                clazz, containsMessage)
+            .anySatisfy(
+                cause ->
+                    Assertions.assertThat(cause)
+                        .isInstanceOf(clazz)
+                        .hasMessageContaining(containsMessage));
+  }
+
+  /**
+   * You can use this method in combination with {@link 
AbstractThrowableAssert#extracting(Function,
+   * AssertFactory)} to perform assertions on a chain of causes. For example:
+   *
+   * <pre>{@code
+   * assertThat(throwable)
+   *     .extracting(FlinkAssertions::chainOfCauses, 
FlinkAssertions.STREAM_THROWABLE)
+   * }</pre>
+   *
+   * @return the list is ordered from the current {@link Throwable} up to the 
root cause.
+   */
+  public static Stream<Throwable> chainOfCauses(Throwable throwable) {
+    if (throwable == null) {
+      return Stream.empty();
+    }
+    if (throwable.getCause() == null) {
+      return Stream.of(throwable);
+    }
+    return Stream.concat(Stream.of(throwable), 
chainOfCauses(throwable.getCause()));
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
new file mode 100644
index 000000000..fcc6303f5
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.utils;
+
+import org.apache.streampark.gateway.ExecutionConfiguration;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.exception.SqlGatewayException;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+/** Mocked implementation of {@link SqlGatewayService}. */
+public class FakeSqlGatewayService implements SqlGatewayService {
+
+  public static final FakeSqlGatewayService INSTANCE = new 
FakeSqlGatewayService();
+
+  private FakeSqlGatewayService() {}
+
+  @Override
+  public GatewayInfo getGatewayInfo() throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SessionHandle openSession(SessionEnvironment environment) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void heartbeat(SessionHandle sessionHandle) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void closeSession(SessionHandle sessionHandle) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void cancelOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void closeOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OperationInfo getOperationInfo(
+      SessionHandle sessionHandle, OperationHandle operationHandle) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Column getOperationResultSchema(
+      SessionHandle sessionHandle, OperationHandle operationHandle) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OperationHandle executeStatement(
+      SessionHandle sessionHandle,
+      String statement,
+      long executionTimeoutMs,
+      ExecutionConfiguration executionConfig)
+      throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ResultSet fetchResults(
+      SessionHandle sessionHandle,
+      OperationHandle operationHandle,
+      ResultQueryCondition resultQueryCondition)
+      throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayServiceFactory.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayServiceFactory.java
new file mode 100644
index 000000000..054e5553d
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayServiceFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.utils;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Factory for {@link SqlGatewayService}. */
+public class FakeSqlGatewayServiceFactory implements SqlGatewayServiceFactory {
+
+  public static final ConfigOption<String> HOST =
+      ConfigOption.key("host")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("The host of the Fake SQL gateway service.");
+
+  public static final ConfigOption<Integer> PORT =
+      ConfigOption.key("port")
+          .intType()
+          .noDefaultValue()
+          .withDescription("The port of the Fake SQL gateway service.");
+
+  public static final ConfigOption<Integer> DESCRIPTION =
+      ConfigOption.key("description")
+          .intType()
+          .defaultValue(8080)
+          .withDescription("The Fake SQL gateway service.");
+
+  @Override
+  public String factoryIdentifier() {
+    return "fake";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    Set<ConfigOption<?>> options = new HashSet<>();
+    options.add(HOST);
+    options.add(PORT);
+    return options;
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return Collections.singleton(DESCRIPTION);
+  }
+
+  @Override
+  public SqlGatewayService createSqlGatewayService(Context context) {
+    return FakeSqlGatewayService.INSTANCE;
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
new file mode 100644
index 000000000..e1f7be03f
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.utils;
+
+import org.apache.streampark.gateway.ExecutionConfiguration;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.exception.SqlGatewayException;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+import java.util.Objects;
+
+/** Mocked implementation of {@link SqlGatewayService}. */
+public class MockedSqlGatewayService implements SqlGatewayService {
+
+  public final String host;
+  public final int port;
+
+  public final String description;
+
+  public MockedSqlGatewayService(String host, int port, String description) {
+    this.host = host;
+    this.port = port;
+    this.description = description;
+  }
+
+  @Override
+  public GatewayInfo getGatewayInfo() throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public SessionHandle openSession(SessionEnvironment environment) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void heartbeat(SessionHandle sessionHandle) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void closeSession(SessionHandle sessionHandle) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void cancelOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public void closeOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OperationInfo getOperationInfo(
+      SessionHandle sessionHandle, OperationHandle operationHandle) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public Column getOperationResultSchema(
+      SessionHandle sessionHandle, OperationHandle operationHandle) throws 
SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public OperationHandle executeStatement(
+      SessionHandle sessionHandle,
+      String statement,
+      long executionTimeoutMs,
+      ExecutionConfiguration executionConfig)
+      throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public ResultSet fetchResults(
+      SessionHandle sessionHandle,
+      OperationHandle operationHandle,
+      ResultQueryCondition resultQueryCondition)
+      throws SqlGatewayException {
+    throw new UnsupportedOperationException();
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+    MockedSqlGatewayService that = (MockedSqlGatewayService) o;
+    return port == that.port
+        && Objects.equals(host, that.host)
+        && Objects.equals(description, that.description);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(host, port, description);
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayServiceFactory.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayServiceFactory.java
new file mode 100644
index 000000000..76ac79f32
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayServiceFactory.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.utils;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Factory for {@link SqlGatewayService}. */
+public class MockedSqlGatewayServiceFactory implements 
SqlGatewayServiceFactory {
+
+  public static final ConfigOption<String> HOST =
+      ConfigOption.key("host")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("The host of the Mocked SQL gateway service.");
+
+  public static final ConfigOption<Integer> PORT =
+      ConfigOption.key("port")
+          .intType()
+          .noDefaultValue()
+          .withDescription("The port of the Mocked SQL gateway service.");
+
+  public static final ConfigOption<Integer> DESCRIPTION =
+      ConfigOption.key("description")
+          .intType()
+          .defaultValue(8080)
+          .withDescription("The Mocked SQL gateway service.");
+
+  @Override
+  public String factoryIdentifier() {
+    return "mocked";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    Set<ConfigOption<?>> options = new HashSet<>();
+    options.add(HOST);
+    options.add(PORT);
+    return options;
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return Collections.singleton(DESCRIPTION);
+  }
+
+  @Override
+  public SqlGatewayService createSqlGatewayService(Context context) {
+    return new MockedSqlGatewayService("localhost", 8080, "The Mocked SQL 
gateway service");
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
new file mode 100644
index 000000000..3c317041c
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
@@ -0,0 +1,20 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+org.apache.streampark.gateway.utils.MockedSqlGatewayServiceFactory
+org.apache.streampark.gateway.utils.FakeSqlGatewayServiceFactory

Reply via email to