This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch submit
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 0ce7e260b7500df66196fa14f58849b3506efe50
Author: benjobs <[email protected]>
AuthorDate: Fri Dec 22 13:13:11 2023 +0800

    [Improve] submit flink job on yarn job name check improvement
---
 .../impl/ApplicationActionServiceImpl.java         | 337 ++++++++++-----------
 .../flink/client/impl/YarnApplicationClient.scala  |   1 +
 .../flink/client/impl/YarnPerJobClient.scala       |   1 +
 .../flink/client/impl/YarnSessionClient.scala      |   1 +
 .../flink/client/trait/FlinkClientTrait.scala      |  10 +-
 .../flink/client/trait/YarnClientTrait.scala       |   7 +
 6 files changed, 182 insertions(+), 175 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 401eb67d4..a7447cb0c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -19,7 +19,6 @@ package 
org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.ConfigKeys;
-import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.FlinkDevelopmentMode;
@@ -27,7 +26,6 @@ import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.enums.FlinkRestoreMode;
 import org.apache.streampark.common.enums.ResolveOrder;
 import org.apache.streampark.common.fs.FsOperator;
-import org.apache.streampark.common.util.CompletableFutureUtils;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.ExceptionUtils;
 import org.apache.streampark.common.util.HadoopUtils;
@@ -120,6 +118,7 @@ import java.util.concurrent.ExecutorService;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadPoolExecutor;
 import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
 
 import static 
org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.Bridge.toTrackId;
 import static 
org.apache.streampark.console.core.watcher.FlinkK8sWatcherWrapper.isKubernetesApp;
@@ -229,7 +228,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       cancelFuture.cancel(true);
     }
     if (startFuture == null && cancelFuture == null) {
-      this.updateToStopped(appParam);
+      this.doStopped(appParam);
     }
   }
 
@@ -325,60 +324,55 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
     cancelFutureMap.put(application.getId(), cancelFuture);
 
-    CompletableFutureUtils.runTimeout(
-            cancelFuture,
-            10L,
-            TimeUnit.MINUTES,
-            cancelResponse -> {
-              applicationLog.setSuccess(true);
-              if (cancelResponse != null && cancelResponse.savePointDir() != 
null) {
-                String savePointDir = cancelResponse.savePointDir();
-                log.info("savePoint path: {}", savePointDir);
-                SavePoint savePoint = new SavePoint();
-                savePoint.setPath(savePointDir);
-                savePoint.setAppId(application.getId());
-                savePoint.setLatest(true);
-                savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
-                savePoint.setCreateTime(new Date());
-                savePoint.setTriggerTime(triggerTime);
-                savePointService.save(savePoint);
+    cancelFuture.whenComplete(
+        (cancelResponse, throwable) -> {
+          cancelFutureMap.remove(application.getId());
+          if (throwable != null) {
+            if (throwable instanceof CancellationException) {
+              doStopped(application);
+            } else {
+              log.error("stop flink job fail.", throwable);
+              application.setOptionState(OptionStateEnum.NONE.getValue());
+              application.setState(FlinkAppStateEnum.FAILED.getValue());
+              updateById(application);
+
+              if (appParam.getSavePointed()) {
+                savePointService.expire(application.getId());
               }
+              // re-tracking flink job on kubernetes and logging exception
               if (isKubernetesApp(application)) {
-                k8SFlinkTrackMonitor.unWatching(toTrackId(application));
-              }
-            },
-            e -> {
-              if (e.getCause() instanceof CancellationException) {
-                updateToStopped(application);
+                TrackId id = toTrackId(application);
+                k8SFlinkTrackMonitor.unWatching(id);
+                k8SFlinkTrackMonitor.doWatching(id);
               } else {
-                log.error("stop flink job fail.", e);
-                application.setOptionState(OptionStateEnum.NONE.getValue());
-                application.setState(FlinkAppStateEnum.FAILED.getValue());
-                updateById(application);
-
-                if (appParam.getSavePointed()) {
-                  savePointService.expire(application.getId());
-                }
-
-                // re-tracking flink job on kubernetes and logging exception
-                if (isKubernetesApp(application)) {
-                  TrackId id = toTrackId(application);
-                  k8SFlinkTrackMonitor.unWatching(id);
-                  k8SFlinkTrackMonitor.doWatching(id);
-                } else {
-                  FlinkAppHttpWatcher.unWatching(application.getId());
-                }
-
-                String exception = ExceptionUtils.stringifyException(e);
-                applicationLog.setException(exception);
-                applicationLog.setSuccess(false);
+                FlinkAppHttpWatcher.unWatching(application.getId());
               }
-            })
-        .whenComplete(
-            (t, e) -> {
-              cancelFutureMap.remove(application.getId());
-              applicationLogService.save(applicationLog);
-            });
+
+              String exception = ExceptionUtils.stringifyException(throwable);
+              applicationLog.setException(exception);
+              applicationLog.setSuccess(false);
+            }
+          } else {
+            applicationLog.setSuccess(true);
+            if (cancelResponse != null && cancelResponse.savePointDir() != 
null) {
+              String savePointDir = cancelResponse.savePointDir();
+              log.info("savePoint path: {}", savePointDir);
+              SavePoint savePoint = new SavePoint();
+              savePoint.setPath(savePointDir);
+              savePoint.setAppId(application.getId());
+              savePoint.setLatest(true);
+              savePoint.setType(CheckPointTypeEnum.SAVEPOINT.get());
+              savePoint.setCreateTime(new Date());
+              savePoint.setTriggerTime(triggerTime);
+              savePointService.save(savePoint);
+            }
+            if (isKubernetesApp(application)) {
+              k8SFlinkTrackMonitor.unWatching(toTrackId(application));
+            }
+          }
+          // save log...
+          applicationLogService.save(applicationLog);
+        });
   }
 
   @Override
