This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 1db4400be [Improve] yarn submission mode: Check whether the same task
is submitted repeatedly (#3378)
1db4400be is described below
commit 1db4400be0d621ad1287ea9d9bc8b2e4ab61b23d
Author: hackallan <[email protected]>
AuthorDate: Thu Dec 7 11:29:41 2023 +0800
[Improve] yarn submission mode: Check whether the same task is submitted
repeatedly (#3378)
* When submitting a task, check whether tasks with the same name are
running in the yarn queue to prevent multiple tasks from running in the yarn
queue.
* yarn submission mode: Check whether the submission job name is running in
the queue. If yes, the submission cannot be performed
---------
Co-authored-by: qinjiyong <[email protected]>
---
.../impl/ApplicationActionServiceImpl.java | 41 ++++++++++++++++++++++
1 file changed, 41 insertions(+)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index 49799d4b8..401eb67d4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -21,6 +21,7 @@ import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.enums.FlinkRestoreMode;
@@ -88,10 +89,14 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
import com.baomidou.mybatisplus.core.toolkit.Wrappers;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
+import com.google.common.collect.Sets;
import io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -102,9 +107,12 @@ import
org.springframework.transaction.annotation.Transactional;
import java.io.File;
import java.net.URI;
import java.util.Date;
+import java.util.EnumSet;
import java.util.HashMap;
+import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -381,6 +389,13 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
ApiAlertException.throwIfTrue(
!application.isCanBeStart(), "[StreamPark] The application cannot be
started repeatedly.");
+ if (FlinkExecutionMode.isYarnMode(application.getFlinkExecutionMode())) {
+
+ ApiAlertException.throwIfTrue(
+ checkAppRepeatInYarn(application.getJobName()),
+ "[StreamPark] The same task name is already running in the yarn
queue");
+ }
+
AppBuildPipeline buildPipeline =
appBuildPipeService.getById(application.getId());
Utils.notNull(buildPipeline);
@@ -565,6 +580,32 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
});
}
+ /**
+ * Check whether a job with the same name is running in the yarn queue
+ *
+ * @param jobName
+ * @return
+ */
+ private boolean checkAppRepeatInYarn(String jobName) {
+ try {
+ YarnClient yarnClient = HadoopUtils.yarnClient();
+ Set<String> types =
+ Sets.newHashSet(
+ ApplicationType.STREAMPARK_FLINK.getName(),
ApplicationType.APACHE_FLINK.getName());
+ EnumSet<YarnApplicationState> states =
+ EnumSet.of(YarnApplicationState.RUNNING,
YarnApplicationState.ACCEPTED);
+ List<ApplicationReport> applications = yarnClient.getApplications(types,
states);
+ for (ApplicationReport report : applications) {
+ if (report.getName().equals(jobName)) {
+ return true;
+ }
+ }
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("The yarn api is abnormal. Ensure that yarn
is running properly.");
+ }
+ }
+
private void starting(Application application) {
application.setState(FlinkAppStateEnum.STARTING.getValue());
application.setOptionTime(new Date());