This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.5
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.5 by this push:
new 44ee2e9db Fix the issue of state update failure. (#4030)
44ee2e9db is described below
commit 44ee2e9db214235e8ac638ca46a12d8271f1b6a1
Author: Darcy <[email protected]>
AuthorDate: Wed Sep 4 18:15:34 2024 +0800
Fix the issue of state update failure. (#4030)
---
.../java/org/apache/streampark/console/core/enums/FlinkAppState.java | 3 +++
.../streampark/console/core/service/impl/ApplicationServiceImpl.java | 2 +-
.../streampark/console/core/task/FlinkK8sChangeEventListener.java | 5 +++--
.../src/main/resources/mapper/core/ApplicationMapper.xml | 4 +++-
4 files changed, 10 insertions(+), 4 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
index 630e7c02d..0dc72a4ad 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/enums/FlinkAppState.java
@@ -127,6 +127,9 @@ public enum FlinkAppState implements Serializable {
}
public static boolean isEndState(Integer appState) {
+ if (appState == null) {
+ return false;
+ }
FlinkAppState flinkAppState = FlinkAppState.of(appState);
return FlinkAppState.CANCELED == flinkAppState
|| FlinkAppState.FAILED == flinkAppState
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index ce5c86ed1..ed38dabd7 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1477,7 +1477,7 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
@Override
public void persistMetrics(Application appParam) {
- if (appParam.getFlinkAppStateEnum() == FlinkAppState.RUNNING) {
+ if (appParam.getState() != null && appParam.getFlinkAppStateEnum() ==
FlinkAppState.RUNNING) {
appParam.setEndTime(null);
appParam.setDuration(null);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
index ec1791074..ef6d0845b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkK8sChangeEventListener.java
@@ -115,12 +115,13 @@ public class FlinkK8sChangeEventListener {
return;
}
- Application app = applicationService.getById(trackId.appId());
- if (app == null) {
+ if (applicationService.getById(trackId.appId()) == null) {
return;
}
FlinkMetricCV metrics = event.metrics();
+ Application app = new Application();
+ app.setId(trackId.appId());
app.setJmMemory(metrics.totalJmMemory());
app.setTmMemory(metrics.totalTmMemory());
app.setTotalTM(metrics.totalTm());
diff --git
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
index 4704bf45d..bd62adc52 100644
---
a/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
+++
b/streampark-console/streampark-console-service/src/main/resources/mapper/core/ApplicationMapper.xml
@@ -168,6 +168,9 @@
<if test="application.duration != null">
duration=#{application.duration},
</if>
+ <if test="application.state != null">
+ state=#{application.state},
+ </if>
<choose>
<when
test="@org.apache.streampark.console.core.enums.FlinkAppState@isEndState(application.state)">
total_tm=null,
@@ -198,7 +201,6 @@
</if>
</otherwise>
</choose>
- state=#{application.state}
</set>
where id=#{application.id}
</update>