This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 117fc8e41 [Improve] flink on yarn application tag improvement
117fc8e41 is described below

commit 117fc8e41f934a1e12aad5a484656880996cd8dc
Author: benjobs <[email protected]>
AuthorDate: Fri Dec 22 10:54:30 2023 +0800

    [Improve] flink on yarn application tag improvement
---
 .../console/core/service/impl/ApplicationServiceImpl.java           | 6 +++++-
 .../org/apache/streampark/flink/client/trait/YarnClientTrait.scala  | 2 +-
 2 files changed, 6 insertions(+), 2 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 349d16c0d..2ed6346dc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1835,8 +1835,12 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
               YarnApplicationState.SUBMITTED,
               YarnApplicationState.ACCEPTED,
               YarnApplicationState.RUNNING);
-      Set<String> yarnTag = 
Sets.newHashSet(ApplicationType.STREAMPARK_FLINK.getName());
+      Set<String> yarnTag = Sets.newHashSet("streampark");
       List<ApplicationReport> applications = yarnClient.getApplications(types, 
states, yarnTag);
+      // Compatible with historical versions.
+      if (applications.isEmpty()) {
+        applications = yarnClient.getApplications(types, states);
+      }
       return applications.stream()
           .filter(report -> report.getName().equals(jobName))
           .collect(Collectors.toList());
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 1a0a24268..0021184b1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -46,7 +46,7 @@ trait YarnClientTrait extends FlinkClientTrait {
       // yarn application Type
       .safeSet(YarnConfigOptions.APPLICATION_TYPE, 
submitRequest.applicationType.getName)
       // yarn application Tag
-      .safeSet(YarnConfigOptions.APPLICATION_TAGS, 
ApplicationType.STREAMPARK_FLINK.getName)
+      .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
   }
 
   private[this] def executeClientAction[R <: SavepointRequestTrait, O](

Reply via email to