@@ -390,10 +384,9 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
         !application.isCanBeStart(), "[StreamPark] The application cannot be 
started repeatedly.");
 
     if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
-
       ApiAlertException.throwIfTrue(
-          checkAppRepeatInYarn(application.getJobName()),
-          "[StreamPark] The same task name is already running in the yarn 
queue");
+          !getApplicationReports(application.getJobName()).isEmpty(),
+          "The same job name is already running in the yarn queue");
     }
 
     AppBuildPipeline buildPipeline = 
appBuildPipeService.getById(application.getId());
@@ -486,124 +479,90 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
 
     startFutureMap.put(application.getId(), future);
 
-    CompletableFutureUtils.runTimeout(
-            future,
-            2L,
-            TimeUnit.MINUTES,
-            submitResponse -> {
-              if (submitResponse.flinkConfig() != null) {
-                String jmMemory =
-                    
submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
-                if (jmMemory != null) {
-                  
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
-                }
-                String tmMemory =
-                    
submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
-                if (tmMemory != null) {
-                  
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
-                }
-              }
-              application.setAppId(submitResponse.clusterId());
-              if (StringUtils.isNoneEmpty(submitResponse.jobId())) {
-                application.setJobId(submitResponse.jobId());
-              }
-
-              if (StringUtils.isNoneEmpty(submitResponse.jobManagerUrl())) {
-                application.setJobManagerUrl(submitResponse.jobManagerUrl());
-                
applicationLog.setJobManagerUrl(submitResponse.jobManagerUrl());
-              }
-              applicationLog.setYarnAppId(submitResponse.clusterId());
-              application.setStartTime(new Date());
-              application.setEndTime(null);
-              if (isKubernetesApp(application)) {
-                application.setRelease(ReleaseStateEnum.DONE.get());
-              }
-              updateById(application);
-
-              // if start completed, will be added task to tracking queue
-              if (isKubernetesApp(application)) {
-                k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+    future.whenComplete(
+        (response, throwable) -> {
+          // 1) remove Future
+          startFutureMap.remove(application.getId());
+
+          // 2) exception
+          if (throwable != null) {
+            if (throwable instanceof CancellationException) {
+              doStopped(application);
+              return;
+            } else {
+              Application app = getById(appParam.getId());
+              app.setState(FlinkAppStateEnum.FAILED.getValue());
+              app.setOptionState(OptionStateEnum.NONE.getValue());
+              updateById(app);
+              if (isKubernetesApp(app)) {
+                k8SFlinkTrackMonitor.unWatching(toTrackId(app));
               } else {
-                FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionStateEnum.STARTING);
-                FlinkAppHttpWatcher.doWatching(application);
+                FlinkAppHttpWatcher.unWatching(appParam.getId());
               }
+            }
+            String exception = ExceptionUtils.stringifyException(throwable);
+            applicationLog.setException(exception);
+            applicationLog.setSuccess(false);
+            applicationLogService.save(applicationLog);
+            // set savepoint to expire
+            savePointService.expire(application.getId());
+            return;
+          }
 
-              applicationLog.setSuccess(true);
-              // set savepoint to expire
-              savePointService.expire(application.getId());
-            },
-            e -> {
-              if (e.getCause() instanceof CancellationException) {
-                updateToStopped(application);
-              } else {
-                String exception = ExceptionUtils.stringifyException(e);
-                applicationLog.setException(exception);
-                applicationLog.setSuccess(false);
-                Application app = getById(appParam.getId());
-                app.setState(FlinkAppStateEnum.FAILED.getValue());
-                app.setOptionState(OptionStateEnum.NONE.getValue());
-                updateById(app);
-                if (isKubernetesApp(app)) {
-                  k8SFlinkTrackMonitor.unWatching(toTrackId(app));
-                } else {
-                  FlinkAppHttpWatcher.unWatching(appParam.getId());
-                }
-              }
-            })
-        .whenComplete(
-            (t, e) -> {
-              if (!K8sFlinkConfig.isV2Enabled()
-                  && FlinkExecutionMode.isKubernetesApplicationMode(
-                      application.getExecutionMode())) {
-                String domainName = settingService.getIngressModeDefault();
-                if (StringUtils.isNotBlank(domainName)) {
-                  try {
-                    IngressController.configureIngress(
-                        domainName, application.getClusterId(), 
application.getK8sNamespace());
-                  } catch (KubernetesClientException 
kubernetesClientException) {
-                    log.info(
-                        "Failed to create ingress, stack info:{}",
-                        kubernetesClientException.getMessage());
-                    applicationLog.setException(e.getMessage());
-                    applicationLog.setSuccess(false);
-                    applicationLogService.save(applicationLog);
-                    application.setState(FlinkAppStateEnum.FAILED.getValue());
-                    
application.setOptionState(OptionStateEnum.NONE.getValue());
-                    updateById(application);
-                    return;
-                  }
+          // 3) success
+          applicationLog.setSuccess(true);
+          if (response.flinkConfig() != null) {
+            String jmMemory = 
response.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
+            if (jmMemory != null) {
+              
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
+            }
+            String tmMemory = 
response.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
+            if (tmMemory != null) {
+              
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
+            }
+          }
+          application.setAppId(response.clusterId());
+          if (StringUtils.isNoneEmpty(response.jobId())) {
+            application.setJobId(response.jobId());
+          }
+
+          if (StringUtils.isNoneEmpty(response.jobManagerUrl())) {
+            application.setJobManagerUrl(response.jobManagerUrl());
+            applicationLog.setJobManagerUrl(response.jobManagerUrl());
+          }
+          applicationLog.setYarnAppId(response.clusterId());
+          application.setStartTime(new Date());
+          application.setEndTime(null);
+
+          // if start completed, will be added task to tracking queue
+          if (isKubernetesApp(application)) {
+            application.setRelease(ReleaseStateEnum.DONE.get());
+            k8SFlinkTrackMonitor.doWatching(toTrackId(application));
+            if 
(FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode()))
 {
+              String domainName = settingService.getIngressModeDefault();
+              if (StringUtils.isNotBlank(domainName)) {
+                try {
+                  IngressController.configureIngress(
+                      domainName, application.getClusterId(), 
application.getK8sNamespace());
+                } catch (KubernetesClientException e) {
+                  log.info("Failed to create ingress, stack info:{}", 
e.getMessage());
+                  applicationLog.setException(e.getMessage());
+                  applicationLog.setSuccess(false);
+                  applicationLogService.save(applicationLog);
+                  application.setState(FlinkAppStateEnum.FAILED.getValue());
+                  application.setOptionState(OptionStateEnum.NONE.getValue());
                 }
               }
-
-              applicationLogService.save(applicationLog);
-              startFutureMap.remove(application.getId());
-            });
-  }
-
-  /**
-   * Check whether a job with the same name is running in the yarn queue
-   *
-   * @param jobName
-   * @return
-   */
-  private boolean checkAppRepeatInYarn(String jobName) {
-    try {
-      YarnClient yarnClient = HadoopUtils.yarnClient();
-      Set<String> types =
-          Sets.newHashSet(
-              ApplicationType.STREAMPARK_FLINK.getName(), 
ApplicationType.APACHE_FLINK.getName());
-      EnumSet<YarnApplicationState> states =
-          EnumSet.of(YarnApplicationState.RUNNING, 
YarnApplicationState.ACCEPTED);
-      List<ApplicationReport> applications = yarnClient.getApplications(types, 
states);
-      for (ApplicationReport report : applications) {
-        if (report.getName().equals(jobName)) {
-          return true;
-        }
-      }
-      return false;
-    } catch (Exception e) {
-      throw new RuntimeException("The yarn api is abnormal. Ensure that yarn 
is running properly.");
-    }
+            }
+          } else {
+            FlinkAppHttpWatcher.setOptionState(appParam.getId(), 
OptionStateEnum.STARTING);
+            FlinkAppHttpWatcher.doWatching(application);
+          }
+          // update app
+          updateById(application);
+          // save log
+          applicationLogService.save(applicationLog);
+        });
   }
 
   private void starting(Application application) {
@@ -781,7 +740,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     return properties;
   }
 
-  private void updateToStopped(Application app) {
+  private void doStopped(Application app) {
     Application application = getById(app);
     application.setOptionState(OptionStateEnum.NONE.getValue());
     application.setState(FlinkAppStateEnum.CANCELED.getValue());
@@ -796,6 +755,17 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     } else {
       FlinkAppHttpWatcher.unWatching(application.getId());
     }
+    // kill application
+    if (FlinkExecutionMode.isYarnMode(app.getFlinkExecutionMode())) {
+      try {
+        List<ApplicationReport> applications = 
getApplicationReports(application.getJobName());
+        if (!applications.isEmpty()) {
+          YarnClient yarnClient = HadoopUtils.yarnClient();
+          yarnClient.killApplication(applications.get(0).getApplicationId());
+        }
+      } catch (Exception ignored) {
+      }
+    }
   }
 
   private String getSavePointed(Application appParam) {
@@ -811,4 +781,31 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     }
     return null;
   }
