This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
     new 24d02a681 [Improve] upgrade ddl improvement
24d02a681 is described below

commit 24d02a681a12ef931c1b699f05656b5aed7dfe50
Author: benjobs <[email protected]>
AuthorDate: Wed Dec 13 18:21:43 2023 +0800

    [Improve] upgrade ddl improvement
---
 .../apache/streampark/common/conf/Workspace.scala  | 11 +++++-
 .../main/assembly/script/upgrade/mysql/2.1.3.sql   | 26 ++++++++++++++
 .../core/service/impl/ApplicationServiceImpl.java  | 40 ++++++++++++++++++++++
 3 files changed, 76 insertions(+), 1 deletion(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index 58ef8e2e9..c33baec0b 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -20,6 +20,7 @@ import org.apache.streampark.common.enums.StorageType
 import org.apache.streampark.common.util.{HdfsUtils, SystemPropertyUtils}
 import org.apache.streampark.common.util.Utils.StringCasts
 
+import java.io.File
 import java.net.URI
 
 object Workspace {
@@ -36,7 +37,15 @@ object Workspace {
   lazy val APP_LOCAL_DIST = s"$localWorkspace/dist"
 
   /** dirPath of the maven local repository with built-in compilation process 
*/
-  lazy val MAVEN_LOCAL_PATH = s"$localWorkspace/mvnrepo"
+  lazy val MAVEN_LOCAL_PATH: String = {
+    val userName = SystemPropertyUtils.get("user.home")
+    val repository = new File(s"$userName/.m2/repository")
+    if (repository.exists()) {
+      repository.getAbsolutePath
+    } else {
+      s"$localWorkspace/mvnrepo"
+    }
+  }
 
   /** local sourceCode path.(for git...) */
   lazy val PROJECT_LOCAL_PATH = s"$localWorkspace/project"
diff --git 
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
new file mode 100644
index 000000000..077d4903b
--- /dev/null
+++ 
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use streampark;
+
+set names utf8mb4;
+set foreign_key_checks = 0;
+
+alter table `t_flink_app`
+    modify column `modify_time` datetime not null default current_timestamp 
comment 'modify time';
+
+set foreign_key_checks = 1;
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 8b8b5b53b..87066681e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.DevelopmentMode;
 import org.apache.streampark.common.enums.ExecutionMode;
 import org.apache.streampark.common.enums.ResolveOrder;
@@ -103,6 +104,9 @@ import org.apache.flink.configuration.JobManagerOptions;
 import org.apache.flink.configuration.MemorySize;
 import org.apache.flink.configuration.RestOptions;
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
 
 import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
 import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -112,6 +116,7 @@ import 
com.baomidou.mybatisplus.core.toolkit.support.SFunction;
 import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
 import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
 import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
 import io.fabric8.kubernetes.client.KubernetesClientException;
 import lombok.SneakyThrows;
 import lombok.extern.slf4j.Slf4j;
@@ -134,11 +139,13 @@ import java.util.Base64;
 import java.util.Collection;
 import java.util.Comparator;
 import java.util.Date;
+import java.util.EnumSet;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.Optional;
+import java.util.Set;
 import java.util.concurrent.CancellationException;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
@@ -1423,6 +1430,13 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
       throw new ApiAlertException("[StreamPark] can no found flink version");
     }
 
+    // check job on yarn is already running
+    if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+      ApiAlertException.throwIfTrue(
+          checkAppRepeatInYarn(application.getJobName()),
+          "[StreamPark] The same job name is already running in the yarn 
queue");
+    }
+
     // if manually started, clear the restart flag
     if (!auto) {
       application.setRestartCount(0);
@@ -1808,4 +1822,30 @@ public class ApplicationServiceImpl extends 
ServiceImpl<ApplicationMapper, Appli
     return 
ExecutionMode.isYarnPerJobOrAppMode(application.getExecutionModeEnum())
         && !yarnQueueService.isDefaultQueue(application.getYarnQueue());
   }
+
+  /**
+   * Check whether a job with the same name is running in the yarn queue
+   *
+   * @param jobName
+   * @return
+   */
+  private boolean checkAppRepeatInYarn(String jobName) {
+    try {
+      YarnClient yarnClient = HadoopUtils.yarnClient();
+      Set<String> types =
+          Sets.newHashSet(
+              ApplicationType.STREAMPARK_FLINK.getName(), 
ApplicationType.APACHE_FLINK.getName());
+      EnumSet<YarnApplicationState> states =
+          EnumSet.of(YarnApplicationState.RUNNING, 
YarnApplicationState.ACCEPTED);
+      List<ApplicationReport> applications = yarnClient.getApplications(types, 
states);
+      for (ApplicationReport report : applications) {
+        if (report.getName().equals(jobName)) {
+          return true;
+        }
+      }
+      return false;
+    } catch (Exception e) {
+      throw new RuntimeException("The yarn api is abnormal. Ensure that yarn 
is running properly.");
+    }
+  }
 }

Reply via email to