This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch cluster-state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/cluster-state by this push:
new de7410303 flink cluster jobManagerUrl improvement
de7410303 is described below
commit de74103030f195cf9eed58d508803c66c2c3b900
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 9 23:33:02 2023 +0800
flink cluster jobManagerUrl improvement
---
.../src/main/assembly/script/schema/mysql-schema.sql | 1 +
.../src/main/assembly/script/schema/pgsql-schema.sql | 2 ++
.../src/main/assembly/script/upgrade/mysql/2.2.0.sql | 1 +
.../src/main/assembly/script/upgrade/pgsql/2.2.0.sql | 1 +
.../apache/streampark/console/core/entity/FlinkCluster.java | 3 +++
.../console/core/service/impl/FlinkClusterServiceImpl.java | 1 +
.../streampark/console/core/task/FlinkClusterWatcher.java | 10 +++++++++-
.../src/main/resources/mapper/core/FlinkClusterMapper.xml | 1 +
8 files changed, 19 insertions(+), 1 deletion(-)
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index f03815299..7783e6c66 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -438,6 +438,7 @@ drop table if exists `t_flink_cluster`;
create table `t_flink_cluster` (
`id` bigint not null auto_increment,
`address` varchar(150) default null comment 'url address of cluster',
+ `job_manager_url` varchar(150) default null comment 'url address of
jobmanager',
`cluster_id` varchar(45) default null comment 'clusterid of session
mode(yarn-session:application-id,k8s-session:cluster-id)',
`cluster_name` varchar(128) not null comment 'cluster name',
`options` text comment 'json form of parameter collection ',
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index 1bae96cf9..b1b8d1b1d 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -271,6 +271,7 @@ create sequence "public"."streampark_t_flink_cluster_id_seq"
create table "public"."t_flink_cluster" (
"id" int8 not null default
nextval('streampark_t_flink_cluster_id_seq'::regclass),
"address" varchar(150) collate "pg_catalog"."default",
+ "job_manager_url" varchar(150) collate "pg_catalog"."default",
"cluster_id" varchar(45) collate "pg_catalog"."default",
"cluster_name" varchar(128) collate "pg_catalog"."default" not null,
"options" text collate "pg_catalog"."default",
@@ -296,6 +297,7 @@ create table "public"."t_flink_cluster" (
)
;
comment on column "public"."t_flink_cluster"."address" is 'url address of
cluster';
+comment on column "public"."t_flink_cluster"."job_manager_url" is 'url address
of jobmanager';
comment on column "public"."t_flink_cluster"."cluster_id" is 'clusterid of
session mode(yarn-session:application-id,k8s-session:cluster-id)';
comment on column "public"."t_flink_cluster"."cluster_name" is 'cluster name';
comment on column "public"."t_flink_cluster"."options" is 'parameter
collection json form';
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index fed66d2ba..91ce47d14 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -44,6 +44,7 @@ alter table `t_flink_sql`
add column `team_resource` varchar(64) default null;
alter table `t_flink_cluster`
+ add column `job_manager_url` varchar(150) default null comment 'url
address of jobmanager' after `address`,
add column `start_time` datetime default null comment 'start time',
add column `end_time` datetime default null comment 'end time',
add column `alert_id` bigint default null comment 'alert id';
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index e050ab884..744681436 100644
---
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -57,6 +57,7 @@ alter table "public"."t_flink_sql"
add column "team_resource" varchar(64) default null;
alter table "public"."t_flink_cluster"
+ add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
add column "start_time" timestamp(6) collate "pg_catalog"."default",
add column "end_time" timestamp(6) collate "pg_catalog"."default",
add column "alert_id" int8 collate "pg_catalog"."default";
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 2cd105840..32feec747 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -64,6 +64,9 @@ public class FlinkCluster implements Serializable {
@TableField(updateStrategy = FieldStrategy.IGNORED)
private String address;
+ @TableField(updateStrategy = FieldStrategy.IGNORED)
+ private String jobManagerUrl;
+
private String clusterId;
private String clusterName;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 1dfc09e13..ad6af6617 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -169,6 +169,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
String.format(
"%s/proxy/%s/", YarnUtils.getRMWebAppURL(true),
deployResponse.clusterId());
flinkCluster.setAddress(address);
+ flinkCluster.setJobManagerUrl(deployResponse.address());
} else {
flinkCluster.setAddress(deployResponse.address());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 4ed37d9d3..ed545b0dd 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -30,6 +30,7 @@ import
org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.alert.AlertService;
+import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
import org.apache.hc.client5.http.config.RequestConfig;
@@ -167,7 +168,14 @@ public class FlinkClusterWatcher {
*/
private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
final String address = flinkCluster.getAddress();
- final String flinkUrl = address.concat("/overview");
+ if (StringUtils.isEmpty(address)) {
+ return ClusterState.STOPPED;
+ }
+ final String jobManagerUrl = flinkCluster.getJobManagerUrl();
+ final String flinkUrl =
+ StringUtils.isEmpty(jobManagerUrl)
+ ? address.concat("/overview")
+ : jobManagerUrl.concat("/overview");
try {
String res =
HttpClientUtils.httpGetRequest(
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index 27173c0a6..769fe521c 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -20,6 +20,7 @@
<resultMap id="BaseResultMap"
type="org.apache.streampark.console.core.entity.FlinkCluster">
<id column="id" jdbcType="BIGINT" property="id"/>
<result column="address" jdbcType="VARCHAR" property="address"/>
+ <result column="job_manager_url" jdbcType="VARCHAR"
property="jobManagerUrl"/>
<result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
<result column="cluster_name" jdbcType="VARCHAR"
property="clusterName"/>
<result column="options" jdbcType="LONGVARCHAR" property="options"/>