This is an automated email from the ASF dual-hosted git repository.

panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new a35618b24 [Improve] Constant improvement (#3272)
a35618b24 is described below

commit a35618b2479936ef6915f201f959b5275bc3e19c
Author: benjobs <[email protected]>
AuthorDate: Mon Oct 23 03:27:35 2023 -0500

    [Improve] Constant improvement (#3272)
    
    * [Improve] Constant improvement
    
    * minor imprement
    
    ---------
    
    Co-authored-by: benjobs <[email protected]>
---
 .../org/apache/streampark/common/Constant.java     | 22 ++++++++++++-
 .../conf/{ConfigConst.scala => ConfigKeys.scala}   | 36 +---------------------
 .../common/conf/InternalConfigHolder.scala         |  3 +-
 .../streampark/common/util/ConfigUtils.scala       |  5 +--
 .../streampark/common/util/HBaseClient.scala       |  2 +-
 .../streampark/common/util/HadoopUtils.scala       |  6 ++--
 .../apache/streampark/common/util/JdbcUtils.scala  |  2 +-
 .../streampark/common/util/MongoConfig.scala       |  2 +-
 .../streampark/common/util/RedisClient.scala       |  5 +--
 .../streampark/common/util/RedisEndpoint.scala     |  2 +-
 .../org/apache/streampark/common/util/Utils.scala  | 19 +++++++++++-
 .../streampark/console/base/util/WebUtils.java     |  4 +--
 .../console/core/entity/Application.java           |  8 ++---
 .../console/core/entity/ApplicationConfig.java     | 22 ++++++-------
 .../console/core/entity/FlinkCluster.java          |  4 +--
 .../streampark/console/core/entity/Variable.java   |  4 +--
 .../console/core/runner/EnvInitializer.java        |  6 ++--
 .../impl/ApplicationActionServiceImpl.java         | 26 ++++++++--------
 .../core/service/impl/AppBuildPipeServiceImpl.java |  3 +-
 .../core/service/impl/ResourceServiceImpl.java     |  4 +--
 .../core/utils/YarnQueueLabelExpression.java       |  6 ++--
 .../streampark/console/system/entity/User.java     |  4 +--
 .../console/system/runner/StartedUpRunner.java     |  4 +--
 .../console/SpringIntegrationTestBase.java         |  4 +--
 .../streampark/console/SpringUnitTestBase.java     |  4 +--
 .../flink/client/bean/SubmitRequest.scala          |  9 +++---
 .../flink/client/impl/YarnApplicationClient.scala  |  9 +++---
 .../flink/client/trait/FlinkClientTrait.scala      |  9 +++---
 .../clickhouse/conf/ClickHouseHttpConfig.scala     |  4 +--
 .../connector/doris/conf/DorisConfig.scala         |  6 ++--
 .../connector/influx/function/InfluxFunction.scala |  2 +-
 .../connector/jdbc/internal/JdbcSinkFunction.scala |  2 +-
 .../flink/connector/jdbc/sink/JdbcSink.scala       |  2 +-
 .../flink/connector/kafka/sink/KafkaSink.scala     |  6 ++--
 .../flink/connector/kafka/source/KafkaSource.scala |  4 +--
 .../flink/core/scala/FlinkStreamTable.scala        |  2 +-
 .../flink/core/scala/FlinkStreaming.scala          |  6 ++--
 .../streampark/flink/core/scala/FlinkTable.scala   |  2 +-
 .../flink/kubernetes/KubernetesRetriever.scala     |  4 +--
 .../flink/kubernetes/ingress/IngressStrategy.scala |  4 +--
 .../kubernetes/watcher/FlinkK8sEventWatcher.scala  |  4 +--
 .../streampark/flink/proxy/FlinkShimsProxy.scala   |  6 ++--
 .../streampark/flink/core/EnhancerImplicit.scala   |  2 +-
 .../streampark/flink/core/FlinkSqlExecutor.scala   |  2 +-
 .../flink/core/FlinkStreamTableTrait.scala         |  5 +--
 .../flink/core/FlinkStreamingInitializer.scala     |  2 +-
 .../flink/core/FlinkTableInitializer.scala         |  2 +-
 .../streampark/flink/core/FlinkTableTrait.scala    |  5 +--
 .../streampark/flink/core/SqlCommandParser.scala   |  2 +-
 .../streampark/flink/core/conf/ParameterCli.scala  |  8 ++---
 .../flink/core/test/FlinkSqlExecuteFunSuite.scala  |  2 +-
 .../streampark/flink/core/TableContext.scala       |  4 +--
 .../streampark/flink/core/TableContext.scala       |  4 +--
 .../streampark/flink/core/TableContext.scala       |  4 +--
 .../streampark/flink/core/TableContext.scala       |  4 +--
 .../apache/streampark/flink/cli/SqlClient.scala    |  2 +-
 56 files changed, 173 insertions(+), 163 deletions(-)

diff --git 
a/streampark-common/src/main/java/org/apache/streampark/common/Constant.java 
b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
index ac02bb8a1..553d68bb3 100644
--- a/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
+++ b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
@@ -18,15 +18,35 @@
 package org.apache.streampark.common;
 
 /** A constant class to hold the constants variables. */
-public class Constant {
+public final class Constant {
 
   private Constant() {}
 
   public static final String DEFAULT = "default";
+
   public static final String STREAM_PARK = "streampark";
+
   public static final String HTTP_SCHEMA = "http://";;
+
   public static final String HTTPS_SCHEMA = "https://";;
+
   public static final String JAR_SUFFIX = ".jar";
+
   public static final String ZIP_SUFFIX = ".zip";
+
+  public static final String EMPTY_STRING = "";
+
+  public static final String PYTHON_SUFFIX = ".py";
+
   public static final String SEMICOLON = ";";
+
+  public static final String DEFAULT_DATAMASK_STRING = "********";
+
+  public static final String PYTHON_FLINK_DRIVER_CLASS_NAME =
+      "org.apache.flink.client.python.PythonDriver";
+
+  public static final String STREAMPARK_FLINKSQL_CLIENT_CLASS =
+      "org.apache.streampark.flink.cli.SqlClient";
+
+  public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3";
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala
similarity index 75%
rename from 
streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
rename to 
streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala
index f465f25e5..f2fb98fb0 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala
@@ -16,22 +16,10 @@
  */
 package org.apache.streampark.common.conf
 
-import java.time.LocalDateTime
-
-object ConfigConst {
-
-  /** common const */
-  val DEFAULT_DATAMASK_STRING = "********"
+object ConfigKeys {
 
   val PARAM_PREFIX = "--"
 
-  /** pyflink */
-  val PYTHON_SUFFIX = ".py"
-
-  val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"
-
-  val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"
-
   /** about parameter... */
   val KEY_APP_HOME = "app.home"
 
@@ -49,9 +37,6 @@ object ConfigConst {
 
   val KEY_SEMANTIC = "semantic"
 
-  /** sign.... */
-  val SIGN_EMPTY = ""
-
   /** kerberos */
   val KEY_KERBEROS = "kerberos"
 
@@ -180,23 +165,4 @@ object ConfigConst {
 
   val KEY_FLINK_TM_PROCESS_MEMORY = "taskmanager.memory.process.size"
 
-  val STREAMPARK_FLINKSQL_CLIENT_CLASS = 
"org.apache.streampark.flink.cli.SqlClient"
-
-  def printLogo(info: String): Unit = {
-    // scalastyle:off println
-    println("\n")
-    println("        _____ __                                             __   
    ")
-    println("       / ___// /_________  ____ _____ ___  ____  ____ ______/ /__ 
    ")
-    println("       \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\  __ `/ ___/ 
//_/")
-    println("      ___/ / /_/ /  /  __/ /_/ / / / / / / /_/ / /_/ / /  / ,<    
    ")
-    println("     /____/\\__/_/   \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/  
/_/|_|   ")
-    println("                                       /_/                        
\n\n")
-    println("    Version:  2.2.0-SNAPSHOT                                      
    ")
-    println("    WebSite:  https://streampark.apache.org                       
    ")
-    println("    GitHub :  https://github.com/apache/incubator-streampark      
              ")
-    println(s"    Info   :  $info                                 ")
-    println(s"    Time   :  ${LocalDateTime.now}              \n\n")
-    // scalastyle:on println
-  }
-
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
index f6631b697..18aae4e8e 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.common.conf
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
 import org.apache.streampark.common.util.ImplicitsUtils._
 
@@ -164,7 +165,7 @@ object InternalConfigHolder extends Logger {
                |  ${configKeys
                 .map(
                   key =>
-                    s"$key = ${if (key.contains("password")) 
ConfigConst.DEFAULT_DATAMASK_STRING
+                    s"$key = ${if (key.contains("password")) 
Constant.DEFAULT_DATAMASK_STRING
                       else get(key)}")
                 .mkString("\n  ")}""".stripMargin)
   }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
index e0c159299..4cb23a2ad 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.streampark.common.util
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.ConfigKeys._
 
 import java.util.{Map => JavaMap, Properties}
 
@@ -52,7 +53,7 @@ object ConfigUtils {
       val kafkaProperty = new Properties()
       param.foreach(x => kafkaProperty.put(x._1, x._2.trim))
       val _topic = topic match {
-        case SIGN_EMPTY =>
+        case Constant.EMPTY_STRING =>
           val top = kafkaProperty.getOrElse(KEY_KAFKA_TOPIC, null)
           if (top == null || top.split(",|\\s+").length > 1) {
             throw new IllegalArgumentException(
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
index f128f252b..e1ec974e7 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.common.util
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 
 import org.apache.hadoop.conf.Configuration
 import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index a2ae4bb90..93d0573a8 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.common.util
 
-import org.apache.streampark.common.conf.{CommonConfig, ConfigConst, 
InternalConfigHolder}
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.{CommonConfig, ConfigKeys, 
InternalConfigHolder}
+import org.apache.streampark.common.conf.ConfigKeys._
 
 import org.apache.commons.collections.CollectionUtils
 import org.apache.commons.lang3.StringUtils
@@ -62,7 +62,7 @@ object HadoopUtils extends Logger {
     InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME)
 
   private[this] lazy val kerberosConf: Map[String, String] =
-    SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME, null) match {
+    SystemPropertyUtils.get(ConfigKeys.KEY_APP_HOME, null) match {
       case null =>
         getClass.getResourceAsStream("/kerberos.yml") match {
           case x if x != null => PropertiesUtils.fromYamlFile(x)
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
index 6029d88e9..22bbfb889 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.common.util
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 
 import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
 
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
index e04c97d13..7a6764df9 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.common.util
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 
 import com.mongodb._
 
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
index 2b2a4420f..c208411cb 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
@@ -17,7 +17,8 @@
 
 package org.apache.streampark.common.util
 
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.ConfigKeys
 
 import redis.clients.jedis._
 import redis.clients.jedis.exceptions.JedisConnectionException
@@ -92,7 +93,7 @@ object RedisClient extends Logger {
    * @return
    */
   def createJedisPool(endpoint: RedisEndpoint): JedisPool = {
-    val endpointEn: RedisEndpoint = endpoint.copy(auth = 
ConfigConst.DEFAULT_DATAMASK_STRING)
+    val endpointEn: RedisEndpoint = endpoint.copy(auth = 
Constant.DEFAULT_DATAMASK_STRING)
     logInfo(s"[StreamPark] RedisClient: createJedisPool with $endpointEn ")
     new JedisPool(
       poolConfig,
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
index ab765e691..82de51421 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.common.util
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 
 import redis.clients.jedis.{Jedis, Protocol}
 import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, 
SafeEncoder}
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index d56322982..5ab002ab2 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -20,7 +20,7 @@ import org.apache.commons.lang3.StringUtils
 
 import java.io._
 import java.net.URL
-import java.time.Duration
+import java.time.{Duration, LocalDateTime}
 import java.util.{jar, Collection => JavaCollection, Map => JavaMap, 
Properties, UUID}
 import java.util.concurrent.locks.LockSupport
 import java.util.jar.{JarFile, JarInputStream}
@@ -156,4 +156,21 @@ object Utils extends Logger {
     }
   }
 
+  def printLogo(info: String): Unit = {
+    // scalastyle:off println
+    println("\n")
+    println("        _____ __                                             __   
    ")
+    println("       / ___// /_________  ____ _____ ___  ____  ____ ______/ /__ 
    ")
+    println("       \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\  __ `/ ___/ 
//_/")
+    println("      ___/ / /_/ /  /  __/ /_/ / / / / / / /_/ / /_/ / /  / ,<    
    ")
+    println("     /____/\\__/_/   \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/  
/_/|_|   ")
+    println("                                       /_/                        
\n\n")
+    println("    Version:  2.2.0-SNAPSHOT                                      
    ")
+    println("    WebSite:  https://streampark.apache.org                       
    ")
+    println("    GitHub :  https://github.com/apache/incubator-streampark      
              ")
+    println(s"    Info   :  $info                                 ")
+    println(s"    Time   :  ${LocalDateTime.now}              \n\n")
+    // scalastyle:on println
+  }
+
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
index cfef4cbe0..7313788e5 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.base.util;
 
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
 
 import org.apache.commons.lang3.StringUtils;
 
@@ -97,7 +97,7 @@ public final class WebUtils {
   }
 
   public static String getAppHome() {
-    return System.getProperty(ConfigConst.KEY_APP_HOME());
+    return System.getProperty(ConfigKeys.KEY_APP_HOME());
   }
 
   public static File getAppDir(String dir) {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 670795012..09deab73d 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.entity;
 
 import org.apache.streampark.common.Constant;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.FlinkDevelopmentMode;
@@ -278,10 +278,10 @@ public class Application implements Serializable {
 
     Map<String, Object> hotParamsMap = this.getHotParamsMap();
     if (MapUtils.isNotEmpty(hotParamsMap)
-        && hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
-      String yarnQueue = 
hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString();
+        && hotParamsMap.containsKey(ConfigKeys.KEY_YARN_APP_QUEUE())) {
+      String yarnQueue = 
hotParamsMap.get(ConfigKeys.KEY_YARN_APP_QUEUE()).toString();
       String labelExpr =
-          
Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL()))
+          
Optional.ofNullable(hotParamsMap.get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()))
               .map(Object::toString)
               .orElse(null);
       this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue, 
labelExpr).toString());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index b920d6e56..a50414839 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.entity;
 
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.util.DeflaterUtils;
 import org.apache.streampark.common.util.PropertiesUtils;
 import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
@@ -103,16 +103,16 @@ public class ApplicationConfig {
               Collectors.toMap(
                   entry -> {
                     String key = entry.getKey();
-                    if (key.startsWith(ConfigConst.KEY_FLINK_OPTION_PREFIX())) 
{
-                      key = 
key.substring(ConfigConst.KEY_FLINK_OPTION_PREFIX().length());
-                    } else if 
(key.startsWith(ConfigConst.KEY_FLINK_PROPERTY_PREFIX())) {
-                      key = 
key.substring(ConfigConst.KEY_FLINK_PROPERTY_PREFIX().length());
-                    } else if 
(key.startsWith(ConfigConst.KEY_FLINK_TABLE_PREFIX())) {
-                      key = 
key.substring(ConfigConst.KEY_FLINK_TABLE_PREFIX().length());
-                    } else if (key.startsWith(ConfigConst.KEY_APP_PREFIX())) {
-                      key = 
key.substring(ConfigConst.KEY_APP_PREFIX().length());
-                    } else if (key.startsWith(ConfigConst.KEY_SQL_PREFIX())) {
-                      key = 
key.substring(ConfigConst.KEY_SQL_PREFIX().length());
+                    if (key.startsWith(ConfigKeys.KEY_FLINK_OPTION_PREFIX())) {
+                      key = 
key.substring(ConfigKeys.KEY_FLINK_OPTION_PREFIX().length());
+                    } else if 
(key.startsWith(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX())) {
+                      key = 
key.substring(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX().length());
+                    } else if 
(key.startsWith(ConfigKeys.KEY_FLINK_TABLE_PREFIX())) {
+                      key = 
key.substring(ConfigKeys.KEY_FLINK_TABLE_PREFIX().length());
+                    } else if (key.startsWith(ConfigKeys.KEY_APP_PREFIX())) {
+                      key = 
key.substring(ConfigKeys.KEY_APP_PREFIX().length());
+                    } else if (key.startsWith(ConfigKeys.KEY_SQL_PREFIX())) {
+                      key = 
key.substring(ConfigKeys.KEY_SQL_PREFIX().length());
                     }
                     return key;
                   },
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 48f3d8b15..611fc038e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.entity;
 
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.enums.ClusterState;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
@@ -138,7 +138,7 @@ public class FlinkCluster implements Serializable {
     }
     Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
     if (FlinkExecutionMode.YARN_SESSION == getFlinkExecutionModeEnum()) {
-      map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
+      map.put(ConfigKeys.KEY_YARN_APP_NAME(), this.clusterName);
       map.putAll(YarnQueueLabelExpression.getQueueLabelMap(yarnQueue));
     }
     map.entrySet().removeIf(entry -> entry.getValue() == null);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
index 458c74d3d..6d4f1f9e4 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.entity;
 
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.Constant;
 
 import com.baomidou.mybatisplus.annotation.IdType;
 import com.baomidou.mybatisplus.annotation.TableId;
@@ -70,7 +70,7 @@ public class Variable implements Serializable {
 
   public void dataMasking() {
     if (desensitization) {
-      this.setVariableValue(ConfigConst.DEFAULT_DATAMASK_STRING());
+      this.setVariableValue(Constant.DEFAULT_DATAMASK_STRING);
     }
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 13d631d4f..86dc54169 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.runner;
 
 import org.apache.streampark.common.conf.CommonConfig;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.InternalOption;
 import org.apache.streampark.common.conf.Workspace;
@@ -92,14 +92,14 @@ public class EnvInitializer implements ApplicationRunner {
                   + " The system initialization check failed. If started local 
for development and debugging,"
                   + " please ensure the -D%s parameter is clearly specified,"
                   + " more detail: 
https://streampark.apache.org/docs/user-guide/deployment";,
-              ConfigConst.KEY_APP_HOME()));
+              ConfigKeys.KEY_APP_HOME()));
     }
 
     // init InternalConfig
     initInternalConfig(context.getEnvironment());
     // overwrite system variable HADOOP_USER_NAME
     String hadoopUserName = 
InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME());
-    overrideSystemProp(ConfigConst.KEY_HADOOP_USER_NAME(), hadoopUserName);
+    overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName);
     // initialize local file system resources
     storageInitialize(LFS);
     // Launch the embedded http file server.
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index b60bd0d34..ff0745284 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console.core.service.application.impl;
 
 import org.apache.streampark.common.Constant;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.FlinkDevelopmentMode;
@@ -418,7 +418,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
       // Get the sql of the replaced placeholder
       String realSql = 
variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
       flinkSql.setSql(DeflaterUtils.zipString(realSql));
-      extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
+      extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql());
     }
 
     // TODO Need to display more K8s submission parameters in the front-end UI.
@@ -478,12 +478,12 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
             submitResponse -> {
               if (submitResponse.flinkConfig() != null) {
                 String jmMemory =
-                    
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
+                    
submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
                 if (jmMemory != null) {
                   
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
                 }
                 String tmMemory =
-                    
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY());
+                    
submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
                 if (tmMemory != null) {
                   
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
                 }
@@ -610,7 +610,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
             resource.getFilePath(), "pyflink file can't be null, start 
application failed.");
 
         ApiAlertException.throwIfFalse(
-            resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX()),
+            resource.getFilePath().endsWith(Constant.PYTHON_SUFFIX),
             "pyflink format error, must be a \".py\" suffix, start application 
failed.");
 
         flinkUserJar = resource.getFilePath();
@@ -621,7 +621,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
           appConf =
               String.format(
                   "json://{\"%s\":\"%s\"}",
-                  ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), 
application.getMainClass());
+                  ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), 
application.getMainClass());
         } else {
           switch (application.getApplicationType()) {
             case STREAMPARK_FLINK:
@@ -639,7 +639,7 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
               appConf =
                   String.format(
                       "json://{\"%s\":\"%s\"}",
-                      ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), 
application.getMainClass());
+                      ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), 
application.getMainClass());
               break;
             default:
               throw new IllegalArgumentException(
@@ -701,19 +701,19 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
                 "The yarn session clusterId=%s cannot be find, maybe the 
clusterId is wrong or "
                     + "the cluster has been deleted. Please contact the 
Admin.",
                 application.getFlinkClusterId()));
-        properties.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
+        properties.put(ConfigKeys.KEY_YARN_APP_ID(), cluster.getClusterId());
       } else {
         String yarnQueue =
-            (String) 
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
+            (String) 
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE());
         String yarnLabelExpr =
-            (String) 
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_NODE_LABEL());
+            (String) 
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL());
         Optional.ofNullable(yarnQueue)
-            .ifPresent(yq -> properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), 
yq));
+            .ifPresent(yq -> properties.put(ConfigKeys.KEY_YARN_APP_QUEUE(), 
yq));
         Optional.ofNullable(yarnLabelExpr)
-            .ifPresent(yLabel -> 
properties.put(ConfigConst.KEY_YARN_APP_NODE_LABEL(), yLabel));
+            .ifPresent(yLabel -> 
properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel));
       }
     } else if 
(FlinkExecutionMode.isKubernetesMode(application.getFlinkExecutionMode())) {
-      properties.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
+      properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
     }
 
     if 
(FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode()))
 {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 222dc56bf..15946b7f6 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -18,7 +18,6 @@
 package org.apache.streampark.console.core.service.impl;
 
 import org.apache.streampark.common.Constant;
-import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
@@ -441,7 +440,7 @@ public class AppBuildPipeServiceImpl
     }
 
     FlinkExecutionMode executionModeEnum = app.getFlinkExecutionMode();
-    String mainClass = ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
+    String mainClass = Constant.STREAMPARK_FLINKSQL_CLIENT_CLASS;
     switch (executionModeEnum) {
       case YARN_APPLICATION:
         String yarnProvidedPath = app.getAppLib();
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 47b113616..c1b4b4564 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.fs.FsOperator;
 import org.apache.streampark.common.util.ExceptionUtils;
@@ -289,7 +289,7 @@ public class ResourceServiceImpl extends 
ServiceImpl<ResourceMapper, Resource>
           resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
           return RestResponse.success().data(resp);
         }
-        if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) {
+        if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
           return RestResponse.success().data(resp);
         }
         Manifest manifest = Utils.getJarManifest(jarFile);
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
index e44482b58..ab0a35bea 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.core.utils;
 
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.console.base.exception.ApiAlertException;
 
 import org.apache.commons.lang3.StringUtils;
@@ -111,8 +111,8 @@ public class YarnQueueLabelExpression {
     Map<String, String> map = new HashMap<>(2);
     yarnQueueLabelExpression
         .getLabelExpression()
-        .ifPresent(labelExp -> map.put(ConfigConst.KEY_YARN_APP_NODE_LABEL(), 
labelExp));
-    map.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueueLabelExpression.queue);
+        .ifPresent(labelExp -> map.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), 
labelExp));
+    map.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yarnQueueLabelExpression.queue);
     return map;
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
index 88a2f5733..1699e0b6e 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.system.entity;
 
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.console.core.enums.LoginTypeEnum;
 import org.apache.streampark.console.core.enums.UserTypeEnum;
 
@@ -101,7 +101,7 @@ public class User implements Serializable {
   private Long lastTeamId;
 
   public void dataMasking() {
-    String dataMask = ConfigConst.DEFAULT_DATAMASK_STRING();
+    String dataMask = Constant.DEFAULT_DATAMASK_STRING;
     this.setPassword(dataMask);
     this.setSalt(dataMask);
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
index 4a19b709f..3228c2266 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.console.system.runner;
 
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.util.Utils;
 
 import lombok.extern.slf4j.Slf4j;
 import org.springframework.beans.factory.annotation.Autowired;
@@ -37,7 +37,7 @@ public class StartedUpRunner implements ApplicationRunner {
   @Override
   public void run(ApplicationArguments args) {
     if (context.isActive()) {
-      ConfigConst.printLogo("streampark-console start successful");
+      Utils.printLogo("streampark-console start successful");
     }
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
index 8780e221e..398fce3d5 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console;
 
 import org.apache.streampark.common.conf.CommonConfig;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.util.SystemPropertyUtils;
 
 import org.apache.commons.io.FileUtils;
@@ -91,7 +91,7 @@ public abstract class SpringIntegrationTestBase {
         Files.createDirectories(new File(tempAbsPath, 
DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath());
 
     appHome = new File(tempAbsPath, 
DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath();
-    System.setProperty(ConfigConst.KEY_APP_HOME(), appHome);
+    System.setProperty(ConfigKeys.KEY_APP_HOME(), appHome);
     System.setProperty(
         CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
         localWorkspace.toAbsolutePath().toString());
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
index 5e40b3122..88f5284d8 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.console;
 
 import org.apache.streampark.common.conf.CommonConfig;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
 import org.apache.streampark.console.core.entity.Application;
 import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -68,7 +68,7 @@ public abstract class SpringUnitTestBase {
     Path localWorkspace =
         Files.createDirectories(new File(mockedHome + 
"/localWorkspace").toPath());
 
-    System.setProperty(ConfigConst.KEY_APP_HOME(), mockedHome);
+    System.setProperty(ConfigKeys.KEY_APP_HOME(), mockedHome);
     System.setProperty(
         CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
         localWorkspace.toAbsolutePath().toString());
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 18665186f..25e17120d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -17,8 +17,9 @@
 
 package org.apache.streampark.flink.client.bean
 
-import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion, Workspace}
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion, Workspace}
+import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.enums._
 import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils, 
PropertiesUtils}
 import org.apache.streampark.flink.packer.pipeline.{BuildResult, 
ShadedBuildResponse}
@@ -59,8 +60,8 @@ case class SubmitRequest(
   lazy val appOption: Map[String, String] = 
getParameterMap(KEY_FLINK_OPTION_PREFIX)
 
   lazy val appMain: String = this.developmentMode match {
-    case FlinkDevelopmentMode.FLINK_SQL => 
ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS
-    case FlinkDevelopmentMode.PYFLINK => ConfigConst.PYTHON_DRIVER_CLASS_NAME
+    case FlinkDevelopmentMode.FLINK_SQL => 
Constant.STREAMPARK_FLINKSQL_CLIENT_CLASS
+    case FlinkDevelopmentMode.PYFLINK => 
Constant.PYTHON_FLINK_DRIVER_CLASS_NAME
     case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
   }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f39fa341a..f5862a082 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -17,7 +17,8 @@
 
 package org.apache.streampark.flink.client.impl
 
-import org.apache.streampark.common.conf.{ConfigConst, Workspace}
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
 import org.apache.streampark.common.enums.FlinkDevelopmentMode
 import org.apache.streampark.common.fs.FsOperator
 import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils}
@@ -117,15 +118,15 @@ object YarnApplicationClient extends YarnClientTrait {
         // python.archives
         .safeSet(PythonOptions.PYTHON_ARCHIVES, pyVenv)
         // python.client.executable
-        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
Constant.PYTHON_EXECUTABLE)
         // python.executable
-        .safeSet(PythonOptions.PYTHON_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+        .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
 
       val args: util.List[String] = 
flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
       // Caused by: java.lang.UnsupportedOperationException
       val argsList: util.ArrayList[String] = new util.ArrayList[String](args)
       argsList.add("-pym")
-      
argsList.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
+      
argsList.add(submitRequest.userJarFile.getName.dropRight(Constant.PYTHON_SUFFIX.length))
       flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argsList)
     }
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 7fb6aa148..75a05326f 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -17,8 +17,9 @@
 
 package org.apache.streampark.flink.client.`trait`
 
-import org.apache.streampark.common.conf.{ConfigConst, Workspace}
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
+import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.enums.{ApplicationType, 
FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
 import org.apache.streampark.common.fs.FsOperator
 import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger, 
SystemPropertyUtils}
@@ -251,9 +252,9 @@ trait FlinkClientTrait extends Logger {
         // python.archives
         .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
         // python.client.executable
-        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+        .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, 
Constant.PYTHON_EXECUTABLE)
         // python.executable
-        .safeSet(PythonOptions.PYTHON_EXECUTABLE, 
ConfigConst.PYTHON_EXECUTABLE)
+        .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
     }
 
     val packageProgram = PackagedProgram.newBuilder
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
index c75eb1790..041dbe5ac 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
@@ -17,9 +17,9 @@
 
 package org.apache.streampark.flink.connector.clickhouse.conf
 
-import org.apache.streampark.common.conf.ConfigConst
 import org.apache.streampark.flink.connector.conf.ThresholdConf
 
+import org.apache.streampark.common.Constant
 import java.util.{Base64, Properties}
 import java.util.concurrent.ThreadLocalRandom
 
@@ -69,7 +69,7 @@ class ClickHouseHttpConfig(parameters: Properties)
 
   override def toString: String = {
     s"""
-       |{ user: $user, password: ${ConfigConst.DEFAULT_DATAMASK_STRING}, 
hosts: ${hosts.mkString(",")} }
+       |{ user: $user, password: ${Constant.DEFAULT_DATAMASK_STRING}, hosts: 
${hosts.mkString(",")} }
        |""".stripMargin
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
index 40a97ff1e..df09056c1 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
@@ -16,10 +16,10 @@
  */
 package org.apache.streampark.connector.doris.conf
 
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.ConfigKeys
 
 import java.util.Properties
-
 import scala.collection.convert.ImplicitConversions._
 
 object DorisConfig {
@@ -83,7 +83,7 @@ class DorisConfig(parameters: Properties) {
 
   override def toString: String = {
     s"""
-       |{ doris user: $user, password: ${ConfigConst.DEFAULT_DATAMASK_STRING}, 
hosts: ${loadUrl.mkString(",")} }
+       |{ doris user: $user, password: ${Constant.DEFAULT_DATAMASK_STRING}, 
hosts: ${loadUrl.mkString(",")} }
        |""".stripMargin
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
index 41e8e324e..c3565c9a2 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.influx.function
 
-import org.apache.streampark.common.conf.ConfigConst.{KEY_JDBC_PASSWORD, 
KEY_JDBC_URL, KEY_JDBC_USER}
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_JDBC_PASSWORD, 
KEY_JDBC_URL, KEY_JDBC_USER}
 import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.connector.influx.bean.InfluxEntity
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala
index f00647dd3..6fb365e31 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.jdbc.internal
 
-import 
org.apache.streampark.common.conf.ConfigConst.{DEFAULT_JDBC_INSERT_BATCH, 
KEY_JDBC_INSERT_BATCH}
+import 
org.apache.streampark.common.conf.ConfigKeys.{DEFAULT_JDBC_INSERT_BATCH, 
KEY_JDBC_INSERT_BATCH}
 import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util.{JdbcUtils, Logger}
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
index 1303e3661..c5ca134e2 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.jdbc.sink
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.enums.Semantic
 import org.apache.streampark.common.util.{ConfigUtils, Logger}
 import 
org.apache.streampark.flink.connector.jdbc.internal.{Jdbc2PCSinkFunction, 
JdbcSinkFunction}
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala
index 9bfe176e0..8a60922a8 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.kafka.sink
 
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.conf.ConfigKeys
 import org.apache.streampark.common.util.{ConfigUtils, Utils}
 import 
org.apache.streampark.flink.connector.kafka.bean.KafkaEqualityPartitioner
 import org.apache.streampark.flink.connector.sink.Sink
@@ -83,10 +83,10 @@ class KafkaSink(
     val producer = {
       val prop = ConfigUtils.getKafkaSinkConf(ctx.parameter.toMap, topic, 
alias)
       Utils.copyProperties(property, prop)
-      val topicId = prop.remove(ConfigConst.KEY_KAFKA_TOPIC).toString
+      val topicId = prop.remove(ConfigKeys.KEY_KAFKA_TOPIC).toString
 
       /** kafkaProducersPoolSize will be used under EXACTLY_ONCE semantics */
-      val semantic = 
Try(Some(prop.remove(ConfigConst.KEY_KAFKA_SEMANTIC).toString.toUpperCase))
+      val semantic = 
Try(Some(prop.remove(ConfigKeys.KEY_KAFKA_SEMANTIC).toString.toUpperCase))
         .getOrElse(None) match {
         case None => Semantic.AT_LEAST_ONCE
         case Some("AT_LEAST_ONCE") => Semantic.AT_LEAST_ONCE
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
index 05e8e7b8c..aea660284 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.kafka.source
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.util.{ConfigUtils, Utils}
 import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord
 import org.apache.streampark.flink.core.scala.StreamingContext
@@ -25,7 +25,7 @@ import org.apache.streampark.flink.core.scala.StreamingContext
 import org.apache.flink.api.common.eventtime.WatermarkStrategy
 import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
 import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass
-import org.apache.flink.streaming.api.scala.{DataStream, _}
+import org.apache.flink.streaming.api.scala._
 import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, 
KafkaDeserializationSchema}
 import 
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
 import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
diff --git 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
index c7bfbd155..b480231c0 100644
--- 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
+++ 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.core.scala
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
 import org.apache.streampark.flink.core.{FlinkTableInitializer, 
StreamTableContext}
 import org.apache.streampark.flink.core.TableExt
diff --git 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
index 7dffccd29..8d5dca940 100644
--- 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
+++ 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.flink.core.scala
 
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
+import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils}
 import org.apache.streampark.flink.core.{FlinkStreamingInitializer, 
StreamEnvConfig}
 import org.apache.streampark.flink.core.EnhancerImplicit._
 
@@ -50,7 +50,7 @@ class StreamingContext(
   }
 
   @Deprecated override def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkStreaming $jobName Starting...")
+    Utils.printLogo(s"FlinkStreaming $jobName Starting...")
     super.execute(jobName)
   }
 }
diff --git 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
index 78ead4fb2..44ed2b972 100644
--- 
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
+++ 
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.core.scala
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
 import org.apache.streampark.flink.core.{FlinkTableInitializer, TableContext}
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index edd679ad7..ee67a202d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes
 
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.conf.ConfigKeys
 import org.apache.streampark.common.util.ImplicitsUtils._
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
@@ -116,7 +116,7 @@ object KubernetesRetriever extends Logger {
             .apps()
             .deployments()
             .inNamespace(namespace)
-            .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL)
+            .withLabel("type", ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL)
             .list()
             .getItems
             .asScala
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index e97fcfc1f..5cba3b36f 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.ingress
 
-import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder, 
K8sFlinkConfig}
+import org.apache.streampark.common.conf.{ConfigKeys, InternalConfigHolder, 
K8sFlinkConfig}
 
 import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
 import io.fabric8.kubernetes.client.DefaultKubernetesClient
@@ -63,7 +63,7 @@ trait IngressStrategy {
   def buildIngressLabels(clusterId: String): Map[String, String] = {
     Map(
       "app" -> clusterId,
-      "type" -> ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL,
+      "type" -> ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL,
       "component" -> "ingress"
     )
   }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
index f49bf4406..4e3681a81 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.conf.ConfigKeys
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.kubernetes.{FlinkK8sWatchController, 
KubernetesRetriever}
 import org.apache.streampark.flink.kubernetes.model.{K8sDeploymentEventCV, 
K8sEventKey}
@@ -67,7 +67,7 @@ class FlinkK8sEventWatcher(implicit watchController: 
FlinkK8sWatchController)
     k8sClient
       .apps()
       .deployments()
-      .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL)
+      .withLabel("type", ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL)
       .watch(new CompatibleKubernetesWatcher[Deployment, 
CompKubernetesDeployment] {
         override def eventReceived(action: Watcher.Action, event: Deployment): 
Unit = {
           handleDeploymentEvent(action, event)
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 2a8050f35..de3b9eaed 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.proxy
 
 import org.apache.streampark.common.Constant
-import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion}
+import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion}
 import org.apache.streampark.common.util.{ClassLoaderUtils, Logger, Utils}
 import org.apache.streampark.common.util.ImplicitsUtils._
 
@@ -108,10 +108,10 @@ object FlinkShimsProxy extends Logger {
   }
 
   def addShimsUrls(flinkVersion: FlinkVersion, addShimUrl: File => Unit): Unit 
= {
-    val appHome = System.getProperty(ConfigConst.KEY_APP_HOME)
+    val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME)
     require(
       appHome != null,
-      String.format("%s is not found on System env.", 
ConfigConst.KEY_APP_HOME))
+      String.format("%s is not found on System env.", ConfigKeys.KEY_APP_HOME))
 
     val libPath = new File(s"$appHome/lib")
     require(libPath.exists())
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
index 528e47907..d855ee030 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, 
KEY_FLINK_APP_NAME}
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_APP_NAME, 
KEY_FLINK_APP_NAME}
 import org.apache.streampark.common.util.DeflaterUtils
 
 import org.apache.flink.api.java.utils.ParameterTool
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index 8d18a31e2..9d6a09339 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
+import org.apache.streampark.common.conf.ConfigKeys.KEY_FLINK_SQL
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.flink.core.SqlCommand._
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index 53e381275..517ae1ae6 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.core.EnhancerImplicit._
 
 import com.esotericsoftware.kryo.Serializer
@@ -77,7 +78,7 @@ abstract class FlinkStreamTableTrait(
   }
 
   @Deprecated def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkStreamTable $jobName Starting...")
+    Utils.printLogo(s"FlinkStreamTable $jobName Starting...")
     if (isConvertedToDataStream) {
       streamEnv.execute(jobName)
     } else null
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 87a54bd07..bfb4be1cc 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util._
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 2169b8ea5..ce8c93823 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
 import org.apache.streampark.common.enums.{ApiType, PlannerType}
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index e581f857c..71b1a1c48 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -16,7 +16,8 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.core.EnhancerImplicit._
 
 import org.apache.flink.api.common.JobExecutionResult
@@ -40,7 +41,7 @@ abstract class FlinkTableTrait(val parameter: ParameterTool, 
private val tableEn
   }
 
   def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
+    Utils.printLogo(s"FlinkTable $jobName Starting...")
     null
   }
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index dc4de596c..157216ce8 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.PARAM_PREFIX
+import org.apache.streampark.common.conf.ConfigKeys.PARAM_PREFIX
 import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
 import org.apache.streampark.common.util.Logger
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
index 1206c817e..3331253b3 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
@@ -16,8 +16,8 @@
  */
 package org.apache.streampark.flink.core.conf
 
-import org.apache.streampark.common.conf.ConfigConst
-import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_OPTION_PREFIX, 
KEY_FLINK_PROPERTY_PREFIX}
+import org.apache.streampark.common.conf.ConfigKeys
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_FLINK_OPTION_PREFIX, 
KEY_FLINK_PROPERTY_PREFIX}
 import org.apache.streampark.common.util.PropertiesUtils
 
 import org.apache.commons.cli.{DefaultParser, Options}
@@ -96,7 +96,7 @@ object ParameterCli {
                 x =>
                   val key = x._1.drop(propertyPrefix.length).trim
                   val value = x._2.trim
-                  if (key == ConfigConst.KEY_FLINK_APP_NAME) {
+                  if (key == ConfigKeys.KEY_FLINK_APP_NAME) {
                     buffer.append(s" -D$key=${value.replace(" ", "_")}")
                   } else {
                     buffer.append(s" -D$key=$value")
@@ -104,7 +104,7 @@ object ParameterCli {
               }
             buffer.toString.trim
           case "--name" =>
-            
map.getOrElse(propertyPrefix.concat(ConfigConst.KEY_FLINK_APP_NAME), "").trim 
match {
+            
map.getOrElse(propertyPrefix.concat(ConfigKeys.KEY_FLINK_APP_NAME), "").trim 
match {
               case appName if appName.nonEmpty => appName
               case _ => ""
             }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
index a681018a3..ed6040fa3 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.flink.core.test
 
-import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_SQL, 
PARAM_PREFIX}
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_FLINK_SQL, 
PARAM_PREFIX}
 import org.apache.streampark.common.util.DeflaterUtils
 import org.apache.streampark.flink.core.{FlinkSqlExecutor, 
FlinkTableInitializer, StreamTableContext}
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index bbe35967c..f7aedd8ea 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.printLogo
+import org.apache.streampark.common.util.Utils
 
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
@@ -46,7 +46,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
     tableEnv.connect(connectorDescriptor)
 
   override def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
+    Utils.printLogo(s"FlinkTable $jobName Starting...")
     null
   }
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 96ac48549..f0f2ad135 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.printLogo
+import org.apache.streampark.common.util.Utils
 
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
@@ -51,7 +51,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
     tableEnv.connect(connectorDescriptor)
 
   override def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
+    Utils.printLogo(s"FlinkTable $jobName Starting...")
     null
   }
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index fcd7708f3..d0815b5cb 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,7 +16,7 @@
  */
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.printLogo
+import org.apache.streampark.common.util.Utils
 
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
@@ -64,7 +64,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
   def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor)
 
   @Deprecated override def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
+    Utils.printLogo(s"FlinkTable $jobName Starting...")
     null
   }
 
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 64337d26b..8ca8e3fe1 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.core
 
-import org.apache.streampark.common.conf.ConfigConst.printLogo
+import org.apache.streampark.common.util.Utils
 
 import org.apache.flink.api.common.JobExecutionResult
 import org.apache.flink.api.java.utils.ParameterTool
@@ -46,7 +46,7 @@ class TableContext(override val parameter: ParameterTool, 
private val tableEnv:
   override def listFullModules(): Array[ModuleEntry] = 
tableEnv.listFullModules()
 
   override def execute(jobName: String): JobExecutionResult = {
-    printLogo(s"FlinkTable $jobName Starting...")
+    Utils.printLogo(s"FlinkTable $jobName Starting...")
     null
   }
 
diff --git 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 3b094d086..e305f428b 100644
--- 
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++ 
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.cli
 
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, 
KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE}
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_APP_CONF, 
KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE}
 import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
 import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser}
 import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}

Reply via email to