This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new cac931ae2 [sql gateway]Implement flink native (#2737)
cac931ae2 is described below

commit cac931ae289e0753892279336e1c4e70e5f7d7c6
Author: gongzhongqiang <[email protected]>
AuthorDate: Sun Jul 2 18:28:02 2023 +0800

    [sql gateway]Implement flink native (#2737)
    
    * Implement flink native and kyuubi sql gateway
    
    * fix exception when fetch result with row type
    
    * polish code
    
    * add gateway management
    
    * add check method to check flink whether version is supported
    
    * complete k8s session config
    
    * complete gateway session module
    
    * Add gateway manage frontend
    
    * polish gateway
    
    * adress comment, remove mysql driver dependency
    
    * move sql to 2.2.0
    
    * frontend code format
    
    ---------
    
    Co-authored-by: gongzhongqiang <[email protected]>
---
 .../streampark-console-service/pom.xml             |  13 +-
 .../main/assembly/script/upgrade/mysql/2.2.0.sql   |  33 ++
 .../main/assembly/script/upgrade/pgsql/2.2.0.sql   |  40 ++
 .../core/controller/FlinkGateWayController.java    | 102 ++++
 .../core/controller/SqlWorkBenchController.java    | 195 +++++++
 .../console/core/entity/FlinkGateWay.java          |  51 ++
 .../console/core/enums/GatewayTypeEnum.java        |  55 +-
 .../console/core/mapper/FlinkGateWayMapper.java}   |  34 +-
 .../console/core/service/FlinkGateWayService.java} |  44 +-
 .../console/core/service/SqlWorkBenchService.java  | 137 +++++
 .../core/service/impl/FlinkGateWayServiceImpl.java |  91 ++++
 .../core/service/impl/SqlWorkBenchServiceImpl.java | 220 ++++++++
 .../resources/mapper/core/FlinkGateWayMapper.xml   |  27 +-
 .../src/api/flink/setting/flinkGateway.ts          |  78 +++
 .../src/locales/lang/en/menu.ts                    |   1 +
 .../{zh-CN/menu.ts => en/setting/flinkGateway.ts}  |  41 +-
 .../src/locales/lang/zh-CN/menu.ts                 |   5 +-
 .../zh-CN/{menu.ts => setting/flinkGateway.ts}     |  41 +-
 .../FlinkGateway/components/FlinkGatewayDrawer.vue |  87 ++++
 .../src/views/setting/FlinkGateway/index.data.ts   |  87 ++++
 .../src/views/setting/FlinkGateway/index.vue       | 136 +++++
 .../streampark-flink-sql-gateway/pom.xml           |   2 +
 .../apache/streampark/gateway/OperationHandle.java |   7 +-
 .../gateway/service/SqlGatewayService.java         |  12 +
 .../streampark/gateway/session/SessionHandle.java  |  11 +-
 .../gateway/utils/FakeSqlGatewayService.java       |   5 +
 .../gateway/utils/MockedSqlGatewayService.java     |   5 +
 .../streampark-flink-sql-gateway-flink-v1/pom.xml  | 196 +++++++
 .../gateway/flink/FlinkSqlGatewayImpl.java         | 239 +++++++++
 .../flink/FlinkSqlGatewayServiceFactory.java       |  63 +++
 ...org.apache.streampark.gateway.factories.Factory |  19 +
 .../main/resources/flink_sql_gateway_rest_v1.yml   | 573 +++++++++++++++++++++
 .../streampark/gateway/flink/FlinkSqlGateway.java  |  46 ++
 .../gateway/flink/FlinkSqlGatewayExample.java      | 229 ++++++++
 .../pom.xml                                        |  18 +-
 .../streampark/gateway/kyuubi/package-info.java    |  28 +-
 36 files changed, 2787 insertions(+), 184 deletions(-)

diff --git a/streampark-console/streampark-console-service/pom.xml 
b/streampark-console/streampark-console-service/pom.xml
index 5705bded4..bbc761000 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -282,7 +282,6 @@
             <version>${postgresql.version}</version>
             <optional>true</optional>
         </dependency>
-
         <dependency>
             <groupId>org.freemarker</groupId>
             <artifactId>freemarker</artifactId>
@@ -372,6 +371,18 @@
             <version>${project.version}</version>
         </dependency>
 
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-flink-sql-gateway-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-flink-sql-gateway-flink-v1</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
         <dependency>
             <groupId>com.fasterxml.jackson.module</groupId>
             
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 81db6ac9b..16404952d 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -71,4 +71,37 @@ insert into `t_role_menu` (role_id, menu_id) values (100002, 
120403);
 alter table `t_user` modify column `password` varchar(64) collate 
utf8mb4_general_ci default null comment 'password';
 alter table `t_user` modify column `login_type` tinyint default 0 comment 
'login type 0:password 1:ldap 2:sso';
 
+-- ----------------------------
+-- Table of t_flink_gateway
+-- ----------------------------
+drop table if exists `t_flink_gateway`;
+create table `t_flink_gateway` (
+                                `id` bigint not null auto_increment,
+                                `gateway_name` varchar(128) collate 
utf8mb4_general_ci not null comment 'The name of the gateway',
+                                `description` text collate utf8mb4_general_ci 
default null comment 'More detailed description of resource',
+                                `gateway_type` int not null comment 'The type 
of the gateway',
+                                `address` varchar(150) default null comment 
'url address of gateway endpoint',
+                                `create_time` datetime not null default 
current_timestamp comment 'create time',
+                                `modify_time` datetime not null default 
current_timestamp on update current_timestamp comment 'modify time',
+                                primary key (`id`) using btree
+) engine=innodb auto_increment=100000 default charset=utf8mb4 
collate=utf8mb4_general_ci;
+
+-- menu level 2
+insert into `t_menu` values (120500, 130000, 'setting.flinkGateway', 
'/setting/FlinkGateway', 'setting/FlinkGateway/index', null, 'apartment', '0', 
1, 3, now(), now());
+-- menu level 3
+insert into `t_menu` values (120501, 120500, 'add', NULL, NULL, 'gateway:add', 
NULL, '1', 1, NULL, now(), now());
+insert into `t_menu` values (120502, 120500, 'update', NULL, NULL, 
'gateway:update', NULL, '1', 1, NULL, now(), now());
+insert into `t_menu` values (120503, 120500, 'delete', NULL, NULL, 
'gateway:delete', NULL, '1', 1, NULL, now(), now());
+
+-- role menu script
+insert into `t_role_menu` (role_id, menu_id) values (100001, 120500);
+insert into `t_role_menu` (role_id, menu_id) values (100001, 120501);
+insert into `t_role_menu` (role_id, menu_id) values (100001, 120502);
+insert into `t_role_menu` (role_id, menu_id) values (100001, 120503);
+
+insert into `t_role_menu` (role_id, menu_id) values (100002, 120500);
+insert into `t_role_menu` (role_id, menu_id) values (100002, 120501);
+insert into `t_role_menu` (role_id, menu_id) values (100002, 120502);
+insert into `t_role_menu` (role_id, menu_id) values (100002, 120503);
+
 set foreign_key_checks = 1;
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index 9352cd563..56b055ce8 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -80,3 +80,43 @@ insert into "public"."t_role_menu" (role_id, menu_id) values 
(100002, 120403);
 -- add sso as login type
 alter table "public"."t_user" alter column "password" TYPE varchar(64) collate 
"pg_catalog"."default";
 comment on column "public"."t_user"."login_type" is 'login type 0:password 
