This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
     new 37402fd55 [Improve] job on yarn state bug fixed. (#3973)
37402fd55 is described below

commit 37402fd55f9c4e9be1d17b06a44c8b9eb372352d
Author: benjobs <[email protected]>
AuthorDate: Mon Aug 19 23:28:28 2024 +0800

    [Improve] job on yarn state bug fixed. (#3973)
---
 .../apache/streampark/common/util/YarnUtils.scala  |  9 +--
 .../console/core/mapper/ApplicationMapper.java     |  2 +
 .../console/core/service/ApplicationService.java   |  2 +
 .../core/service/impl/ApplicationServiceImpl.java  |  5 ++
 .../console/core/task/FlinkAppHttpWatcher.java     |  6 ++
 .../resources/mapper/core/ApplicationMapper.xml    | 66 ++--------------------
 6 files changed, 26 insertions(+), 64 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 6dbe28ad2..e541c66ff 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, RMHAUtils}
 import org.apache.http.client.config.RequestConfig
 
 import java.io.IOException
-import java.net.InetAddress
+import java.net.{ConnectException, InetAddress}
 import java.security.PrivilegedExceptionAction
 import java.util
 import java.util.{List => JavaList}
@@ -231,9 +231,9 @@ object YarnUtils extends Logger {
           case Success(v) => v
           case Failure(e) =>
             if (hasYarnHttpKerberosAuth) {
-              throw new IOException(s"yarnUtils authRestRequest error, url: 
$u, detail: $e")
+              throw new ConnectException(s"yarnUtils authRestRequest error, 
url: $u, detail: $e")
             } else {
-              throw new IOException(s"yarnUtils restRequest error, url: $u, 
detail: $e")
+              throw new ConnectException(s"yarnUtils restRequest error, url: 
$u, detail: $e")
             }
         }
       case _ =>
@@ -243,7 +243,8 @@ object YarnUtils extends Logger {
             Utils.retry[String](5)(request(s"${getRMWebAppURL(true)}/$url", 
timeout)) match {
               case Success(v) => v
               case Failure(e) =>
-                throw new IOException(s"yarnUtils restRequest retry 5 times 
all failed. detail: $e")
+                throw new ConnectException(
+                  s"yarnUtils restRequest retry 5 times all failed. detail: 
$e")
             }
         }
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index cf7b8e9cc..c96bbfd42 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -62,4 +62,6 @@ public interface ApplicationMapper extends 
BaseMapper<Application> {
   boolean existsRunningJobByClusterId(@Param("clusterId") Long clusterId);
 
   boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
+
+  void updateJobManagerUrl(@Param("id") Long id, @Param("url") String url);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 952728973..cfc887f0d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -131,4 +131,6 @@ public interface ApplicationService extends 
IService<Application> {
   List<ApplicationReport> getYARNApplication(String appName);
 
   RestResponse buildApplication(Long appId, boolean forceBuild) throws 
Exception;
+
+  void updateJobManagerUrl(Long id, String url);
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 971c6d8ae..11b973bc6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -2013,6 +2013,11 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return RestResponse.success(actionResult);
   }
 
+  @Override
+  public void updateJobManagerUrl(Long id, String url) {
+    baseMapper.updateJobManagerUrl(id, url);
+  }
+
   private Tuple2<String, String> getNamespaceClusterId(Application 
application) {
     String clusterId = null;
     String k8sNamespace = null;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 82939b8c1..5a390886a 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -571,6 +571,12 @@ public class FlinkAppHttpWatcher {
         }
       } else {
         try {
+          String trackingUrl = yarnAppInfo.getApp().getTrackingUrl();
+          if (trackingUrl != null && 
!trackingUrl.equals(application.getJobManagerUrl())) {
+            application.setJobManagerUrl(trackingUrl);
+            applicationService.updateJobManagerUrl(application.getId(), 
trackingUrl);
+          }
+
           String state = yarnAppInfo.getApp().getFinalStatus();
           FlinkAppState flinkAppState = FlinkAppState.of(state);
           if (FlinkAppState.OTHER.equals(flinkAppState)) {
diff --git 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 679473f63..4704bf45d 100644
--- 
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++ 
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -18,66 +18,6 @@
 
 <!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN" 
"http://mybatis.org/dtd/mybatis-3-mapper.dtd";>
 <mapper 
namespace="org.apache.streampark.console.core.mapper.ApplicationMapper">
-    <resultMap id="BaseResultMap" 
type="org.apache.streampark.console.core.entity.Application">
-        <id column="id" jdbcType="BIGINT" property="id"/>
-        <result column="team_id" jdbcType="BIGINT" property="teamId"/>
-        <result column="project_id" jdbcType="BIGINT" property="projectId"/>
-        <result column="module" jdbcType="VARCHAR" property="module"/>
-        <result column="args" jdbcType="LONGVARCHAR" property="args"/>
-        <result column="options" jdbcType="LONGVARCHAR" property="options"/>
-        <result column="dynamic_properties" jdbcType="LONGVARCHAR" 
property="dynamicProperties"/>
-        <result column="hot_params" jdbcType="VARCHAR" property="hotParams"/>
-        <result column="job_name" jdbcType="VARCHAR" property="jobName"/>
-        <result column="version_id" jdbcType="BIGINT" property="versionId"/>
-        <result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
-        <result column="flink_cluster_id" jdbcType="BIGINT" 
property="flinkClusterId"/>
-        <result column="flink_image" jdbcType="VARCHAR" property="flinkImage"/>
-        <result column="k8s_namespace" jdbcType="VARCHAR" 
property="k8sNamespace"/>
-        <result column="app_type" jdbcType="INTEGER" property="appType"/>
-        <result column="job_type" jdbcType="INTEGER" property="jobType"/>
-        <result column="resource_from" jdbcType="INTEGER" 
property="resourceFrom"/>
-        <result column="execution_mode" jdbcType="INTEGER" 
property="executionMode"/>
-        <result column="tracking" jdbcType="INTEGER" property="tracking"/>
-        <result column="jar" jdbcType="VARCHAR" property="jar"/>
-        <result column="jar_check_sum" jdbcType="VARCHAR" 
property="jarCheckSum"/>
-        <result column="dependency" jdbcType="LONGVARCHAR" 
property="dependency"/>
-        <result column="main_class" jdbcType="VARCHAR" property="mainClass"/>
-        <result column="job_id" jdbcType="VARCHAR" property="jobId"/>
-        <result column="job_manager_url" jdbcType="VARCHAR" 
property="jobManagerUrl"/>
-        <result column="user_id" jdbcType="BIGINT" property="userId"/>
-        <result column="start_time" jdbcType="DATE" property="startTime"/>
-        <result column="end_time" jdbcType="DATE" property="endTime"/>
-        <result column="duration" jdbcType="BIGINT" property="duration"/>
-        <result column="state" jdbcType="INTEGER" property="state"/>
-        <result column="cp_max_failure_interval" jdbcType="INTEGER" 
property="cpMaxFailureInterval"/>
-        <result column="cp_failure_rate_interval" jdbcType="INTEGER" 
property="cpFailureRateInterval"/>
-        <result column="cp_failure_action" jdbcType="INTEGER" 
property="cpFailureAction"/>
-        <result column="restart_size" jdbcType="INTEGER" 
property="restartSize"/>
-        <result column="restart_count" jdbcType="INTEGER" 
property="restartCount"/>
-        <result column="release" jdbcType="INTEGER" property="release"/>
-        <result column="build" jdbcType="BOOLEAN" property="build"/>
-        <result column="resolve_order" jdbcType="INTEGER" 
property="resolveOrder"/>
-        <result column="total_tm" jdbcType="INTEGER" property="totalTM"/>
-        <result column="total_slot" jdbcType="INTEGER" property="totalSlot"/>
-        <result column="available_slot" jdbcType="INTEGER" 
property="availableSlot"/>
-        <result column="total_task" jdbcType="INTEGER" property="totalTask"/>
-        <result column="jm_memory" jdbcType="INTEGER" property="jmMemory"/>
-        <result column="tm_memory" jdbcType="INTEGER" property="tmMemory"/>
-        <result column="option_state" jdbcType="INTEGER" 
property="optionState"/>
-        <result column="alert_id" jdbcType="BIGINT" property="alertId"/>
-        <result column="description" jdbcType="VARCHAR" 
property="description"/>
-        <result column="create_time" jdbcType="DATE" property="createTime"/>
-        <result column="option_time" jdbcType="DATE" property="optionTime"/>
-        <result column="k8s_rest_exposed_type" jdbcType="INTEGER" 
property="k8sRestExposedType"/>
-        <result column="k8s_pod_template" jdbcType="LONGVARCHAR" 
property="k8sPodTemplate"/>
-        <result column="k8s_jm_pod_template" jdbcType="LONGVARCHAR" 
property="k8sJmPodTemplate"/>
-        <result column="k8s_tm_pod_template" jdbcType="LONGVARCHAR" 
property="k8sTmPodTemplate"/>
-        <result column="ingress_template" jdbcType="LONGVARCHAR" 
property="ingressTemplate"/>
-        <result column="k8s_hadoop_integration" jdbcType="TINYINT" 
property="k8sHadoopIntegration"/>
-        <result column="rest_url" jdbcType="VARCHAR" property="restUrl"/>
-        <result column="rest_port" jdbcType="INTEGER" property="restPort"/>
-        <result column="tags" jdbcType="VARCHAR" property="tags"/>
-    </resultMap>
 
     <update id="resetOptionState">
         update t_flink_app
@@ -294,6 +234,12 @@
         where id=#{application.id}
     </update>
 
+    <update id="updateJobManagerUrl" >
+        update t_flink_app
+        set job_manager_url = #{url}
+        where id = #{id}
+    </update>
+
     <select id="getRecentK8sNamespace" resultType="java.lang.String" 
parameterType="java.lang.Integer">
         select k8s_namespace
         from (

Reply via email to