This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new 60c50918d [ISSUE-3044][Improve] Improve streampark-common module base 
on 3.1 Naming Style (#3130)
60c50918d is described below

commit 60c50918dc7da9abe69a10a4290cee2dc1f74366
Author: Kick156 <[email protected]>
AuthorDate: Sat Sep 23 12:30:28 2023 +0800

    [ISSUE-3044][Improve] Improve streampark-common module base on 3.1 Naming 
Style (#3130)
    
    * [Improve] Improve streampark-common module base on 3.1 Naming Style
---
 .../streampark/common/enums/ApplicationType.java   |  4 +++
 .../streampark/common/enums/ClusterState.java      | 29 +++++++++++++-----
 .../streampark/common/enums/DevelopmentMode.java   | 18 ++++++------
 .../streampark/common/enums/ExecutionMode.java     |  6 ++++
 .../common/enums/FlinkK8sRestExposedType.java      | 12 ++++----
 .../common/enums/FlinkSqlValidationFailedType.java | 17 +++++++----
 .../streampark/common/enums/ResolveOrder.java      | 14 +++++----
 .../streampark/common/enums/RestoreMode.java       | 34 +++++++++++++++++-----
 .../apache/streampark/common/enums/Semantic.java   |  9 ++++--
 .../streampark/common/enums/StorageType.java       | 12 ++++----
 .../streampark/common/conf/ConfigConst.scala       | 17 ++---------
 .../streampark/common/conf/FlinkVersion.scala      |  2 --
 .../streampark/common/conf/K8sFlinkConfig.scala    |  1 -
 .../apache/streampark/common/util/HdfsUtils.scala  |  9 ++++--
 .../streampark/common/util/SqlConvertUtils.scala   |  3 +-
 .../core/controller/FlinkSqlController.java        |  2 +-
 .../console/core/entity/Application.java           | 12 ++++----
 .../core/service/impl/FlinkClusterServiceImpl.java | 16 +++++-----
 .../console/core/task/FlinkClusterWatcher.java     |  4 +--
 .../console/core/task/FlinkK8sObserverBroker.scala |  2 +-
 .../console/core/service/SavePointServiceTest.java |  4 +--
 .../flink/client/bean/SubmitRequest.scala          | 10 +++++--
 22 files changed, 142 insertions(+), 95 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ApplicationType.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ApplicationType.java
index ed87b8a10..70c992d82 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ApplicationType.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ApplicationType.java
@@ -18,12 +18,16 @@
 package org.apache.streampark.common.enums;
 
 public enum ApplicationType {
+
   /** StreamPark Flink */
   STREAMPARK_FLINK(1, "StreamPark Flink"),
+
   /** Apache Flink */
   APACHE_FLINK(2, "Apache Flink"),
+
   /** StreamPark Spark */
   STREAMPARK_SPARK(3, "StreamPark Spark"),
+
   /** Apache Spark */
   APACHE_SPARK(4, "Apache Spark");
 
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
index 284b60a36..b2599b74c 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ClusterState.java
@@ -19,47 +19,60 @@ package org.apache.streampark.common.enums;
 
 /** @since 1.2.3 */
 public enum ClusterState {
+
   /** The cluster was just created but not started */
   CREATED(0),
+
   /** cluster started */
   RUNNING(1),
+
   /** cluster stopped */
   CANCELED(2),
+
   /** cluster lost */
   LOST(3),
+
   /** cluster unknown */
   UNKNOWN(4),
+
+  /** cluster starting */
   STARTING(5),
+
+  /** cluster cancelling */
   CANCELLING(6),
+
+  /** cluster failed */
   FAILED(7),
+
+  /** cluster killed */
   KILLED(8);
 
-  private final Integer value;
+  private final Integer state;
 
-  ClusterState(Integer value) {
-    this.value = value;
+  ClusterState(Integer state) {
+    this.state = state;
   }
 
   public static ClusterState of(Integer value) {
     for (ClusterState clusterState : values()) {
-      if (clusterState.value.equals(value)) {
+      if (clusterState.state.equals(value)) {
         return clusterState;
       }
     }
     return ClusterState.UNKNOWN;
   }
 
-  public static ClusterState of(String value) {
+  public static ClusterState of(String name) {
     for (ClusterState clusterState : values()) {
-      if (clusterState.name().equals(value)) {
+      if (clusterState.name().equals(name)) {
         return clusterState;
       }
     }
     return ClusterState.UNKNOWN;
   }
 
-  public Integer getValue() {
-    return value;
+  public Integer getState() {
+    return state;
   }
 
   public static boolean isRunning(ClusterState state) {
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
index 7bf3109ee..00580a2cd 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/DevelopmentMode.java
@@ -28,25 +28,25 @@ public enum DevelopmentMode {
   /** Py flink */
   PYFLINK("Python Flink", 3);
 
-  private final String mode;
+  private final String name;
 
-  private final Integer value;
+  private final Integer mode;
 
-  DevelopmentMode(String mode, Integer value) {
+  DevelopmentMode(String name, Integer mode) {
+    this.name = name;
     this.mode = mode;
-    this.value = value;
   }
 
   public static DevelopmentMode of(Integer value) {
-    for (DevelopmentMode mode : values()) {
-      if (mode.value.equals(value)) {
-        return mode;
+    for (DevelopmentMode developmentMode : values()) {
+      if (developmentMode.mode.equals(value)) {
+        return developmentMode;
       }
     }
     return null;
   }
 
-  public Integer getValue() {
-    return value;
+  public Integer getMode() {
+    return mode;
   }
 }
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
index 889cb915d..c903a5bcd 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java
@@ -25,16 +25,22 @@ public enum ExecutionMode {
 
   /** Local mode */
   LOCAL(0, "local"),
+
   /** remote */
   REMOTE(1, "remote"),
+
   /** yarn-per-job mode */
   YARN_PER_JOB(2, "yarn-per-job"),
+
   /** yarn session */
   YARN_SESSION(3, "yarn-session"),
+
   /** yarn application */
   YARN_APPLICATION(4, "yarn-application"),
+
   /** kubernetes session */
   KUBERNETES_NATIVE_SESSION(5, "kubernetes-session"),
+
   /** kubernetes application */
   KUBERNETES_NATIVE_APPLICATION(6, "kubernetes-application");
 
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkK8sRestExposedType.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkK8sRestExposedType.java
index 79f6e4196..c87c5a3a6 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkK8sRestExposedType.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkK8sRestExposedType.java
@@ -31,16 +31,16 @@ public enum FlinkK8sRestExposedType {
 
   private final String name;
 
-  private final Integer value;
+  private final Integer type;
 
-  FlinkK8sRestExposedType(String name, Integer value) {
+  FlinkK8sRestExposedType(String name, Integer type) {
     this.name = name;
-    this.value = value;
+    this.type = type;
   }
 
   public static FlinkK8sRestExposedType of(Integer value) {
     for (FlinkK8sRestExposedType order : values()) {
-      if (order.value.equals(value)) {
+      if (order.type.equals(value)) {
         return order;
       }
     }
@@ -51,7 +51,7 @@ public enum FlinkK8sRestExposedType {
     return name;
   }
 
-  public Integer getValue() {
-    return value;
+  public Integer getType() {
+    return type;
   }
 }
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
index 856c39bf9..6f9b8a067 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/FlinkSqlValidationFailedType.java
@@ -18,33 +18,38 @@
 package org.apache.streampark.common.enums;
 
 public enum FlinkSqlValidationFailedType {
+
   /** Basic test failed (such as null, etc.) */
   VERIFY_FAILED(1),
+
   /** syntax error */
   SYNTAX_ERROR(2),
+
   /** unsupported dialect */
   UNSUPPORTED_DIALECT(3),
+
   /** unsupported sql command */
   UNSUPPORTED_SQL(4),
+
   /** Not at the end of ";" */
   ENDS_WITH(5);
 
-  private final int value;
+  private final int failedType;
 
-  FlinkSqlValidationFailedType(int value) {
-    this.value = value;
+  FlinkSqlValidationFailedType(int failedType) {
+    this.failedType = failedType;
   }
 
   public static FlinkSqlValidationFailedType of(Integer value) {
     for (FlinkSqlValidationFailedType type : values()) {
-      if (type.value == value) {
+      if (type.failedType == value) {
         return type;
       }
     }
     return null;
   }
 
-  public int getValue() {
-    return value;
+  public int getFailedType() {
+    return failedType;
   }
 }
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ResolveOrder.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ResolveOrder.java
index 8a2b207e1..e69823e27 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/ResolveOrder.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/ResolveOrder.java
@@ -19,23 +19,25 @@ package org.apache.streampark.common.enums;
 
 /** classloader.resolve-order */
 public enum ResolveOrder {
+
   /** parent-first */
   PARENT_FIRST("parent-first", 0),
+
   /** child-first */
   CHILD_FIRST("child-first", 1);
 
   private final String name;
 
-  private final Integer value;
+  private final Integer order;
 
-  ResolveOrder(String name, Integer value) {
+  ResolveOrder(String name, Integer order) {
     this.name = name;
-    this.value = value;
+    this.order = order;
   }
 
   public static ResolveOrder of(Integer value) {
     for (ResolveOrder order : values()) {
-      if (order.value.equals(value)) {
+      if (order.order.equals(value)) {
         return order;
       }
     }
@@ -46,7 +48,7 @@ public enum ResolveOrder {
     return name;
   }
 
-  public Integer getValue() {
-    return value;
+  public Integer getOrder() {
+    return order;
   }
 }
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
index 131507798..63ae3a819 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/RestoreMode.java
@@ -20,28 +20,46 @@ package org.apache.streampark.common.enums;
 import java.util.Arrays;
 
 public enum RestoreMode {
-  NO_CLAIM(1),
-  CLAIM(2),
+
+  /**
+   * In this mode Flink claims ownership of the snapshot and essentially 
treats it like a
+   * checkpoint: its controls the lifecycle and might delete it if it is not 
needed for recovery
+   * anymore. Hence, it is not safe to manually delete the snapshot or to 
start two jobs from the
+   * same snapshot. Flink keeps around a configured number of checkpoints.
+   */
+  CLAIM(1),
+
+  /**
+   * In the NO_CLAIM mode Flink will not assume ownership of the snapshot. It 
will leave the files
+   * in user’s control and never delete any of the files. In this mode you can 
start multiple jobs
+   * from the same snapshot.
+   */
+  NO_CLAIM(2),
+
+  /**
+   * The legacy mode is how Flink worked until 1.15. In this mode Flink will 
never delete the
+   * initial checkpoint. At the same time, it is not clear if a user can ever 
delete it as well.
+   */
   LEGACY(3);
 
   public static final String RESTORE_MODE = "execution.savepoint-restore-mode";
   public static final int SINCE_FLINK_VERSION = 15;
 
-  private final int value;
+  private final int mode;
 
   public int get() {
-    return this.value;
+    return this.mode;
   }
 
-  RestoreMode(int value) {
-    this.value = value;
+  RestoreMode(int mode) {
+    this.mode = mode;
   }
 
   public String getName() {
-    return String.valueOf(RestoreMode.of(this.value));
+    return RestoreMode.of(this.mode).toString();
   }
 
   public static RestoreMode of(Integer value) {
-    return Arrays.stream(values()).filter((x) -> x.value == 
value).findFirst().orElse(null);
+    return Arrays.stream(values()).filter((x) -> x.mode == 
value).findFirst().orElse(null);
   }
 }
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/Semantic.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/Semantic.java
index 65dc3c5ea..f405f2818 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/Semantic.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/Semantic.java
@@ -17,15 +17,18 @@
 
 package org.apache.streampark.common.enums;
 
+/** Flink consistency semantics */
 public enum Semantic {
 
-  /** */
+  /**
+   * Ensure that the counting results obtained after a fault are consistent 
with the correct values.
+   */
   EXACTLY_ONCE,
 
-  /** */
+  /** The program may calculate more after a malfunction, but it will never 
calculate less. */
   AT_LEAST_ONCE,
 
-  /** */
+  /** After the fault occurs, the counting results may be lost. */
   NONE;
 
   public static Semantic of(String name) {
diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/StorageType.java
 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/StorageType.java
index 6890d669f..11be2cbec 100644
--- 
a/streampark-common/src/main/java/org/apache/streampark/common/enums/StorageType.java
+++ 
b/streampark-common/src/main/java/org/apache/streampark/common/enums/StorageType.java
@@ -27,14 +27,14 @@ public enum StorageType {
   /** local File system */
   LFS("lfs");
 
-  private final String identifier;
+  private final String type;
 
-  StorageType(String identifier) {
-    this.identifier = identifier;
+  StorageType(String type) {
+    this.type = type;
   }
 
-  public String getIdentifier() {
-    return identifier;
+  public String getType() {
+    return type;
   }
 
   public static StorageType of(String identifier) {
@@ -42,7 +42,7 @@ public enum StorageType {
       return LFS;
     }
     for (StorageType type : values()) {
-      if (type.identifier.equals(identifier)) {
+      if (type.type.equals(identifier)) {
         return type;
       }
     }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
index 6c5c6a6d5..f465f25e5 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
@@ -21,13 +21,11 @@ import java.time.LocalDateTime
 object ConfigConst {
 
   /** common const */
-
   val DEFAULT_DATAMASK_STRING = "********"
 
   val PARAM_PREFIX = "--"
 
   /** pyflink */
-
   val PYTHON_SUFFIX = ".py"
 
   val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"
@@ -35,7 +33,6 @@ object ConfigConst {
   val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"
 
   /** about parameter... */
-
   val KEY_APP_HOME = "app.home"
 
   val KEY_HOST = "host"
@@ -53,12 +50,6 @@ object ConfigConst {
   val KEY_SEMANTIC = "semantic"
 
   /** sign.... */
-  val SIGN_COLON = ":"
-
-  val SIGN_SEMICOLON = ";"
-
-  val SIGN_COMMA = ","
-
   val SIGN_EMPTY = ""
 
   /** kerberos */
@@ -79,15 +70,14 @@ object ConfigConst {
 
   val KEY_SECURITY_KERBEROS_KRB5_CONF = "security.kerberos.login.krb5"
 
-  // spark
-
+  /** about spark */
   val KEY_SPARK_MAIN_CLASS = "spark.main.class"
 
   val KEY_SPARK_APP_NAME = "spark.app.name"
 
   val KEY_SPARK_BATCH_DURATION = "spark.batch.duration"
 
-  // flink
+  /** about config flink */
   def KEY_APP_CONF(prefix: String = null): String = 
s"${Option(prefix).getOrElse("")}conf"
 
   def KEY_FLINK_CONF(prefix: String = null): String = 
s"${Option(prefix).getOrElse("")}flink.conf"
@@ -123,7 +113,7 @@ object ConfigConst {
 
   val FLINK_NATIVE_KUBERNETES_LABEL = "flink-native-kubernetes"
 
-  // ---table---
+  /** about flink table */
   val KEY_FLINK_TABLE_PLANNER = "flink.table.planner"
 
   val KEY_FLINK_TABLE_MODE = "flink.table.mode"
@@ -133,7 +123,6 @@ object ConfigConst {
   val KEY_FLINK_TABLE_DATABASE = "flink.table.database"
 
   /** about config Kafka */
-
   val KAFKA_SINK_PREFIX = "kafka.sink."
 
   val KAFKA_SOURCE_PREFIX = "kafka.source."
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
index 6585f7e56..033157ac0 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/FlinkVersion.scala
@@ -59,8 +59,6 @@ class FlinkVersion(val flinkHome: String) extends 
java.io.Serializable with Logg
     lib
   }
 
-  lazy val flinkLibs: List[NetURL] = 
flinkLib.listFiles().map(_.toURI.toURL).toList
-
   lazy val version: String = {
     val flinkVersion = new AtomicReference[String]
     val cmd = List(
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
index 39f3fc3d4..95ed2de1c 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/K8sFlinkConfig.scala
@@ -18,7 +18,6 @@
 package org.apache.streampark.common.conf
 
 /** Flink kubernetes Configuration for v1 version */
-
 object K8sFlinkConfig {
 
   lazy val isV2Enabled: Boolean = InternalConfigHolder.get(ENABLE_V2)
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala
index 5a74ab73d..2565841ba 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HdfsUtils.scala
@@ -31,9 +31,11 @@ object HdfsUtils extends Logger {
 
   def getDefaultFS: String = 
HadoopUtils.hadoopConf.get(FileSystem.FS_DEFAULT_NAME_KEY)
 
-  def list(src: String): List[FileStatus] = 
HadoopUtils.hdfs.listStatus(getPath(src)).toList
+  def list(src: String): List[FileStatus] =
+    HadoopUtils.hdfs.listStatus(getPath(src)).toList
 
-  def move(src: String, dst: String): Unit = 
HadoopUtils.hdfs.rename(getPath(src), getPath(dst))
+  def move(src: String, dst: String): Unit =
+    HadoopUtils.hdfs.rename(getPath(src), getPath(dst))
 
   def mkdirs(path: String): Unit = HadoopUtils.hdfs.mkdirs(getPath(path))
 
@@ -121,7 +123,8 @@ object HdfsUtils extends Logger {
     val path: Path = getPath(fileName)
     require(
       HadoopUtils.hdfs.exists(path) && !HadoopUtils.hdfs.isDirectory(path),
-      s"[StreamPark] HdfsUtils.read: path($fileName) not exists or isDirectory 
")
+      s"[StreamPark] HdfsUtils.read: path($fileName) not exists or isDirectory 
"
+    )
     val in = HadoopUtils.hdfs.open(path)
     val out = new ByteArrayOutputStream()
     IOUtils.copyBytes(in, out, 4096, false)
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
index ba38150c2..3b629cab3 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/SqlConvertUtils.scala
@@ -247,7 +247,8 @@ object SqlConvertUtils extends Logger {
         case _ => null
       }
     },
-    postfix)
+    postfix
+  )
 
   def mysqlToClickhouse(sql: String, postfix: String): String =
     convertSql(sql, toClickhouseDataType, postfix = postfix)
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
index 2fad82e5b..512aefb4d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/FlinkSqlController.java
@@ -68,7 +68,7 @@ public class FlinkSqlController {
           RestResponse.success()
               .data(false)
               .message(exception)
-              .put("type", flinkSqlValidationResult.failedType().getValue())
+              .put("type", 
flinkSqlValidationResult.failedType().getFailedType())
               .put("start", flinkSqlValidationResult.lineStart())
               .put("end", flinkSqlValidationResult.lineEnd());
 
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 6299bcb24..2e8a06b61 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -441,24 +441,24 @@ public class Application implements Serializable {
 
   @JsonIgnore
   public boolean isFlinkSqlJob() {
-    return DevelopmentMode.FLINK_SQL.getValue().equals(this.getJobType());
+    return DevelopmentMode.FLINK_SQL.getMode().equals(this.getJobType());
   }
 
   @JsonIgnore
   public boolean isFlinkSqlJobOrPyFlinkJob() {
-    return DevelopmentMode.FLINK_SQL.getValue().equals(this.getJobType())
-        || DevelopmentMode.PYFLINK.getValue().equals(this.getJobType());
+    return DevelopmentMode.FLINK_SQL.getMode().equals(this.getJobType())
+        || DevelopmentMode.PYFLINK.getMode().equals(this.getJobType());
   }
 
   @JsonIgnore
   public boolean isCustomCodeJob() {
-    return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType());
+    return DevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType());
   }
 
   @JsonIgnore
   public boolean isCustomCodeOrPyFlinkJob() {
-    return DevelopmentMode.CUSTOM_CODE.getValue().equals(this.getJobType())
-        || DevelopmentMode.PYFLINK.getValue().equals(this.getJobType());
+    return DevelopmentMode.CUSTOM_CODE.getMode().equals(this.getJobType())
+        || DevelopmentMode.PYFLINK.getMode().equals(this.getJobType());
   }
 
   @JsonIgnore
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
index bbf30c1f5..d7f62ab08 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/FlinkClusterServiceImpl.java
@@ -153,11 +153,11 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
         successful, String.format(ERROR_CLUSTER_QUEUE_HINT, 
flinkCluster.getYarnQueue()));
     flinkCluster.setCreateTime(new Date());
     if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
-      flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+      flinkCluster.setClusterState(ClusterState.RUNNING.getState());
       flinkCluster.setStartTime(new Date());
       flinkCluster.setEndTime(null);
     } else {
-      flinkCluster.setClusterState(ClusterState.CREATED.getValue());
+      flinkCluster.setClusterState(ClusterState.CREATED.getState());
     }
     boolean ret = save(flinkCluster);
     if (ret && ExecutionMode.isRemoteMode(flinkCluster.getExecutionMode())) {
@@ -188,7 +188,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
         flinkCluster.setAddress(deployResponse.address());
       }
       flinkCluster.setClusterId(deployResponse.clusterId());
-      flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+      flinkCluster.setClusterState(ClusterState.RUNNING.getState());
       flinkCluster.setException(null);
       flinkCluster.setEndTime(null);
       updateById(flinkCluster);
@@ -198,7 +198,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       }
     } catch (Exception e) {
       log.error(e.getMessage(), e);
-      flinkCluster.setClusterState(ClusterState.FAILED.getValue());
+      flinkCluster.setClusterState(ClusterState.FAILED.getState());
       flinkCluster.setException(e.toString());
       updateById(flinkCluster);
       throw new ApiDetailException(e);
@@ -217,7 +217,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     flinkCluster.setDescription(paramOfCluster.getDescription());
     if (ExecutionMode.isRemoteMode(flinkCluster.getExecutionModeEnum())) {
       flinkCluster.setAddress(paramOfCluster.getAddress());
-      flinkCluster.setClusterState(ClusterState.RUNNING.getValue());
+      flinkCluster.setClusterState(ClusterState.RUNNING.getState());
       flinkCluster.setStartTime(new Date());
       flinkCluster.setEndTime(null);
       FlinkClusterWatcher.addWatching(flinkCluster);
@@ -249,7 +249,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
       ShutDownResponse shutDownResponse =
           shutdownInternal(flinkCluster, flinkCluster.getClusterId());
       ApiAlertException.throwIfNull(shutDownResponse, "Get shutdown response 
failed");
-      flinkCluster.setClusterState(ClusterState.CANCELED.getValue());
+      flinkCluster.setClusterState(ClusterState.CANCELED.getState());
       flinkCluster.setEndTime(new Date());
       updateById(flinkCluster);
       FlinkClusterWatcher.unWatching(flinkCluster);
@@ -321,7 +321,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
     LambdaUpdateWrapper<FlinkCluster> updateWrapper =
         new LambdaUpdateWrapper<FlinkCluster>()
             .eq(FlinkCluster::getId, id)
-            .set(FlinkCluster::getClusterState, state.getValue());
+            .set(FlinkCluster::getClusterState, state.getState());
 
     switch (state) {
       case KILLED:
@@ -447,7 +447,7 @@ public class FlinkClusterServiceImpl extends 
ServiceImpl<FlinkClusterMapper, Fli
           ClusterState.isRunning(flinkCluster.getClusterStateEnum()),
           "Current cluster is not active, please check!");
       if (!flinkClusterWatcher.verifyClusterConnection(flinkCluster)) {
-        flinkCluster.setClusterState(ClusterState.LOST.getValue());
+        flinkCluster.setClusterState(ClusterState.LOST.getState());
         updateById(flinkCluster);
         throw new ApiAlertException("Current cluster is not active, please 
check!");
       }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
index 5aba86351..f94df947b 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkClusterWatcher.java
@@ -99,7 +99,7 @@ public class FlinkClusterWatcher {
     List<FlinkCluster> flinkClusters =
         flinkClusterService.list(
             new LambdaQueryWrapper<FlinkCluster>()
-                .eq(FlinkCluster::getClusterState, 
ClusterState.RUNNING.getValue())
+                .eq(FlinkCluster::getClusterState, 
ClusterState.RUNNING.getState())
                 // excluding flink clusters on kubernetes
                 .notIn(FlinkCluster::getExecutionMode, 
ExecutionMode.getKubernetesMode()));
     flinkClusters.forEach(cluster -> WATCHER_CLUSTERS.put(cluster.getId(), 
cluster));
@@ -138,7 +138,7 @@ public class FlinkClusterWatcher {
       cluster.setAffectedJobs(
           applicationInfoService.countAffectedByClusterId(
               cluster.getId(), 
InternalConfigHolder.get(CommonConfig.SPRING_PROFILES_ACTIVE())));
-      cluster.setClusterState(state.getValue());
+      cluster.setClusterState(state.getState());
       cluster.setEndTime(new Date());
       alertService.alert(cluster.getAlertId(), AlertTemplate.of(cluster, 
state));
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
index 47ef5c536..6725bba4b 100644
--- 
a/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
+++ 
b/streampark-console/streampark-console-service/src/main/scala/org/apache/streampark/console/core/task/FlinkK8sObserverBroker.scala
@@ -221,7 +221,7 @@ class FlinkK8sObserverBroker @Autowired() (
         .tap { case (id, state, error) =>
           safeUpdateFlinkClusterRecord(id)(
             new LambdaUpdateWrapper[FlinkCluster]
-              .set(FlinkCluster.SFunc.CLUSTER_STATE, state.getValue)
+              .set(FlinkCluster.SFunc.CLUSTER_STATE, state.getState)
               .set(error.isDefined, FlinkCluster.SFunc.EXCEPTION, error.get))
         }
         // Alter for unhealthy state in parallel
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
index 5d37b6200..47decc93b 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/core/service/SavePointServiceTest.java
@@ -97,14 +97,14 @@ class SavePointServiceTest extends SpringUnitTestBase {
     app.setAppType(ApplicationType.APACHE_FLINK.getType());
     
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
     app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
-    app.setJobType(DevelopmentMode.CUSTOM_CODE.getValue());
+    app.setJobType(DevelopmentMode.CUSTOM_CODE.getMode());
     
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
 
     // Test for (StreamPark job Or FlinkSQL job) without application config.
     app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
     
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
     app.setAppType(ApplicationType.STREAMPARK_FLINK.getType());
-    app.setJobType(DevelopmentMode.CUSTOM_CODE.getValue());
+    app.setJobType(DevelopmentMode.CUSTOM_CODE.getMode());
     
assertThat(savePointServiceImpl.getSavepointFromAppCfgIfStreamParkOrSQLJob(app)).isNull();
 
     // Test for (StreamPark job Or FlinkSQL job) with application config just 
disabled checkpoint.
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 4989abaf8..7650b84b3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -116,7 +116,10 @@ case class SubmitRequest(
         case "conf://" => PropertiesUtils.fromHoconText(content)
         case "prop://" => PropertiesUtils.fromPropertiesText(content)
         case "hdfs://" =>
-          // 如果配置文件为hdfs方式,则需要用户将hdfs相关配置文件copy到resources下...
+          /**
+           * If the configuration file is HDFS mode, you need to copy the HDFS 
related configuration
+           * file to resources.
+           */
           val text = HdfsUtils.read(this.appConf)
           val extension = this.appConf.split("\\.").last.toLowerCase
           extension match {
@@ -139,7 +142,10 @@ case class SubmitRequest(
 
   private[client] lazy val hdfsWorkspace = {
 
-    /** 必须保持本机flink和hdfs里的flink版本和配置都完全一致. */
+    /**
+     * The flink version and configuration in the native flink and hdfs must 
be kept exactly the
+     * same.
+     */
     val workspace = Workspace.remote
     val flinkHome = flinkVersion.flinkHome
     val flinkHomeDir = new File(flinkHome)

Reply via email to