1:ldap 2:sso';
+
+-- ----------------------------
+-- Table of t_flink_gateway
+-- ----------------------------
+create sequence "public"."streampark_t_flink_gateway_id_seq"
+    increment 1 start 10000 cache 1 minvalue 10000 maxvalue 
9223372036854775807;
+
+create table "public"."t_flink_gateway" (
+                                            "id" int8 not null default 
nextval('streampark_t_resource_id_seq'::regclass),
+                                            "gateway_name" varchar(128) 
collate "pg_catalog"."default" not null,
+                                            "description" text collate 
"pg_catalog"."default" default null,
+                                            "gateway_type" int4,
+                                            "address" varchar(150) collate 
"pg_catalog"."default",
+                                            "create_time" timestamp(6) not 
null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
+                                            "modify_time" timestamp(6) not 
null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
+);
+comment on column "public"."t_flink_gateway"."id" is 'The id of the gateway';
+comment on column "public"."t_flink_gateway"."gateway_name" is 'The name of 
the gateway';
+comment on column "public"."t_flink_gateway"."description" is 'More detailed 
description of resource';
+comment on column "public"."t_flink_gateway"."gateway_type" is 'The type of 
the gateway';
+comment on column "public"."t_flink_gateway"."address" is 'url address of 
gateway endpoint';
+comment on column "public"."t_flink_gateway"."create_time" is 'create time';
+comment on column "public"."t_flink_gateway"."modify_time" is 'modify time';
+
+alter table "public"."t_flink_gateway" add constraint "t_flink_gateway_pkey" 
primary key ("id");
+
+insert into "public"."t_menu" values (120500, 130000, 'setting.flinkGateway', 
'/setting/FlinkGateway', 'setting/FlinkGateway/index', null, 'apartment', '0', 
'1', 3, now(), now());
+insert into "public"."t_menu" values (110501, 110500, 'add', null, null, 
'gateway:add', null, '1', '1', null, now(), now());
+insert into "public"."t_menu" values (110502, 110500, 'update', null, null, 
'gateway:update', null, '1', '1', null, now(), now());
+insert into "public"."t_menu" values (110503, 110500, 'delete', null, null, 
'gateway:delete', null, '1', '1', null, now(), now());
+
+insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 120500);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 120501);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 120502);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 120503);
+
+insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 120500);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 120501);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 120502);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 120503);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkGateWayController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkGateWayController.java
new file mode 100644
index 000000000..6f0e0ec4f
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkGateWayController.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.controller;
+
+import org.apache.streampark.console.base.domain.RestResponse;
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+import org.apache.streampark.console.core.enums.GatewayTypeEnum;
+import org.apache.streampark.console.core.service.FlinkGateWayService;
+
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.validation.constraints.NotNull;
+
+@Tag(name = "FLINK_GATEWAY_TAG")
+@Slf4j
+@Validated
+@RestController
+@RequiredArgsConstructor
+@RequestMapping("flink/gateway")
+public class FlinkGateWayController {
+
+  private final FlinkGateWayService flinkGatewayService;
+
+  @Operation(summary = "List flink gateways")
+  @GetMapping("list")
+  public RestResponse list() {
+    return RestResponse.success(flinkGatewayService.list());
+  }
+
+  @Operation(summary = "Create flink gateway")
+  @PostMapping("create")
+  public RestResponse create(@RequestBody FlinkGateWay flinkGateWay) {
+    flinkGatewayService.create(flinkGateWay);
+    return RestResponse.success();
+  }
+
+  @Operation(summary = "Check flink gateway name")
+  @GetMapping("check/name")
+  public RestResponse checkName(
+      @NotNull(message = "The Gateway name cannot be null") 
@RequestParam("name") String name) {
+    return RestResponse.success(flinkGatewayService.existsByGatewayName(name));
+  }
+
+  @Operation(summary = "Check flink gateway address")
+  @GetMapping("check/address")
+  public RestResponse checkAddress(
+      @NotNull(message = "The Gateway address cannot be null") 
@RequestParam("address")
+          String address)
+      throws Exception {
+    GatewayTypeEnum gatewayVersion = 
flinkGatewayService.getGatewayVersion(address);
+    return RestResponse.success(gatewayVersion);
+  }
+
+  @Operation(summary = "Update flink gateway")
+  @PutMapping("update")
+  public RestResponse update(@RequestBody FlinkGateWay flinkGateWay) {
+    flinkGatewayService.update(flinkGateWay);
+    return RestResponse.success();
+  }
+
+  @Operation(summary = "Get flink gateway by id")
+  @GetMapping("get/{id}")
+  public RestResponse get(@PathVariable Long id) {
+    return RestResponse.success(flinkGatewayService.getById(id));
+  }
+
+  @Operation(summary = "Delete flink gateway by id")
+  @DeleteMapping("delete")
+  public RestResponse delete(
+      @NotNull(message = "The Gateway id cannot be null") @RequestParam("id") 
Long id) {
+    flinkGatewayService.removeById(id);
+    return RestResponse.success();
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java
new file mode 100644
index 000000000..c0d7aa77b
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.controller;
+
+import org.apache.streampark.console.base.domain.RestResponse;
+import org.apache.streampark.console.core.annotation.ApiAccess;
+import org.apache.streampark.console.core.service.SqlWorkBenchService;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@Api(tags = {"FLINK_GATEWAY_TAG"})
+@Slf4j
+@Validated
+@RestController
+@RequestMapping("flink/sqlWorkBench/{flinkGatewayId}")
+public class SqlWorkBenchController {
+  private final SqlWorkBenchService sqlWorkBenchService;
+
+  public SqlWorkBenchController(SqlWorkBenchService sqlWorkBenchService) {
+    this.sqlWorkBenchService = sqlWorkBenchService;
+  }
+
+  // 
-------------------------------------------------------------------------------------------
+  // Validation API
+  // 
-------------------------------------------------------------------------------------------
+  @ApiAccess
+  @ApiOperation(value = "Check Support", notes = "Check Support", tags = 
"FLINK_GATEWAY_TAG")
+  @GetMapping("{flinkClusterId}/check")
+  public RestResponse check(@PathVariable Long flinkGatewayId, @PathVariable 
Long flinkClusterId) {
+    return RestResponse.success(sqlWorkBenchService.check(flinkGatewayId, 
flinkClusterId));
+  }
+
+  // 
-------------------------------------------------------------------------------------------
+  // Info API
+  // 
-------------------------------------------------------------------------------------------
+
+  @ApiAccess
+  @ApiOperation(value = "Get gateway info", notes = "Get gateway info", tags = 
"FLINK_GATEWAY_TAG")
+  @GetMapping("getGatewayInfo")
+  public RestResponse getGatewayInfo(@PathVariable Long flinkGatewayId) {
+    return 
RestResponse.success(sqlWorkBenchService.getGatewayInfo(flinkGatewayId));
+  }
+
+  // 
-------------------------------------------------------------------------------------------
+  // Session Management
+  // 
-------------------------------------------------------------------------------------------
+
+  @ApiAccess
+  @ApiOperation(value = "Open sessions", notes = "Open sessions", tags = 
"FLINK_GATEWAY_TAG")
+  @PostMapping("/{flinkClusterId}/sessions")
+  public RestResponse openSession(
+      @PathVariable Long flinkGatewayId, @PathVariable Long flinkClusterId) {
+    SessionHandle sessionHandle = 
sqlWorkBenchService.openSession(flinkGatewayId, flinkClusterId);
+    return RestResponse.success(sessionHandle);
+  }
+
+  @ApiAccess
+  @ApiOperation(value = "Heartbeat", notes = "Heartbeat", tags = 
"FLINK_GATEWAY_TAG")
+  @PostMapping("sessions/{sessionHandle}/heartbeat")
+  public RestResponse heartbeat(
+      @PathVariable Long flinkGatewayId, @PathVariable String sessionHandle) {
+    sqlWorkBenchService.heartbeat(flinkGatewayId, sessionHandle);
+    return RestResponse.success();
+  }
+
+  @ApiAccess
+  @ApiOperation(value = "Close session", notes = "Close session", tags = 
"FLINK_GATEWAY_TAG")
+  @DeleteMapping("sessions/{sessionHandle}")
+  public RestResponse closeSession(
+      @PathVariable Long flinkGatewayId, @PathVariable String sessionHandle) {
+    sqlWorkBenchService.closeSession(flinkGatewayId, sessionHandle);
+    return RestResponse.success();
+  }
+
+  // 
-------------------------------------------------------------------------------------------
+  // Operation Management
+  // 
-------------------------------------------------------------------------------------------
+
+  @ApiAccess
+  @ApiOperation(value = "Cancel operation", notes = "Cancel operation", tags = 
"FLINK_GATEWAY_TAG")
+  @PostMapping("sessions/{sessionHandle}/operations/{operationHandle}/cancel")
+  public RestResponse cancelOperation(
+      @PathVariable Long flinkGatewayId,
+      @PathVariable String sessionHandle,
+      @PathVariable String operationHandle) {
+    sqlWorkBenchService.cancelOperation(flinkGatewayId, sessionHandle, 
operationHandle);
+    return RestResponse.success();
+  }
+
+  @ApiAccess
+  @ApiOperation(value = "Close operation", notes = "Close operation", tags = 
"FLINK_GATEWAY_TAG")
+  @DeleteMapping("sessions/{sessionHandle}/operations/{operationHandle}/close")
+  public RestResponse closeOperation(
+      @PathVariable Long flinkGatewayId,
+      @PathVariable String sessionHandle,
+      @PathVariable String operationHandle) {
+    sqlWorkBenchService.closeOperation(flinkGatewayId, sessionHandle, 
operationHandle);
+    return RestResponse.success();
+  }
+
+  @ApiAccess
+  @ApiOperation(
+      value = "Get operation info",
+      notes = "Get operation info",
+      tags = "FLINK_GATEWAY_TAG")
+  @PostMapping("sessions/{sessionHandle}/operations/{operationHandle}/info")
+  public RestResponse getOperationInfo(
+      @PathVariable Long flinkGatewayId,
+      @PathVariable String sessionHandle,
+      @PathVariable String operationHandle) {
+    return RestResponse.success(
+        sqlWorkBenchService.getOperationInfo(flinkGatewayId, sessionHandle, 
operationHandle));
+  }
+
+  @ApiAccess
+  @ApiOperation(
+      value = "Get operation result schema",
+      notes = "Get operation result schema",
+      tags = "FLINK_GATEWAY_TAG")
+  
@PostMapping("sessions/{sessionHandle}/operations/{operationHandle}/resultSchema")
+  public RestResponse getOperationResultSchema(
+      @PathVariable Long flinkGatewayId,
+      @PathVariable String sessionHandle,
+      @PathVariable String operationHandle) {
+    return RestResponse.success(
+        sqlWorkBenchService.getOperationResultSchema(
+            flinkGatewayId, sessionHandle, operationHandle));
+  }
+
+  // 
-------------------------------------------------------------------------------------------
+  // Statements API
+  // 
-------------------------------------------------------------------------------------------
+
+  @ApiAccess
+  @ApiOperation(
+      value = "Execute statement",
+      notes = "Execute statement",
+      tags = "FLINK_GATEWAY_TAG")
+  @PostMapping("sessions/{sessionHandle}/statements")
+  public RestResponse executeStatement(
+      @PathVariable Long flinkGatewayId,
+      @PathVariable String sessionHandle,
+      @RequestParam String statement) {
+    return RestResponse.success(
+        sqlWorkBenchService.executeStatement(flinkGatewayId, sessionHandle, 
statement));
+  }
+
+  @ApiAccess
+  @ApiOperation(value = "Fetch results", notes = "Fetch results", tags = 
"FLINK_GATEWAY_TAG")
+  @PostMapping("sessions/{sessionHandle}/statements/{operationHandle}/info")
+  public RestResponse fetchResults(
+      @PathVariable Long flinkGatewayId,
+      @PathVariable String sessionHandle,
+      @PathVariable String operationHandle,
+      @RequestBody ResultQueryCondition resultQueryCondition) {
+    return RestResponse.success(
+        sqlWorkBenchService.fetchResults(
+            flinkGatewayId, sessionHandle, operationHandle, 
resultQueryCondition));
+  }
+
+  // 
-------------------------------------------------------------------------------------------
+  // Catalog API
+  // 
-------------------------------------------------------------------------------------------
+  // TODO: 2023/5/5 because of catalog with fixed statement, so frontend can 
use above methods to
+  // get catalog info
+
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkGateWay.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkGateWay.java
new file mode 100644
index 000000000..bcfc4cb9a
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkGateWay.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.console.core.enums.GatewayTypeEnum;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+import javax.validation.constraints.NotBlank;
+
+import java.util.Date;
+
+@Data
+@TableName("t_flink_gateway")
+public class FlinkGateWay {
+
+  @TableId(type = IdType.AUTO)
+  private Long id;
+
+  @NotBlank(message = "{required}")
+  private String gatewayName;
+
+  private String description;
+
+  private GatewayTypeEnum gatewayType;
+
+  @NotBlank(message = "{required}")
+  private String address;
+
+  private Date createTime;
+
+  private Date modifyTime;
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GatewayTypeEnum.java
similarity index 50%
copy from 
streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GatewayTypeEnum.java
index b57d1ac3e..60bdf81d3 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GatewayTypeEnum.java
@@ -15,47 +15,38 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.gateway.session;
+package org.apache.streampark.console.core.enums;
 
-import java.util.Objects;
-import java.util.UUID;
+import com.baomidou.mybatisplus.annotation.EnumValue;
+import lombok.Getter;
 
-/** Session Handle that used to identify the Session. */
-public class SessionHandle {
+import java.util.Arrays;
 
-  private final UUID identifier;
+@Getter
+public enum GatewayTypeEnum {
 
-  public static SessionHandle create() {
-    return new SessionHandle(UUID.randomUUID());
-  }
+  /** After flink 1.16 (including 1.16) */
+  FLINK_V1(1, "flink-v1"),
 
-  public SessionHandle(UUID identifier) {
-    this.identifier = identifier;
-  }
+  /** After flink 1.17 (including 1.17) */
+  FLINK_V2(2, "flink-v2"),
 
-  public UUID getIdentifier() {
-    return identifier;
-  }
+  /** After kyuubi 1.7.0 (including 1.7.0) */
+  KYUUBI(10, "kyuubi"),
+  ;
+  @EnumValue private final int value;
 
-  @Override
-  public boolean equals(Object o) {
-    if (this == o) {
-      return true;
-    }
-    if (!(o instanceof SessionHandle)) {
-      return false;
-    }
-    SessionHandle that = (SessionHandle) o;
-    return Objects.equals(identifier, that.identifier);
-  }
+  private final String identifier;
 
-  @Override
-  public int hashCode() {
-    return Objects.hash(identifier);
+  GatewayTypeEnum(int value, String identifier) {
+    this.value = value;
+    this.identifier = identifier;
   }
 
-  @Override
-  public String toString() {
-    return identifier.toString();
+  public static GatewayTypeEnum of(int value) {
+    return Arrays.stream(values())
+        .filter(x -> x.value == value)
+        .findFirst()
+        .orElseThrow(() -> new IllegalArgumentException("Unknown 
GatewayTypeEnum value: " + value));
   }
 }
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkGateWayMapper.java
similarity index 52%
copy from 
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkGateWayMapper.java
index 1a359ee49..2838d98ef 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkGateWayMapper.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    https://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,27 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-export default {
-  menu: {
-    system: '系统管理',
-    userManagement: '用户管理',
-    roleManagement: '角色管理',
-    menuManagement: '菜单管理',
-    tokenManagement: 'Token管理',
-    teamManagement: '团队管理',
-    memberManagement: '成员管理',
-    project: '项目管理',
-    application: '作业管理',
-    variable: '变量管理',
-    resource: '资源管理',
-    setting: '设置中心',
-  },
-  setting: {
-    system: '系统设置',
-    alarm: '告警设置',
-    flinkHome: 'Flink 版本',
-    flinkCluster: 'Flink集群',
-    externalLink: '扩展链接',
-    yarnQueue: 'Yarn 队列',
-  },
-};
+
+package org.apache.streampark.console.core.mapper;
+
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface FlinkGateWayMapper extends BaseMapper<FlinkGateWay> {}
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkGateWayService.java
similarity index 52%
copy from 
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to 
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkGateWayService.java
index 1a359ee49..20289e502 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkGateWayService.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    https://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,27 +14,21 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-export default {
-  menu: {
-    system: '系统管理',
-    userManagement: '用户管理',
-    roleManagement: '角色管理',
-    menuManagement: '菜单管理',
-    tokenManagement: 'Token管理',
-    teamManagement: '团队管理',
-    memberManagement: '成员管理',
-    project: '项目管理',
-    application: '作业管理',
-    variable: '变量管理',
-    resource: '资源管理',
-    setting: '设置中心',
-  },
-  setting: {
-    system: '系统设置',
-    alarm: '告警设置',
-    flinkHome: 'Flink 版本',
-    flinkCluster: 'Flink集群',
-    externalLink: '扩展链接',
-    yarnQueue: 'Yarn 队列',
-  },
-};
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+import org.apache.streampark.console.core.enums.GatewayTypeEnum;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+public interface FlinkGateWayService extends IService<FlinkGateWay> {
+  void create(FlinkGateWay flinkGateWay);
+
+  void update(FlinkGateWay flinkGateWay);
+
+  boolean existsByGatewayName(String name);
+
+  GatewayTypeEnum getGatewayVersion(String address) throws 
JsonProcessingException;
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java
new file mode 100644
index 000000000..40ded03f2
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+public interface SqlWorkBenchService {
+
+  /**
+   * Open a session for a flink cluster
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param flinkClusterId flink cluster id
+   * @return session handle
+   */
+  SessionHandle openSession(Long flinkGatewayId, Long flinkClusterId);
+
+  /**
+   * Close a session
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param sessionHandleUUIDStr session handle uuid string
+   */
+  void closeSession(Long flinkGatewayId, String sessionHandleUUIDStr);
+
+  /**
+   * Get the gateway info
+   *
+   * @param flinkGatewayId flink gateway id
+   * @return gateway info
+   */
+  GatewayInfo getGatewayInfo(Long flinkGatewayId);
+
+  /**
+   * Get the session info
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param sessionHandleUUIDStr session handle uuid string
+   * @param operationId operation id
+   */
+  void cancelOperation(Long flinkGatewayId, String sessionHandleUUIDStr, 
String operationId);
+
+  /**
+   * Close the operation
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param sessionHandleUUIDStr session handle uuid string
+   * @param operationId operation id
+   */
+  void closeOperation(Long flinkGatewayId, String sessionHandleUUIDStr, String 
operationId);
+
+  /**
+   * Get operation info
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param sessionHandleUUIDStr session handle uuid string
+   * @param operationId operation id
+   * @return operation info
+   */
+  OperationInfo getOperationInfo(
+      Long flinkGatewayId, String sessionHandleUUIDStr, String operationId);
+
+  /**
+   * Get operation result schema
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param sessionHandleUUIDStr session handle uuid string
+   * @param operationId operation id
+   * @return operation result schema
+   */
+  Column getOperationResultSchema(
+      Long flinkGatewayId, String sessionHandleUUIDStr, String operationId);
+
+  /**
+   * Execute statement
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param sessionHandleUUIDStr session handle uuid string
+   * @param statement statement
+   * @return operation handle
+   */
+  OperationHandle executeStatement(
+      Long flinkGatewayId, String sessionHandleUUIDStr, String statement);
+
+  /**
+   * Fetch results
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param sessionHandleUUIDStr session handle uuid string
+   * @param operationId operation id
+   * @param resultQueryCondition result query condition
+   * @return result set
+   */
+  ResultSet fetchResults(
+      Long flinkGatewayId,
+      String sessionHandleUUIDStr,
+      String operationId,
+      ResultQueryCondition resultQueryCondition);
+
+  /**
+   * Send heartbeat
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param sessionHandle session handle
+   */
+  void heartbeat(Long flinkGatewayId, String sessionHandle);
+
+  /**
+   * check flink cluster version is supported
+   *
+   * @param flinkGatewayId flink gateway id
+   * @param flinkClusterId flink cluster id
+   * @return true if supported
+   */
+  boolean check(Long flinkGatewayId, Long flinkClusterId);
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java
new file mode 100644
index 000000000..f14280b84
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+import org.apache.streampark.console.core.enums.GatewayTypeEnum;
+import org.apache.streampark.console.core.mapper.FlinkGateWayMapper;
+import org.apache.streampark.console.core.service.FlinkGateWayService;
+import 
org.apache.streampark.gateway.flink.client.dto.GetApiVersionResponseBody;
+
+import org.apache.hc.client5.http.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Service
+@Transactional(propagation = Propagation.SUPPORTS, readOnly = true, 
rollbackFor = Exception.class)
+public class FlinkGateWayServiceImpl extends ServiceImpl<FlinkGateWayMapper, 
FlinkGateWay>
+    implements FlinkGateWayService {
+
+  private void preHandleGatewayInfo(FlinkGateWay flinkGateWay) {
+    // validate gateway name
+    if (existsByGatewayName(flinkGateWay.getGatewayName())) {
+      throw new ApiAlertException("gateway name already exists");
+    }
+    // validate gateway address and set gateway type
+    flinkGateWay.setGatewayType(getGatewayVersion(flinkGateWay.getAddress()));
+  }
+
+  @Override
+  public void create(FlinkGateWay flinkGateWay) {
+    preHandleGatewayInfo(flinkGateWay);
+    this.save(flinkGateWay);
+  }
+
+  @Override
+  public void update(FlinkGateWay flinkGateWay) {
+    preHandleGatewayInfo(flinkGateWay);
+    this.saveOrUpdate(flinkGateWay);
+  }
+
+  @Override
+  public boolean existsByGatewayName(String name) {
+    return getBaseMapper()
+        .exists(new 
LambdaQueryWrapper<FlinkGateWay>().eq(FlinkGateWay::getGatewayName, name));
+  }
+
+  @Override
+  public GatewayTypeEnum getGatewayVersion(String address) {
+    String restUrl = address + "/api_versions";
+    try {
+      String result =
+          HttpClientUtils.httpGetRequest(
+              restUrl,
+              RequestConfig.custom().setConnectTimeout(2000, 
TimeUnit.MILLISECONDS).build());
+      if (result != null) {
+        String versionStr =
+            JacksonUtils.read(result, 
GetApiVersionResponseBody.class).getVersions().get(0);
+        return "V1".equals(versionStr) ? GatewayTypeEnum.FLINK_V1 : 
GatewayTypeEnum.FLINK_V2;
+      }
+    } catch (Exception e) {
+      log.error("get gateway version failed", e);
+    }
+    throw new ApiAlertException("get gateway version failed");
+  }
+}
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
new file mode 100644
index 000000000..3b7be04ae
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HadoopConfigUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEnvService;
+import org.apache.streampark.console.core.service.FlinkGateWayService;
+import org.apache.streampark.console.core.service.SqlWorkBenchService;
+import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
+import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
+import org.apache.streampark.flink.kubernetes.ingress.IngressController;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.factories.FactoryUtil;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils;
+import org.apache.streampark.gateway.flink.FlinkSqlGatewayServiceFactory;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+import org.apache.flink.client.program.ClusterClient;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static 
org.apache.streampark.common.enums.ExecutionMode.KUBERNETES_NATIVE_SESSION;
+import static org.apache.streampark.common.enums.ExecutionMode.REMOTE;
+import static org.apache.streampark.common.enums.ExecutionMode.YARN_SESSION;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class SqlWorkBenchServiceImpl implements SqlWorkBenchService {
+
+  private final FlinkClusterService flinkClusterService;
+  private final FlinkGateWayService flinkGateWayService;
+  private final FlinkEnvService flinkEnvService;
+
+  /** Get SqlGatewayService instance by flinkGatewayId */
+  private SqlGatewayService getSqlGateWayService(Long flinkGatewayId) {
+    FlinkGateWay flinkGateWay = flinkGateWayService.getById(flinkGatewayId);
+    if (flinkGateWay == null) {
+      throw new IllegalArgumentException(
+          "flinkGateWay is not exist, please check your config, id: " + 
flinkGatewayId);
+    }
+    Map<String, String> config = new HashMap<>(2);
+    config.put(
+        FactoryUtil.SQL_GATEWAY_SERVICE_TYPE.getKey(),
+        flinkGateWay.getGatewayType().getIdentifier());
+    config.put(FlinkSqlGatewayServiceFactory.BASE_URI.getKey(), 
flinkGateWay.getAddress());
+    List<SqlGatewayService> actual = 
SqlGatewayServiceFactoryUtils.createSqlGatewayService(config);
+    if (actual.size() > 1) {
+      log.warn("There are more than one SqlGatewayService instance, please 
check your config");
+    }
+    return actual.get(0);
+  }
+
+  @Override
+  public GatewayInfo getGatewayInfo(Long flinkGatewayId) {
+    SqlGatewayService sqlGateWayService = getSqlGateWayService(flinkGatewayId);
+    return sqlGateWayService.getGatewayInfo();
+  }
+
+  @Override
+  public SessionHandle openSession(Long flinkGatewayId, Long flinkClusterId) {
+    Map<String, String> streamParkConf = new HashMap<>();
+    SqlGatewayService sqlGateWayService = getSqlGateWayService(flinkGatewayId);
+    FlinkCluster flinkCluster = flinkClusterService.getById(flinkClusterId);
+    URI remoteURI = flinkCluster.getRemoteURI();
+    String host = remoteURI.getHost();
+    String port = String.valueOf(remoteURI.getPort());
+    String clusterId = flinkCluster.getClusterId();
+
+    ExecutionMode executionMode = 
ExecutionMode.of(flinkCluster.getExecutionMode());
+    if (executionMode == null) {
+      throw new IllegalArgumentException("executionMode is null");
+    }
+
+    streamParkConf.put("execution.target", executionMode.getName());
+    switch (Objects.requireNonNull(executionMode)) {
+      case REMOTE:
+        streamParkConf.put("rest.address", host);
+        streamParkConf.put("rest.port", port);
+        break;
+      case YARN_SESSION:
+        streamParkConf.put("yarn.application.id", clusterId);
+        HadoopConfigUtils.readSystemHadoopConf()
+            .forEach((k, v) -> streamParkConf.put("flink.hadoop." + k, v));
+        break;
+      case KUBERNETES_NATIVE_SESSION:
+        String k8sNamespace = flinkCluster.getK8sNamespace();
+        String restAddress;
+        try (ClusterClient<?> clusterClient =
+            (ClusterClient<?>)
+                KubernetesRetriever.newFinkClusterClient(
+                    clusterId, k8sNamespace, 
FlinkK8sExecuteMode.of(executionMode))) {
+          restAddress = IngressController.ingressUrlAddress(k8sNamespace, 
clusterId, clusterClient);
+        } catch (Exception e) {
+          throw new IllegalArgumentException("get k8s rest address error", e);
+        }
+        streamParkConf.put("kubernetes.cluster-id", clusterId);
+        streamParkConf.put(
+            "kubernetes.jobmanager.service-account", 
flinkCluster.getServiceAccount());
+        streamParkConf.put("kubernetes.namespace", k8sNamespace);
+        streamParkConf.put("rest.address", restAddress);
+        break;
+      default:
+        throw new IllegalArgumentException("Unsupported execution mode: " + 
executionMode);
+    }
+
+    return sqlGateWayService.openSession(
+        new SessionEnvironment(
+            flinkGatewayId + flinkClusterId + UUID.randomUUID().toString(), 
null, streamParkConf));
+  }
+
+  @Override
+  public void closeSession(Long flinkGatewayId, String sessionHandleUUIDStr) {
+    SqlGatewayService sqlGateWayService = getSqlGateWayService(flinkGatewayId);
+    sqlGateWayService.closeSession(new SessionHandle(sessionHandleUUIDStr));
+  }
+
+  @Override
+  public void cancelOperation(
+      Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
+    getSqlGateWayService(flinkGatewayId)
+        .cancelOperation(new SessionHandle(sessionHandleUUIDStr), new 
OperationHandle(operationId));
+  }
+
+  @Override
+  public void closeOperation(Long flinkGatewayId, String sessionHandleUUIDStr, 
String operationId) {
+    getSqlGateWayService(flinkGatewayId)
+        .closeOperation(new SessionHandle(sessionHandleUUIDStr), new 
OperationHandle(operationId));
+  }
+
+  @Override
+  public OperationInfo getOperationInfo(
+      Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
+    return getSqlGateWayService(flinkGatewayId)
+        .getOperationInfo(
+            new SessionHandle(sessionHandleUUIDStr), new 
OperationHandle(operationId));
+  }
+
+  @Override
+  public Column getOperationResultSchema(
+      Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
+    return getSqlGateWayService(flinkGatewayId)
+        .getOperationResultSchema(
+            new SessionHandle(sessionHandleUUIDStr), new 
OperationHandle(operationId));
+  }
+
+  @Override
+  public OperationHandle executeStatement(
+      Long flinkGatewayId, String sessionHandleUUIDStr, String statement) {
+    return getSqlGateWayService(flinkGatewayId)
+        .executeStatement(new SessionHandle(sessionHandleUUIDStr), statement, 
10000L, null);
+  }
+
+  @Override
+  public ResultSet fetchResults(
+      Long flinkGatewayId,
+      String sessionHandleUUIDStr,
+      String operationId,
+      ResultQueryCondition resultQueryCondition) {
+    return getSqlGateWayService(flinkGatewayId)
+        .fetchResults(
+            new SessionHandle(sessionHandleUUIDStr),
+            new OperationHandle(operationId),
+            resultQueryCondition);
+  }
+
+  @Override
+  public void heartbeat(Long flinkGatewayId, String sessionHandle) {
+    getSqlGateWayService(flinkGatewayId).heartbeat(new 
SessionHandle(sessionHandle));
+  }
+
+  @Override
+  public boolean check(Long flinkGatewayId, Long flinkClusterId) {
+    FlinkCluster flinkCluster = flinkClusterService.getById(flinkClusterId);
+    if (flinkCluster == null) {
+      throw new IllegalArgumentException("FlinkCluster not found");
+    }
+    FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
+    if (flinkEnv == null) {
+      throw new IllegalArgumentException("FlinkEnv not found");
+    }
+    return 
getSqlGateWayService(flinkGatewayId).check(flinkEnv.getFlinkVersion().majorVersion());
+  }
+}
diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkGateWayMapper.xml
similarity index 50%
copy from streampark-flink/streampark-flink-sql-gateway/pom.xml
copy to 
streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkGateWayMapper.xml
index fe1824b59..78a238092 100644
--- a/streampark-flink/streampark-flink-sql-gateway/pom.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkGateWayMapper.xml
@@ -15,20 +15,17 @@
   ~ See the License for the specific language governing permissions and
   ~ limitations under the License.
   -->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.streampark</groupId>
-        <artifactId>streampark-flink</artifactId>
-        <version>2.2.0-SNAPSHOT</version>
-    </parent>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
+<mapper 
namespace="org.apache.streampark.console.core.mapper.FlinkGateWayMapper">
 
-    <artifactId>streampark-flink-sql-gateway</artifactId>
-    <name>StreamPark : SQL Gateway Parent</name>
-    <packaging>pom</packaging>
+    <resultMap id="BaseResultMap" 
type="org.apache.streampark.console.core.entity.FlinkGateWay">
+        <id column="id" jdbcType="BIGINT" property="id"/>
+        <result column="name" jdbcType="VARCHAR" property="name"/>
+        <result column="description" jdbcType="VARCHAR" 
property="description"/>
+        <result column="gateway_type" jdbcType="BIGINT" 
property="gatewayType"/>
+        <result column="address" jdbcType="VARCHAR" property="address"/>
+        <result column="create_time" jdbcType="TIMESTAMP" 
property="createTime"/>
+        <result column="modify_time" jdbcType="TIMESTAMP" 
property="modifyTime"/>
+    </resultMap>
 
-    <modules>
-        <module>streampark-flink-sql-gateway-base</module>
-    </modules>
-
-</project>
+</mapper>
diff --git 
a/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkGateway.ts
 
b/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkGateway.ts
new file mode 100644
index 000000000..08336796d
--- /dev/null
+++ 
b/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkGateway.ts
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { defHttp } from '/@/utils/http/axios';
+import { Result } from '/#/axios';
+import { AxiosResponse } from 'axios';
+
+enum GATEWAY_API {
+  CREATE = '/flink/gateway/create',
+  UPDATE = '/flink/gateway/update',
+  LIST = '/flink/gateway/list',
+  DELETE = '/flink/gateway/delete',
+  GET = '/flink/gateway/get',
+  CHECK_NAME = '/flink/gateway/check/name',
+  CHECK_ADDRESS = '/flink/gateway/check/address',
+}
+
+/**
+ * fetch gateway list.
+ */
+export function fetchGatewayList() {
+  return defHttp.get({
+    url: GATEWAY_API.LIST,
+  });
+}
+
+/**
+ * fetch gateway remove result.
+ * @returns {Promise<AxiosResponse<Result>>}
+ */
+export function fetchGatewayDelete(id: string): Promise<AxiosResponse<Result>> 
{
+  return defHttp.delete(
+    { url: GATEWAY_API.DELETE, data: { id } },
+    { isReturnNativeResponse: true },
+  );
+}
+
+export function fetchGatewayCreate(data: Recordable) {
+  return defHttp.postJson({
+    url: GATEWAY_API.CREATE,
+    data,
+  });
+}
+
+export function fetchGatewayUpdate(data: Recordable) {
+  return defHttp.postJson({
+    url: GATEWAY_API.UPDATE,
+    data,
+  });
+}
+
+export function fetchGatewayCheckName(name: string) {
+  return defHttp.get({
+    url: GATEWAY_API.CHECK_NAME,
+    params: { name },
+  });
+}
+
+export function fetchGatewayCheckAddress(address: string) {
+  return defHttp.get({
+    url: GATEWAY_API.CHECK_ADDRESS,
+    params: { address },
+  });
+}
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts
index 8d93eed0c..b161b7145 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts
@@ -36,5 +36,6 @@ export default {
     flinkCluster: 'Flink Cluster',
     externalLink: 'External Link',
     yarnQueue: 'Yarn Queue',
+    flinkGateway: 'FLink Gateway',
   },
 };
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkGateway.ts
similarity index 50%
copy from 
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to 
streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkGateway.ts
index 1a359ee49..f7da8c073 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkGateway.ts
@@ -15,26 +15,27 @@
  * limitations under the License.
  */
 export default {
-  menu: {
-    system: '系统管理',
-    userManagement: '用户管理',
-    roleManagement: '角色管理',
-    menuManagement: '菜单管理',
-    tokenManagement: 'Token管理',
-    teamManagement: '团队管理',
-    memberManagement: '成员管理',
-    project: '项目管理',
-    application: '作业管理',
-    variable: '变量管理',
-    resource: '资源管理',
-    setting: '设置中心',
+  tableTitle: 'Flink Gateway List',
+  createGateway: 'Create Flink Gateway',
+  success: 'Success',
+  gatewayType: 'Gateway Type',
+  gatewayAddress: 'Gateway Address',
+  modifyGateway: 'Edit Flink Gateway',
+  deleteGateway: 'Delete Flink Gateway',
+  deleteConfirm: 'Are you sure to delete this Flink Gateway ?',
+  name: 'Gateway Name',
+  placeholder: {
+    gatewayType: 'Please select gateway type',
   },
-  setting: {
-    system: '系统设置',
-    alarm: '告警设置',
-    flinkHome: 'Flink 版本',
-    flinkCluster: 'Flink集群',
-    externalLink: '扩展链接',
-    yarnQueue: 'Yarn 队列',
+  checkResult: {
+    emptyHint: 'Flink gateway name can not be empty.',
+    emptyType: 'Flink gateway type can not be empty.',
+    emptyAddress: 'Flink gateway address can not be empty.',
+  },
+  operation: {
+    updateSuccess: 'Update Flink gateway successfully.',
+    deleteSuccess: 'Delete Flink gateway successfully.',
+    createSuccess: 'Create Flink gateway successfully.',
+    deleteFailed: 'Delete Flink gateway failed.',
   },
 };
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
index 1a359ee49..330fe9f7d 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
@@ -20,7 +20,7 @@ export default {
     userManagement: '用户管理',
     roleManagement: '角色管理',
     menuManagement: '菜单管理',
-    tokenManagement: 'Token管理',
+    tokenManagement: 'Token 管理',
     teamManagement: '团队管理',
     memberManagement: '成员管理',
     project: '项目管理',
@@ -33,8 +33,9 @@ export default {
     system: '系统设置',
     alarm: '告警设置',
     flinkHome: 'Flink 版本',
-    flinkCluster: 'Flink集群',
+    flinkCluster: 'Flink 集群',
     externalLink: '扩展链接',
     yarnQueue: 'Yarn 队列',
+    flinkGateway: 'FLink 网关',
   },
 };
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkGateway.ts
similarity index 51%
copy from 
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to 
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkGateway.ts
index 1a359ee49..996ce72d1 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++ 
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkGateway.ts
@@ -15,26 +15,27 @@
  * limitations under the License.
  */
 export default {
-  menu: {
-    system: '系统管理',
-    userManagement: '用户管理',
-    roleManagement: '角色管理',
-    menuManagement: '菜单管理',
-    tokenManagement: 'Token管理',
-    teamManagement: '团队管理',
-    memberManagement: '成员管理',
-    project: '项目管理',
-    application: '作业管理',
-    variable: '变量管理',
-    resource: '资源管理',
-    setting: '设置中心',
+  tableTitle: 'Flink 网关列表',
+  createGateway: '创建 Flink 网关',
+  success: '成功',
+  gatewayType: '网关类型',
+  gatewayAddress: '网关地址',
+  modifyGateway: '编辑 Flink 网关',
+  deleteGateway: '删除 Flink 网关',
+  deleteConfirm: '你确定要删除此 Flink 网关 吗?',
+  name: 'Flink 网关名称',
+  placeholder: {
+    gatewayType: '请选择 Flink 网关类型',
   },
-  setting: {
-    system: '系统设置',
-    alarm: '告警设置',
-    flinkHome: 'Flink 版本',
-    flinkCluster: 'Flink集群',
-    externalLink: '扩展链接',
-    yarnQueue: 'Yarn 队列',
+  checkResult: {
+    emptyHint: 'Flink 网关名称不得为空',
+    emptyType: 'Flink 网关类型不得为空',
+    emptyAddress: 'Flink 网关地址不得为空',
+  },
+  operation: {
+    updateSuccess: '更新 Flink 网关 成功',
+    deleteSuccess: '删除 Flink 网关 成功',
+    createSuccess: '创建 Flink 网关 成功',
+    deleteFailed: '删除 Flink 网关 失败',
   },
 };
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/components/FlinkGatewayDrawer.vue
 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/components/FlinkGatewayDrawer.vue
new file mode 100644
index 000000000..6f55aa540
--- /dev/null
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/components/FlinkGatewayDrawer.vue
@@ -0,0 +1,87 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<template>
+  <BasicDrawer
+    :okText="t('common.submitText')"
+    @register="registerDrawer"
+    showFooter
+    :title="getTitle"
+    width="40%"
+    @ok="handleSubmit"
+  >
+    <BasicForm @register="registerForm" />
+  </BasicDrawer>
+</template>
+<script lang="ts">
+  import { defineComponent, ref, computed, unref } from 'vue';
+  import { BasicForm, useForm } from '/@/components/Form';
+  import { formSchema } from '../index.data';
+  import { BasicDrawer, useDrawerInner } from '/@/components/Drawer';
+
+  import { fetchGatewayCreate } from '/@/api/flink/setting/flinkGateway';
+  import { useI18n } from '/@/hooks/web/useI18n';
+
+  export default defineComponent({
+    name: 'TokenDrawer',
+    components: { BasicDrawer, BasicForm },
+    emits: ['success', 'register'],
+    setup(_, { emit }) {
+      const isUpdate = ref(true);
+      const { t } = useI18n();
+
+      const [registerForm, { resetFields, setFieldsValue, validate }] = 
useForm({
+        labelWidth: 120,
+        colon: true,
+        schemas: formSchema,
+        showActionButtonGroup: false,
+        baseColProps: { lg: 22, md: 22 },
+      });
+
+      const [registerDrawer, { setDrawerProps, closeDrawer }] = 
useDrawerInner(async (data) => {
+        resetFields();
+        setDrawerProps({ confirmLoading: false });
+        isUpdate.value = !!data?.isUpdate;
+
+        if (unref(isUpdate)) {
+          setFieldsValue({
+            ...data.record,
+          });
+        }
+      });
+
+      const getTitle = computed(() =>
+        !unref(isUpdate)
+          ? t('setting.flinkGateway.createGateway')
+          : t('setting.flinkGateway.modifyGateway'),
+      );
+
+      async function handleSubmit() {
+        try {
+          const values = await validate();
+          setDrawerProps({ confirmLoading: true });
+          const res = await fetchGatewayCreate(values);
+          closeDrawer();
+          emit('success', { isUpdate: unref(isUpdate), values: res });
+        } finally {
+          setDrawerProps({ confirmLoading: false });
+        }
+      }
+
+      return { t, registerDrawer, registerForm, getTitle, handleSubmit };
+    },
+  });
+</script>
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.data.ts
 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.data.ts
new file mode 100644
index 000000000..6c70a9241
--- /dev/null
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.data.ts
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { BasicColumn, FormSchema } from '/@/components/Table';
+import { useI18n } from '/@/hooks/web/useI18n';
+const { t } = useI18n();
+
+export const searchFormSchema: FormSchema[] = [
+  {
+    field: 'user',
+    label: t('system.token.table.userName'),
+    component: 'Input',
+    colProps: { span: 8 },
+  },
+];
+
+export const formSchema: FormSchema[] = [
+  {
+    field: 'gatewayName',
+    label: t('setting.flinkGateway.name'),
+    component: 'Input',
+    rules: [{ required: true, message: 
t('setting.flinkGateway.checkResult.emptyHint') }],
+  },
+  {
+    field: 'address',
+    label: t('setting.flinkGateway.gatewayAddress'),
+    component: 'Input',
+    rules: [{ required: true, message: 
t('setting.flinkGateway.checkResult.emptyAddress') }],
+  },
+  {
+    field: 'description',
+    label: t('common.description'),
+    component: 'InputTextArea',
+  },
+];
+
+export const columns: BasicColumn[] = [
+  {
+    title: 'id',
+    dataIndex: 'id',
+    ifShow: false,
+  },
+  {
+    title: t('setting.flinkGateway.name'),
+    dataIndex: 'gatewayName',
+    sorter: true,
+  },
+  {
+    title: t('setting.flinkGateway.gatewayType'),
+    dataIndex: 'gatewayType',
+    sorter: true,
+  },
+  {
+    title: t('setting.flinkGateway.gatewayAddress'),
+    dataIndex: 'address',
+    sorter: true,
+  },
+  {
+    title: t('common.description'),
+    dataIndex: 'description',
+    ellipsis: true,
+    width: 350,
+  },
+  {
+    title: t('common.createTime'),
+    dataIndex: 'createTime',
+    sorter: true,
+  },
+  {
+    title: t('common.modifyTime'),
+    dataIndex: 'modifyTime',
+    sorter: true,
+  },
+];
diff --git 
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.vue
 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.vue
new file mode 100644
index 000000000..f0b582ccf
--- /dev/null
+++ 
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.vue
@@ -0,0 +1,136 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<template>
+  <PageWrapper>
+    <BasicTable @register="registerTable">
+      <template #toolbar>
+        <a-button type="primary" @click="handleCreate" v-auth="'gateway:add'">
+          <Icon icon="ant-design:plus-outlined" />
+          {{ t('common.add') }}
+        </a-button>
+      </template>
+      <template #bodyCell="{ column, record }">
+        <template v-if="column.dataIndex === 'action'">
+          <TableAction
+            :actions="[
+              {
+                icon: 'ant-design:delete-outlined',
+                color: 'error',
+                auth: 'gateway:delete',
+                tooltip: t('setting.flinkGateway.deleteGateway'),
+                popConfirm: {
+                  title: t('setting.flinkGateway.operation.deleteConfirm'),
+                  confirm: handleDelete.bind(null, record),
+                },
+              },
+            ]"
+          />
+        </template>
+      </template>
+    </BasicTable>
+    <FlinkGatewayDrawer @register="registerDrawer" @success="handleSuccess" />
+  </PageWrapper>
+</template>
+<script lang="ts">
+  import { defineComponent, unref } from 'vue';
+
+  import { BasicTable, useTable, TableAction } from '/@/components/Table';
+  import FlinkGatewayDrawer from './components/FlinkGatewayDrawer.vue';
+  import { useDrawer } from '/@/components/Drawer';
+  import { fetchGatewayDelete, fetchGatewayList } from 
'/@/api/flink/setting/flinkGateway';
+  import { columns, searchFormSchema } from './index.data';
+  import { useMessage } from '/@/hooks/web/useMessage';
+  import { useI18n } from '/@/hooks/web/useI18n';
+  import Icon from '/@/components/Icon';
+  import { PageWrapper } from '/@/components/Page';
+  export default defineComponent({
+    name: 'FlinkGateway',
+    components: {
+      BasicTable,
+      FlinkGatewayDrawer: FlinkGatewayDrawer,
+      TableAction,
+      Icon,
+      PageWrapper,
+    },
+    setup() {
+      const { t } = useI18n();
+      const { createMessage } = useMessage();
+      const [registerDrawer, { openDrawer }] = useDrawer();
+      const [registerTable, { reload, updateTableDataRecord }] = useTable({
+        title: t('setting.flinkGateway.tableTitle'),
+        api: fetchGatewayList,
+        // beforeFetch: (params) => {
+        //   if (params.user) {
+        //     params.username = params.user;
+        //     delete params.user;
+        //   }
+        //   return params;
+        // },
+        columns,
+        formConfig: {
+          baseColProps: { style: { paddingRight: '30px' } },
+          schemas: searchFormSchema,
+        },
+        useSearchForm: false,
+        showTableSetting: true,
+        rowKey: 'id',
+        showIndexColumn: false,
+        canResize: false,
+        actionColumn: {
+          width: 200,
+          title: t('component.table.operation'),
+          dataIndex: 'action',
+        },
+      });
+
+      function handleCreate() {
+        openDrawer(true, {
+          isUpdate: false,
+        });
+      }
+
+      async function handleDelete(record: Recordable) {
+        const res = await fetchGatewayDelete(record.id);
+        if (res) {
+          
createMessage.success(t('setting.flinkGateway.operation.deleteSuccess'));
+          reload();
+        } else {
+          
createMessage.success(t('setting.flinkGateway.operation.deleteFailed'));
+        }
+      }
+
+      function handleSuccess({ isUpdate, values }) {
+        if (isUpdate) {
+          
createMessage.success(t('setting.flinkGateway.operation.updateSuccess'));
+          updateTableDataRecord(values.id, values);
+        } else {
+          
createMessage.success(t('setting.flinkGateway.operation.createSuccess'));
+          reload();
+        }
+      }
+
+      return {
+        t,
+        registerTable,
+        registerDrawer,
+        handleCreate,
+        handleDelete,
+        handleSuccess,
+      };
+    },
+  });
+</script>
diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml 
b/streampark-flink/streampark-flink-sql-gateway/pom.xml
index fe1824b59..20c14fd7b 100644
--- a/streampark-flink/streampark-flink-sql-gateway/pom.xml
+++ b/streampark-flink/streampark-flink-sql-gateway/pom.xml
@@ -29,6 +29,8 @@
 
     <modules>
         <module>streampark-flink-sql-gateway-base</module>
+        <module>streampark-flink-sql-gateway-flink-v1</module>
+        <module>streampark-flink-sql-gateway-kyuubi</module>
     </modules>
 
 </project>
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
index a8be0eaf4..90b77c723 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
@@ -19,18 +19,17 @@ package org.apache.streampark.gateway;
 
 import java.io.Serializable;
 import java.util.Objects;
-import java.util.UUID;
 
 /** {@link OperationHandle} to index the {@code Operation}. */
 public class OperationHandle implements Serializable {
 
-  private final UUID identifier;
+  private final String identifier;
 
-  public OperationHandle(UUID identifier) {
+  public OperationHandle(String identifier) {
     this.identifier = identifier;
   }
 
-  public UUID getIdentifier() {
+  public String getIdentifier() {
     return identifier;
   }
 
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
index d737bbb0a..b355ac9dc 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
@@ -32,6 +32,18 @@ import org.apache.streampark.gateway.session.SessionHandle;
 /** A service of SQL gateway is responsible for handling requests from 
streampark console. */
 public interface SqlGatewayService {
 
+  // 
-------------------------------------------------------------------------------------------
+  // Validate API
+  // 
-------------------------------------------------------------------------------------------
+
+  /**
+   * Check if the SQL gateway is available with the given flink major version.
+   *
+   * @param flinkMajorVersion flink major version
+   * @return true if the SQL gateway is available with the given flink major 
version.
+   */
+  boolean check(String flinkMajorVersion);
+
   // 
-------------------------------------------------------------------------------------------
   // Info API
   // 
-------------------------------------------------------------------------------------------
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
index b57d1ac3e..d530d68c3 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
@@ -18,22 +18,17 @@
 package org.apache.streampark.gateway.session;
 
 import java.util.Objects;
-import java.util.UUID;
 
 /** Session Handle that used to identify the Session. */
 public class SessionHandle {
 
-  private final UUID identifier;
+  private final String identifier;
 
-  public static SessionHandle create() {
-    return new SessionHandle(UUID.randomUUID());
-  }
-
-  public SessionHandle(UUID identifier) {
+  public SessionHandle(String identifier) {
     this.identifier = identifier;
   }
 
-  public UUID getIdentifier() {
+  public String getIdentifier() {
     return identifier;
   }
 
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
index fcc6303f5..a9242597d 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
@@ -36,6 +36,11 @@ public class FakeSqlGatewayService implements 
SqlGatewayService {
 
   private FakeSqlGatewayService() {}
 
+  @Override
+  public boolean check(String flinkMajorVersion) {
+    return true;
+  }
+
   @Override
   public GatewayInfo getGatewayInfo() throws SqlGatewayException {
     throw new UnsupportedOperationException();
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
index e1f7be03f..defdea367 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
@@ -45,6 +45,11 @@ public class MockedSqlGatewayService implements 
SqlGatewayService {
     this.description = description;
   }
 
+  @Override
+  public boolean check(String flinkMajorVersion) {
+    return !Objects.equals(flinkMajorVersion, "1.11");
+  }
+
   @Override
   public GatewayInfo getGatewayInfo() throws SqlGatewayException {
     throw new UnsupportedOperationException();
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml
new file mode 100644
index 000000000..b437f577d
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~     http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <modelVersion>4.0.0</modelVersion>
+
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-flink-sql-gateway</artifactId>
+        <version>2.2.0-SNAPSHOT</version>
+    </parent>
+
+    <artifactId>streampark-flink-sql-gateway-flink-v1</artifactId>
+    <name>StreamPark : Flink SQL Gateway 1.16</name>
+
+    <properties>
+        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+        <gson-fire-version>1.8.5</gson-fire-version>
+        <swagger-core-version>1.6.5</swagger-core-version>
+        <okhttp-version>4.10.0</okhttp-version>
+        <gson-version>2.9.1</gson-version>
+        <commons-lang3-version>3.12.0</commons-lang3-version>
+        
<jackson-databind-nullable-version>0.2.4</jackson-databind-nullable-version>
+        <jakarta-annotation-version>1.3.5</jakarta-annotation-version>
+        <junit-version>5.9.1</junit-version>
+        <junit-platform-runner.version>1.9.1</junit-platform-runner.version>
+        <mockito-core-version>3.12.4</mockito-core-version>
+        <javax.ws.rs-api-version>2.1.1</javax.ws.rs-api-version>
+        <jsr311-api-version>1.1.1</jsr311-api-version>
+    </properties>
+
+    <dependencies>
+        <!-- Gateway base -->
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-flink-sql-gateway-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <!-- Flink API -->
+        <dependency>
+            <groupId>io.swagger</groupId>
+            <artifactId>swagger-annotations</artifactId>
+            <version>${swagger-core-version}</version>
+        </dependency>
+        <!-- @Nullable annotation -->
+        <dependency>
+            <groupId>com.google.code.findbugs</groupId>
+            <artifactId>jsr305</artifactId>
+            <version>3.0.2</version>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>okhttp</artifactId>
+            <version>${okhttp-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.squareup.okhttp3</groupId>
+            <artifactId>logging-interceptor</artifactId>
+            <version>${okhttp-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>com.google.code.gson</groupId>
+            <artifactId>gson</artifactId>
+            <version>${gson-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>io.gsonfire</groupId>
+            <artifactId>gson-fire</artifactId>
+            <version>${gson-fire-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>org.apache.commons</groupId>
+            <artifactId>commons-lang3</artifactId>
+            <version>${commons-lang3-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>jakarta.annotation</groupId>
+            <artifactId>jakarta.annotation-api</artifactId>
+            <version>${jakarta-annotation-version}</version>
+            <scope>provided</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.openapitools</groupId>
+            <artifactId>jackson-databind-nullable</artifactId>
+            <version>${jackson-databind-nullable-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>jsr311-api</artifactId>
+            <version>${jsr311-api-version}</version>
+        </dependency>
+        <dependency>
+            <groupId>javax.ws.rs</groupId>
+            <artifactId>javax.ws.rs-api</artifactId>
+            <version>${javax.ws.rs-api-version}</version>
+        </dependency>
+        <!-- test dependencies -->
+        <dependency>
+            <groupId>org.junit.platform</groupId>
+            <artifactId>junit-platform-runner</artifactId>
+            <version>${junit-platform-runner.version}</version>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
+            <groupId>org.mockito</groupId>
+            <artifactId>mockito-core</artifactId>
+            <version>${mockito-core-version}</version>
+            <scope>test</scope>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.openapitools</groupId>
+                <artifactId>openapi-generator-maven-plugin</artifactId>
+                <version>6.5.0</version>
+                <executions>
+                    <execution>
+                        <goals>
+                            <goal>generate</goal>
+                        </goals>
+                        <configuration>
+                            
<inputSpec>${project.basedir}/src/main/resources/flink_sql_gateway_rest_v1.yml</inputSpec>
+                            <generatorName>java</generatorName>
+                            <!--
+                            When use jdk11+, use the following line to 
generate native java code.
+                            <library>native</library>
+                            -->
+                            
<apiPackage>org.apache.streampark.gateway.flink.client.rest.v1</apiPackage>
+                            
<invokerPackage>org.apache.streampark.gateway.flink.client.rest</invokerPackage>
+                            
<modelPackage>org.apache.streampark.gateway.flink.client.dto</modelPackage>
+                            <generateApiTests>false</generateApiTests>
+                            <generateModelTests>false</generateModelTests>
+                            <configOptions>
+                                <sourceFolder>src/gen/java/main</sourceFolder>
+                            </configOptions>
+                        </configuration>
+                    </execution>
+                </executions>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java
new file mode 100644
index 000000000..263cb73ab
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.flink;
+
+import org.apache.streampark.gateway.ExecutionConfiguration;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.OperationStatus;
+import org.apache.streampark.gateway.exception.SqlGatewayException;
+import 
org.apache.streampark.gateway.flink.client.dto.ExecuteStatementRequestBody;
+import org.apache.streampark.gateway.flink.client.dto.FetchResultsResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.GetInfoResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody;
+import 
org.apache.streampark.gateway.flink.client.dto.OperationStatusResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.ResultSetColumnsInner;
+import org.apache.streampark.gateway.flink.client.dto.ResultSetDataInner;
+import org.apache.streampark.gateway.flink.client.rest.ApiClient;
+import org.apache.streampark.gateway.flink.client.rest.ApiException;
+import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultKind;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.results.RowData;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+/** Implement {@link SqlGatewayService} with Flink native SqlGateway. */
+public class FlinkSqlGatewayImpl implements SqlGatewayService {
+
+  private final DefaultApi defaultApi;
+
+  public FlinkSqlGatewayImpl(String baseUri) {
+    ApiClient client = new ApiClient();
+    client.setBasePath(baseUri);
+    defaultApi = new DefaultApi(client);
+  }
+
+  @Override
+  public boolean check(String flinkMajorVersion) {
+    // flink gateway v1 api is supported from flink 1.16
+    return Double.parseDouble(flinkMajorVersion) >= 1.16;
+  }
+
+  @Override
+  public GatewayInfo getGatewayInfo() throws SqlGatewayException {
+    GetInfoResponseBody info = null;
+    try {
+      info = defaultApi.getInfo();
+      return new GatewayInfo(info.getProductName(), info.getVersion());
+    } catch (ApiException e) {
+      throw new SqlGatewayException("Flink native SqlGateWay getGatewayInfo 
failed!", e);
+    }
+  }
+
+  @Override
+  public SessionHandle openSession(SessionEnvironment environment) throws 
SqlGatewayException {
+    try {
+      return new SessionHandle(
+          Objects.requireNonNull(
+              defaultApi
+                  .openSession(
+                      new OpenSessionRequestBody()
+                          .sessionName(environment.getSessionName())
+                          .properties(environment.getSessionConfig()))
+                  .getSessionHandle()));
+    } catch (ApiException e) {
+      throw new SqlGatewayException("Flink native SqlGateWay openSession 
failed!", e);
+    }
+  }
+
+  @Override
+  public void heartbeat(SessionHandle sessionHandle) throws 
SqlGatewayException {
+    try {
+      defaultApi.triggerSession(
+          new org.apache.streampark.gateway.flink.client.dto.SessionHandle()
+              .identifier(UUID.fromString(sessionHandle.getIdentifier())));
+    } catch (ApiException e) {
+      throw new SqlGatewayException("Flink native SqlGateWay heartbeat 
failed!", e);
+    }
+  }
+
+  @Override
+  public void closeSession(SessionHandle sessionHandle) throws 
SqlGatewayException {
+    try {
+      defaultApi.closeSession(UUID.fromString(sessionHandle.getIdentifier()));
+    } catch (ApiException e) {
+      throw new SqlGatewayException("Flink native SqlGateWay closeSession 
failed!", e);
+    }
+  }
+
+  @Override
+  public void cancelOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException {
+    try {
+      defaultApi.cancelOperation(
+          new org.apache.streampark.gateway.flink.client.dto.SessionHandle()
+              .identifier(UUID.fromString(sessionHandle.getIdentifier())),
+          new org.apache.streampark.gateway.flink.client.dto.OperationHandle()
+              .identifier(UUID.fromString(operationHandle.getIdentifier())));
+    } catch (ApiException e) {
+      throw new SqlGatewayException("Flink native SqlGateWay cancelOperation 
failed!", e);
+    }
+  }
+
+  @Override
+  public void closeOperation(SessionHandle sessionHandle, OperationHandle 
operationHandle)
+      throws SqlGatewayException {
+    try {
+      defaultApi.closeOperation(
+          UUID.fromString(sessionHandle.getIdentifier()),
+          UUID.fromString(operationHandle.getIdentifier()));
+    } catch (ApiException e) {
+      throw new SqlGatewayException("Flink native SqlGateWay closeOperation 
failed!", e);
+    }
+  }
+
+  @Override
+  public OperationInfo getOperationInfo(
+      SessionHandle sessionHandle, OperationHandle operationHandle) throws 
SqlGatewayException {
+
+    try {
+      OperationStatusResponseBody operationStatus =
+          defaultApi.getOperationStatus(
+              UUID.fromString(sessionHandle.getIdentifier()),
+              UUID.fromString(operationHandle.getIdentifier()));
+      return new 
OperationInfo(OperationStatus.valueOf(operationStatus.getStatus()), null);
+    } catch (ApiException e) {
+      throw new SqlGatewayException("Flink native SqlGateWay closeOperation 
failed!", e);
+    }
+  }
+
+  @Override
+  public Column getOperationResultSchema(
+      SessionHandle sessionHandle, OperationHandle operationHandle) throws 
SqlGatewayException {
+    throw new SqlGatewayException(
+        "Flink native SqlGateWay don`t support 
operation:getOperationResultSchema!");
+  }
+
+  @Override
+  public OperationHandle executeStatement(
+      SessionHandle sessionHandle,
+      String statement,
+      long executionTimeoutMs,
+      ExecutionConfiguration executionConfig)
+      throws SqlGatewayException {
+    try {
+      return new OperationHandle(
+          Objects.requireNonNull(
+              defaultApi
+                  .executeStatement(
+                      UUID.fromString(sessionHandle.getIdentifier()),
+                      new ExecuteStatementRequestBody()
+                          .statement(statement)
+                          // currently, sql gateway don't support execution 
timeout
+                          //              .executionTimeout(executionTimeoutMs)
+                          .executionConfig(null))
+                  .getOperationHandle()));
+    } catch (ApiException e) {
+      throw new SqlGatewayException("Flink native SqlGateWay executeStatement 
failed!", e);
+    }
+  }
+
+  @Override
+  public ResultSet fetchResults(
+      SessionHandle sessionHandle,
+      OperationHandle operationHandle,
+      ResultQueryCondition resultQueryCondition)
+      throws SqlGatewayException {
+    try {
+
+      List<RowData> data = new ArrayList<>();
+      List<Column> columns = new ArrayList<>();
+      FetchResultsResponseBody fetchResultsResponseBody =
+          defaultApi.fetchResults(
+              UUID.fromString(sessionHandle.getIdentifier()),
+              UUID.fromString(operationHandle.getIdentifier()),
+              resultQueryCondition.getToken());
+      String resultTypeStr = fetchResultsResponseBody.getResultType();
+      Long nextToken = null;
+      if (fetchResultsResponseBody.getNextResultUri() != null) {
+        String nextResultUri = fetchResultsResponseBody.getNextResultUri();
+        nextToken = 
Long.valueOf(nextResultUri.substring(nextResultUri.lastIndexOf("/") + 1));
+      }
+
+      org.apache.streampark.gateway.flink.client.dto.ResultSet results =
+          fetchResultsResponseBody.getResults();
+
+      List<ResultSetColumnsInner> resultsColumns = results.getColumns();
+      List<ResultSetDataInner> resultsData = results.getData();
+
+      resultsColumns.forEach(
+          column ->
+              columns.add(
+                  new Column(
+                      column.getName(), column.getLogicalType().toJson(), 
column.getComment())));
+
+      resultsData.forEach(row -> data.add(new 
RowData(row.getKind().getValue(), row.getFields())));
+
+      ResultKind resultKind =
+          columns.size() == 1 && columns.get(0).getName().equals("result")
+              ? ResultKind.SUCCESS
+              : ResultKind.SUCCESS_WITH_CONTENT;
+
+      return new ResultSet(
+          ResultSet.ResultType.valueOf(resultTypeStr),
+          nextToken,
+          columns,
+          data,
+          true,
+          null,
+          resultKind);
+    } catch (ApiException e) {
+      throw new SqlGatewayException("Flink native SqlGateWay fetchResults 
failed!", e);
+    }
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
new file mode 100644
index 000000000..348c73845
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.flink;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Flink sql gateway's Factory for {@link SqlGatewayService}. */
+public class FlinkSqlGatewayServiceFactory implements SqlGatewayServiceFactory 
{
+
+  public static final ConfigOption<String> BASE_URI =
+      ConfigOption.key("base-uri")
+          .stringType()
+          .noDefaultValue()
+          .withDescription("The base uri of the flink cluster.");
+
+  @Override
+  public String factoryIdentifier() {
+    return "flink-v1";
+  }
+
+  @Override
+  public Set<ConfigOption<?>> requiredOptions() {
+    Set<ConfigOption<?>> options = new HashSet<>();
+    options.add(BASE_URI);
+    return options;
+  }
+
+  @Override
+  public Set<ConfigOption<?>> optionalOptions() {
+    return Collections.emptySet();
+  }
+
+  @Override
+  public SqlGatewayService createSqlGatewayService(Context context) {
+    SqlGatewayServiceFactoryUtils.EndpointFactoryHelper helper =
+        SqlGatewayServiceFactoryUtils.createEndpointFactoryHelper(this, 
context);
+    helper.validate();
+    String baseUri = context.getGateWayServiceOptions().get(BASE_URI.getKey());
+    return new FlinkSqlGatewayImpl(baseUri);
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
new file mode 100644
index 000000000..bbefc2c25
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
@@ -0,0 +1,19 @@
+#
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+#  limitations under the License.
+#
+
+org.apache.streampark.gateway.flink.FlinkSqlGatewayServiceFactory
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml
new file mode 100644
index 000000000..3b9614a43
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml
@@ -0,0 +1,573 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#     http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+openapi: 3.0.1
+info:
+  title: Flink SQL Gateway REST API
+  contact:
+    email: [email protected]
+  license:
+    name: Apache 2.0
+    url: https://www.apache.org/licenses/LICENSE-2.0.html
+  version: v1/1.16
+paths:
+  /api_versions:
+    get:
+      description: Get the current available versions for the Rest Endpoint. 
The client
+        can choose one of the return version as the protocol for later 
communicate.
+      operationId: getApiVersion
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetApiVersionResponseBody'
+  /info:
+    get:
+      description: Get meta data for this cluster.
+      operationId: getInfo
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetInfoResponseBody'
+  /sessions:
+    post:
+      description: Opens a new session with specific properties. Specific 
properties
+        can be given for current session which will override the default 
properties
+        of gateway.
+      operationId: openSession
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/OpenSessionRequestBody'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/OpenSessionResponseBody'
+  /sessions/{session_handle}:
+    get:
+      description: Get the session configuration.
+      operationId: getSessionConfig
+      parameters:
+        - name: session_handle
+          in: path
+          description: The SessionHandle that identifies a session.
+          required: true
+          schema:
+            type: string
+            format: uuid
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/GetSessionConfigResponseBody'
+    delete:
+      description: Closes the specific session.
+      operationId: closeSession
+      parameters:
+        - name: session_handle
+          in: path
+          description: The SessionHandle that identifies a session.
+          required: true
+          schema:
+            type: string
+            format: uuid
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/CloseSessionResponseBody'
+  /sessions/{session_handle}/heartbeat:
+    post:
+      description: "Trigger heartbeat to tell the server that the client is 
active,\
+        \ and to keep the session alive as long as configured timeout value."
+      operationId: triggerSession
+      parameters:
+        - name: session_handle
+          in: path
+          description: The SessionHandle that identifies a session.
+          required: true
+          schema:
+            $ref: '#/components/schemas/SessionHandle'
+      responses:
+        "200":
+          description: The request was successful.
+  /sessions/{session_handle}/operations/{operation_handle}/cancel:
+    post:
+      description: Cancel the operation.
+      operationId: cancelOperation
+      parameters:
+        - name: session_handle
+          in: path
+          description: The SessionHandle that identifies a session.
+          required: true
+          schema:
+            $ref: '#/components/schemas/SessionHandle'
+        - name: operation_handle
+          in: path
+          description: The OperationHandle that identifies a operation.
+          required: true
+          schema:
+            $ref: '#/components/schemas/OperationHandle'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/OperationStatusResponseBody'
+  /sessions/{session_handle}/operations/{operation_handle}/close:
+    delete:
+      description: Close the operation.
+      operationId: closeOperation
+      parameters:
+        - name: session_handle
+          in: path
+          description: The SessionHandle that identifies a session.
+          required: true
+          schema:
+            type: string
+            format: uuid
+        - name: operation_handle
+          in: path
+          description: The OperationHandle that identifies a operation.
+          required: true
+          schema:
+            type: string
+            format: uuid
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/OperationStatusResponseBody'
+  /sessions/{session_handle}/operations/{operation_handle}/result/{token}:
+    get:
+      description: Fetch results of Operation.
+      operationId: fetchResults
+      parameters:
+        - name: session_handle
+          in: path
+          description: The SessionHandle that identifies a session.
+          required: true
+          schema:
+            type: string
+            format: uuid
+        - name: operation_handle
+          in: path
+          description: The OperationHandle that identifies a operation.
+          required: true
+          schema:
+            type: string
+            format: uuid
+        - name: token
+          in: path
+          description: The OperationHandle that identifies a operation.
+          required: true
+          schema:
+            type: integer
+            format: int64
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/FetchResultsResponseBody'
+  /sessions/{session_handle}/operations/{operation_handle}/status:
+    get:
+      description: Get the status of operation.
+      operationId: getOperationStatus
+      parameters:
+        - name: session_handle
+          in: path
+          description: The SessionHandle that identifies a session.
+          required: true
+          schema:
+            type: string
+            format: uuid
+        - name: operation_handle
+          in: path
+          description: The OperationHandle that identifies a operation.
+          required: true
+          schema:
+            type: string
+            format: uuid
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/OperationStatusResponseBody'
+  /sessions/{session_handle}/statements:
+    post:
+      description: Execute a statement.
+      operationId: executeStatement
+      parameters:
+        - name: session_handle
+          in: path
+          description: The SessionHandle that identifies a session.
+          required: true
+          schema:
+            type: string
+            format: uuid
+      requestBody:
+        content:
+          application/json:
+            schema:
+              $ref: '#/components/schemas/ExecuteStatementRequestBody'
+      responses:
+        "200":
+          description: The request was successful.
+          content:
+            application/json:
+              schema:
+                $ref: '#/components/schemas/ExecuteStatementResponseBody'
+components:
+  schemas:
+    CloseSessionResponseBody:
+      type: object
+      properties:
+        status:
+          type: string
+    WatermarkSpec:
+      type: object
+      properties:
+        rowtimeAttribute:
+          type: string
+        watermarkExpression:
+          $ref: '#/components/schemas/ResolvedExpression'
+    GetInfoResponseBody:
+      type: object
+      properties:
+        productName:
+          type: string
+        version:
+          type: string
+    JobVertexID:
+      pattern: "[0-9a-f]{32}"
+      type: string
+    ResolvedExpression:
+      type: object
+      properties:
+        outputDataType:
+          $ref: '#/components/schemas/DataType'
+        resolvedChildren:
+          type: array
+          items:
+            $ref: '#/components/schemas/ResolvedExpression'
+        children:
+          type: array
+          items:
+            $ref: '#/components/schemas/Expression'
+    LogicalType:
+      type: object
+      properties:
+        typeRoot:
+          $ref: '#/components/schemas/LogicalTypeRoot'
+        children:
+          type: array
+          items:
+            $ref: '#/components/schemas/LogicalType'
+        nullable:
+          type: boolean
+    UniqueConstraint:
+      type: object
+      properties:
+        name:
+          type: string
+        enforced:
+          type: boolean
+        columns:
+          type: array
+          items:
+            type: string
+        type:
+          $ref: '#/components/schemas/ConstraintType'
+    ExecuteStatementRequestBody:
+      type: object
+      properties:
+        statement:
+          type: string
+        executionTimeout:
+          type: integer
+          format: int64
+        executionConfig:
+          type: object
+          additionalProperties:
+            type: string
+    LogicalTypeRoot:
+      type: string
+      enum:
+        - CHAR
+        - VARCHAR
+        - BOOLEAN
+        - BINARY
+        - VARBINARY
+        - DECIMAL
+        - TINYINT
+        - SMALLINT
+        - INTEGER
+        - BIGINT
+        - FLOAT
+        - DOUBLE
+        - DATE
+        - TIME_WITHOUT_TIME_ZONE
+        - TIMESTAMP_WITHOUT_TIME_ZONE
+        - TIMESTAMP_WITH_TIME_ZONE
+        - TIMESTAMP_WITH_LOCAL_TIME_ZONE
+        - INTERVAL_YEAR_MONTH
+        - INTERVAL_DAY_TIME
+        - ARRAY
+        - MULTISET
+        - MAP
+        - ROW
+        - DISTINCT_TYPE
+        - STRUCTURED_TYPE
+        - "NULL"
+        - RAW
+        - SYMBOL
+        - UNRESOLVED
+    OperationHandle:
+      type: object
+      properties:
+        identifier:
+          type: string
+          format: uuid
+    ExecuteStatementResponseBody:
+      type: object
+      properties:
+        operationHandle:
+          type: string
+    FetchResultsResponseBody:
+      type: object
+      properties:
+        results:
+          $ref: '#/components/schemas/ResultSet'
+        resultType:
+          type: string
+        nextResultUri:
+          type: string
+    RowData:
+      type: object
+      properties:
+        arity:
+          type: integer
+          format: int32
+        rowKind:
+          $ref: '#/components/schemas/RowKind'
+    Column:
+      type: object
+      properties:
+        name:
+          type: string
+        dataType:
+          $ref: '#/components/schemas/DataType'
+        comment:
+          type: string
+        physical:
+          type: boolean
+        persisted:
+          type: boolean
+    TriggerId:
+      pattern: "[0-9a-f]{32}"
+      type: string
+    ResourceID:
+      pattern: "[0-9a-f]{32}"
+      type: string
+    OpenSessionRequestBody:
+      type: object
+      properties:
+        sessionName:
+          type: string
+        properties:
+          type: object
+          additionalProperties:
+            type: string
+    ResultSet:
+      type: object
+      properties:
+        columns:
+          type: array
+          items:
+            properties:
+              name:
+                type: string
+              logicalType:
+                type: object
+                properties:
+                  type:
+                    $ref: '#/components/schemas/LogicalTypeRoot'
+                  fields:
+                    type: array
+                    items:
+                      properties:
+                        name:
+                          type: string
+                        fieldType:
+                          type: object
+                          properties:
+                            type:
+                              $ref: '#/components/schemas/LogicalTypeRoot'
+                            nullable:
+                              type: boolean
+                            length:
+                              type: integer
+                              format: int32
+                            precision:
+                              type: integer
+                              format: int32
+                            scale:
+                              type: integer
+                              format: int32
+                  nullable:
+                    type: boolean
+                  length:
+                    type: integer
+                    format: int32
+                  precision:
+                    type: integer
+                    format: int32
+                  scale:
+                    type: integer
+                    format: int32
+              comment:
+                type: string
+        data:
+          type: array
+          items:
+            type: object
+            properties:
+              kind:
+                $ref: '#/components/schemas/RowKind'
+              fields:
+                type: array
+                items:
+                  type: object
+    DataType:
+      type: object
+      properties:
+        logicalType:
+          $ref: '#/components/schemas/LogicalType'
+        children:
+          type: array
+          items:
+            $ref: '#/components/schemas/DataType'
+    ResolvedSchema:
+      type: object
+      properties:
+        columns:
+          type: array
+          items:
+            $ref: '#/components/schemas/Column'
+        watermarkSpecs:
+          type: array
+          items:
+            $ref: '#/components/schemas/WatermarkSpec'
+        primaryKey:
+          $ref: '#/components/schemas/UniqueConstraint'
+        primaryKeyIndexes:
+          type: array
+          items:
+            type: integer
+            format: int32
+        columnCount:
+          type: integer
+          format: int32
+        columnDataTypes:
+          type: array
+          items:
+            $ref: '#/components/schemas/DataType'
+        columnNames:
+          type: array
+          items:
+            type: string
+    GetSessionConfigResponseBody:
+      type: object
+      properties:
+        properties:
+          type: object
+          additionalProperties:
+            type: string
+    SerializedThrowable:
+      type: object
+      properties:
+        serialized-throwable:
+          type: string
+          format: binary
+    GetApiVersionResponseBody:
+      type: object
+      properties:
+        versions:
+          type: array
+          items:
+            type: string
+    OperationStatusResponseBody:
+      type: object
+      properties:
+        status:
+          type: string
+    SessionHandle:
+      type: object
+      properties:
+        identifier:
+          type: string
+          format: uuid
+    ResultType:
+      type: string
+      enum:
+        - NOT_READY
+        - PAYLOAD
+        - EOS
+    Expression:
+      type: object
+    RowKind:
+      type: string
+      enum:
+        - INSERT
+        - UPDATE_BEFORE
+        - UPDATE_AFTER
+        - DELETE
+    OpenSessionResponseBody:
+      type: object
+      properties:
+        sessionHandle:
+          type: string
+    ConstraintType:
+      type: string
+      enum:
+        - PRIMARY_KEY
+        - UNIQUE_KEY
+    IntermediateDataSetID:
+      pattern: "[0-9a-f]{32}"
+      type: string
+    JobID:
+      pattern: "[0-9a-f]{32}"
+      type: string
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java
new file mode 100644
index 000000000..a18f59335
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.flink;
+
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody;
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionResponseBody;
+import org.apache.streampark.gateway.flink.client.rest.ApiClient;
+import org.apache.streampark.gateway.flink.client.rest.ApiException;
+import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi;
+
+import java.util.Collections;
+
+public class FlinkSqlGateway {
+
+  private FlinkSqlGateway() {}
+
+  public static DefaultApi sqlGatewayApi(String basePath) {
+    ApiClient client = new ApiClient();
+    client.setBasePath(basePath);
+    return new DefaultApi(client);
+  }
+
+  public static void main(String[] args) throws ApiException {
+    DefaultApi api = new DefaultApi(new ApiClient());
+    OpenSessionResponseBody openSessionResponseBody =
+        api.openSession(
+            new OpenSessionRequestBody()
+                .sessionName("example")
+                .properties(Collections.singletonMap("foo", "bar")));
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java
new file mode 100644
index 000000000..51943270b
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.flink;
+
+import 
org.apache.streampark.gateway.flink.client.dto.ExecuteStatementRequestBody;
+import 
org.apache.streampark.gateway.flink.client.dto.ExecuteStatementResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.FetchResultsResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody;
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionResponseBody;
+import org.apache.streampark.gateway.flink.client.rest.ApiException;
+import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi;
+
+import java.util.UUID;
+
+public class FlinkSqlGatewayExample {
+
+  private FlinkSqlGatewayExample() {}
+
+  public static void main(String[] args) throws Exception {
+    DefaultApi api = 
FlinkSqlGateway.sqlGatewayApi("http://192.168.20.144:8083";);
+    runOnRemote(api);
+    //    runOnYarn(api);
+    //    runOnKubernetes(api);
+  }
+
+  public static void runOnRemote(DefaultApi api) throws ApiException, 
InterruptedException {
+    OpenSessionResponseBody response =
+        api.openSession(
+            new OpenSessionRequestBody()
+                .putPropertiesItem("rest.address", "192.168.20.239")
+                .putPropertiesItem("rest.port", "8081")
+                .putPropertiesItem("execution.target", "remote"));
+    String sessionHandle = response.getSessionHandle();
+    System.out.println("SessionHandle: " + sessionHandle);
+
+    ExecuteStatementResponseBody statement1 =
+        api.executeStatement(
+            UUID.fromString(sessionHandle),
+            new ExecuteStatementRequestBody()
+                .statement(
+                    "CREATE TABLE Orders (\n"
+                        + "    order_number BIGINT,\n"
+                        + "    price        DECIMAL(32,2),\n"
+                        + "    buyer        ROW<first_name STRING, last_name 
STRING>,\n"
+                        + "    order_time   TIMESTAMP(3)\n"
+                        + ") WITH (\n"
+                        + "  'connector' = 'datagen',\n"
+                        + "  'number-of-rows' = '2'\n"
+                        + ")")
+                .putExecutionConfigItem(
+                    "pipeline.name", "Flink SQL Gateway SDK on flink cluster 
Example"));
+
+    System.out.println("create table: " + statement1.getOperationHandle());
+
+    ExecuteStatementResponseBody statement2 =
+        api.executeStatement(
+            UUID.fromString(sessionHandle),
+            new ExecuteStatementRequestBody()
+                .statement("select * from Orders;")
+                .putExecutionConfigItem(
+                    "pipeline.name", "Flink SQL Gateway SDK on flink cluster 
Example"));
+
+    System.out.println("select * from Orders: " + 
statement2.getOperationHandle());
+
+    Thread.sleep(1000 * 10);
+
+    FetchResultsResponseBody fetchResultsResponseBody =
+        api.fetchResults(
+            UUID.fromString(sessionHandle), 
UUID.fromString(statement2.getOperationHandle()), 0L);
+    
fetchResultsResponseBody.getResults().getColumns().forEach(System.out::println);
+  }
+
+  private static void runOnKubernetes(DefaultApi api) throws ApiException {
+    OpenSessionResponseBody response =
+        api.openSession(
+            new OpenSessionRequestBody()
+                .putPropertiesItem("kubernetes.cluster-id", 
"custom-flink-cluster")
+                .putPropertiesItem("kubernetes.jobmanager.service-account", 
"flink")
+                .putPropertiesItem("kubernetes.namespace", "flink-cluster")
+                .putPropertiesItem("rest.address", "127.0.0.1")
+                .putPropertiesItem("rest.port", "8081")
+                .putPropertiesItem("execution.target", "kubernetes-session"));
+    System.out.println(response.getSessionHandle());
+
+    ExecuteStatementResponseBody statement1 =
+        api.executeStatement(
+            UUID.fromString(response.getSessionHandle()),
+            new ExecuteStatementRequestBody()
+                .statement(
+                    "CREATE TABLE datagen (\n"
+                        + " f_sequence INT,\n"
+                        + " f_random INT,\n"
+                        + " f_random_str STRING\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'datagen',\n"
+                        + " 'rows-per-second'='10',\n"
+                        + " 'fields.f_sequence.kind'='sequence',\n"
+                        + " 'fields.f_sequence.start'='1',\n"
+                        + " 'fields.f_sequence.end'='1000',\n"
+                        + " 'fields.f_random.min'='1',\n"
+                        + " 'fields.f_random.max'='1000',\n"
+                        + " 'fields.f_random_str.length'='10'\n"
+                        + ")")
+                .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway 
SDK on K8S Example"));
+
+    System.out.println(statement1.getOperationHandle());
+
+    ExecuteStatementResponseBody statement2 =
+        api.executeStatement(
+            UUID.fromString(response.getSessionHandle()),
+            new ExecuteStatementRequestBody()
+                .statement(
+                    "CREATE TABLE blackhole_table  (\n"
+                        + " f_sequence INT,\n"
+                        + " f_random INT,\n"
+                        + " f_random_str STRING\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'blackhole'\n"
+                        + ")")
+                .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway 
SDK on K8S Example"));
+
+    System.out.println(statement2.getOperationHandle());
+
+    ExecuteStatementResponseBody statement3 =
+        api.executeStatement(
+            UUID.fromString(response.getSessionHandle()),
+            new ExecuteStatementRequestBody()
+                .statement(
+                    "CREATE TABLE print_table  (\n"
+                        + " f_sequence INT,\n"
+                        + " f_random INT,\n"
+                        + " f_random_str STRING\n"
+                        + ") WITH (\n"
+                        + " 'connector' = 'print'\n"
+                        + ")")
+                .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway 
SDK on K8S Example"));
+
+    System.out.println(statement3.getOperationHandle());
+
+    ExecuteStatementResponseBody statement4 =
+        api.executeStatement(
+            UUID.fromString(response.getSessionHandle()),
+            new ExecuteStatementRequestBody()
+                .statement(
+                    "EXECUTE STATEMENT SET\n"
+                        + "BEGIN\n"
+                        + "    insert into blackhole_table select * from 
datagen;\n"
+                        + "    insert into print_table select * from 
datagen;\n"
+                        + "END;")
+                .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway 
SDK on K8S Example"));
+
+    System.out.println(statement4.getOperationHandle());
+  }
+
+  public static void runOnYarn(DefaultApi api) throws ApiException {
+    OpenSessionResponseBody response =
+        api.openSession(
+            new OpenSessionRequestBody()
+                .putPropertiesItem("execution.target", "yarn-session")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.enabled", "true")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm1", "yarn01")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm2", "yarn01")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.cluster-id", 
"yarn-cluster")
+                .putPropertiesItem(
+                    "flink.hadoop.yarn.client.failover-proxy-provider",
+                    
"org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider")
+                .putPropertiesItem("yarn.application.id", 
"application_1667789375191_XXXX"));
+    System.out.println(response.getSessionHandle());
+    ExecuteStatementResponseBody executeStatementResponseBody =
+        api.executeStatement(
+            UUID.fromString(response.getSessionHandle()),
+            new ExecuteStatementRequestBody()
+                .statement("select 1")
+                .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway 
SDK on YARN Example"));
+    System.out.println(executeStatementResponseBody.getOperationHandle());
+  }
+
+  public static void runOnYarnWithUDF(DefaultApi api) throws ApiException {
+    OpenSessionResponseBody response =
+        api.openSession(
+            new OpenSessionRequestBody()
+                .putPropertiesItem("execution.target", "local")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.enabled", "true")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm1", "yarn01")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm2", "yarn01")
+                
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.cluster-id", 
"yarn-cluster")
+                .putPropertiesItem(
+                    "flink.hadoop.yarn.client.failover-proxy-provider",
+                    
"org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider")
+                .putPropertiesItem("yarn.application.id", 
"application_1667789375191_XXXX"));
+
+    ExecuteStatementResponseBody statment1 =
+        api.executeStatement(
+            UUID.fromString(response.getSessionHandle()),
+            new ExecuteStatementRequestBody()
+                .statement(
+                    "create TEMPORARY FUNCTION \n"
+                        + "    FakeFunction as 
'com.fortycoderplus.flink.udf.FakeFunction'\n"
+                        + "using JAR 
'hdfs://MyHdfsService/udf-test/fake-func.jar'")
+                .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway 
UDF on YARN Example"));
+    System.out.println(statment1.getOperationHandle());
+
+    ExecuteStatementResponseBody statment2 =
+        api.executeStatement(
+            UUID.fromString(response.getSessionHandle()),
+            new ExecuteStatementRequestBody()
+                .statement("select FakeFunction('Flink SQL Gateway UDF on YARN 
Example')")
+                .putExecutionConfigItem(
+                    "pipeline.name", "Flink SQL Gateway UDF on YARN Example-" 
+ UUID.randomUUID()));
+    System.out.println(statment2.getOperationHandle());
+  }
+}
diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/pom.xml
similarity index 74%
copy from streampark-flink/streampark-flink-sql-gateway/pom.xml
copy to 
streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/pom.xml
index fe1824b59..7cc7b1750 100644
--- a/streampark-flink/streampark-flink-sql-gateway/pom.xml
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/pom.xml
@@ -17,18 +17,22 @@
   -->
 <project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"; 
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <modelVersion>4.0.0</modelVersion>
+
     <parent>
         <groupId>org.apache.streampark</groupId>
-        <artifactId>streampark-flink</artifactId>
+        <artifactId>streampark-flink-sql-gateway</artifactId>
         <version>2.2.0-SNAPSHOT</version>
     </parent>
 
-    <artifactId>streampark-flink-sql-gateway</artifactId>
-    <name>StreamPark : SQL Gateway Parent</name>
-    <packaging>pom</packaging>
+    <artifactId>streampark-flink-sql-gateway-kyuubi</artifactId>
+    <name>StreamPark : Kyuubi SQL Gateway</name>
 
-    <modules>
-        <module>streampark-flink-sql-gateway-base</module>
-    </modules>
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-flink-sql-gateway-base</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+    </dependencies>
 
 </project>
diff --git 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/src/main/java/org/apache/streampark/gateway/kyuubi/package-info.java
similarity index 52%
copy from 
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to 
streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/src/main/java/org/apache/streampark/gateway/kyuubi/package-info.java
index 1a359ee49..e5e040f6a 100644
--- 
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/src/main/java/org/apache/streampark/gateway/kyuubi/package-info.java
@@ -6,7 +6,7 @@
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
  *
- *    https://www.apache.org/licenses/LICENSE-2.0
+ *    http://www.apache.org/licenses/LICENSE-2.0
  *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,27 +14,5 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-export default {
-  menu: {
-    system: '系统管理',
-    userManagement: '用户管理',
-    roleManagement: '角色管理',
-    menuManagement: '菜单管理',
-    tokenManagement: 'Token管理',
-    teamManagement: '团队管理',
-    memberManagement: '成员管理',
-    project: '项目管理',
-    application: '作业管理',
-    variable: '变量管理',
-    resource: '资源管理',
-    setting: '设置中心',
-  },
-  setting: {
-    system: '系统设置',
-    alarm: '告警设置',
-    flinkHome: 'Flink 版本',
-    flinkCluster: 'Flink集群',
-    externalLink: '扩展链接',
-    yarnQueue: 'Yarn 队列',
-  },
-};
+
+package org.apache.streampark.gateway.kyuubi;


Reply via email to