+
+  private List<ApplicationReport> getApplicationReports(String jobName) {
+    try {
+      YarnClient yarnClient = HadoopUtils.yarnClient();
+      Set<String> types =
+          Sets.newHashSet(
+              ApplicationType.STREAMPARK_FLINK.getName(), 
ApplicationType.APACHE_FLINK.getName());
+      EnumSet<YarnApplicationState> states =
+          EnumSet.of(
+              YarnApplicationState.NEW,
+              YarnApplicationState.NEW_SAVING,
+              YarnApplicationState.SUBMITTED,
+              YarnApplicationState.ACCEPTED,
+              YarnApplicationState.RUNNING);
+      Set<String> yarnTag = Sets.newHashSet("streampark");
+      List<ApplicationReport> applications = yarnClient.getApplications(types, 
states, yarnTag);
+      // Compatible with historical versions.
+      if (applications.isEmpty()) {
+        applications = yarnClient.getApplications(types, states);
+      }
+      return applications.stream()
+          .filter(report -> report.getName().equals(jobName))
+          .collect(Collectors.toList());
+    } catch (Exception e) {
+      throw new RuntimeException("The yarn api is abnormal. Ensure that yarn 
is running properly.");
+    }
+  }
 }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f4fe45761..4062896b1 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -49,6 +49,7 @@ object YarnApplicationClient extends YarnClientTrait {
   private[this] lazy val workspace = Workspace.remote
 
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
+    super.setConfig(submitRequest, flinkConfig)
     val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
       submitRequest.flinkVersion.flinkHome)
     val currentUser = UserGroupInformation.getCurrentUser
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
index 370064f4a..b52263d7c 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala
@@ -40,6 +40,7 @@ import scala.collection.convert.ImplicitConversions._
 object YarnPerJobClient extends YarnClientTrait {
 
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
+    super.setConfig(submitRequest, flinkConfig)
     // execution.target
     flinkConfig
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.PER_JOB.getName)
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
index c91fc45f0..847786577 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnSessionClient.scala
@@ -47,6 +47,7 @@ object YarnSessionClient extends YarnClientTrait {
    * @param flinkConfig
    */
   override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
+    super.setConfig(submitRequest, flinkConfig)
     flinkConfig
       .safeSet(DeploymentOptions.TARGET, YarnDeploymentTarget.SESSION.getName)
     logInfo(s"""
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index a31250d4c..17a48c1e6 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -115,7 +115,6 @@ trait FlinkClientTrait extends Logger {
     flinkConfig
       .safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
       .safeSet(DeploymentOptions.TARGET, submitRequest.executionMode.getName)
-      .safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
       .safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, 
submitRequest.appMain)
       .safeSet(ApplicationConfiguration.APPLICATION_ARGS, 
extractProgramArgs(submitRequest))
       .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, 
submitRequest.jobId)
@@ -132,10 +131,11 @@ trait FlinkClientTrait extends Logger {
 
     // set savepoint parameter
     if (submitRequest.savePoint != null) {
-      flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, 
submitRequest.savePoint)
-      flinkConfig.setBoolean(
-        SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
-        submitRequest.allowNonRestoredState)
+      flinkConfig
+        .safeSet(SavepointConfigOptions.SAVEPOINT_PATH, 
submitRequest.savePoint)
+        .setBoolean(
+          SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
+          submitRequest.allowNonRestoredState)
       if (
         submitRequest.flinkVersion.checkVersion(
           FlinkRestoreMode.SINCE_FLINK_VERSION) && submitRequest.restoreMode 
!= null
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
index 4e58a50f9..20b245420 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/YarnClientTrait.scala
@@ -39,6 +39,13 @@ import scala.util.Try
 /** yarn application mode submit */
 trait YarnClientTrait extends FlinkClientTrait {
 
+  override def setConfig(submitRequest: SubmitRequest, flinkConfig: 
Configuration): Unit = {
+    flinkConfig
+      .safeSet(YarnConfigOptions.APPLICATION_NAME, 
submitRequest.effectiveAppName)
+      .safeSet(YarnConfigOptions.APPLICATION_TYPE, 
submitRequest.applicationType.getName)
+      .safeSet(YarnConfigOptions.APPLICATION_TAGS, "streampark")
+  }
+
   private[this] def executeClientAction[R <: SavepointRequestTrait, O](
       savepointRequestTrait: R,
       flinkConf: Configuration,

Reply via email to