This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 117fc8e41 [Improve] flink on yarn application tag improvement
117fc8e41 is described below
commit 117fc8e41f934a1e12aad5a484656880996cd8dc
Author: benjobs <[email protected]>
AuthorDate: Fri Dec 22 10:54:30 2023 +0800
[Improve] flink on yarn application tag improvement
---
.../console/core/service/impl/ApplicationServiceImpl.java | 6 +++++-
.../org/apache/streampark/flink/client/trait/YarnClientTrait.scala | 2 +-
2 files changed, 6 insertions(+), 2 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 349d16c0d..2ed6346dc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -1835,8 +1835,12 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
YarnApplicationState.SUBMITTED,
YarnApplicationState.ACCEPTED,
YarnApplicationState.RUNNING);
- Set<String> yarnTag =
Sets.newHashSet(ApplicationType.STREAMPARK_FLINK.getName());
+ Set<String> yarnTag = Sets.newHashSet("streampark");
List<ApplicationReport> applications = yarnClient.getApplications(types,
states, yarnTag);
+ // Compatible with historical versions.
+ if (applications.isEmpty()) {
+ applications = yarnClient.getApplications(types, states);
+ }
return applications.stream()
.filter(report -> report.getName().equals(jobName))
.collect(Collectors.toList());
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 1a0a24268..0021184b1 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -46,7 +46,7 @@ trait YarnClientTrait extends FlinkClientTrait {
// yarn application Type
.safeSet(YarnConfigOptions.APPLICATION_TYPE,
submitRequest.applicationType.getName)
// yarn application Tag
- .safeSet(YarnConfigOptions.APPLICATION_TAGS,
ApplicationType.STREAMPARK_FLINK.getName)
+ .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
}
private[this] def executeClientAction[R <: SavepointRequestTrait, O](