This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 1db4400be [Improve] yarn submission mode: Check whether the same task 
is submitted repeatedly (#3378)
1db4400be is described below

commit 1db4400be0d621ad1287ea9d9bc8b2e4ab61b23d
Author: hackallan <[email protected]>
AuthorDate: Thu Dec 7 11:29:41 2023 +0800

    [Improve] yarn submission mode: Check whether the same task is submitted 
repeatedly (#3378)
    
    * When submitting a task, check whether tasks with the same name are 
running in the yarn queue to prevent multiple tasks from running in the yarn 
queue.
    
    * yarn submission mode: Check whether the submission job name is running in 
the queue. If yes, the submission cannot be performed
    
    ---------
    
    Co-authored-by: qinjiyong <[email protected]>
---
 .../impl/ApplicationActionServiceImpl.java         | 41 ++++++++++++++++++++++
 1 file changed, 41 insertions(+)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 49799d4b8..401eb67d4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.FlinkDevelopmentMode;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.enums.FlinkRestoreMode;
@@ -88,10 +89,14 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
 import com.baomidou.mybatisplus.core.toolkit.Wrappers;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.google.common.collect.Sets;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -102,9 +107,12 @@ import 
org.springframework.transaction.annotation.Transactional;
 import java.io.File;
 import java.net.URI;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -381,6 +389,13 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
     ApiAlertException.throwIfTrue(
         !application.isCanBeStart(), "[StreamPark] The application cannot be 
started repeatedly.");
 
+    if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
+
+      ApiAlertException.throwIfTrue(
+          checkAppRepeatInYarn(application.getJobName()),
+          "[StreamPark] The same task name is already running in the yarn 
queue");
+    }
+
     AppBuildPipeline buildPipeline = 
appBuildPipeService.getById(application.getId());
     Utils.notNull(buildPipeline);
 
@@ -565,6 +580,32 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
             });
   }
 
+  /**
+   * Check whether a job with the same name is running in the yarn queue
+   *
+   * @param jobName
+   * @return
+   */
+  private boolean checkAppRepeatInYarn(String jobName) {
+    try {
+      YarnClient yarnClient = HadoopUtils.yarnClient();
+      Set<String> types =
+          Sets.newHashSet(
+              ApplicationType.STREAMPARK_FLINK.getName(), 
ApplicationType.APACHE_FLINK.getName());
+      EnumSet<YarnApplicationState> states =
+          EnumSet.of(YarnApplicationState.RUNNING, 
YarnApplicationState.ACCEPTED);
+      List<ApplicationReport> applications = yarnClient.getApplications(types, 
states);
+      for (ApplicationReport report : applications) {
+        if (report.getName().equals(jobName)) {
+          return true;
+        }
+      }
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("The yarn api is abnormal. Ensure that yarn 
is running properly.");
+    }
+  }
+
   private void starting(Application application) {
     application.setState(FlinkAppStateEnum.STARTING.getValue());
     application.setOptionTime(new Date());

Reply via email to