This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new cac931ae2 [sql gateway]Implement flink native (#2737)
cac931ae2 is described below
commit cac931ae289e0753892279336e1c4e70e5f7d7c6
Author: gongzhongqiang <[email protected]>
AuthorDate: Sun Jul 2 18:28:02 2023 +0800
[sql gateway]Implement flink native (#2737)
* Implement flink native and kyuubi sql gateway
* fix exception when fetch result with row type
* polish code
* add gateway management
* add check method to check flink whether version is supported
* complete k8s session config
* complete gateway session module
* Add gateway manage frontend
* polish gateway
* adress comment, remove mysql driver dependency
* move sql to 2.2.0
* frontend code format
---------
Co-authored-by: gongzhongqiang <[email protected]>
---
.../streampark-console-service/pom.xml | 13 +-
.../main/assembly/script/upgrade/mysql/2.2.0.sql | 33 ++
.../main/assembly/script/upgrade/pgsql/2.2.0.sql | 40 ++
.../core/controller/FlinkGateWayController.java | 102 ++++
.../core/controller/SqlWorkBenchController.java | 195 +++++++
.../console/core/entity/FlinkGateWay.java | 51 ++
.../console/core/enums/GatewayTypeEnum.java | 55 +-
.../console/core/mapper/FlinkGateWayMapper.java} | 34 +-
.../console/core/service/FlinkGateWayService.java} | 44 +-
.../console/core/service/SqlWorkBenchService.java | 137 +++++
.../core/service/impl/FlinkGateWayServiceImpl.java | 91 ++++
.../core/service/impl/SqlWorkBenchServiceImpl.java | 220 ++++++++
.../resources/mapper/core/FlinkGateWayMapper.xml | 27 +-
.../src/api/flink/setting/flinkGateway.ts | 78 +++
.../src/locales/lang/en/menu.ts | 1 +
.../{zh-CN/menu.ts => en/setting/flinkGateway.ts} | 41 +-
.../src/locales/lang/zh-CN/menu.ts | 5 +-
.../zh-CN/{menu.ts => setting/flinkGateway.ts} | 41 +-
.../FlinkGateway/components/FlinkGatewayDrawer.vue | 87 ++++
.../src/views/setting/FlinkGateway/index.data.ts | 87 ++++
.../src/views/setting/FlinkGateway/index.vue | 136 +++++
.../streampark-flink-sql-gateway/pom.xml | 2 +
.../apache/streampark/gateway/OperationHandle.java | 7 +-
.../gateway/service/SqlGatewayService.java | 12 +
.../streampark/gateway/session/SessionHandle.java | 11 +-
.../gateway/utils/FakeSqlGatewayService.java | 5 +
.../gateway/utils/MockedSqlGatewayService.java | 5 +
.../streampark-flink-sql-gateway-flink-v1/pom.xml | 196 +++++++
.../gateway/flink/FlinkSqlGatewayImpl.java | 239 +++++++++
.../flink/FlinkSqlGatewayServiceFactory.java | 63 +++
...org.apache.streampark.gateway.factories.Factory | 19 +
.../main/resources/flink_sql_gateway_rest_v1.yml | 573 +++++++++++++++++++++
.../streampark/gateway/flink/FlinkSqlGateway.java | 46 ++
.../gateway/flink/FlinkSqlGatewayExample.java | 229 ++++++++
.../pom.xml | 18 +-
.../streampark/gateway/kyuubi/package-info.java | 28 +-
36 files changed, 2787 insertions(+), 184 deletions(-)
diff --git a/streampark-console/streampark-console-service/pom.xml
b/streampark-console/streampark-console-service/pom.xml
index 5705bded4..bbc761000 100644
--- a/streampark-console/streampark-console-service/pom.xml
+++ b/streampark-console/streampark-console-service/pom.xml
@@ -282,7 +282,6 @@
<version>${postgresql.version}</version>
<optional>true</optional>
</dependency>
-
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
@@ -372,6 +371,18 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-sql-gateway-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-sql-gateway-flink-v1</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
<dependency>
<groupId>com.fasterxml.jackson.module</groupId>
<artifactId>jackson-module-scala_${scala.binary.version}</artifactId>
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index 81db6ac9b..16404952d 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -71,4 +71,37 @@ insert into `t_role_menu` (role_id, menu_id) values (100002,
120403);
alter table `t_user` modify column `password` varchar(64) collate
utf8mb4_general_ci default null comment 'password';
alter table `t_user` modify column `login_type` tinyint default 0 comment
'login type 0:password 1:ldap 2:sso';
+-- ----------------------------
+-- Table of t_flink_gateway
+-- ----------------------------
+drop table if exists `t_flink_gateway`;
+create table `t_flink_gateway` (
+ `id` bigint not null auto_increment,
+ `gateway_name` varchar(128) collate
utf8mb4_general_ci not null comment 'The name of the gateway',
+ `description` text collate utf8mb4_general_ci
default null comment 'More detailed description of resource',
+ `gateway_type` int not null comment 'The type
of the gateway',
+ `address` varchar(150) default null comment
'url address of gateway endpoint',
+ `create_time` datetime not null default
current_timestamp comment 'create time',
+ `modify_time` datetime not null default
current_timestamp on update current_timestamp comment 'modify time',
+ primary key (`id`) using btree
+) engine=innodb auto_increment=100000 default charset=utf8mb4
collate=utf8mb4_general_ci;
+
+-- menu level 2
+insert into `t_menu` values (120500, 130000, 'setting.flinkGateway',
'/setting/FlinkGateway', 'setting/FlinkGateway/index', null, 'apartment', '0',
1, 3, now(), now());
+-- menu level 3
+insert into `t_menu` values (120501, 120500, 'add', NULL, NULL, 'gateway:add',
NULL, '1', 1, NULL, now(), now());
+insert into `t_menu` values (120502, 120500, 'update', NULL, NULL,
'gateway:update', NULL, '1', 1, NULL, now(), now());
+insert into `t_menu` values (120503, 120500, 'delete', NULL, NULL,
'gateway:delete', NULL, '1', 1, NULL, now(), now());
+
+-- role menu script
+insert into `t_role_menu` (role_id, menu_id) values (100001, 120500);
+insert into `t_role_menu` (role_id, menu_id) values (100001, 120501);
+insert into `t_role_menu` (role_id, menu_id) values (100001, 120502);
+insert into `t_role_menu` (role_id, menu_id) values (100001, 120503);
+
+insert into `t_role_menu` (role_id, menu_id) values (100002, 120500);
+insert into `t_role_menu` (role_id, menu_id) values (100002, 120501);
+insert into `t_role_menu` (role_id, menu_id) values (100002, 120502);
+insert into `t_role_menu` (role_id, menu_id) values (100002, 120503);
+
set foreign_key_checks = 1;
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index 9352cd563..56b055ce8 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -80,3 +80,43 @@ insert into "public"."t_role_menu" (role_id, menu_id) values
(100002, 120403);
-- add sso as login type
alter table "public"."t_user" alter column "password" TYPE varchar(64) collate
"pg_catalog"."default";
comment on column "public"."t_user"."login_type" is 'login type 0:password
1:ldap 2:sso';
+
+-- ----------------------------
+-- Table of t_flink_gateway
+-- ----------------------------
+create sequence "public"."streampark_t_flink_gateway_id_seq"
+ increment 1 start 10000 cache 1 minvalue 10000 maxvalue
9223372036854775807;
+
+create table "public"."t_flink_gateway" (
+ "id" int8 not null default
nextval('streampark_t_resource_id_seq'::regclass),
+ "gateway_name" varchar(128)
collate "pg_catalog"."default" not null,
+ "description" text collate
"pg_catalog"."default" default null,
+ "gateway_type" int4,
+ "address" varchar(150) collate
"pg_catalog"."default",
+ "create_time" timestamp(6) not
null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone),
+ "modify_time" timestamp(6) not
null default timezone('UTC-8'::text, (now())::timestamp(0) without time zone)
+);
+comment on column "public"."t_flink_gateway"."id" is 'The id of the gateway';
+comment on column "public"."t_flink_gateway"."gateway_name" is 'The name of
the gateway';
+comment on column "public"."t_flink_gateway"."description" is 'More detailed
description of resource';
+comment on column "public"."t_flink_gateway"."gateway_type" is 'The type of
the gateway';
+comment on column "public"."t_flink_gateway"."address" is 'url address of
gateway endpoint';
+comment on column "public"."t_flink_gateway"."create_time" is 'create time';
+comment on column "public"."t_flink_gateway"."modify_time" is 'modify time';
+
+alter table "public"."t_flink_gateway" add constraint "t_flink_gateway_pkey"
primary key ("id");
+
+insert into "public"."t_menu" values (120500, 130000, 'setting.flinkGateway',
'/setting/FlinkGateway', 'setting/FlinkGateway/index', null, 'apartment', '0',
'1', 3, now(), now());
+insert into "public"."t_menu" values (110501, 110500, 'add', null, null,
'gateway:add', null, '1', '1', null, now(), now());
+insert into "public"."t_menu" values (110502, 110500, 'update', null, null,
'gateway:update', null, '1', '1', null, now(), now());
+insert into "public"."t_menu" values (110503, 110500, 'delete', null, null,
'gateway:delete', null, '1', '1', null, now(), now());
+
+insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 120500);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 120501);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 120502);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100001, 120503);
+
+insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 120500);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 120501);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 120502);
+insert into "public"."t_role_menu" (role_id, menu_id) values (100002, 120503);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkGateWayController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkGateWayController.java
new file mode 100644
index 000000000..6f0e0ec4f
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkGateWayController.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.controller;
+
+import org.apache.streampark.console.base.domain.RestResponse;
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+import org.apache.streampark.console.core.enums.GatewayTypeEnum;
+import org.apache.streampark.console.core.service.FlinkGateWayService;
+
+import io.swagger.v3.oas.annotations.Operation;
+import io.swagger.v3.oas.annotations.tags.Tag;
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.PutMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+import javax.validation.constraints.NotNull;
+
+@Tag(name = "FLINK_GATEWAY_TAG")
+@Slf4j
+@Validated
+@RestController
+@RequiredArgsConstructor
+@RequestMapping("flink/gateway")
+public class FlinkGateWayController {
+
+ private final FlinkGateWayService flinkGatewayService;
+
+ @Operation(summary = "List flink gateways")
+ @GetMapping("list")
+ public RestResponse list() {
+ return RestResponse.success(flinkGatewayService.list());
+ }
+
+ @Operation(summary = "Create flink gateway")
+ @PostMapping("create")
+ public RestResponse create(@RequestBody FlinkGateWay flinkGateWay) {
+ flinkGatewayService.create(flinkGateWay);
+ return RestResponse.success();
+ }
+
+ @Operation(summary = "Check flink gateway name")
+ @GetMapping("check/name")
+ public RestResponse checkName(
+ @NotNull(message = "The Gateway name cannot be null")
@RequestParam("name") String name) {
+ return RestResponse.success(flinkGatewayService.existsByGatewayName(name));
+ }
+
+ @Operation(summary = "Check flink gateway address")
+ @GetMapping("check/address")
+ public RestResponse checkAddress(
+ @NotNull(message = "The Gateway address cannot be null")
@RequestParam("address")
+ String address)
+ throws Exception {
+ GatewayTypeEnum gatewayVersion =
flinkGatewayService.getGatewayVersion(address);
+ return RestResponse.success(gatewayVersion);
+ }
+
+ @Operation(summary = "Update flink gateway")
+ @PutMapping("update")
+ public RestResponse update(@RequestBody FlinkGateWay flinkGateWay) {
+ flinkGatewayService.update(flinkGateWay);
+ return RestResponse.success();
+ }
+
+ @Operation(summary = "Get flink gateway by id")
+ @GetMapping("get/{id}")
+ public RestResponse get(@PathVariable Long id) {
+ return RestResponse.success(flinkGatewayService.getById(id));
+ }
+
+ @Operation(summary = "Delete flink gateway by id")
+ @DeleteMapping("delete")
+ public RestResponse delete(
+ @NotNull(message = "The Gateway id cannot be null") @RequestParam("id")
Long id) {
+ flinkGatewayService.removeById(id);
+ return RestResponse.success();
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java
new file mode 100644
index 000000000..c0d7aa77b
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/SqlWorkBenchController.java
@@ -0,0 +1,195 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.controller;
+
+import org.apache.streampark.console.base.domain.RestResponse;
+import org.apache.streampark.console.core.annotation.ApiAccess;
+import org.apache.streampark.console.core.service.SqlWorkBenchService;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+import io.swagger.annotations.Api;
+import io.swagger.annotations.ApiOperation;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.validation.annotation.Validated;
+import org.springframework.web.bind.annotation.DeleteMapping;
+import org.springframework.web.bind.annotation.GetMapping;
+import org.springframework.web.bind.annotation.PathVariable;
+import org.springframework.web.bind.annotation.PostMapping;
+import org.springframework.web.bind.annotation.RequestBody;
+import org.springframework.web.bind.annotation.RequestMapping;
+import org.springframework.web.bind.annotation.RequestParam;
+import org.springframework.web.bind.annotation.RestController;
+
+@Api(tags = {"FLINK_GATEWAY_TAG"})
+@Slf4j
+@Validated
+@RestController
+@RequestMapping("flink/sqlWorkBench/{flinkGatewayId}")
+public class SqlWorkBenchController {
+ private final SqlWorkBenchService sqlWorkBenchService;
+
+ public SqlWorkBenchController(SqlWorkBenchService sqlWorkBenchService) {
+ this.sqlWorkBenchService = sqlWorkBenchService;
+ }
+
+ //
-------------------------------------------------------------------------------------------
+ // Validation API
+ //
-------------------------------------------------------------------------------------------
+ @ApiAccess
+ @ApiOperation(value = "Check Support", notes = "Check Support", tags =
"FLINK_GATEWAY_TAG")
+ @GetMapping("{flinkClusterId}/check")
+ public RestResponse check(@PathVariable Long flinkGatewayId, @PathVariable
Long flinkClusterId) {
+ return RestResponse.success(sqlWorkBenchService.check(flinkGatewayId,
flinkClusterId));
+ }
+
+ //
-------------------------------------------------------------------------------------------
+ // Info API
+ //
-------------------------------------------------------------------------------------------
+
+ @ApiAccess
+ @ApiOperation(value = "Get gateway info", notes = "Get gateway info", tags =
"FLINK_GATEWAY_TAG")
+ @GetMapping("getGatewayInfo")
+ public RestResponse getGatewayInfo(@PathVariable Long flinkGatewayId) {
+ return
RestResponse.success(sqlWorkBenchService.getGatewayInfo(flinkGatewayId));
+ }
+
+ //
-------------------------------------------------------------------------------------------
+ // Session Management
+ //
-------------------------------------------------------------------------------------------
+
+ @ApiAccess
+ @ApiOperation(value = "Open sessions", notes = "Open sessions", tags =
"FLINK_GATEWAY_TAG")
+ @PostMapping("/{flinkClusterId}/sessions")
+ public RestResponse openSession(
+ @PathVariable Long flinkGatewayId, @PathVariable Long flinkClusterId) {
+ SessionHandle sessionHandle =
sqlWorkBenchService.openSession(flinkGatewayId, flinkClusterId);
+ return RestResponse.success(sessionHandle);
+ }
+
+ @ApiAccess
+ @ApiOperation(value = "Heartbeat", notes = "Heartbeat", tags =
"FLINK_GATEWAY_TAG")
+ @PostMapping("sessions/{sessionHandle}/heartbeat")
+ public RestResponse heartbeat(
+ @PathVariable Long flinkGatewayId, @PathVariable String sessionHandle) {
+ sqlWorkBenchService.heartbeat(flinkGatewayId, sessionHandle);
+ return RestResponse.success();
+ }
+
+ @ApiAccess
+ @ApiOperation(value = "Close session", notes = "Close session", tags =
"FLINK_GATEWAY_TAG")
+ @DeleteMapping("sessions/{sessionHandle}")
+ public RestResponse closeSession(
+ @PathVariable Long flinkGatewayId, @PathVariable String sessionHandle) {
+ sqlWorkBenchService.closeSession(flinkGatewayId, sessionHandle);
+ return RestResponse.success();
+ }
+
+ //
-------------------------------------------------------------------------------------------
+ // Operation Management
+ //
-------------------------------------------------------------------------------------------
+
+ @ApiAccess
+ @ApiOperation(value = "Cancel operation", notes = "Cancel operation", tags =
"FLINK_GATEWAY_TAG")
+ @PostMapping("sessions/{sessionHandle}/operations/{operationHandle}/cancel")
+ public RestResponse cancelOperation(
+ @PathVariable Long flinkGatewayId,
+ @PathVariable String sessionHandle,
+ @PathVariable String operationHandle) {
+ sqlWorkBenchService.cancelOperation(flinkGatewayId, sessionHandle,
operationHandle);
+ return RestResponse.success();
+ }
+
+ @ApiAccess
+ @ApiOperation(value = "Close operation", notes = "Close operation", tags =
"FLINK_GATEWAY_TAG")
+ @DeleteMapping("sessions/{sessionHandle}/operations/{operationHandle}/close")
+ public RestResponse closeOperation(
+ @PathVariable Long flinkGatewayId,
+ @PathVariable String sessionHandle,
+ @PathVariable String operationHandle) {
+ sqlWorkBenchService.closeOperation(flinkGatewayId, sessionHandle,
operationHandle);
+ return RestResponse.success();
+ }
+
+ @ApiAccess
+ @ApiOperation(
+ value = "Get operation info",
+ notes = "Get operation info",
+ tags = "FLINK_GATEWAY_TAG")
+ @PostMapping("sessions/{sessionHandle}/operations/{operationHandle}/info")
+ public RestResponse getOperationInfo(
+ @PathVariable Long flinkGatewayId,
+ @PathVariable String sessionHandle,
+ @PathVariable String operationHandle) {
+ return RestResponse.success(
+ sqlWorkBenchService.getOperationInfo(flinkGatewayId, sessionHandle,
operationHandle));
+ }
+
+ @ApiAccess
+ @ApiOperation(
+ value = "Get operation result schema",
+ notes = "Get operation result schema",
+ tags = "FLINK_GATEWAY_TAG")
+
@PostMapping("sessions/{sessionHandle}/operations/{operationHandle}/resultSchema")
+ public RestResponse getOperationResultSchema(
+ @PathVariable Long flinkGatewayId,
+ @PathVariable String sessionHandle,
+ @PathVariable String operationHandle) {
+ return RestResponse.success(
+ sqlWorkBenchService.getOperationResultSchema(
+ flinkGatewayId, sessionHandle, operationHandle));
+ }
+
+ //
-------------------------------------------------------------------------------------------
+ // Statements API
+ //
-------------------------------------------------------------------------------------------
+
+ @ApiAccess
+ @ApiOperation(
+ value = "Execute statement",
+ notes = "Execute statement",
+ tags = "FLINK_GATEWAY_TAG")
+ @PostMapping("sessions/{sessionHandle}/statements")
+ public RestResponse executeStatement(
+ @PathVariable Long flinkGatewayId,
+ @PathVariable String sessionHandle,
+ @RequestParam String statement) {
+ return RestResponse.success(
+ sqlWorkBenchService.executeStatement(flinkGatewayId, sessionHandle,
statement));
+ }
+
+ @ApiAccess
+ @ApiOperation(value = "Fetch results", notes = "Fetch results", tags =
"FLINK_GATEWAY_TAG")
+ @PostMapping("sessions/{sessionHandle}/statements/{operationHandle}/info")
+ public RestResponse fetchResults(
+ @PathVariable Long flinkGatewayId,
+ @PathVariable String sessionHandle,
+ @PathVariable String operationHandle,
+ @RequestBody ResultQueryCondition resultQueryCondition) {
+ return RestResponse.success(
+ sqlWorkBenchService.fetchResults(
+ flinkGatewayId, sessionHandle, operationHandle,
resultQueryCondition));
+ }
+
+ //
-------------------------------------------------------------------------------------------
+ // Catalog API
+ //
-------------------------------------------------------------------------------------------
+ // TODO: 2023/5/5 because of catalog with fixed statement, so frontend can
use above methods to
+ // get catalog info
+
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkGateWay.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkGateWay.java
new file mode 100644
index 000000000..bcfc4cb9a
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkGateWay.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.entity;
+
+import org.apache.streampark.console.core.enums.GatewayTypeEnum;
+
+import com.baomidou.mybatisplus.annotation.IdType;
+import com.baomidou.mybatisplus.annotation.TableId;
+import com.baomidou.mybatisplus.annotation.TableName;
+import lombok.Data;
+
+import javax.validation.constraints.NotBlank;
+
+import java.util.Date;
+
+@Data
+@TableName("t_flink_gateway")
+public class FlinkGateWay {
+
+ @TableId(type = IdType.AUTO)
+ private Long id;
+
+ @NotBlank(message = "{required}")
+ private String gatewayName;
+
+ private String description;
+
+ private GatewayTypeEnum gatewayType;
+
+ @NotBlank(message = "{required}")
+ private String address;
+
+ private Date createTime;
+
+ private Date modifyTime;
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GatewayTypeEnum.java
similarity index 50%
copy from
streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GatewayTypeEnum.java
index b57d1ac3e..60bdf81d3 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/GatewayTypeEnum.java
@@ -15,47 +15,38 @@
* limitations under the License.
*/
-package org.apache.streampark.gateway.session;
+package org.apache.streampark.console.core.enums;
-import java.util.Objects;
-import java.util.UUID;
+import com.baomidou.mybatisplus.annotation.EnumValue;
+import lombok.Getter;
-/** Session Handle that used to identify the Session. */
-public class SessionHandle {
+import java.util.Arrays;
- private final UUID identifier;
+@Getter
+public enum GatewayTypeEnum {
- public static SessionHandle create() {
- return new SessionHandle(UUID.randomUUID());
- }
+ /** After flink 1.16 (including 1.16) */
+ FLINK_V1(1, "flink-v1"),
- public SessionHandle(UUID identifier) {
- this.identifier = identifier;
- }
+ /** After flink 1.17 (including 1.17) */
+ FLINK_V2(2, "flink-v2"),
- public UUID getIdentifier() {
- return identifier;
- }
+ /** After kyuubi 1.7.0 (including 1.7.0) */
+ KYUUBI(10, "kyuubi"),
+ ;
+ @EnumValue private final int value;
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (!(o instanceof SessionHandle)) {
- return false;
- }
- SessionHandle that = (SessionHandle) o;
- return Objects.equals(identifier, that.identifier);
- }
+ private final String identifier;
- @Override
- public int hashCode() {
- return Objects.hash(identifier);
+ GatewayTypeEnum(int value, String identifier) {
+ this.value = value;
+ this.identifier = identifier;
}
- @Override
- public String toString() {
- return identifier.toString();
+ public static GatewayTypeEnum of(int value) {
+ return Arrays.stream(values())
+ .filter(x -> x.value == value)
+ .findFirst()
+ .orElseThrow(() -> new IllegalArgumentException("Unknown
GatewayTypeEnum value: " + value));
}
}
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkGateWayMapper.java
similarity index 52%
copy from
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkGateWayMapper.java
index 1a359ee49..2838d98ef 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/FlinkGateWayMapper.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * https://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,27 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-export default {
- menu: {
- system: '系统管理',
- userManagement: '用户管理',
- roleManagement: '角色管理',
- menuManagement: '菜单管理',
- tokenManagement: 'Token管理',
- teamManagement: '团队管理',
- memberManagement: '成员管理',
- project: '项目管理',
- application: '作业管理',
- variable: '变量管理',
- resource: '资源管理',
- setting: '设置中心',
- },
- setting: {
- system: '系统设置',
- alarm: '告警设置',
- flinkHome: 'Flink 版本',
- flinkCluster: 'Flink集群',
- externalLink: '扩展链接',
- yarnQueue: 'Yarn 队列',
- },
-};
+
+package org.apache.streampark.console.core.mapper;
+
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+
+import com.baomidou.mybatisplus.core.mapper.BaseMapper;
+
+public interface FlinkGateWayMapper extends BaseMapper<FlinkGateWay> {}
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkGateWayService.java
similarity index 52%
copy from
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to
streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkGateWayService.java
index 1a359ee49..20289e502 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/FlinkGateWayService.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * https://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,27 +14,21 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-export default {
- menu: {
- system: '系统管理',
- userManagement: '用户管理',
- roleManagement: '角色管理',
- menuManagement: '菜单管理',
- tokenManagement: 'Token管理',
- teamManagement: '团队管理',
- memberManagement: '成员管理',
- project: '项目管理',
- application: '作业管理',
- variable: '变量管理',
- resource: '资源管理',
- setting: '设置中心',
- },
- setting: {
- system: '系统设置',
- alarm: '告警设置',
- flinkHome: 'Flink 版本',
- flinkCluster: 'Flink集群',
- externalLink: '扩展链接',
- yarnQueue: 'Yarn 队列',
- },
-};
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+import org.apache.streampark.console.core.enums.GatewayTypeEnum;
+
+import com.baomidou.mybatisplus.extension.service.IService;
+import com.fasterxml.jackson.core.JsonProcessingException;
+
+public interface FlinkGateWayService extends IService<FlinkGateWay> {
+ void create(FlinkGateWay flinkGateWay);
+
+ void update(FlinkGateWay flinkGateWay);
+
+ boolean existsByGatewayName(String name);
+
+ GatewayTypeEnum getGatewayVersion(String address) throws
JsonProcessingException;
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java
new file mode 100644
index 000000000..40ded03f2
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/SqlWorkBenchService.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service;
+
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+public interface SqlWorkBenchService {
+
+ /**
+ * Open a session for a flink cluster
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param flinkClusterId flink cluster id
+ * @return session handle
+ */
+ SessionHandle openSession(Long flinkGatewayId, Long flinkClusterId);
+
+ /**
+ * Close a session
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param sessionHandleUUIDStr session handle uuid string
+ */
+ void closeSession(Long flinkGatewayId, String sessionHandleUUIDStr);
+
+ /**
+ * Get the gateway info
+ *
+ * @param flinkGatewayId flink gateway id
+ * @return gateway info
+ */
+ GatewayInfo getGatewayInfo(Long flinkGatewayId);
+
+ /**
+ * Get the session info
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param sessionHandleUUIDStr session handle uuid string
+ * @param operationId operation id
+ */
+ void cancelOperation(Long flinkGatewayId, String sessionHandleUUIDStr,
String operationId);
+
+ /**
+ * Close the operation
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param sessionHandleUUIDStr session handle uuid string
+ * @param operationId operation id
+ */
+ void closeOperation(Long flinkGatewayId, String sessionHandleUUIDStr, String
operationId);
+
+ /**
+ * Get operation info
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param sessionHandleUUIDStr session handle uuid string
+ * @param operationId operation id
+ * @return operation info
+ */
+ OperationInfo getOperationInfo(
+ Long flinkGatewayId, String sessionHandleUUIDStr, String operationId);
+
+ /**
+ * Get operation result schema
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param sessionHandleUUIDStr session handle uuid string
+ * @param operationId operation id
+ * @return operation result schema
+ */
+ Column getOperationResultSchema(
+ Long flinkGatewayId, String sessionHandleUUIDStr, String operationId);
+
+ /**
+ * Execute statement
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param sessionHandleUUIDStr session handle uuid string
+ * @param statement statement
+ * @return operation handle
+ */
+ OperationHandle executeStatement(
+ Long flinkGatewayId, String sessionHandleUUIDStr, String statement);
+
+ /**
+ * Fetch results
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param sessionHandleUUIDStr session handle uuid string
+ * @param operationId operation id
+ * @param resultQueryCondition result query condition
+ * @return result set
+ */
+ ResultSet fetchResults(
+ Long flinkGatewayId,
+ String sessionHandleUUIDStr,
+ String operationId,
+ ResultQueryCondition resultQueryCondition);
+
+ /**
+ * Send heartbeat
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param sessionHandle session handle
+ */
+ void heartbeat(Long flinkGatewayId, String sessionHandle);
+
+ /**
+ * check flink cluster version is supported
+ *
+ * @param flinkGatewayId flink gateway id
+ * @param flinkClusterId flink cluster id
+ * @return true if supported
+ */
+ boolean check(Long flinkGatewayId, Long flinkClusterId);
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java
new file mode 100644
index 000000000..f14280b84
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkGateWayServiceImpl.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.common.util.HttpClientUtils;
+import org.apache.streampark.console.base.exception.ApiAlertException;
+import org.apache.streampark.console.base.util.JacksonUtils;
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+import org.apache.streampark.console.core.enums.GatewayTypeEnum;
+import org.apache.streampark.console.core.mapper.FlinkGateWayMapper;
+import org.apache.streampark.console.core.service.FlinkGateWayService;
+import
org.apache.streampark.gateway.flink.client.dto.GetApiVersionResponseBody;
+
+import org.apache.hc.client5.http.config.RequestConfig;
+
+import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
+import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+import org.springframework.transaction.annotation.Propagation;
+import org.springframework.transaction.annotation.Transactional;
+
+import java.util.concurrent.TimeUnit;
+
+@Slf4j
+@Service
+@Transactional(propagation = Propagation.SUPPORTS, readOnly = true,
rollbackFor = Exception.class)
+public class FlinkGateWayServiceImpl extends ServiceImpl<FlinkGateWayMapper,
FlinkGateWay>
+ implements FlinkGateWayService {
+
+ private void preHandleGatewayInfo(FlinkGateWay flinkGateWay) {
+ // validate gateway name
+ if (existsByGatewayName(flinkGateWay.getGatewayName())) {
+ throw new ApiAlertException("gateway name already exists");
+ }
+ // validate gateway address and set gateway type
+ flinkGateWay.setGatewayType(getGatewayVersion(flinkGateWay.getAddress()));
+ }
+
+ @Override
+ public void create(FlinkGateWay flinkGateWay) {
+ preHandleGatewayInfo(flinkGateWay);
+ this.save(flinkGateWay);
+ }
+
+ @Override
+ public void update(FlinkGateWay flinkGateWay) {
+ preHandleGatewayInfo(flinkGateWay);
+ this.saveOrUpdate(flinkGateWay);
+ }
+
+ @Override
+ public boolean existsByGatewayName(String name) {
+ return getBaseMapper()
+ .exists(new
LambdaQueryWrapper<FlinkGateWay>().eq(FlinkGateWay::getGatewayName, name));
+ }
+
+ @Override
+ public GatewayTypeEnum getGatewayVersion(String address) {
+ String restUrl = address + "/api_versions";
+ try {
+ String result =
+ HttpClientUtils.httpGetRequest(
+ restUrl,
+ RequestConfig.custom().setConnectTimeout(2000,
TimeUnit.MILLISECONDS).build());
+ if (result != null) {
+ String versionStr =
+ JacksonUtils.read(result,
GetApiVersionResponseBody.class).getVersions().get(0);
+ return "V1".equals(versionStr) ? GatewayTypeEnum.FLINK_V1 :
GatewayTypeEnum.FLINK_V2;
+ }
+ } catch (Exception e) {
+ log.error("get gateway version failed", e);
+ }
+ throw new ApiAlertException("get gateway version failed");
+ }
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
new file mode 100644
index 000000000..3b7be04ae
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SqlWorkBenchServiceImpl.java
@@ -0,0 +1,220 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.console.core.service.impl;
+
+import org.apache.streampark.common.enums.ExecutionMode;
+import org.apache.streampark.common.util.HadoopConfigUtils;
+import org.apache.streampark.console.core.entity.FlinkCluster;
+import org.apache.streampark.console.core.entity.FlinkEnv;
+import org.apache.streampark.console.core.entity.FlinkGateWay;
+import org.apache.streampark.console.core.service.FlinkClusterService;
+import org.apache.streampark.console.core.service.FlinkEnvService;
+import org.apache.streampark.console.core.service.FlinkGateWayService;
+import org.apache.streampark.console.core.service.SqlWorkBenchService;
+import org.apache.streampark.flink.kubernetes.KubernetesRetriever;
+import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode;
+import org.apache.streampark.flink.kubernetes.ingress.IngressController;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.factories.FactoryUtil;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils;
+import org.apache.streampark.gateway.flink.FlinkSqlGatewayServiceFactory;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+import org.apache.flink.client.program.ClusterClient;
+
+import lombok.RequiredArgsConstructor;
+import lombok.extern.slf4j.Slf4j;
+import org.springframework.stereotype.Service;
+
+import java.net.URI;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.UUID;
+
+import static
org.apache.streampark.common.enums.ExecutionMode.KUBERNETES_NATIVE_SESSION;
+import static org.apache.streampark.common.enums.ExecutionMode.REMOTE;
+import static org.apache.streampark.common.enums.ExecutionMode.YARN_SESSION;
+
+@Slf4j
+@Service
+@RequiredArgsConstructor
+public class SqlWorkBenchServiceImpl implements SqlWorkBenchService {
+
+ private final FlinkClusterService flinkClusterService;
+ private final FlinkGateWayService flinkGateWayService;
+ private final FlinkEnvService flinkEnvService;
+
+ /** Get SqlGatewayService instance by flinkGatewayId */
+ private SqlGatewayService getSqlGateWayService(Long flinkGatewayId) {
+ FlinkGateWay flinkGateWay = flinkGateWayService.getById(flinkGatewayId);
+ if (flinkGateWay == null) {
+ throw new IllegalArgumentException(
+ "flinkGateWay is not exist, please check your config, id: " +
flinkGatewayId);
+ }
+ Map<String, String> config = new HashMap<>(2);
+ config.put(
+ FactoryUtil.SQL_GATEWAY_SERVICE_TYPE.getKey(),
+ flinkGateWay.getGatewayType().getIdentifier());
+ config.put(FlinkSqlGatewayServiceFactory.BASE_URI.getKey(),
flinkGateWay.getAddress());
+ List<SqlGatewayService> actual =
SqlGatewayServiceFactoryUtils.createSqlGatewayService(config);
+ if (actual.size() > 1) {
+ log.warn("There are more than one SqlGatewayService instance, please
check your config");
+ }
+ return actual.get(0);
+ }
+
+ @Override
+ public GatewayInfo getGatewayInfo(Long flinkGatewayId) {
+ SqlGatewayService sqlGateWayService = getSqlGateWayService(flinkGatewayId);
+ return sqlGateWayService.getGatewayInfo();
+ }
+
+ @Override
+ public SessionHandle openSession(Long flinkGatewayId, Long flinkClusterId) {
+ Map<String, String> streamParkConf = new HashMap<>();
+ SqlGatewayService sqlGateWayService = getSqlGateWayService(flinkGatewayId);
+ FlinkCluster flinkCluster = flinkClusterService.getById(flinkClusterId);
+ URI remoteURI = flinkCluster.getRemoteURI();
+ String host = remoteURI.getHost();
+ String port = String.valueOf(remoteURI.getPort());
+ String clusterId = flinkCluster.getClusterId();
+
+ ExecutionMode executionMode =
ExecutionMode.of(flinkCluster.getExecutionMode());
+ if (executionMode == null) {
+ throw new IllegalArgumentException("executionMode is null");
+ }
+
+ streamParkConf.put("execution.target", executionMode.getName());
+ switch (Objects.requireNonNull(executionMode)) {
+ case REMOTE:
+ streamParkConf.put("rest.address", host);
+ streamParkConf.put("rest.port", port);
+ break;
+ case YARN_SESSION:
+ streamParkConf.put("yarn.application.id", clusterId);
+ HadoopConfigUtils.readSystemHadoopConf()
+ .forEach((k, v) -> streamParkConf.put("flink.hadoop." + k, v));
+ break;
+ case KUBERNETES_NATIVE_SESSION:
+ String k8sNamespace = flinkCluster.getK8sNamespace();
+ String restAddress;
+ try (ClusterClient<?> clusterClient =
+ (ClusterClient<?>)
+ KubernetesRetriever.newFinkClusterClient(
+ clusterId, k8sNamespace,
FlinkK8sExecuteMode.of(executionMode))) {
+ restAddress = IngressController.ingressUrlAddress(k8sNamespace,
clusterId, clusterClient);
+ } catch (Exception e) {
+ throw new IllegalArgumentException("get k8s rest address error", e);
+ }
+ streamParkConf.put("kubernetes.cluster-id", clusterId);
+ streamParkConf.put(
+ "kubernetes.jobmanager.service-account",
flinkCluster.getServiceAccount());
+ streamParkConf.put("kubernetes.namespace", k8sNamespace);
+ streamParkConf.put("rest.address", restAddress);
+ break;
+ default:
+ throw new IllegalArgumentException("Unsupported execution mode: " +
executionMode);
+ }
+
+ return sqlGateWayService.openSession(
+ new SessionEnvironment(
+ flinkGatewayId + flinkClusterId + UUID.randomUUID().toString(),
null, streamParkConf));
+ }
+
+ @Override
+ public void closeSession(Long flinkGatewayId, String sessionHandleUUIDStr) {
+ SqlGatewayService sqlGateWayService = getSqlGateWayService(flinkGatewayId);
+ sqlGateWayService.closeSession(new SessionHandle(sessionHandleUUIDStr));
+ }
+
+ @Override
+ public void cancelOperation(
+ Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
+ getSqlGateWayService(flinkGatewayId)
+ .cancelOperation(new SessionHandle(sessionHandleUUIDStr), new
OperationHandle(operationId));
+ }
+
+ @Override
+ public void closeOperation(Long flinkGatewayId, String sessionHandleUUIDStr,
String operationId) {
+ getSqlGateWayService(flinkGatewayId)
+ .closeOperation(new SessionHandle(sessionHandleUUIDStr), new
OperationHandle(operationId));
+ }
+
+ @Override
+ public OperationInfo getOperationInfo(
+ Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
+ return getSqlGateWayService(flinkGatewayId)
+ .getOperationInfo(
+ new SessionHandle(sessionHandleUUIDStr), new
OperationHandle(operationId));
+ }
+
+ @Override
+ public Column getOperationResultSchema(
+ Long flinkGatewayId, String sessionHandleUUIDStr, String operationId) {
+ return getSqlGateWayService(flinkGatewayId)
+ .getOperationResultSchema(
+ new SessionHandle(sessionHandleUUIDStr), new
OperationHandle(operationId));
+ }
+
+ @Override
+ public OperationHandle executeStatement(
+ Long flinkGatewayId, String sessionHandleUUIDStr, String statement) {
+ return getSqlGateWayService(flinkGatewayId)
+ .executeStatement(new SessionHandle(sessionHandleUUIDStr), statement,
10000L, null);
+ }
+
+ @Override
+ public ResultSet fetchResults(
+ Long flinkGatewayId,
+ String sessionHandleUUIDStr,
+ String operationId,
+ ResultQueryCondition resultQueryCondition) {
+ return getSqlGateWayService(flinkGatewayId)
+ .fetchResults(
+ new SessionHandle(sessionHandleUUIDStr),
+ new OperationHandle(operationId),
+ resultQueryCondition);
+ }
+
+ @Override
+ public void heartbeat(Long flinkGatewayId, String sessionHandle) {
+ getSqlGateWayService(flinkGatewayId).heartbeat(new
SessionHandle(sessionHandle));
+ }
+
+ @Override
+ public boolean check(Long flinkGatewayId, Long flinkClusterId) {
+ FlinkCluster flinkCluster = flinkClusterService.getById(flinkClusterId);
+ if (flinkCluster == null) {
+ throw new IllegalArgumentException("FlinkCluster not found");
+ }
+ FlinkEnv flinkEnv = flinkEnvService.getById(flinkCluster.getVersionId());
+ if (flinkEnv == null) {
+ throw new IllegalArgumentException("FlinkEnv not found");
+ }
+ return
getSqlGateWayService(flinkGatewayId).check(flinkEnv.getFlinkVersion().majorVersion());
+ }
+}
diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkGateWayMapper.xml
similarity index 50%
copy from streampark-flink/streampark-flink-sql-gateway/pom.xml
copy to
streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkGateWayMapper.xml
index fe1824b59..78a238092 100644
--- a/streampark-flink/streampark-flink-sql-gateway/pom.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkGateWayMapper.xml
@@ -15,20 +15,17 @@
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.streampark</groupId>
- <artifactId>streampark-flink</artifactId>
- <version>2.2.0-SNAPSHOT</version>
- </parent>
+<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
+<mapper
namespace="org.apache.streampark.console.core.mapper.FlinkGateWayMapper">
- <artifactId>streampark-flink-sql-gateway</artifactId>
- <name>StreamPark : SQL Gateway Parent</name>
- <packaging>pom</packaging>
+ <resultMap id="BaseResultMap"
type="org.apache.streampark.console.core.entity.FlinkGateWay">
+ <id column="id" jdbcType="BIGINT" property="id"/>
+ <result column="name" jdbcType="VARCHAR" property="name"/>
+ <result column="description" jdbcType="VARCHAR"
property="description"/>
+ <result column="gateway_type" jdbcType="BIGINT"
property="gatewayType"/>
+ <result column="address" jdbcType="VARCHAR" property="address"/>
+ <result column="create_time" jdbcType="TIMESTAMP"
property="createTime"/>
+ <result column="modify_time" jdbcType="TIMESTAMP"
property="modifyTime"/>
+ </resultMap>
- <modules>
- <module>streampark-flink-sql-gateway-base</module>
- </modules>
-
-</project>
+</mapper>
diff --git
a/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkGateway.ts
b/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkGateway.ts
new file mode 100644
index 000000000..08336796d
--- /dev/null
+++
b/streampark-console/streampark-console-webapp/src/api/flink/setting/flinkGateway.ts
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { defHttp } from '/@/utils/http/axios';
+import { Result } from '/#/axios';
+import { AxiosResponse } from 'axios';
+
+enum GATEWAY_API {
+ CREATE = '/flink/gateway/create',
+ UPDATE = '/flink/gateway/update',
+ LIST = '/flink/gateway/list',
+ DELETE = '/flink/gateway/delete',
+ GET = '/flink/gateway/get',
+ CHECK_NAME = '/flink/gateway/check/name',
+ CHECK_ADDRESS = '/flink/gateway/check/address',
+}
+
+/**
+ * fetch gateway list.
+ */
+export function fetchGatewayList() {
+ return defHttp.get({
+ url: GATEWAY_API.LIST,
+ });
+}
+
+/**
+ * fetch gateway remove result.
+ * @returns {Promise<AxiosResponse<Result>>}
+ */
+export function fetchGatewayDelete(id: string): Promise<AxiosResponse<Result>>
{
+ return defHttp.delete(
+ { url: GATEWAY_API.DELETE, data: { id } },
+ { isReturnNativeResponse: true },
+ );
+}
+
+export function fetchGatewayCreate(data: Recordable) {
+ return defHttp.postJson({
+ url: GATEWAY_API.CREATE,
+ data,
+ });
+}
+
+export function fetchGatewayUpdate(data: Recordable) {
+ return defHttp.postJson({
+ url: GATEWAY_API.UPDATE,
+ data,
+ });
+}
+
+export function fetchGatewayCheckName(name: string) {
+ return defHttp.get({
+ url: GATEWAY_API.CHECK_NAME,
+ params: { name },
+ });
+}
+
+export function fetchGatewayCheckAddress(address: string) {
+ return defHttp.get({
+ url: GATEWAY_API.CHECK_ADDRESS,
+ params: { address },
+ });
+}
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts
index 8d93eed0c..b161b7145 100644
--- a/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts
+++ b/streampark-console/streampark-console-webapp/src/locales/lang/en/menu.ts
@@ -36,5 +36,6 @@ export default {
flinkCluster: 'Flink Cluster',
externalLink: 'External Link',
yarnQueue: 'Yarn Queue',
+ flinkGateway: 'FLink Gateway',
},
};
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkGateway.ts
similarity index 50%
copy from
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to
streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkGateway.ts
index 1a359ee49..f7da8c073 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/en/setting/flinkGateway.ts
@@ -15,26 +15,27 @@
* limitations under the License.
*/
export default {
- menu: {
- system: '系统管理',
- userManagement: '用户管理',
- roleManagement: '角色管理',
- menuManagement: '菜单管理',
- tokenManagement: 'Token管理',
- teamManagement: '团队管理',
- memberManagement: '成员管理',
- project: '项目管理',
- application: '作业管理',
- variable: '变量管理',
- resource: '资源管理',
- setting: '设置中心',
+ tableTitle: 'Flink Gateway List',
+ createGateway: 'Create Flink Gateway',
+ success: 'Success',
+ gatewayType: 'Gateway Type',
+ gatewayAddress: 'Gateway Address',
+ modifyGateway: 'Edit Flink Gateway',
+ deleteGateway: 'Delete Flink Gateway',
+ deleteConfirm: 'Are you sure to delete this Flink Gateway ?',
+ name: 'Gateway Name',
+ placeholder: {
+ gatewayType: 'Please select gateway type',
},
- setting: {
- system: '系统设置',
- alarm: '告警设置',
- flinkHome: 'Flink 版本',
- flinkCluster: 'Flink集群',
- externalLink: '扩展链接',
- yarnQueue: 'Yarn 队列',
+ checkResult: {
+ emptyHint: 'Flink gateway name can not be empty.',
+ emptyType: 'Flink gateway type can not be empty.',
+ emptyAddress: 'Flink gateway address can not be empty.',
+ },
+ operation: {
+ updateSuccess: 'Update Flink gateway successfully.',
+ deleteSuccess: 'Delete Flink gateway successfully.',
+ createSuccess: 'Create Flink gateway successfully.',
+ deleteFailed: 'Delete Flink gateway failed.',
},
};
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
index 1a359ee49..330fe9f7d 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
@@ -20,7 +20,7 @@ export default {
userManagement: '用户管理',
roleManagement: '角色管理',
menuManagement: '菜单管理',
- tokenManagement: 'Token管理',
+ tokenManagement: 'Token 管理',
teamManagement: '团队管理',
memberManagement: '成员管理',
project: '项目管理',
@@ -33,8 +33,9 @@ export default {
system: '系统设置',
alarm: '告警设置',
flinkHome: 'Flink 版本',
- flinkCluster: 'Flink集群',
+ flinkCluster: 'Flink 集群',
externalLink: '扩展链接',
yarnQueue: 'Yarn 队列',
+ flinkGateway: 'FLink 网关',
},
};
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkGateway.ts
similarity index 51%
copy from
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkGateway.ts
index 1a359ee49..996ce72d1 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++
b/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/setting/flinkGateway.ts
@@ -15,26 +15,27 @@
* limitations under the License.
*/
export default {
- menu: {
- system: '系统管理',
- userManagement: '用户管理',
- roleManagement: '角色管理',
- menuManagement: '菜单管理',
- tokenManagement: 'Token管理',
- teamManagement: '团队管理',
- memberManagement: '成员管理',
- project: '项目管理',
- application: '作业管理',
- variable: '变量管理',
- resource: '资源管理',
- setting: '设置中心',
+ tableTitle: 'Flink 网关列表',
+ createGateway: '创建 Flink 网关',
+ success: '成功',
+ gatewayType: '网关类型',
+ gatewayAddress: '网关地址',
+ modifyGateway: '编辑 Flink 网关',
+ deleteGateway: '删除 Flink 网关',
+ deleteConfirm: '你确定要删除此 Flink 网关 吗?',
+ name: 'Flink 网关名称',
+ placeholder: {
+ gatewayType: '请选择 Flink 网关类型',
},
- setting: {
- system: '系统设置',
- alarm: '告警设置',
- flinkHome: 'Flink 版本',
- flinkCluster: 'Flink集群',
- externalLink: '扩展链接',
- yarnQueue: 'Yarn 队列',
+ checkResult: {
+ emptyHint: 'Flink 网关名称不得为空',
+ emptyType: 'Flink 网关类型不得为空',
+ emptyAddress: 'Flink 网关地址不得为空',
+ },
+ operation: {
+ updateSuccess: '更新 Flink 网关 成功',
+ deleteSuccess: '删除 Flink 网关 成功',
+ createSuccess: '创建 Flink 网关 成功',
+ deleteFailed: '删除 Flink 网关 失败',
},
};
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/components/FlinkGatewayDrawer.vue
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/components/FlinkGatewayDrawer.vue
new file mode 100644
index 000000000..6f55aa540
--- /dev/null
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/components/FlinkGatewayDrawer.vue
@@ -0,0 +1,87 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<template>
+ <BasicDrawer
+ :okText="t('common.submitText')"
+ @register="registerDrawer"
+ showFooter
+ :title="getTitle"
+ width="40%"
+ @ok="handleSubmit"
+ >
+ <BasicForm @register="registerForm" />
+ </BasicDrawer>
+</template>
+<script lang="ts">
+ import { defineComponent, ref, computed, unref } from 'vue';
+ import { BasicForm, useForm } from '/@/components/Form';
+ import { formSchema } from '../index.data';
+ import { BasicDrawer, useDrawerInner } from '/@/components/Drawer';
+
+ import { fetchGatewayCreate } from '/@/api/flink/setting/flinkGateway';
+ import { useI18n } from '/@/hooks/web/useI18n';
+
+ export default defineComponent({
+ name: 'TokenDrawer',
+ components: { BasicDrawer, BasicForm },
+ emits: ['success', 'register'],
+ setup(_, { emit }) {
+ const isUpdate = ref(true);
+ const { t } = useI18n();
+
+ const [registerForm, { resetFields, setFieldsValue, validate }] =
useForm({
+ labelWidth: 120,
+ colon: true,
+ schemas: formSchema,
+ showActionButtonGroup: false,
+ baseColProps: { lg: 22, md: 22 },
+ });
+
+ const [registerDrawer, { setDrawerProps, closeDrawer }] =
useDrawerInner(async (data) => {
+ resetFields();
+ setDrawerProps({ confirmLoading: false });
+ isUpdate.value = !!data?.isUpdate;
+
+ if (unref(isUpdate)) {
+ setFieldsValue({
+ ...data.record,
+ });
+ }
+ });
+
+ const getTitle = computed(() =>
+ !unref(isUpdate)
+ ? t('setting.flinkGateway.createGateway')
+ : t('setting.flinkGateway.modifyGateway'),
+ );
+
+ async function handleSubmit() {
+ try {
+ const values = await validate();
+ setDrawerProps({ confirmLoading: true });
+ const res = await fetchGatewayCreate(values);
+ closeDrawer();
+ emit('success', { isUpdate: unref(isUpdate), values: res });
+ } finally {
+ setDrawerProps({ confirmLoading: false });
+ }
+ }
+
+ return { t, registerDrawer, registerForm, getTitle, handleSubmit };
+ },
+ });
+</script>
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.data.ts
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.data.ts
new file mode 100644
index 000000000..6c70a9241
--- /dev/null
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.data.ts
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import { BasicColumn, FormSchema } from '/@/components/Table';
+import { useI18n } from '/@/hooks/web/useI18n';
+const { t } = useI18n();
+
+export const searchFormSchema: FormSchema[] = [
+ {
+ field: 'user',
+ label: t('system.token.table.userName'),
+ component: 'Input',
+ colProps: { span: 8 },
+ },
+];
+
+export const formSchema: FormSchema[] = [
+ {
+ field: 'gatewayName',
+ label: t('setting.flinkGateway.name'),
+ component: 'Input',
+ rules: [{ required: true, message:
t('setting.flinkGateway.checkResult.emptyHint') }],
+ },
+ {
+ field: 'address',
+ label: t('setting.flinkGateway.gatewayAddress'),
+ component: 'Input',
+ rules: [{ required: true, message:
t('setting.flinkGateway.checkResult.emptyAddress') }],
+ },
+ {
+ field: 'description',
+ label: t('common.description'),
+ component: 'InputTextArea',
+ },
+];
+
+export const columns: BasicColumn[] = [
+ {
+ title: 'id',
+ dataIndex: 'id',
+ ifShow: false,
+ },
+ {
+ title: t('setting.flinkGateway.name'),
+ dataIndex: 'gatewayName',
+ sorter: true,
+ },
+ {
+ title: t('setting.flinkGateway.gatewayType'),
+ dataIndex: 'gatewayType',
+ sorter: true,
+ },
+ {
+ title: t('setting.flinkGateway.gatewayAddress'),
+ dataIndex: 'address',
+ sorter: true,
+ },
+ {
+ title: t('common.description'),
+ dataIndex: 'description',
+ ellipsis: true,
+ width: 350,
+ },
+ {
+ title: t('common.createTime'),
+ dataIndex: 'createTime',
+ sorter: true,
+ },
+ {
+ title: t('common.modifyTime'),
+ dataIndex: 'modifyTime',
+ sorter: true,
+ },
+];
diff --git
a/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.vue
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.vue
new file mode 100644
index 000000000..f0b582ccf
--- /dev/null
+++
b/streampark-console/streampark-console-webapp/src/views/setting/FlinkGateway/index.vue
@@ -0,0 +1,136 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<template>
+ <PageWrapper>
+ <BasicTable @register="registerTable">
+ <template #toolbar>
+ <a-button type="primary" @click="handleCreate" v-auth="'gateway:add'">
+ <Icon icon="ant-design:plus-outlined" />
+ {{ t('common.add') }}
+ </a-button>
+ </template>
+ <template #bodyCell="{ column, record }">
+ <template v-if="column.dataIndex === 'action'">
+ <TableAction
+ :actions="[
+ {
+ icon: 'ant-design:delete-outlined',
+ color: 'error',
+ auth: 'gateway:delete',
+ tooltip: t('setting.flinkGateway.deleteGateway'),
+ popConfirm: {
+ title: t('setting.flinkGateway.operation.deleteConfirm'),
+ confirm: handleDelete.bind(null, record),
+ },
+ },
+ ]"
+ />
+ </template>
+ </template>
+ </BasicTable>
+ <FlinkGatewayDrawer @register="registerDrawer" @success="handleSuccess" />
+ </PageWrapper>
+</template>
+<script lang="ts">
+ import { defineComponent, unref } from 'vue';
+
+ import { BasicTable, useTable, TableAction } from '/@/components/Table';
+ import FlinkGatewayDrawer from './components/FlinkGatewayDrawer.vue';
+ import { useDrawer } from '/@/components/Drawer';
+ import { fetchGatewayDelete, fetchGatewayList } from
'/@/api/flink/setting/flinkGateway';
+ import { columns, searchFormSchema } from './index.data';
+ import { useMessage } from '/@/hooks/web/useMessage';
+ import { useI18n } from '/@/hooks/web/useI18n';
+ import Icon from '/@/components/Icon';
+ import { PageWrapper } from '/@/components/Page';
+ export default defineComponent({
+ name: 'FlinkGateway',
+ components: {
+ BasicTable,
+ FlinkGatewayDrawer: FlinkGatewayDrawer,
+ TableAction,
+ Icon,
+ PageWrapper,
+ },
+ setup() {
+ const { t } = useI18n();
+ const { createMessage } = useMessage();
+ const [registerDrawer, { openDrawer }] = useDrawer();
+ const [registerTable, { reload, updateTableDataRecord }] = useTable({
+ title: t('setting.flinkGateway.tableTitle'),
+ api: fetchGatewayList,
+ // beforeFetch: (params) => {
+ // if (params.user) {
+ // params.username = params.user;
+ // delete params.user;
+ // }
+ // return params;
+ // },
+ columns,
+ formConfig: {
+ baseColProps: { style: { paddingRight: '30px' } },
+ schemas: searchFormSchema,
+ },
+ useSearchForm: false,
+ showTableSetting: true,
+ rowKey: 'id',
+ showIndexColumn: false,
+ canResize: false,
+ actionColumn: {
+ width: 200,
+ title: t('component.table.operation'),
+ dataIndex: 'action',
+ },
+ });
+
+ function handleCreate() {
+ openDrawer(true, {
+ isUpdate: false,
+ });
+ }
+
+ async function handleDelete(record: Recordable) {
+ const res = await fetchGatewayDelete(record.id);
+ if (res) {
+
createMessage.success(t('setting.flinkGateway.operation.deleteSuccess'));
+ reload();
+ } else {
+
createMessage.success(t('setting.flinkGateway.operation.deleteFailed'));
+ }
+ }
+
+ function handleSuccess({ isUpdate, values }) {
+ if (isUpdate) {
+
createMessage.success(t('setting.flinkGateway.operation.updateSuccess'));
+ updateTableDataRecord(values.id, values);
+ } else {
+
createMessage.success(t('setting.flinkGateway.operation.createSuccess'));
+ reload();
+ }
+ }
+
+ return {
+ t,
+ registerTable,
+ registerDrawer,
+ handleCreate,
+ handleDelete,
+ handleSuccess,
+ };
+ },
+ });
+</script>
diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml
b/streampark-flink/streampark-flink-sql-gateway/pom.xml
index fe1824b59..20c14fd7b 100644
--- a/streampark-flink/streampark-flink-sql-gateway/pom.xml
+++ b/streampark-flink/streampark-flink-sql-gateway/pom.xml
@@ -29,6 +29,8 @@
<modules>
<module>streampark-flink-sql-gateway-base</module>
+ <module>streampark-flink-sql-gateway-flink-v1</module>
+ <module>streampark-flink-sql-gateway-kyuubi</module>
</modules>
</project>
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
index a8be0eaf4..90b77c723 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/OperationHandle.java
@@ -19,18 +19,17 @@ package org.apache.streampark.gateway;
import java.io.Serializable;
import java.util.Objects;
-import java.util.UUID;
/** {@link OperationHandle} to index the {@code Operation}. */
public class OperationHandle implements Serializable {
- private final UUID identifier;
+ private final String identifier;
- public OperationHandle(UUID identifier) {
+ public OperationHandle(String identifier) {
this.identifier = identifier;
}
- public UUID getIdentifier() {
+ public String getIdentifier() {
return identifier;
}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
index d737bbb0a..b355ac9dc 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/service/SqlGatewayService.java
@@ -32,6 +32,18 @@ import org.apache.streampark.gateway.session.SessionHandle;
/** A service of SQL gateway is responsible for handling requests from
streampark console. */
public interface SqlGatewayService {
+ //
-------------------------------------------------------------------------------------------
+ // Validate API
+ //
-------------------------------------------------------------------------------------------
+
+ /**
+ * Check if the SQL gateway is available with the given flink major version.
+ *
+ * @param flinkMajorVersion flink major version
+ * @return true if the SQL gateway is available with the given flink major
version.
+ */
+ boolean check(String flinkMajorVersion);
+
//
-------------------------------------------------------------------------------------------
// Info API
//
-------------------------------------------------------------------------------------------
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
index b57d1ac3e..d530d68c3 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/session/SessionHandle.java
@@ -18,22 +18,17 @@
package org.apache.streampark.gateway.session;
import java.util.Objects;
-import java.util.UUID;
/** Session Handle that used to identify the Session. */
public class SessionHandle {
- private final UUID identifier;
+ private final String identifier;
- public static SessionHandle create() {
- return new SessionHandle(UUID.randomUUID());
- }
-
- public SessionHandle(UUID identifier) {
+ public SessionHandle(String identifier) {
this.identifier = identifier;
}
- public UUID getIdentifier() {
+ public String getIdentifier() {
return identifier;
}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
index fcc6303f5..a9242597d 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/FakeSqlGatewayService.java
@@ -36,6 +36,11 @@ public class FakeSqlGatewayService implements
SqlGatewayService {
private FakeSqlGatewayService() {}
+ @Override
+ public boolean check(String flinkMajorVersion) {
+ return true;
+ }
+
@Override
public GatewayInfo getGatewayInfo() throws SqlGatewayException {
throw new UnsupportedOperationException();
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
index e1f7be03f..defdea367 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/test/java/org/apache/streampark/gateway/utils/MockedSqlGatewayService.java
@@ -45,6 +45,11 @@ public class MockedSqlGatewayService implements
SqlGatewayService {
this.description = description;
}
+ @Override
+ public boolean check(String flinkMajorVersion) {
+ return !Objects.equals(flinkMajorVersion, "1.11");
+ }
+
@Override
public GatewayInfo getGatewayInfo() throws SqlGatewayException {
throw new UnsupportedOperationException();
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml
new file mode 100644
index 000000000..b437f577d
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/pom.xml
@@ -0,0 +1,196 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+
+<!--
+ ~ Licensed to the Apache Software Foundation (ASF) under one or more
+ ~ contributor license agreements. See the NOTICE file distributed with
+ ~ this work for additional information regarding copyright ownership.
+ ~ The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ (the "License"); you may not use this file except in compliance with
+ ~ the License. You may obtain a copy of the License at
+ ~
+ ~ http://www.apache.org/licenses/LICENSE-2.0
+ ~
+ ~ Unless required by applicable law or agreed to in writing, software
+ ~ distributed under the License is distributed on an "AS IS" BASIS,
+ ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ See the License for the specific language governing permissions and
+ ~ limitations under the License.
+ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-sql-gateway</artifactId>
+ <version>2.2.0-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>streampark-flink-sql-gateway-flink-v1</artifactId>
+ <name>StreamPark : Flink SQL Gateway 1.16</name>
+
+ <properties>
+ <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
+ <gson-fire-version>1.8.5</gson-fire-version>
+ <swagger-core-version>1.6.5</swagger-core-version>
+ <okhttp-version>4.10.0</okhttp-version>
+ <gson-version>2.9.1</gson-version>
+ <commons-lang3-version>3.12.0</commons-lang3-version>
+
<jackson-databind-nullable-version>0.2.4</jackson-databind-nullable-version>
+ <jakarta-annotation-version>1.3.5</jakarta-annotation-version>
+ <junit-version>5.9.1</junit-version>
+ <junit-platform-runner.version>1.9.1</junit-platform-runner.version>
+ <mockito-core-version>3.12.4</mockito-core-version>
+ <javax.ws.rs-api-version>2.1.1</javax.ws.rs-api-version>
+ <jsr311-api-version>1.1.1</jsr311-api-version>
+ </properties>
+
+ <dependencies>
+ <!-- Gateway base -->
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-sql-gateway-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- Flink API -->
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-annotations</artifactId>
+ <version>${swagger-core-version}</version>
+ </dependency>
+ <!-- @Nullable annotation -->
+ <dependency>
+ <groupId>com.google.code.findbugs</groupId>
+ <artifactId>jsr305</artifactId>
+ <version>3.0.2</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>okhttp</artifactId>
+ <version>${okhttp-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.squareup.okhttp3</groupId>
+ <artifactId>logging-interceptor</artifactId>
+ <version>${okhttp-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.code.gson</groupId>
+ <artifactId>gson</artifactId>
+ <version>${gson-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>io.gsonfire</groupId>
+ <artifactId>gson-fire</artifactId>
+ <version>${gson-fire-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.commons</groupId>
+ <artifactId>commons-lang3</artifactId>
+ <version>${commons-lang3-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>jakarta.annotation</groupId>
+ <artifactId>jakarta.annotation-api</artifactId>
+ <version>${jakarta-annotation-version}</version>
+ <scope>provided</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.openapitools</groupId>
+ <artifactId>jackson-databind-nullable</artifactId>
+ <version>${jackson-databind-nullable-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>jsr311-api</artifactId>
+ <version>${jsr311-api-version}</version>
+ </dependency>
+ <dependency>
+ <groupId>javax.ws.rs</groupId>
+ <artifactId>javax.ws.rs-api</artifactId>
+ <version>${javax.ws.rs-api-version}</version>
+ </dependency>
+ <!-- test dependencies -->
+ <dependency>
+ <groupId>org.junit.platform</groupId>
+ <artifactId>junit-platform-runner</artifactId>
+ <version>${junit-platform-runner.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.mockito</groupId>
+ <artifactId>mockito-core</artifactId>
+ <version>${mockito-core-version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.openapitools</groupId>
+ <artifactId>openapi-generator-maven-plugin</artifactId>
+ <version>6.5.0</version>
+ <executions>
+ <execution>
+ <goals>
+ <goal>generate</goal>
+ </goals>
+ <configuration>
+
<inputSpec>${project.basedir}/src/main/resources/flink_sql_gateway_rest_v1.yml</inputSpec>
+ <generatorName>java</generatorName>
+ <!--
+ When use jdk11+, use the following line to
generate native java code.
+ <library>native</library>
+ -->
+
<apiPackage>org.apache.streampark.gateway.flink.client.rest.v1</apiPackage>
+
<invokerPackage>org.apache.streampark.gateway.flink.client.rest</invokerPackage>
+
<modelPackage>org.apache.streampark.gateway.flink.client.dto</modelPackage>
+ <generateApiTests>false</generateApiTests>
+ <generateModelTests>false</generateModelTests>
+ <configOptions>
+ <sourceFolder>src/gen/java/main</sourceFolder>
+ </configOptions>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java
new file mode 100644
index 000000000..263cb73ab
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayImpl.java
@@ -0,0 +1,239 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.flink;
+
+import org.apache.streampark.gateway.ExecutionConfiguration;
+import org.apache.streampark.gateway.OperationHandle;
+import org.apache.streampark.gateway.OperationStatus;
+import org.apache.streampark.gateway.exception.SqlGatewayException;
+import
org.apache.streampark.gateway.flink.client.dto.ExecuteStatementRequestBody;
+import org.apache.streampark.gateway.flink.client.dto.FetchResultsResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.GetInfoResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody;
+import
org.apache.streampark.gateway.flink.client.dto.OperationStatusResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.ResultSetColumnsInner;
+import org.apache.streampark.gateway.flink.client.dto.ResultSetDataInner;
+import org.apache.streampark.gateway.flink.client.rest.ApiClient;
+import org.apache.streampark.gateway.flink.client.rest.ApiException;
+import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi;
+import org.apache.streampark.gateway.results.Column;
+import org.apache.streampark.gateway.results.GatewayInfo;
+import org.apache.streampark.gateway.results.OperationInfo;
+import org.apache.streampark.gateway.results.ResultKind;
+import org.apache.streampark.gateway.results.ResultQueryCondition;
+import org.apache.streampark.gateway.results.ResultSet;
+import org.apache.streampark.gateway.results.RowData;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+import org.apache.streampark.gateway.session.SessionEnvironment;
+import org.apache.streampark.gateway.session.SessionHandle;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Objects;
+import java.util.UUID;
+
+/** Implement {@link SqlGatewayService} with Flink native SqlGateway. */
+public class FlinkSqlGatewayImpl implements SqlGatewayService {
+
+ private final DefaultApi defaultApi;
+
+ public FlinkSqlGatewayImpl(String baseUri) {
+ ApiClient client = new ApiClient();
+ client.setBasePath(baseUri);
+ defaultApi = new DefaultApi(client);
+ }
+
+ @Override
+ public boolean check(String flinkMajorVersion) {
+ // flink gateway v1 api is supported from flink 1.16
+ return Double.parseDouble(flinkMajorVersion) >= 1.16;
+ }
+
+ @Override
+ public GatewayInfo getGatewayInfo() throws SqlGatewayException {
+ GetInfoResponseBody info = null;
+ try {
+ info = defaultApi.getInfo();
+ return new GatewayInfo(info.getProductName(), info.getVersion());
+ } catch (ApiException e) {
+ throw new SqlGatewayException("Flink native SqlGateWay getGatewayInfo
failed!", e);
+ }
+ }
+
+ @Override
+ public SessionHandle openSession(SessionEnvironment environment) throws
SqlGatewayException {
+ try {
+ return new SessionHandle(
+ Objects.requireNonNull(
+ defaultApi
+ .openSession(
+ new OpenSessionRequestBody()
+ .sessionName(environment.getSessionName())
+ .properties(environment.getSessionConfig()))
+ .getSessionHandle()));
+ } catch (ApiException e) {
+ throw new SqlGatewayException("Flink native SqlGateWay openSession
failed!", e);
+ }
+ }
+
+ @Override
+ public void heartbeat(SessionHandle sessionHandle) throws
SqlGatewayException {
+ try {
+ defaultApi.triggerSession(
+ new org.apache.streampark.gateway.flink.client.dto.SessionHandle()
+ .identifier(UUID.fromString(sessionHandle.getIdentifier())));
+ } catch (ApiException e) {
+ throw new SqlGatewayException("Flink native SqlGateWay heartbeat
failed!", e);
+ }
+ }
+
+ @Override
+ public void closeSession(SessionHandle sessionHandle) throws
SqlGatewayException {
+ try {
+ defaultApi.closeSession(UUID.fromString(sessionHandle.getIdentifier()));
+ } catch (ApiException e) {
+ throw new SqlGatewayException("Flink native SqlGateWay closeSession
failed!", e);
+ }
+ }
+
+ @Override
+ public void cancelOperation(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException {
+ try {
+ defaultApi.cancelOperation(
+ new org.apache.streampark.gateway.flink.client.dto.SessionHandle()
+ .identifier(UUID.fromString(sessionHandle.getIdentifier())),
+ new org.apache.streampark.gateway.flink.client.dto.OperationHandle()
+ .identifier(UUID.fromString(operationHandle.getIdentifier())));
+ } catch (ApiException e) {
+ throw new SqlGatewayException("Flink native SqlGateWay cancelOperation
failed!", e);
+ }
+ }
+
+ @Override
+ public void closeOperation(SessionHandle sessionHandle, OperationHandle
operationHandle)
+ throws SqlGatewayException {
+ try {
+ defaultApi.closeOperation(
+ UUID.fromString(sessionHandle.getIdentifier()),
+ UUID.fromString(operationHandle.getIdentifier()));
+ } catch (ApiException e) {
+ throw new SqlGatewayException("Flink native SqlGateWay closeOperation
failed!", e);
+ }
+ }
+
+ @Override
+ public OperationInfo getOperationInfo(
+ SessionHandle sessionHandle, OperationHandle operationHandle) throws
SqlGatewayException {
+
+ try {
+ OperationStatusResponseBody operationStatus =
+ defaultApi.getOperationStatus(
+ UUID.fromString(sessionHandle.getIdentifier()),
+ UUID.fromString(operationHandle.getIdentifier()));
+ return new
OperationInfo(OperationStatus.valueOf(operationStatus.getStatus()), null);
+ } catch (ApiException e) {
+ throw new SqlGatewayException("Flink native SqlGateWay closeOperation
failed!", e);
+ }
+ }
+
+ @Override
+ public Column getOperationResultSchema(
+ SessionHandle sessionHandle, OperationHandle operationHandle) throws
SqlGatewayException {
+ throw new SqlGatewayException(
+ "Flink native SqlGateWay don`t support
operation:getOperationResultSchema!");
+ }
+
+ @Override
+ public OperationHandle executeStatement(
+ SessionHandle sessionHandle,
+ String statement,
+ long executionTimeoutMs,
+ ExecutionConfiguration executionConfig)
+ throws SqlGatewayException {
+ try {
+ return new OperationHandle(
+ Objects.requireNonNull(
+ defaultApi
+ .executeStatement(
+ UUID.fromString(sessionHandle.getIdentifier()),
+ new ExecuteStatementRequestBody()
+ .statement(statement)
+ // currently, sql gateway don't support execution
timeout
+ // .executionTimeout(executionTimeoutMs)
+ .executionConfig(null))
+ .getOperationHandle()));
+ } catch (ApiException e) {
+ throw new SqlGatewayException("Flink native SqlGateWay executeStatement
failed!", e);
+ }
+ }
+
+ @Override
+ public ResultSet fetchResults(
+ SessionHandle sessionHandle,
+ OperationHandle operationHandle,
+ ResultQueryCondition resultQueryCondition)
+ throws SqlGatewayException {
+ try {
+
+ List<RowData> data = new ArrayList<>();
+ List<Column> columns = new ArrayList<>();
+ FetchResultsResponseBody fetchResultsResponseBody =
+ defaultApi.fetchResults(
+ UUID.fromString(sessionHandle.getIdentifier()),
+ UUID.fromString(operationHandle.getIdentifier()),
+ resultQueryCondition.getToken());
+ String resultTypeStr = fetchResultsResponseBody.getResultType();
+ Long nextToken = null;
+ if (fetchResultsResponseBody.getNextResultUri() != null) {
+ String nextResultUri = fetchResultsResponseBody.getNextResultUri();
+ nextToken =
Long.valueOf(nextResultUri.substring(nextResultUri.lastIndexOf("/") + 1));
+ }
+
+ org.apache.streampark.gateway.flink.client.dto.ResultSet results =
+ fetchResultsResponseBody.getResults();
+
+ List<ResultSetColumnsInner> resultsColumns = results.getColumns();
+ List<ResultSetDataInner> resultsData = results.getData();
+
+ resultsColumns.forEach(
+ column ->
+ columns.add(
+ new Column(
+ column.getName(), column.getLogicalType().toJson(),
column.getComment())));
+
+ resultsData.forEach(row -> data.add(new
RowData(row.getKind().getValue(), row.getFields())));
+
+ ResultKind resultKind =
+ columns.size() == 1 && columns.get(0).getName().equals("result")
+ ? ResultKind.SUCCESS
+ : ResultKind.SUCCESS_WITH_CONTENT;
+
+ return new ResultSet(
+ ResultSet.ResultType.valueOf(resultTypeStr),
+ nextToken,
+ columns,
+ data,
+ true,
+ null,
+ resultKind);
+ } catch (ApiException e) {
+ throw new SqlGatewayException("Flink native SqlGateWay fetchResults
failed!", e);
+ }
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
new file mode 100644
index 000000000..348c73845
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayServiceFactory.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.flink;
+
+import org.apache.streampark.gateway.ConfigOption;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactory;
+import org.apache.streampark.gateway.factories.SqlGatewayServiceFactoryUtils;
+import org.apache.streampark.gateway.service.SqlGatewayService;
+
+import java.util.Collections;
+import java.util.HashSet;
+import java.util.Set;
+
+/** Flink sql gateway's Factory for {@link SqlGatewayService}. */
+public class FlinkSqlGatewayServiceFactory implements SqlGatewayServiceFactory
{
+
+ public static final ConfigOption<String> BASE_URI =
+ ConfigOption.key("base-uri")
+ .stringType()
+ .noDefaultValue()
+ .withDescription("The base uri of the flink cluster.");
+
+ @Override
+ public String factoryIdentifier() {
+ return "flink-v1";
+ }
+
+ @Override
+ public Set<ConfigOption<?>> requiredOptions() {
+ Set<ConfigOption<?>> options = new HashSet<>();
+ options.add(BASE_URI);
+ return options;
+ }
+
+ @Override
+ public Set<ConfigOption<?>> optionalOptions() {
+ return Collections.emptySet();
+ }
+
+ @Override
+ public SqlGatewayService createSqlGatewayService(Context context) {
+ SqlGatewayServiceFactoryUtils.EndpointFactoryHelper helper =
+ SqlGatewayServiceFactoryUtils.createEndpointFactoryHelper(this,
context);
+ helper.validate();
+ String baseUri = context.getGateWayServiceOptions().get(BASE_URI.getKey());
+ return new FlinkSqlGatewayImpl(baseUri);
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
new file mode 100644
index 000000000..bbefc2c25
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/META-INF/services/org.apache.streampark.gateway.factories.Factory
@@ -0,0 +1,19 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+org.apache.streampark.gateway.flink.FlinkSqlGatewayServiceFactory
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml
new file mode 100644
index 000000000..3b9614a43
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/main/resources/flink_sql_gateway_rest_v1.yml
@@ -0,0 +1,573 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+openapi: 3.0.1
+info:
+ title: Flink SQL Gateway REST API
+ contact:
+ email: [email protected]
+ license:
+ name: Apache 2.0
+ url: https://www.apache.org/licenses/LICENSE-2.0.html
+ version: v1/1.16
+paths:
+ /api_versions:
+ get:
+ description: Get the current available versions for the Rest Endpoint.
The client
+ can choose one of the return version as the protocol for later
communicate.
+ operationId: getApiVersion
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GetApiVersionResponseBody'
+ /info:
+ get:
+ description: Get meta data for this cluster.
+ operationId: getInfo
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GetInfoResponseBody'
+ /sessions:
+ post:
+ description: Opens a new session with specific properties. Specific
properties
+ can be given for current session which will override the default
properties
+ of gateway.
+ operationId: openSession
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/OpenSessionRequestBody'
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/OpenSessionResponseBody'
+ /sessions/{session_handle}:
+ get:
+ description: Get the session configuration.
+ operationId: getSessionConfig
+ parameters:
+ - name: session_handle
+ in: path
+ description: The SessionHandle that identifies a session.
+ required: true
+ schema:
+ type: string
+ format: uuid
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/GetSessionConfigResponseBody'
+ delete:
+ description: Closes the specific session.
+ operationId: closeSession
+ parameters:
+ - name: session_handle
+ in: path
+ description: The SessionHandle that identifies a session.
+ required: true
+ schema:
+ type: string
+ format: uuid
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/CloseSessionResponseBody'
+ /sessions/{session_handle}/heartbeat:
+ post:
+ description: "Trigger heartbeat to tell the server that the client is
active,\
+ \ and to keep the session alive as long as configured timeout value."
+ operationId: triggerSession
+ parameters:
+ - name: session_handle
+ in: path
+ description: The SessionHandle that identifies a session.
+ required: true
+ schema:
+ $ref: '#/components/schemas/SessionHandle'
+ responses:
+ "200":
+ description: The request was successful.
+ /sessions/{session_handle}/operations/{operation_handle}/cancel:
+ post:
+ description: Cancel the operation.
+ operationId: cancelOperation
+ parameters:
+ - name: session_handle
+ in: path
+ description: The SessionHandle that identifies a session.
+ required: true
+ schema:
+ $ref: '#/components/schemas/SessionHandle'
+ - name: operation_handle
+ in: path
+ description: The OperationHandle that identifies a operation.
+ required: true
+ schema:
+ $ref: '#/components/schemas/OperationHandle'
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/OperationStatusResponseBody'
+ /sessions/{session_handle}/operations/{operation_handle}/close:
+ delete:
+ description: Close the operation.
+ operationId: closeOperation
+ parameters:
+ - name: session_handle
+ in: path
+ description: The SessionHandle that identifies a session.
+ required: true
+ schema:
+ type: string
+ format: uuid
+ - name: operation_handle
+ in: path
+ description: The OperationHandle that identifies a operation.
+ required: true
+ schema:
+ type: string
+ format: uuid
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/OperationStatusResponseBody'
+ /sessions/{session_handle}/operations/{operation_handle}/result/{token}:
+ get:
+ description: Fetch results of Operation.
+ operationId: fetchResults
+ parameters:
+ - name: session_handle
+ in: path
+ description: The SessionHandle that identifies a session.
+ required: true
+ schema:
+ type: string
+ format: uuid
+ - name: operation_handle
+ in: path
+ description: The OperationHandle that identifies a operation.
+ required: true
+ schema:
+ type: string
+ format: uuid
+ - name: token
+ in: path
+ description: The OperationHandle that identifies a operation.
+ required: true
+ schema:
+ type: integer
+ format: int64
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/FetchResultsResponseBody'
+ /sessions/{session_handle}/operations/{operation_handle}/status:
+ get:
+ description: Get the status of operation.
+ operationId: getOperationStatus
+ parameters:
+ - name: session_handle
+ in: path
+ description: The SessionHandle that identifies a session.
+ required: true
+ schema:
+ type: string
+ format: uuid
+ - name: operation_handle
+ in: path
+ description: The OperationHandle that identifies a operation.
+ required: true
+ schema:
+ type: string
+ format: uuid
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/OperationStatusResponseBody'
+ /sessions/{session_handle}/statements:
+ post:
+ description: Execute a statement.
+ operationId: executeStatement
+ parameters:
+ - name: session_handle
+ in: path
+ description: The SessionHandle that identifies a session.
+ required: true
+ schema:
+ type: string
+ format: uuid
+ requestBody:
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecuteStatementRequestBody'
+ responses:
+ "200":
+ description: The request was successful.
+ content:
+ application/json:
+ schema:
+ $ref: '#/components/schemas/ExecuteStatementResponseBody'
+components:
+ schemas:
+ CloseSessionResponseBody:
+ type: object
+ properties:
+ status:
+ type: string
+ WatermarkSpec:
+ type: object
+ properties:
+ rowtimeAttribute:
+ type: string
+ watermarkExpression:
+ $ref: '#/components/schemas/ResolvedExpression'
+ GetInfoResponseBody:
+ type: object
+ properties:
+ productName:
+ type: string
+ version:
+ type: string
+ JobVertexID:
+ pattern: "[0-9a-f]{32}"
+ type: string
+ ResolvedExpression:
+ type: object
+ properties:
+ outputDataType:
+ $ref: '#/components/schemas/DataType'
+ resolvedChildren:
+ type: array
+ items:
+ $ref: '#/components/schemas/ResolvedExpression'
+ children:
+ type: array
+ items:
+ $ref: '#/components/schemas/Expression'
+ LogicalType:
+ type: object
+ properties:
+ typeRoot:
+ $ref: '#/components/schemas/LogicalTypeRoot'
+ children:
+ type: array
+ items:
+ $ref: '#/components/schemas/LogicalType'
+ nullable:
+ type: boolean
+ UniqueConstraint:
+ type: object
+ properties:
+ name:
+ type: string
+ enforced:
+ type: boolean
+ columns:
+ type: array
+ items:
+ type: string
+ type:
+ $ref: '#/components/schemas/ConstraintType'
+ ExecuteStatementRequestBody:
+ type: object
+ properties:
+ statement:
+ type: string
+ executionTimeout:
+ type: integer
+ format: int64
+ executionConfig:
+ type: object
+ additionalProperties:
+ type: string
+ LogicalTypeRoot:
+ type: string
+ enum:
+ - CHAR
+ - VARCHAR
+ - BOOLEAN
+ - BINARY
+ - VARBINARY
+ - DECIMAL
+ - TINYINT
+ - SMALLINT
+ - INTEGER
+ - BIGINT
+ - FLOAT
+ - DOUBLE
+ - DATE
+ - TIME_WITHOUT_TIME_ZONE
+ - TIMESTAMP_WITHOUT_TIME_ZONE
+ - TIMESTAMP_WITH_TIME_ZONE
+ - TIMESTAMP_WITH_LOCAL_TIME_ZONE
+ - INTERVAL_YEAR_MONTH
+ - INTERVAL_DAY_TIME
+ - ARRAY
+ - MULTISET
+ - MAP
+ - ROW
+ - DISTINCT_TYPE
+ - STRUCTURED_TYPE
+ - "NULL"
+ - RAW
+ - SYMBOL
+ - UNRESOLVED
+ OperationHandle:
+ type: object
+ properties:
+ identifier:
+ type: string
+ format: uuid
+ ExecuteStatementResponseBody:
+ type: object
+ properties:
+ operationHandle:
+ type: string
+ FetchResultsResponseBody:
+ type: object
+ properties:
+ results:
+ $ref: '#/components/schemas/ResultSet'
+ resultType:
+ type: string
+ nextResultUri:
+ type: string
+ RowData:
+ type: object
+ properties:
+ arity:
+ type: integer
+ format: int32
+ rowKind:
+ $ref: '#/components/schemas/RowKind'
+ Column:
+ type: object
+ properties:
+ name:
+ type: string
+ dataType:
+ $ref: '#/components/schemas/DataType'
+ comment:
+ type: string
+ physical:
+ type: boolean
+ persisted:
+ type: boolean
+ TriggerId:
+ pattern: "[0-9a-f]{32}"
+ type: string
+ ResourceID:
+ pattern: "[0-9a-f]{32}"
+ type: string
+ OpenSessionRequestBody:
+ type: object
+ properties:
+ sessionName:
+ type: string
+ properties:
+ type: object
+ additionalProperties:
+ type: string
+ ResultSet:
+ type: object
+ properties:
+ columns:
+ type: array
+ items:
+ properties:
+ name:
+ type: string
+ logicalType:
+ type: object
+ properties:
+ type:
+ $ref: '#/components/schemas/LogicalTypeRoot'
+ fields:
+ type: array
+ items:
+ properties:
+ name:
+ type: string
+ fieldType:
+ type: object
+ properties:
+ type:
+ $ref: '#/components/schemas/LogicalTypeRoot'
+ nullable:
+ type: boolean
+ length:
+ type: integer
+ format: int32
+ precision:
+ type: integer
+ format: int32
+ scale:
+ type: integer
+ format: int32
+ nullable:
+ type: boolean
+ length:
+ type: integer
+ format: int32
+ precision:
+ type: integer
+ format: int32
+ scale:
+ type: integer
+ format: int32
+ comment:
+ type: string
+ data:
+ type: array
+ items:
+ type: object
+ properties:
+ kind:
+ $ref: '#/components/schemas/RowKind'
+ fields:
+ type: array
+ items:
+ type: object
+ DataType:
+ type: object
+ properties:
+ logicalType:
+ $ref: '#/components/schemas/LogicalType'
+ children:
+ type: array
+ items:
+ $ref: '#/components/schemas/DataType'
+ ResolvedSchema:
+ type: object
+ properties:
+ columns:
+ type: array
+ items:
+ $ref: '#/components/schemas/Column'
+ watermarkSpecs:
+ type: array
+ items:
+ $ref: '#/components/schemas/WatermarkSpec'
+ primaryKey:
+ $ref: '#/components/schemas/UniqueConstraint'
+ primaryKeyIndexes:
+ type: array
+ items:
+ type: integer
+ format: int32
+ columnCount:
+ type: integer
+ format: int32
+ columnDataTypes:
+ type: array
+ items:
+ $ref: '#/components/schemas/DataType'
+ columnNames:
+ type: array
+ items:
+ type: string
+ GetSessionConfigResponseBody:
+ type: object
+ properties:
+ properties:
+ type: object
+ additionalProperties:
+ type: string
+ SerializedThrowable:
+ type: object
+ properties:
+ serialized-throwable:
+ type: string
+ format: binary
+ GetApiVersionResponseBody:
+ type: object
+ properties:
+ versions:
+ type: array
+ items:
+ type: string
+ OperationStatusResponseBody:
+ type: object
+ properties:
+ status:
+ type: string
+ SessionHandle:
+ type: object
+ properties:
+ identifier:
+ type: string
+ format: uuid
+ ResultType:
+ type: string
+ enum:
+ - NOT_READY
+ - PAYLOAD
+ - EOS
+ Expression:
+ type: object
+ RowKind:
+ type: string
+ enum:
+ - INSERT
+ - UPDATE_BEFORE
+ - UPDATE_AFTER
+ - DELETE
+ OpenSessionResponseBody:
+ type: object
+ properties:
+ sessionHandle:
+ type: string
+ ConstraintType:
+ type: string
+ enum:
+ - PRIMARY_KEY
+ - UNIQUE_KEY
+ IntermediateDataSetID:
+ pattern: "[0-9a-f]{32}"
+ type: string
+ JobID:
+ pattern: "[0-9a-f]{32}"
+ type: string
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java
new file mode 100644
index 000000000..a18f59335
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGateway.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.flink;
+
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody;
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionResponseBody;
+import org.apache.streampark.gateway.flink.client.rest.ApiClient;
+import org.apache.streampark.gateway.flink.client.rest.ApiException;
+import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi;
+
+import java.util.Collections;
+
+public class FlinkSqlGateway {
+
+ private FlinkSqlGateway() {}
+
+ public static DefaultApi sqlGatewayApi(String basePath) {
+ ApiClient client = new ApiClient();
+ client.setBasePath(basePath);
+ return new DefaultApi(client);
+ }
+
+ public static void main(String[] args) throws ApiException {
+ DefaultApi api = new DefaultApi(new ApiClient());
+ OpenSessionResponseBody openSessionResponseBody =
+ api.openSession(
+ new OpenSessionRequestBody()
+ .sessionName("example")
+ .properties(Collections.singletonMap("foo", "bar")));
+ }
+}
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java
new file mode 100644
index 000000000..51943270b
--- /dev/null
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-flink-v1/src/test/java/org/apache/streampark/gateway/flink/FlinkSqlGatewayExample.java
@@ -0,0 +1,229 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.streampark.gateway.flink;
+
+import
org.apache.streampark.gateway.flink.client.dto.ExecuteStatementRequestBody;
+import
org.apache.streampark.gateway.flink.client.dto.ExecuteStatementResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.FetchResultsResponseBody;
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionRequestBody;
+import org.apache.streampark.gateway.flink.client.dto.OpenSessionResponseBody;
+import org.apache.streampark.gateway.flink.client.rest.ApiException;
+import org.apache.streampark.gateway.flink.client.rest.v1.DefaultApi;
+
+import java.util.UUID;
+
+public class FlinkSqlGatewayExample {
+
+ private FlinkSqlGatewayExample() {}
+
+ public static void main(String[] args) throws Exception {
+ DefaultApi api =
FlinkSqlGateway.sqlGatewayApi("http://192.168.20.144:8083");
+ runOnRemote(api);
+ // runOnYarn(api);
+ // runOnKubernetes(api);
+ }
+
+ public static void runOnRemote(DefaultApi api) throws ApiException,
InterruptedException {
+ OpenSessionResponseBody response =
+ api.openSession(
+ new OpenSessionRequestBody()
+ .putPropertiesItem("rest.address", "192.168.20.239")
+ .putPropertiesItem("rest.port", "8081")
+ .putPropertiesItem("execution.target", "remote"));
+ String sessionHandle = response.getSessionHandle();
+ System.out.println("SessionHandle: " + sessionHandle);
+
+ ExecuteStatementResponseBody statement1 =
+ api.executeStatement(
+ UUID.fromString(sessionHandle),
+ new ExecuteStatementRequestBody()
+ .statement(
+ "CREATE TABLE Orders (\n"
+ + " order_number BIGINT,\n"
+ + " price DECIMAL(32,2),\n"
+ + " buyer ROW<first_name STRING, last_name
STRING>,\n"
+ + " order_time TIMESTAMP(3)\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'number-of-rows' = '2'\n"
+ + ")")
+ .putExecutionConfigItem(
+ "pipeline.name", "Flink SQL Gateway SDK on flink cluster
Example"));
+
+ System.out.println("create table: " + statement1.getOperationHandle());
+
+ ExecuteStatementResponseBody statement2 =
+ api.executeStatement(
+ UUID.fromString(sessionHandle),
+ new ExecuteStatementRequestBody()
+ .statement("select * from Orders;")
+ .putExecutionConfigItem(
+ "pipeline.name", "Flink SQL Gateway SDK on flink cluster
Example"));
+
+ System.out.println("select * from Orders: " +
statement2.getOperationHandle());
+
+ Thread.sleep(1000 * 10);
+
+ FetchResultsResponseBody fetchResultsResponseBody =
+ api.fetchResults(
+ UUID.fromString(sessionHandle),
UUID.fromString(statement2.getOperationHandle()), 0L);
+
fetchResultsResponseBody.getResults().getColumns().forEach(System.out::println);
+ }
+
+ private static void runOnKubernetes(DefaultApi api) throws ApiException {
+ OpenSessionResponseBody response =
+ api.openSession(
+ new OpenSessionRequestBody()
+ .putPropertiesItem("kubernetes.cluster-id",
"custom-flink-cluster")
+ .putPropertiesItem("kubernetes.jobmanager.service-account",
"flink")
+ .putPropertiesItem("kubernetes.namespace", "flink-cluster")
+ .putPropertiesItem("rest.address", "127.0.0.1")
+ .putPropertiesItem("rest.port", "8081")
+ .putPropertiesItem("execution.target", "kubernetes-session"));
+ System.out.println(response.getSessionHandle());
+
+ ExecuteStatementResponseBody statement1 =
+ api.executeStatement(
+ UUID.fromString(response.getSessionHandle()),
+ new ExecuteStatementRequestBody()
+ .statement(
+ "CREATE TABLE datagen (\n"
+ + " f_sequence INT,\n"
+ + " f_random INT,\n"
+ + " f_random_str STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'datagen',\n"
+ + " 'rows-per-second'='10',\n"
+ + " 'fields.f_sequence.kind'='sequence',\n"
+ + " 'fields.f_sequence.start'='1',\n"
+ + " 'fields.f_sequence.end'='1000',\n"
+ + " 'fields.f_random.min'='1',\n"
+ + " 'fields.f_random.max'='1000',\n"
+ + " 'fields.f_random_str.length'='10'\n"
+ + ")")
+ .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway
SDK on K8S Example"));
+
+ System.out.println(statement1.getOperationHandle());
+
+ ExecuteStatementResponseBody statement2 =
+ api.executeStatement(
+ UUID.fromString(response.getSessionHandle()),
+ new ExecuteStatementRequestBody()
+ .statement(
+ "CREATE TABLE blackhole_table (\n"
+ + " f_sequence INT,\n"
+ + " f_random INT,\n"
+ + " f_random_str STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'blackhole'\n"
+ + ")")
+ .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway
SDK on K8S Example"));
+
+ System.out.println(statement2.getOperationHandle());
+
+ ExecuteStatementResponseBody statement3 =
+ api.executeStatement(
+ UUID.fromString(response.getSessionHandle()),
+ new ExecuteStatementRequestBody()
+ .statement(
+ "CREATE TABLE print_table (\n"
+ + " f_sequence INT,\n"
+ + " f_random INT,\n"
+ + " f_random_str STRING\n"
+ + ") WITH (\n"
+ + " 'connector' = 'print'\n"
+ + ")")
+ .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway
SDK on K8S Example"));
+
+ System.out.println(statement3.getOperationHandle());
+
+ ExecuteStatementResponseBody statement4 =
+ api.executeStatement(
+ UUID.fromString(response.getSessionHandle()),
+ new ExecuteStatementRequestBody()
+ .statement(
+ "EXECUTE STATEMENT SET\n"
+ + "BEGIN\n"
+ + " insert into blackhole_table select * from
datagen;\n"
+ + " insert into print_table select * from
datagen;\n"
+ + "END;")
+ .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway
SDK on K8S Example"));
+
+ System.out.println(statement4.getOperationHandle());
+ }
+
+ public static void runOnYarn(DefaultApi api) throws ApiException {
+ OpenSessionResponseBody response =
+ api.openSession(
+ new OpenSessionRequestBody()
+ .putPropertiesItem("execution.target", "yarn-session")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.enabled", "true")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm1", "yarn01")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm2", "yarn01")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.cluster-id",
"yarn-cluster")
+ .putPropertiesItem(
+ "flink.hadoop.yarn.client.failover-proxy-provider",
+
"org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider")
+ .putPropertiesItem("yarn.application.id",
"application_1667789375191_XXXX"));
+ System.out.println(response.getSessionHandle());
+ ExecuteStatementResponseBody executeStatementResponseBody =
+ api.executeStatement(
+ UUID.fromString(response.getSessionHandle()),
+ new ExecuteStatementRequestBody()
+ .statement("select 1")
+ .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway
SDK on YARN Example"));
+ System.out.println(executeStatementResponseBody.getOperationHandle());
+ }
+
+ public static void runOnYarnWithUDF(DefaultApi api) throws ApiException {
+ OpenSessionResponseBody response =
+ api.openSession(
+ new OpenSessionRequestBody()
+ .putPropertiesItem("execution.target", "local")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.enabled", "true")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.ha.rm-ids", "rm1,rm2")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm1", "yarn01")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.hostname.rm2", "yarn01")
+
.putPropertiesItem("flink.hadoop.yarn.resourcemanager.cluster-id",
"yarn-cluster")
+ .putPropertiesItem(
+ "flink.hadoop.yarn.client.failover-proxy-provider",
+
"org.apache.hadoop.yarn.client.ConfiguredRMFailoverProxyProvider")
+ .putPropertiesItem("yarn.application.id",
"application_1667789375191_XXXX"));
+
+ ExecuteStatementResponseBody statment1 =
+ api.executeStatement(
+ UUID.fromString(response.getSessionHandle()),
+ new ExecuteStatementRequestBody()
+ .statement(
+ "create TEMPORARY FUNCTION \n"
+ + " FakeFunction as
'com.fortycoderplus.flink.udf.FakeFunction'\n"
+ + "using JAR
'hdfs://MyHdfsService/udf-test/fake-func.jar'")
+ .putExecutionConfigItem("pipeline.name", "Flink SQL Gateway
UDF on YARN Example"));
+ System.out.println(statment1.getOperationHandle());
+
+ ExecuteStatementResponseBody statment2 =
+ api.executeStatement(
+ UUID.fromString(response.getSessionHandle()),
+ new ExecuteStatementRequestBody()
+ .statement("select FakeFunction('Flink SQL Gateway UDF on YARN
Example')")
+ .putExecutionConfigItem(
+ "pipeline.name", "Flink SQL Gateway UDF on YARN Example-"
+ UUID.randomUUID()));
+ System.out.println(statment2.getOperationHandle());
+ }
+}
diff --git a/streampark-flink/streampark-flink-sql-gateway/pom.xml
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/pom.xml
similarity index 74%
copy from streampark-flink/streampark-flink-sql-gateway/pom.xml
copy to
streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/pom.xml
index fe1824b59..7cc7b1750 100644
--- a/streampark-flink/streampark-flink-sql-gateway/pom.xml
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/pom.xml
@@ -17,18 +17,22 @@
-->
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
+
<parent>
<groupId>org.apache.streampark</groupId>
- <artifactId>streampark-flink</artifactId>
+ <artifactId>streampark-flink-sql-gateway</artifactId>
<version>2.2.0-SNAPSHOT</version>
</parent>
- <artifactId>streampark-flink-sql-gateway</artifactId>
- <name>StreamPark : SQL Gateway Parent</name>
- <packaging>pom</packaging>
+ <artifactId>streampark-flink-sql-gateway-kyuubi</artifactId>
+ <name>StreamPark : Kyuubi SQL Gateway</name>
- <modules>
- <module>streampark-flink-sql-gateway-base</module>
- </modules>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-sql-gateway-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
</project>
diff --git
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/src/main/java/org/apache/streampark/gateway/kyuubi/package-info.java
similarity index 52%
copy from
streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
copy to
streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/src/main/java/org/apache/streampark/gateway/kyuubi/package-info.java
index 1a359ee49..e5e040f6a 100644
---
a/streampark-console/streampark-console-webapp/src/locales/lang/zh-CN/menu.ts
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-kyuubi/src/main/java/org/apache/streampark/gateway/kyuubi/package-info.java
@@ -6,7 +6,7 @@
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
- * https://www.apache.org/licenses/LICENSE-2.0
+ * http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
@@ -14,27 +14,5 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-export default {
- menu: {
- system: '系统管理',
- userManagement: '用户管理',
- roleManagement: '角色管理',
- menuManagement: '菜单管理',
- tokenManagement: 'Token管理',
- teamManagement: '团队管理',
- memberManagement: '成员管理',
- project: '项目管理',
- application: '作业管理',
- variable: '变量管理',
- resource: '资源管理',
- setting: '设置中心',
- },
- setting: {
- system: '系统设置',
- alarm: '告警设置',
- flinkHome: 'Flink 版本',
- flinkCluster: 'Flink集群',
- externalLink: '扩展链接',
- yarnQueue: 'Yarn 队列',
- },
-};
+
+package org.apache.streampark.gateway.kyuubi;