This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch cluster-state
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/cluster-state by this push:
     new de7410303 flink cluster jobManagerUrl improvement
de7410303 is described below

commit de74103030f195cf9eed58d508803c66c2c3b900
Author: benjobs <[email protected]>
AuthorDate: Sun Jul 9 23:33:02 2023 +0800

    flink cluster jobManagerUrl improvement
---
 .../src/main/assembly/script/schema/mysql-schema.sql           |  1 +
 .../src/main/assembly/script/schema/pgsql-schema.sql           |  2 ++
 .../src/main/assembly/script/upgrade/mysql/2.2.0.sql           |  1 +
 .../src/main/assembly/script/upgrade/pgsql/2.2.0.sql           |  1 +
 .../apache/streampark/console/core/entity/FlinkCluster.java    |  3 +++
 .../console/core/service/impl/FlinkClusterServiceImpl.java     |  1 +
 .../streampark/console/core/task/FlinkClusterWatcher.java      | 10 +++++++++-
 .../src/main/resources/mapper/core/FlinkClusterMapper.xml      |  1 +
 8 files changed, 19 insertions(+), 1 deletion(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
index f03815299..7783e6c66 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/mysql-schema.sql
@@ -438,6 +438,7 @@ drop table if exists `t_flink_cluster`;
 create table `t_flink_cluster` (
   `id` bigint not null auto_increment,
   `address` varchar(150) default null comment 'url address of cluster',
+  `job_manager_url` varchar(150) default null comment 'url address of 
jobmanager',
   `cluster_id` varchar(45) default null comment 'clusterid of session 
mode(yarn-session:application-id,k8s-session:cluster-id)',
   `cluster_name` varchar(128) not null comment 'cluster name',
   `options` text comment 'json form of parameter collection ',
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
index 1bae96cf9..b1b8d1b1d 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/schema/pgsql-schema.sql
@@ -271,6 +271,7 @@ create sequence "public"."streampark_t_flink_cluster_id_seq"
 create table "public"."t_flink_cluster" (
   "id" int8 not null default 
nextval('streampark_t_flink_cluster_id_seq'::regclass),
   "address" varchar(150) collate "pg_catalog"."default",
+  "job_manager_url" varchar(150) collate "pg_catalog"."default",
   "cluster_id" varchar(45) collate "pg_catalog"."default",
   "cluster_name" varchar(128) collate "pg_catalog"."default" not null,
   "options" text collate "pg_catalog"."default",
@@ -296,6 +297,7 @@ create table "public"."t_flink_cluster" (
 )
 ;
 comment on column "public"."t_flink_cluster"."address" is 'url address of 
cluster';
+comment on column "public"."t_flink_cluster"."job_manager_url" is 'url address 
of jobmanager';
 comment on column "public"."t_flink_cluster"."cluster_id" is 'clusterid of 
session mode(yarn-session:application-id,k8s-session:cluster-id)';
 comment on column "public"."t_flink_cluster"."cluster_name" is 'cluster name';
 comment on column "public"."t_flink_cluster"."options" is 'parameter 
collection json form';
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
index fed66d2ba..91ce47d14 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.2.0.sql
@@ -44,6 +44,7 @@ alter table `t_flink_sql`
     add column `team_resource` varchar(64) default null;
 
 alter table `t_flink_cluster`
+    add column `job_manager_url` varchar(150) default null comment 'url 
address of jobmanager' after `address`,
     add column `start_time` datetime default null comment 'start time',
     add column `end_time` datetime default null comment 'end time',
     add column `alert_id` bigint default null comment 'alert id';
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
index e050ab884..744681436 100644
--- 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/pgsql/2.2.0.sql
@@ -57,6 +57,7 @@ alter table "public"."t_flink_sql"
     add column "team_resource" varchar(64) default null;
 
 alter table "public"."t_flink_cluster"
+    add column "job_manager_url" varchar(150) collate "pg_catalog"."default",
     add column "start_time" timestamp(6) collate "pg_catalog"."default",
     add column "end_time" timestamp(6) collate "pg_catalog"."default",
     add column "alert_id" int8 collate "pg_catalog"."default";
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 2cd105840..32feec747 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -64,6 +64,9 @@ public class FlinkCluster implements Serializable {
   @TableField(updateStrategy = FieldStrategy.IGNORED)
   private String address;
 
+  @TableField(updateStrategy = FieldStrategy.IGNORED)
+  private String jobManagerUrl;
+
   private String clusterId;
 
   private String clusterName;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index 1dfc09e13..ad6af6617 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -169,6 +169,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
             String.format(
                 "%s/proxy/%s/", YarnUtils.getRMWebAppURL(true), 
deployResponse.clusterId());
         flinkCluster.setAddress(address);
+        flinkCluster.setJobManagerUrl(deployResponse.address());
       } else {
         flinkCluster.setAddress(deployResponse.address());
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 4ed37d9d3..ed545b0dd 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -30,6 +30,7 @@ import 
org.apache.streampark.console.core.service.ApplicationService;
 import org.apache.streampark.console.core.service.FlinkClusterService;
 import org.apache.streampark.console.core.service.alert.AlertService;
 
+import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
 import org.apache.hc.client5.http.config.RequestConfig;
 
@@ -167,7 +168,14 @@ public class FlinkClusterWatcher {
    */
   private ClusterState httpRemoteClusterState(FlinkCluster flinkCluster) {
     final String address = flinkCluster.getAddress();
-    final String flinkUrl = address.concat("/overview");
+    if (StringUtils.isEmpty(address)) {
+      return ClusterState.STOPPED;
+    }
+    final String jobManagerUrl = flinkCluster.getJobManagerUrl();
+    final String flinkUrl =
+        StringUtils.isEmpty(jobManagerUrl)
+            ? address.concat("/overview")
+            : jobManagerUrl.concat("/overview");
     try {
       String res =
           HttpClientUtils.httpGetRequest(
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
index 27173c0a6..769fe521c 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/FlinkClusterMapper.xml
@@ -20,6 +20,7 @@
     <resultMap id="BaseResultMap" 
type="org.apache.streampark.console.core.entity.FlinkCluster">
         <id column="id" jdbcType="BIGINT" property="id"/>
         <result column="address" jdbcType="VARCHAR" property="address"/>
+        <result column="job_manager_url" jdbcType="VARCHAR" 
property="jobManagerUrl"/>
         <result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
         <result column="cluster_name" jdbcType="VARCHAR" 
property="clusterName"/>
         <result column="options" jdbcType="LONGVARCHAR" property="options"/>

Reply via email to