This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 6465c0c96 [improve] job on yarn state patch. (#3989)
6465c0c96 is described below
commit 6465c0c96975def13eec81611a3c3adba0e8d2a5
Author: ouyangwulin <[email protected]>
AuthorDate: Sat Aug 24 17:03:06 2024 +0800
[improve] job on yarn state patch. (#3989)
* [improve] job on yarn state patch.
* fixed checkstyle.
---
.../console/core/mapper/ApplicationLogMapper.java | 7 ++++++-
.../console/core/service/ApplicationLogService.java | 2 ++
.../core/service/impl/ApplicationLogServiceImpl.java | 5 +++++
.../streampark/console/core/task/FlinkAppHttpWatcher.java | 15 +++++++++------
.../main/resources/mapper/core/ApplicationLogMapper.xml | 5 +++++
5 files changed, 27 insertions(+), 7 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationLogMapper.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationLogMapper.java
index 610e58ba9..b3c295d49 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationLogMapper.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/mapper/ApplicationLogMapper.java
@@ -19,6 +19,11 @@ package org.apache.streampark.console.core.mapper;
import org.apache.streampark.console.core.entity.ApplicationLog;
+import org.apache.ibatis.annotations.Param;
+
import com.baomidou.mybatisplus.core.mapper.BaseMapper;
-public interface ApplicationLogMapper extends BaseMapper<ApplicationLog> {}
+public interface ApplicationLogMapper extends BaseMapper<ApplicationLog> {
+ void updateJobManagerUrl(
+ @Param("yarnAppId") String yarnAppId, @Param("jobManagerUrl") String
jobManagerUrl);
+}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
index 62585e71d..945625b9c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/ApplicationLogService.java
@@ -30,4 +30,6 @@ public interface ApplicationLogService extends
IService<ApplicationLog> {
void removeApp(Long appId);
Boolean delete(ApplicationLog applicationLog);
+
+ void updateJobManagerUrl(String yarnAppId, String jobManagerUrl);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
index 756bcc4ef..f7e7eb9c4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationLogServiceImpl.java
@@ -59,4 +59,9 @@ public class ApplicationLogServiceImpl extends
ServiceImpl<ApplicationLogMapper,
public Boolean delete(ApplicationLog applicationLog) {
return removeById(applicationLog.getId());
}
+
+ @Override
+ public void updateJobManagerUrl(String yarnAppId, String jobManagerUrl) {
+ baseMapper.updateJobManagerUrl(yarnAppId, jobManagerUrl);
+ }
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
index 5a390886a..d36a6c958 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkAppHttpWatcher.java
@@ -33,6 +33,7 @@ import
org.apache.streampark.console.core.metrics.flink.CheckPoints;
import org.apache.streampark.console.core.metrics.flink.JobsOverview;
import org.apache.streampark.console.core.metrics.flink.Overview;
import org.apache.streampark.console.core.metrics.yarn.YarnAppInfo;
+import org.apache.streampark.console.core.service.ApplicationLogService;
import org.apache.streampark.console.core.service.ApplicationService;
import org.apache.streampark.console.core.service.FlinkClusterService;
import org.apache.streampark.console.core.service.SavepointService;
@@ -76,6 +77,8 @@ public class FlinkAppHttpWatcher {
@Autowired private ApplicationService applicationService;
+ @Autowired private ApplicationLogService applicationLogService;
+
@Autowired private AlertService alertService;
@Autowired private CheckpointProcessor checkpointProcessor;
@@ -571,15 +574,15 @@ public class FlinkAppHttpWatcher {
}
} else {
try {
- String trackingUrl = yarnAppInfo.getApp().getTrackingUrl();
- if (trackingUrl != null &&
!trackingUrl.equals(application.getJobManagerUrl())) {
- application.setJobManagerUrl(trackingUrl);
- applicationService.updateJobManagerUrl(application.getId(),
trackingUrl);
- }
-
String state = yarnAppInfo.getApp().getFinalStatus();
FlinkAppState flinkAppState = FlinkAppState.of(state);
if (FlinkAppState.OTHER.equals(flinkAppState)) {
+ String trackingUrl = yarnAppInfo.getApp().getTrackingUrl();
+ if (trackingUrl != null &&
!trackingUrl.equals(application.getJobManagerUrl())) {
+ application.setJobManagerUrl(trackingUrl);
+ applicationService.updateJobManagerUrl(application.getId(),
trackingUrl);
+
applicationLogService.updateJobManagerUrl(application.getClusterId(),
trackingUrl);
+ }
return;
}
if (FlinkAppState.KILLED.equals(flinkAppState)) {
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
index fb0eb52da..f27703d7b 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationLogMapper.xml
@@ -29,4 +29,9 @@
<result column="option_name" jdbcType="INTEGER" property="optionName"/>
</resultMap>
+ <update id="updateJobManagerUrl" >
+ update t_flink_log
+ set job_manager_url = #{jobManagerUrl}
+ where yarn_app_id = #{yarnAppId}
+ </update>
</mapper>