This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 37402fd55 [Improve] job on yarn state bug fixed. (#3973)
37402fd55 is described below
commit 37402fd55f9c4e9be1d17b06a44c8b9eb372352d
Author: benjobs <[email protected]>
AuthorDate: Mon Aug 19 23:28:28 2024 +0800
[Improve] job on yarn state bug fixed. (#3973)
---
.../apache/streampark/common/util/YarnUtils.scala | 9 +--
.../console/core/mapper/ApplicationMapper.java | 2 +
.../console/core/service/ApplicationService.java | 2 +
.../core/service/impl/ApplicationServiceImpl.java | 5 ++
.../console/core/task/FlinkAppHttpWatcher.java | 6 ++
.../resources/mapper/core/ApplicationMapper.xml | 66 ++--------------------
6 files changed, 26 insertions(+), 64 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 6dbe28ad2..e541c66ff 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -28,7 +28,7 @@ import org.apache.hadoop.yarn.util.{ConverterUtils, RMHAUtils}
import org.apache.http.client.config.RequestConfig
import java.io.IOException
-import java.net.InetAddress
+import java.net.{ConnectException, InetAddress}
import java.security.PrivilegedExceptionAction
import java.util
import java.util.{List => JavaList}
@@ -231,9 +231,9 @@ object YarnUtils extends Logger {
case Success(v) => v
case Failure(e) =>
if (hasYarnHttpKerberosAuth) {
- throw new IOException(s"yarnUtils authRestRequest error, url:
$u, detail: $e")
+ throw new ConnectException(s"yarnUtils authRestRequest error,
url: $u, detail: $e")
} else {
- throw new IOException(s"yarnUtils restRequest error, url: $u,
detail: $e")
+ throw new ConnectException(s"yarnUtils restRequest error, url:
$u, detail: $e")
}
}
case _ =>
@@ -243,7 +243,8 @@ object YarnUtils extends Logger {
Utils.retry[String](5)(request(s"${getRMWebAppURL(true)}/$url",
timeout)) match {
case Success(v) => v
case Failure(e) =>
- throw new IOException(s"yarnUtils restRequest retry 5 times
all failed. detail: $e")
+ throw new ConnectException(
+ s"yarnUtils restRequest retry 5 times all failed. detail:
$e")
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
index cf7b8e9cc..c96bbfd42 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationMapper.java
@@ -62,4 +62,6 @@ public interface ApplicationMapper extends
BaseMapper<Application> {
boolean existsRunningJobByClusterId(@Param("clusterId") Long clusterId);
boolean existsJobByClusterId(@Param("clusterId") Long clusterId);
+
+ void updateJobManagerUrl(@Param("id") Long id, @Param("url") String url);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
index 952728973..cfc887f0d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationService.java
@@ -131,4 +131,6 @@ public interface ApplicationService extends
IService<Application> {
List<ApplicationReport> getYARNApplication(String appName);
RestResponse buildApplication(Long appId, boolean forceBuild) throws
Exception;
+
+ void updateJobManagerUrl(Long id, String url);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 971c6d8ae..11b973bc6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -2013,6 +2013,11 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return RestResponse.success(actionResult);
}
+ @Override
+ public void updateJobManagerUrl(Long id, String url) {
+ baseMapper.updateJobManagerUrl(id, url);
+ }
+
private Tuple2<String, String> getNamespaceClusterId(Application
application) {
String clusterId = null;
String k8sNamespace = null;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 82939b8c1..5a390886a 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -571,6 +571,12 @@ public class FlinkAppHttpWatcher {
}
} else {
try {
+ String trackingUrl = yarnAppInfo.getApp().getTrackingUrl();
+ if (trackingUrl != null &&
!trackingUrl.equals(application.getJobManagerUrl())) {
+ application.setJobManagerUrl(trackingUrl);
+ applicationService.updateJobManagerUrl(application.getId(),
trackingUrl);
+ }
+
String state = yarnAppInfo.getApp().getFinalStatus();
FlinkAppState flinkAppState = FlinkAppState.of(state);
if (FlinkAppState.OTHER.equals(flinkAppState)) {
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 679473f63..4704bf45d 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -18,66 +18,6 @@
<!DOCTYPE mapper PUBLIC "-//mybatis.org//DTD Mapper 3.0//EN"
"http://mybatis.org/dtd/mybatis-3-mapper.dtd">
<mapper
namespace="org.apache.streampark.console.core.mapper.ApplicationMapper">
- <resultMap id="BaseResultMap"
type="org.apache.streampark.console.core.entity.Application">
- <id column="id" jdbcType="BIGINT" property="id"/>
- <result column="team_id" jdbcType="BIGINT" property="teamId"/>
- <result column="project_id" jdbcType="BIGINT" property="projectId"/>
- <result column="module" jdbcType="VARCHAR" property="module"/>
- <result column="args" jdbcType="LONGVARCHAR" property="args"/>
- <result column="options" jdbcType="LONGVARCHAR" property="options"/>
- <result column="dynamic_properties" jdbcType="LONGVARCHAR"
property="dynamicProperties"/>
- <result column="hot_params" jdbcType="VARCHAR" property="hotParams"/>
- <result column="job_name" jdbcType="VARCHAR" property="jobName"/>
- <result column="version_id" jdbcType="BIGINT" property="versionId"/>
- <result column="cluster_id" jdbcType="VARCHAR" property="clusterId"/>
- <result column="flink_cluster_id" jdbcType="BIGINT"
property="flinkClusterId"/>
- <result column="flink_image" jdbcType="VARCHAR" property="flinkImage"/>
- <result column="k8s_namespace" jdbcType="VARCHAR"
property="k8sNamespace"/>
- <result column="app_type" jdbcType="INTEGER" property="appType"/>
- <result column="job_type" jdbcType="INTEGER" property="jobType"/>
- <result column="resource_from" jdbcType="INTEGER"
property="resourceFrom"/>
- <result column="execution_mode" jdbcType="INTEGER"
property="executionMode"/>
- <result column="tracking" jdbcType="INTEGER" property="tracking"/>
- <result column="jar" jdbcType="VARCHAR" property="jar"/>
- <result column="jar_check_sum" jdbcType="VARCHAR"
property="jarCheckSum"/>
- <result column="dependency" jdbcType="LONGVARCHAR"
property="dependency"/>
- <result column="main_class" jdbcType="VARCHAR" property="mainClass"/>
- <result column="job_id" jdbcType="VARCHAR" property="jobId"/>
- <result column="job_manager_url" jdbcType="VARCHAR"
property="jobManagerUrl"/>
- <result column="user_id" jdbcType="BIGINT" property="userId"/>
- <result column="start_time" jdbcType="DATE" property="startTime"/>
- <result column="end_time" jdbcType="DATE" property="endTime"/>
- <result column="duration" jdbcType="BIGINT" property="duration"/>
- <result column="state" jdbcType="INTEGER" property="state"/>
- <result column="cp_max_failure_interval" jdbcType="INTEGER"
property="cpMaxFailureInterval"/>
- <result column="cp_failure_rate_interval" jdbcType="INTEGER"
property="cpFailureRateInterval"/>
- <result column="cp_failure_action" jdbcType="INTEGER"
property="cpFailureAction"/>
- <result column="restart_size" jdbcType="INTEGER"
property="restartSize"/>
- <result column="restart_count" jdbcType="INTEGER"
property="restartCount"/>
- <result column="release" jdbcType="INTEGER" property="release"/>
- <result column="build" jdbcType="BOOLEAN" property="build"/>
- <result column="resolve_order" jdbcType="INTEGER"
property="resolveOrder"/>
- <result column="total_tm" jdbcType="INTEGER" property="totalTM"/>
- <result column="total_slot" jdbcType="INTEGER" property="totalSlot"/>
- <result column="available_slot" jdbcType="INTEGER"
property="availableSlot"/>
- <result column="total_task" jdbcType="INTEGER" property="totalTask"/>
- <result column="jm_memory" jdbcType="INTEGER" property="jmMemory"/>
- <result column="tm_memory" jdbcType="INTEGER" property="tmMemory"/>
- <result column="option_state" jdbcType="INTEGER"
property="optionState"/>
- <result column="alert_id" jdbcType="BIGINT" property="alertId"/>
- <result column="description" jdbcType="VARCHAR"
property="description"/>
- <result column="create_time" jdbcType="DATE" property="createTime"/>
- <result column="option_time" jdbcType="DATE" property="optionTime"/>
- <result column="k8s_rest_exposed_type" jdbcType="INTEGER"
property="k8sRestExposedType"/>
- <result column="k8s_pod_template" jdbcType="LONGVARCHAR"
property="k8sPodTemplate"/>
- <result column="k8s_jm_pod_template" jdbcType="LONGVARCHAR"
property="k8sJmPodTemplate"/>
- <result column="k8s_tm_pod_template" jdbcType="LONGVARCHAR"
property="k8sTmPodTemplate"/>
- <result column="ingress_template" jdbcType="LONGVARCHAR"
property="ingressTemplate"/>
- <result column="k8s_hadoop_integration" jdbcType="TINYINT"
property="k8sHadoopIntegration"/>
- <result column="rest_url" jdbcType="VARCHAR" property="restUrl"/>
- <result column="rest_port" jdbcType="INTEGER" property="restPort"/>
- <result column="tags" jdbcType="VARCHAR" property="tags"/>
- </resultMap>
<update id="resetOptionState">
update t_flink_app
@@ -294,6 +234,12 @@
where id=#{application.id}
</update>
+ <update id="updateJobManagerUrl" >
+ update t_flink_app
+ set job_manager_url = #{url}
+ where id = #{id}
+ </update>
+
<select id="getRecentK8sNamespace" resultType="java.lang.String"
parameterType="java.lang.Integer">
select k8s_namespace
from (