This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.3
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.3 by this push:
new 24d02a681 [Improve] upgrade ddl improvement
24d02a681 is described below
commit 24d02a681a12ef931c1b699f05656b5aed7dfe50
Author: benjobs <[email protected]>
AuthorDate: Wed Dec 13 18:21:43 2023 +0800
[Improve] upgrade ddl improvement
---
.../apache/streampark/common/conf/Workspace.scala | 11 +++++-
.../main/assembly/script/upgrade/mysql/2.1.3.sql | 26 ++++++++++++++
.../core/service/impl/ApplicationServiceImpl.java | 40 ++++++++++++++++++++++
3 files changed, 76 insertions(+), 1 deletion(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
index 58ef8e2e9..c33baec0b 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/Workspace.scala
@@ -20,6 +20,7 @@ import org.apache.streampark.common.enums.StorageType
import org.apache.streampark.common.util.{HdfsUtils, SystemPropertyUtils}
import org.apache.streampark.common.util.Utils.StringCasts
+import java.io.File
import java.net.URI
object Workspace {
@@ -36,7 +37,15 @@ object Workspace {
lazy val APP_LOCAL_DIST = s"$localWorkspace/dist"
/** dirPath of the maven local repository with built-in compilation process
*/
- lazy val MAVEN_LOCAL_PATH = s"$localWorkspace/mvnrepo"
+ lazy val MAVEN_LOCAL_PATH: String = {
+ val userName = SystemPropertyUtils.get("user.home")
+ val repository = new File(s"$userName/.m2/repository")
+ if (repository.exists()) {
+ repository.getAbsolutePath
+ } else {
+ s"$localWorkspace/mvnrepo"
+ }
+ }
/** local sourceCode path.(for git...) */
lazy val PROJECT_LOCAL_PATH = s"$localWorkspace/project"
diff --git
a/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
new file mode 100644
index 000000000..077d4903b
--- /dev/null
+++
b/streampark-console/streampark-console-service/src/main/assembly/script/upgrade/mysql/2.1.3.sql
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+use streampark;
+
+set names utf8mb4;
+set foreign_key_checks = 0;
+
+alter table `t_flink_app`
+ modify column `modify_time` datetime not null default current_timestamp
comment 'modify time';
+
+set foreign_key_checks = 1;
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
index 8b8b5b53b..87066681e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java
@@ -19,6 +19,7 @@ package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.Workspace;
+import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.DevelopmentMode;
import org.apache.streampark.common.enums.ExecutionMode;
import org.apache.streampark.common.enums.ResolveOrder;
@@ -103,6 +104,9 @@ import org.apache.flink.configuration.JobManagerOptions;
import org.apache.flink.configuration.MemorySize;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.api.YarnClient;
import com.baomidou.mybatisplus.core.conditions.query.LambdaQueryWrapper;
import com.baomidou.mybatisplus.core.conditions.update.LambdaUpdateWrapper;
@@ -112,6 +116,7 @@ import
com.baomidou.mybatisplus.core.toolkit.support.SFunction;
import com.baomidou.mybatisplus.extension.plugins.pagination.Page;
import com.baomidou.mybatisplus.extension.service.impl.ServiceImpl;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
import io.fabric8.kubernetes.client.KubernetesClientException;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
@@ -134,11 +139,13 @@ import java.util.Base64;
import java.util.Collection;
import java.util.Comparator;
import java.util.Date;
+import java.util.EnumSet;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.Optional;
+import java.util.Set;
import java.util.concurrent.CancellationException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
@@ -1423,6 +1430,13 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
throw new ApiAlertException("[StreamPark] can no found flink version");
}
+ // check job on yarn is already running
+ if (ExecutionMode.isYarnMode(application.getExecutionMode())) {
+ ApiAlertException.throwIfTrue(
+ checkAppRepeatInYarn(application.getJobName()),
+ "[StreamPark] The same job name is already running in the yarn
queue");
+ }
+
// if manually started, clear the restart flag
if (!auto) {
application.setRestartCount(0);
@@ -1808,4 +1822,30 @@ public class ApplicationServiceImpl extends
ServiceImpl<ApplicationMapper, Appli
return
ExecutionMode.isYarnPerJobOrAppMode(application.getExecutionModeEnum())
&& !yarnQueueService.isDefaultQueue(application.getYarnQueue());
}
+
+ /**
+ * Check whether a job with the same name is running in the yarn queue
+ *
+ * @param jobName
+ * @return
+ */
+ private boolean checkAppRepeatInYarn(String jobName) {
+ try {
+ YarnClient yarnClient = HadoopUtils.yarnClient();
+ Set<String> types =
+ Sets.newHashSet(
+ ApplicationType.STREAMPARK_FLINK.getName(),
ApplicationType.APACHE_FLINK.getName());
+ EnumSet<YarnApplicationState> states =
+ EnumSet.of(YarnApplicationState.RUNNING,
YarnApplicationState.ACCEPTED);
+ List<ApplicationReport> applications = yarnClient.getApplications(types,
states);
+ for (ApplicationReport report : applications) {
+ if (report.getName().equals(jobName)) {
+ return true;
+ }
+ }
+ return false;
+ } catch (Exception e) {
+ throw new RuntimeException("The yarn api is abnormal. Ensure that yarn
is running properly.");
+ }
+ }
}