This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new 60c50918d [ISSUE-3044][Improve] Improve streampark-common module base
on 3.1 Naming Style (#3130)
60c50918d is described below
commit 60c50918dc7da9abe69a10a4290cee2dc1f74366
Author: Kick156 <[email protected]>
AuthorDate: Sat Sep 23 12:30:28 2023 +0800
[ISSUE-3044][Improve] Improve streampark-common module base on 3.1 Naming
Style (#3130)
* [Improve] Improve streampark-common module base on 3.1 Naming Style
---
.../streampark/common/enums/ApplicationType.java | 4 +++
.../streampark/common/enums/ClusterState.java | 29 +++++++++++++-----
.../streampark/common/enums/DevelopmentMode.java | 18 ++++++------
.../streampark/common/enums/ExecutionMode.java | 6 ++++
.../common/enums/FlinkK8sRestExposedType.java | 12 ++++----
.../common/enums/FlinkSqlValidationFailedType.java | 17 +++++++----
.../streampark/common/enums/ResolveOrder.java | 14 +++++----
.../streampark/common/enums/RestoreMode.java | 34 +++++++++++++++++-----
.../apache/streampark/common/enums/Semantic.java | 9 ++++--
.../streampark/common/enums/StorageType.java | 12 ++++----
.../streampark/common/conf/ConfigConst.scala | 17 ++---------
.../streampark/common/conf/FlinkVersion.scala | 2 --
.../streampark/common/conf/K8sFlinkConfig.scala | 1 -
.../apache/streampark/common/util/HdfsUtils.scala | 9 ++++--
.../streampark/common/util/SqlConvertUtils.scala | 3 +-
.../core/controller/FlinkSqlController.java | 2 +-
.../console/core/entity/Application.java | 12 ++++----
.../core/service/impl/FlinkClusterServiceImpl.java | 16 +++++-----
.../console/core/task/FlinkClusterWatcher.java | 4 +--
.../console/core/task/FlinkK8sObserverBroker.scala | 2 +-
.../console/core/service/SavePointServiceTest.java | 4 +--
.../flink/client/bean/SubmitRequest.scala | 10 +++++--
22 files changed, 142 insertions(+), 95 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ApplicationType.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ApplicationType.java
index ed87b8a10..70c992d82 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ApplicationType.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ApplicationType.java
@@ -18,12 +18,16 @@
package org.apache.streampark.common.enums;
public enum ApplicationType {
+
/** StreamPark Flink */
STREAMPARK_FLINK(1, "StreamPark Flink"),
+
/** Apache Flink */
APACHE_FLINK(2, "Apache Flink"),
+
/** StreamPark Spark */
STREAMPARK_SPARK(3, "StreamPark Spark"),
+
/** Apache Spark */
APACHE_SPARK(4, "Apache Spark");
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index 284b60a36..b2599b74c 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -19,47 +19,60 @@ package org.apache.streampark.common.enums;
/** @since 1.2.3 */
public enum ClusterState {
+
/** The cluster was just created but not started */
CREATED(0),
+
/** cluster started */
RUNNING(1),
+
/** cluster stopped */
CANCELED(2),
+
/** cluster lost */
LOST(3),
+
/** cluster unknown */
UNKNOWN(4),
+
+ /** cluster starting */
STARTING(5),
+
+ /** cluster cancelling */
CANCELLING(6),
+
+ /** cluster failed */
FAILED(7),
+
+ /** cluster killed */
KILLED(8);
- private final Integer value;
+ private final Integer state;
- ClusterState(Integer value) {
- this.value = value;
+ ClusterState(Integer state) {
+ this.state = state;
}
public static ClusterState of(Integer value) {
for (ClusterState clusterState : values()) {
- if (clusterState.value.equals(value)) {
+ if (clusterState.state.equals(value)) {
return clusterState;
}
}
return ClusterState.UNKNOWN;
}
- public static ClusterState of(String value) {
+ public static ClusterState of(String name) {
for (ClusterState clusterState : values()) {
- if (clusterState.name().equals(value)) {
+ if (clusterState.name().equals(name)) {
return clusterState;
}
}
return ClusterState.UNKNOWN;
}
- public Integer getValue() {
- return value;
+ public Integer getState() {
+ return state;
}
public static boolean isRunning(ClusterState state) {
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
index 7bf3109ee..00580a2cd 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
@@ -28,25 +28,25 @@ public enum DevelopmentMode {
/** Py flink */
PYFLINK("Python Flink", 3);
- private final String mode;
+ private final String name;
- private final Integer value;
+ private final Integer mode;
- DevelopmentMode(String mode, Integer value) {
+ DevelopmentMode(String name, Integer mode) {
+ this.name = name;
this.mode = mode;
- this.value = value;
}
public static DevelopmentMode of(Integer value) {
- for (DevelopmentMode mode : values()) {
- if (mode.value.equals(value)) {
- return mode;
+ for (DevelopmentMode developmentMode : values()) {
+ if (developmentMode.mode.equals(value)) {
+ return developmentMode;
}
}
return null;
}
- public Integer getValue() {
- return value;
+ public Integer getMode() {
+ return mode;
}
}
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
index 889cb915d..c903a5bcd 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
@@ -25,16 +25,22 @@ public enum ExecutionMode {
/** Local mode */
LOCAL(0, "local"),
+
/** remote */
REMOTE(1, "remote"),
+
/** yarn-per-job mode */
YARN_PER_JOB(2, "yarn-per-job"),
+
/** yarn session */
YARN_SESSION(3, "yarn-session"),
+
/** yarn application */
YARN_APPLICATION(4, "yarn-application"),
+
/** kubernetes session */
KUBERNETES_NATIVE_SESSION(5, "kubernetes-session"),
+
/** kubernetes application */
KUBERNETES_NATIVE_APPLICATION(6, "kubernetes-application");
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkK8sRestExposedType.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkK8sRestExposedType.java
index 79f6e4196..c87c5a3a6 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkK8sRestExposedType.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkK8sRestExposedType.java
@@ -31,16 +31,16 @@ public enum FlinkK8sRestExposedType {
private final String name;
- private final Integer value;
+ private final Integer type;
- FlinkK8sRestExposedType(String name, Integer value) {
+ FlinkK8sRestExposedType(String name, Integer type) {
this.name = name;
- this.value = value;
+ this.type = type;
}
public static FlinkK8sRestExposedType of(Integer value) {
for (FlinkK8sRestExposedType order : values()) {
- if (order.value.equals(value)) {
+ if (order.type.equals(value)) {
return order;
}
}
@@ -51,7 +51,7 @@ public enum FlinkK8sRestExposedType {
return name;
}
- public Integer getValue() {
- return value;
+ public Integer getType() {
+ return type;
}
}
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
index 856c39bf9..6f9b8a067 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
@@ -18,33 +18,38 @@
package org.apache.streampark.common.enums;
public enum FlinkSqlValidationFailedType {
+
/** Basic test failed (such as null, etc.) */
VERIFY_FAILED(1),
+
/** syntax error */
SYNTAX_ERROR(2),
+
/** unsupported dialect */
UNSUPPORTED_DIALECT(3),
+
/** unsupported sql command */
UNSUPPORTED_SQL(4),
+
/** Not at the end of ";" */
ENDS_WITH(5);
- private final int value;
+ private final int failedType;
- FlinkSqlValidationFailedType(int value) {
- this.value = value;
+ FlinkSqlValidationFailedType(int failedType) {
+ this.failedType = failedType;
}
public static FlinkSqlValidationFailedType of(Integer value) {
for (FlinkSqlValidationFailedType type : values()) {
- if (type.value == value) {
+ if (type.failedType == value) {
return type;
}
}
return null;
}
- public int getValue() {
- return value;
+ public int getFailedType() {
+ return failedType;
}
}
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ResolveOrder.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ResolveOrder.java
index 8a2b207e1..e69823e27 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ResolveOrder.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ResolveOrder.java
@@ -19,23 +19,25 @@ package org.apache.streampark.common.enums;
/** classloader.resolve-order */
public enum ResolveOrder {
+
/** parent-first */
PARENT_FIRST("parent-first", 0),
+
/** child-first */
CHILD_FIRST("child-first", 1);
private final String name;
- private final Integer value;
+ private final Integer order;
- ResolveOrder(String name, Integer value) {
+ ResolveOrder(String name, Integer order) {
this.name = name;
- this.value = value;
+ this.order = order;
}
public static ResolveOrder of(Integer value) {
for (ResolveOrder order : values()) {
- if (order.value.equals(value)) {
+ if (order.order.equals(value)) {
return order;
}
}
@@ -46,7 +48,7 @@ public enum ResolveOrder {
return name;
}
- public Integer getValue() {
- return value;
+ public Integer getOrder() {
+ return order;
}
}
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
index 131507798..63ae3a819 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
@@ -20,28 +20,46 @@ package org.apache.streampark.common.enums;
import java.util.Arrays;
public enum RestoreMode {
- NO_CLAIM(1),
- CLAIM(2),
+
+ /**
+ * In this mode Flink claims ownership of the snapshot and essentially
treats it like a
+ * checkpoint: its controls the lifecycle and might delete it if it is not
needed for recovery
+ * anymore. Hence, it is not safe to manually delete the snapshot or to
start two jobs from the
+ * same snapshot. Flink keeps around a configured number of checkpoints.
+ */
+ CLAIM(1),
+
+ /**
+ * In the NO_CLAIM mode Flink will not assume ownership of the snapshot. It
will leave the files
+ * in user’s control and never delete any of the files. In this mode you can
start multiple jobs
+ * from the same snapshot.
+ */
+ NO_CLAIM(2),
+
+ /**
+ * The legacy mode is how Flink worked until 1.15. In this mode Flink will
never delete the
+ * initial checkpoint. At the same time, it is not clear if a user can ever
delete it as well.
+ */
LEGACY(3);
public static final String RESTORE_MODE = "execution.savepoint-restore-mode";
public static final int SINCE_FLINK_VERSION = 15;
- private final int value;
+ private final int mode;
public int get() {
- return this.value;
+ return this.mode;
}
- RestoreMode(int value) {
- this.value = value;
+ RestoreMode(int mode) {
+ this.mode = mode;
}
public String getName() {
- return String.valueOf(RestoreMode.of(this.value));
+ return RestoreMode.of(this.mode).toString();
}
public static RestoreMode of(Integer value) {
- return Arrays.stream(values()).filter((x) -> x.value ==
value).findFirst().orElse(null);
+ return Arrays.stream(values()).filter((x) -> x.mode ==
value).findFirst().orElse(null);
}
}
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/Semantic.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/Semantic.java
index 65dc3c5ea..f405f2818 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/Semantic.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/Semantic.java
@@ -17,15 +17,18 @@
package org.apache.streampark.common.enums;
+/** Flink consistency semantics */
public enum Semantic {
- /** */
+ /**
+ * Ensure that the counting results obtained after a fault are consistent
with the correct values.
+ */
EXACTLY_ONCE,
- /** */
+ /** The program may calculate more after a malfunction, but it will never
calculate less. */
AT_LEAST_ONCE,
- /** */
+ /** After the fault occurs, the counting results may be lost. */
NONE;
public static Semantic of(String name) {
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/enums/StorageType.java
b/streampark-common/src/main/java/org/apache/streampark/common/enums/StorageType.java
index 6890d669f..11be2cbec 100644
---
a/streampark-common/src/main/java/org/apache/streampark/common/enums/StorageType.java
+++
b/streampark-common/src/main/java/org/apache/streampark/common/enums/StorageType.java
@@ -27,14 +27,14 @@ public enum StorageType {
/** local File system */
LFS("lfs");
- private final String identifier;
+ private final String type;
- StorageType(String identifier) {
- this.identifier = identifier;
+ StorageType(String type) {
+ this.type = type;
}
- public String getIdentifier() {
- return identifier;
+ public String getType() {
+ return type;
}
public static StorageType of(String identifier) {
@@ -42,7 +42,7 @@ public enum StorageType {
return LFS;
}
for (StorageType type : values()) {
- if (type.identifier.equals(identifier)) {
+ if (type.type.equals(identifier)) {
return type;
}
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 6c5c6a6d5..f465f25e5 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -21,13 +21,11 @@ import java.time.LocalDateTime
object ConfigConst {
/** common const */
-
val DEFAULT_DATAMASK_STRING = "********"
val PARAM_PREFIX = "--"
/** pyflink */
-
val PYTHON_SUFFIX = ".py"
val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"
@@ -35,7 +33,6 @@ object ConfigConst {
val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"
/** about parameter... */
-
val KEY_APP_HOME = "app.home"
val KEY_HOST = "host"
@@ -53,12 +50,6 @@ object ConfigConst {
val KEY_SEMANTIC = "semantic"
/** sign.... */
- val SIGN_COLON = ":"
-
- val SIGN_SEMICOLON = ";"
-
- val SIGN_COMMA = ","
-
val SIGN_EMPTY = ""
/** kerberos */
@@ -79,15 +70,14 @@ object ConfigConst {
val KEY_SECURITY_KERBEROS_KRB5_CONF = "security.kerberos.login.krb5"
- // spark
-
+ /** about spark */
val KEY_SPARK_MAIN_CLASS = "spark.main.class"
val KEY_SPARK_APP_NAME = "spark.app.name"
val KEY_SPARK_BATCH_DURATION = "spark.batch.duration"
- // flink
+ /** about config flink */
def KEY_APP_CONF(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}conf"
def KEY_FLINK_CONF(prefix: String = null): String =
s"${Option(prefix).getOrElse("")}flink.conf"
@@ -123,7 +113,7 @@ object ConfigConst {
val FLINK_NATIVE_KUBERNETES_LABEL = "flink-native-kubernetes"
- // ---table---
+ /** about flink table */
val KEY_FLINK_TABLE_PLANNER = "flink.table.planner"
val KEY_FLINK_TABLE_MODE = "flink.table.mode"
@@ -133,7 +123,6 @@ object ConfigConst {
val KEY_FLINK_TABLE_DATABASE = "flink.table.database"
/** about config Kafka */
-
val KAFKA_SINK_PREFIX = "kafka.sink."
val KAFKA_SOURCE_PREFIX = "kafka.source."
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index 6585f7e56..033157ac0 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -59,8 +59,6 @@ class FlinkVersion(val flinkHome: String) extends
java.io.Serializable with Logg
lib
}
- lazy val flinkLibs: List[NetURL] =
flinkLib.listFiles().map(_.toURI.toURL).toList
-
lazy val version: String = {
val flinkVersion = new AtomicReference[String]
val cmd = List(
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index 39f3fc3d4..95ed2de1c 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -18,7 +18,6 @@
package org.apache.streampark.common.conf
/** Flink kubernetes Configuration for v1 version */
-
object K8sFlinkConfig {
lazy val isV2Enabled: Boolean = InternalConfigHolder.get(ENABLE_V2)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala
index 5a74ab73d..2565841ba 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala
@@ -31,9 +31,11 @@ object HdfsUtils extends Logger {
def getDefaultFS: String =
HadoopUtils.hadoopConf.get(FileSystem.FS_DEFAULT_NAME_KEY)
- def list(src: String): List[FileStatus] =
HadoopUtils.hdfs.listStatus(getPath(src)).toList
+ def list(src: String): List[FileStatus] =
+ HadoopUtils.hdfs.listStatus(getPath(src)).toList
- def move(src: String, dst: String): Unit =
HadoopUtils.hdfs.rename(getPath(src), getPath(dst))
+ def move(src: String, dst: String): Unit =
+ HadoopUtils.hdfs.rename(getPath(src), getPath(dst))
def mkdirs(path: String): Unit = HadoopUtils.hdfs.mkdirs(getPath(path))
@@ -121,7 +123,8 @@ object HdfsUtils extends Logger {
val path: Path = getPath(fileName)
require(
HadoopUtils.hdfs.exists(path) && !HadoopUtils.hdfs.isDirectory(path),
- s"[StreamPark] HdfsUtils.read: path($fileName) not exists or isDirectory
")
+ s"[StreamPark] HdfsUtils.read: path($fileName) not exists or isDirectory
"
+ )
val in = HadoopUtils.hdfs.open(path)
val out = new ByteArrayOutputStream()
IOUtils.copyBytes(in, out, 4096, false)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
index ba38150c2..3b629cab3 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
@@ -247,7 +247,8 @@ object SqlConvertUtils extends Logger {
case _ => null
}
},
- postfix)
+ postfix
+ )
def mysqlToClickhouse(sql: String, postfix: String): String =
convertSql(sql, toClickhouseDataType, postfix = postfix)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
index 2fad82e5b..512aefb4d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
@@ -68,7 +68,7 @@ public class FlinkSqlController {
RestResponse.success()
.data(false)
.message(exception)
- .put("type", flinkSqlValidationResult.failedType().getValue())
+ .put("type",
flinkSqlValidationResult.failedType().getFailedType())
.put("start", flinkSqlValidationResult.lineStart())
.put("end", flinkSqlValidationResult.lineEnd());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 6299bcb24..2e8a06b61 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -441,24 +441,24 @@ public class Application implements Serializable {
@JsonIgnore
public boolean isFlinkSqlJob() {
- return DevelopmentMode.FLINK_SQL.getValue().equals(this.getJobType());
+ return DevelopmentMode.FLINK_SQL.getMode().equals(this.getJobType());
}
@JsonIgnore
public boolean isFlinkSqlJobOrPyFlinkJob() {
- return DevelopmentMode.FLINK_SQL.getValue().equals(this.getJobType())
- || DevelopmentMode.PYFLINK.getValue().equals(this.getJobType());
+ return DevelopmentMode.FLINK_SQL.getMode().equals(this.getJobType())
+ || DevelopmentMode.PYFLINK.getMode().equals(this.getJobType());
}
@JsonIgnore
public boolean isCustomCodeJob() {
- return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType());
+ return DevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType());
}
@JsonIgnore
public boolean isCustomCodeOrPyFlinkJob() {
- return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType())
- || DevelopmentMode.PYFLINK.getValue().equals(this.getJobType());
+ return DevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType())
+ || DevelopmentMode.PYFLINK.getMode().equals(this.getJobType());
}
@JsonIgnore
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index bbf30c1f5..d7f62ab08 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -153,11 +153,11 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
successful, String.format(ERROR_CLUSTER_QUEUE_HINT,
flinkCluster.getYarnQueue()));
flinkCluster.setCreateTime(new Date());
if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
- flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+ flinkCluster.setClusterState(ClusterState.RUNNING.getState());
flinkCluster.setStartTime(new Date());
flinkCluster.setEndTime(null);
} else {
- flinkCluster.setClusterState(ClusterState.CREATED.getValue());
+ flinkCluster.setClusterState(ClusterState.CREATED.getState());
}
boolean ret = save(flinkCluster);
if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) {
@@ -188,7 +188,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setAddress(deployResponse.address());
}
flinkCluster.setClusterId(deployResponse.clusterId());
- flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+ flinkCluster.setClusterState(ClusterState.RUNNING.getState());
flinkCluster.setException(null);
flinkCluster.setEndTime(null);
updateById(flinkCluster);
@@ -198,7 +198,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
}
} catch (Exception e) {
log.error(e.getMessage(), e);
- flinkCluster.setClusterState(ClusterState.FAILED.getValue());
+ flinkCluster.setClusterState(ClusterState.FAILED.getState());
flinkCluster.setException(e.toString());
updateById(flinkCluster);
throw new ApiDetailException(e);
@@ -217,7 +217,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
flinkCluster.setDescription(paramOfCluster.getDescription());
if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
flinkCluster.setAddress(paramOfCluster.getAddress());
- flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+ flinkCluster.setClusterState(ClusterState.RUNNING.getState());
flinkCluster.setStartTime(new Date());
flinkCluster.setEndTime(null);
FlinkClusterWatcher.addWatching(flinkCluster);
@@ -249,7 +249,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
ShutDownResponse shutDownResponse =
shutdownInternal(flinkCluster, flinkCluster.getClusterId());
ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response
failed");
- flinkCluster.setClusterState(ClusterState.CANCELED.getValue());
+ flinkCluster.setClusterState(ClusterState.CANCELED.getState());
flinkCluster.setEndTime(new Date());
updateById(flinkCluster);
FlinkClusterWatcher.unWatching(flinkCluster);
@@ -321,7 +321,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
LambdaUpdateWrapper<FlinkCluster> updateWrapper =
new LambdaUpdateWrapper<FlinkCluster>()
.eq(FlinkCluster::getId, id)
- .set(FlinkCluster::getClusterState, state.getValue());
+ .set(FlinkCluster::getClusterState, state.getState());
switch (state) {
case KILLED:
@@ -447,7 +447,7 @@ public class FlinkClusterServiceImpl extends
ServiceImpl<FlinkClusterMapper, Fli
ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
"Current cluster is not active, please check!");
if (!flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
- flinkCluster.setClusterState(ClusterState.LOST.getValue());
+ flinkCluster.setClusterState(ClusterState.LOST.getState());
updateById(flinkCluster);
throw new ApiAlertException("Current cluster is not active, please
check!");
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 5aba86351..f94df947b 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -99,7 +99,7 @@ public class FlinkClusterWatcher {
List<FlinkCluster> flinkClusters =
flinkClusterService.list(
new LambdaQueryWrapper<FlinkCluster>()
- .eq(FlinkCluster::getClusterState,
ClusterState.RUNNING.getValue())
+ .eq(FlinkCluster::getClusterState,
ClusterState.RUNNING.getState())
// excluding flink clusters on kubernetes
.notIn(FlinkCluster::getExecutionMode,
ExecutionMode.getKubernetesMode()));
flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(),
cluster));
@@ -138,7 +138,7 @@ public class FlinkClusterWatcher {
cluster.setAffectedJobs(
applicationInfoService.countAffectedByClusterId(
cluster.getId(),
InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
- cluster.setClusterState(state.getValue());
+ cluster.setClusterState(state.getState());
cluster.setEndTime(new Date());
alertService.alert(cluster.getAlertId(), AlertTemplate.of(cluster,
state));
}
diff --git
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
index 47ef5c536..6725bba4b 100644
---
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
+++
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
@@ -221,7 +221,7 @@ class FlinkK8sObserverBroker @Autowired() (
.tap { case (id, state, error) =>
safeUpdateFlinkClusterRecord(id)(
new LambdaUpdateWrapper[FlinkCluster]
- .set(FlinkCluster.SFunc.CLUSTER_STATE, state.getValue)
+ .set(FlinkCluster.SFunc.CLUSTER_STATE, state.getState)
.set(error.isDefined, FlinkCluster.SFunc.EXCEPTION, error.get))
}
// Alter for unhealthy state in parallel
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
index 5d37b6200..47decc93b 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
@@ -97,14 +97,14 @@ class SavePointServiceTest extends SpringUnitTestBase {
app.setAppType(ApplicationType.APACHE_FLINK.getType());
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
- app.setJobType(DevelopmentMode.CUSTOM_CODE.getValue());
+ app.setJobType(DevelopmentMode.CUSTOM_CODE.getMode());
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
// Test for (StreamPark job Or FlinkSQL job) without application config.
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
- app.setJobType(DevelopmentMode.CUSTOM_CODE.getValue());
+ app.setJobType(DevelopmentMode.CUSTOM_CODE.getMode());
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
// Test for (StreamPark job Or FlinkSQL job) with application config just
disabled checkpoint.
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 4989abaf8..7650b84b3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -116,7 +116,10 @@ case class SubmitRequest(
case "conf://" => PropertiesUtils.fromHoconText(content)
case "prop://" => PropertiesUtils.fromPropertiesText(content)
case "hdfs://" =>
- // 如果配置文件为hdfs方式,则需要用户将hdfs相关配置文件copy到resources下...
+ /**
+ * If the configuration file is HDFS mode, you need to copy the HDFS
related configuration
+ * file to resources.
+ */
val text = HdfsUtils.read(this.appConf)
val extension = this.appConf.split("\\.").last.toLowerCase
extension match {
@@ -139,7 +142,10 @@ case class SubmitRequest(
private[client] lazy val hdfsWorkspace = {
- /** 必须保持本机flink和hdfs里的flink版本和配置都完全一致. */
+ /**
+ * The flink version and configuration in the native flink and hdfs must
be kept exactly the
+ * same.
+ */
val workspace = Workspace.remote
val flinkHome = flinkVersion.flinkHome
val flinkHomeDir = new File(flinkHome)