This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev by this push:
     new db330bb25 [ISSUE-3083][Improve] Improve streampark-flink module base 
on [3.2 Constant Variables Definition] (#3264)
db330bb25 is described below

commit db330bb258a08459a04b6c93754c6af75a1c0ac0
Author: Yuepeng Pan <[email protected]>
AuthorDate: Mon Oct 23 10:48:10 2023 +0800

    [ISSUE-3083][Improve] Improve streampark-flink module base on [3.2 Constant 
Variables Definition] (#3264)
---
 .../org/apache/streampark/common/Constant.java     | 22 ++++++++++----------
 .../streampark/common/util/ClassLoaderUtils.scala  |  6 +++++-
 .../apache/streampark/common/util/YarnUtils.scala  |  5 +++--
 .../console/core/entity/Application.java           |  9 +++-----
 .../impl/ApplicationActionServiceImpl.java         |  4 +++-
 .../impl/ApplicationInfoServiceImpl.java           |  3 ++-
 .../core/service/impl/AppBuildPipeServiceImpl.java |  4 +++-
 .../core/service/impl/ProjectServiceImpl.java      |  3 ++-
 .../console/core/task/ProjectBuildTask.java        |  5 +++--
 .../console/base/util/EncryptUtilsTest.java        |  6 ++++--
 .../console/base/util/ShaHashUtilsTest.java        |  4 +++-
 .../flink/client/bean/CancelRequest.scala          |  5 +++--
 .../flink/client/bean/SavepointRequestTrait.scala  |  5 +++--
 .../client/bean/TriggerSavepointRequest.scala      |  5 +++--
 .../impl/KubernetesApplicationClientV2.scala       |  3 ++-
 .../client/impl/KubernetesSessionClientV2.scala    |  5 +++--
 .../flink/connector/failover/SinkRequest.scala     |  4 +++-
 .../conf/ClickHouseSinkConfigOption.scala          |  5 +++--
 .../clickhouse/internal/ClickHouseWriterTask.scala |  2 +-
 .../connector/clickhouse/sink/ClickHouseSink.scala | 10 +++++----
 .../doris/conf/DorisSinkConfigOption.scala         |  2 +-
 .../connector/elasticsearch5/sink/ES5Sink.scala    | 11 ++++++----
 .../elasticsearch5/util/ElasticsearchUtils.scala   |  4 ++--
 .../connector/elasticsearch6/sink/ES6Sink.scala    | 11 ++++++----
 .../elasticsearch6/util/ElasticsearchUtils.scala   |  4 ++--
 .../bean/RestClientFactoryImpl.scala               |  2 +-
 .../connector/elasticsearch7/sink/ES7Sink.scala    | 12 +++++++----
 .../elasticsearch7/util/ElasticsearchUtils.scala   |  4 ++--
 .../connector/hbase/source/HBaseJavaSource.java    |  4 ++--
 .../flink/connector/hbase/sink/HBaseSink.scala     |  6 +++---
 .../connector/http/function/HttpSinkFunction.scala |  2 +-
 .../flink/connector/jdbc/sink/JdbcJavaSink.java    |  2 +-
 .../connector/jdbc/source/JdbcJavaSource.java      |  4 ++--
 .../flink/connector/jdbc/sink/JdbcSink.scala       |  2 +-
 .../connector/mongo/source/MongoJavaSource.java    |  6 +++---
 .../flink/connector/redis/bean/RedisMapper.scala   | 20 ++++++++++--------
 .../flink/connector/redis/conf/RedisConfig.scala   |  2 +-
 .../flink/connector/redis/sink/RedisSink.scala     | 14 ++++++-------
 .../flink/kubernetes/v2/operator/CROperator.scala  | 12 ++++++-----
 .../flink/kubernetes/model/ClusterKey.scala        |  4 ++--
 .../flink/kubernetes/model/TrackId.scala           |  4 ++--
 .../streampark/flink/packer/PackerResourceGC.scala |  3 ++-
 .../docker/FlinkDockerfileTemplateTrait.scala      |  8 ++++++--
 .../streampark/flink/packer/maven/MavenTool.scala  | 11 +++++++---
 .../flink/packer/pipeline/BuildPipeline.scala      | 14 ++++++-------
 .../impl/FlinkK8sApplicationBuildPipeline.scala    | 24 +++++++++++-----------
 .../impl/FlinkK8sApplicationBuildPipelineV2.scala  |  4 ++--
 .../impl/FlinkK8sSessionBuildPipeline.scala        |  4 ++--
 .../pipeline/impl/FlinkRemoteBuildPipeline.scala   |  2 +-
 .../impl/FlinkYarnApplicationBuildPipeline.scala   |  2 +-
 .../streampark/flink/proxy/FlinkShimsProxy.scala   | 11 +++++-----
 .../streampark/flink/core/FlinkSqlValidator.scala  |  3 ++-
 .../streampark-flink-sql-gateway-base/pom.xml      |  5 +++++
 .../streampark/gateway/factories/FactoryUtil.java  |  3 ++-
 .../factories/SqlGatewayServiceFactoryUtils.java   |  3 ++-
 55 files changed, 201 insertions(+), 143 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
 b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
similarity index 61%
copy from 
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
copy to 
streampark-common/src/main/java/org/apache/streampark/common/Constant.java
index 398c14288..ac02bb8a1 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
+++ b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
@@ -15,18 +15,18 @@
  * limitations under the License.
  */
 
-package org.apache.streampark.console.base.util;
+package org.apache.streampark.common;
 
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+/** A constant class to hold the constants variables. */
+public class Constant {
 
-class EncryptUtilsTest {
+  private Constant() {}
 
-  @Test
-  void testEncrypt() throws Exception {
-    String value = "apache streampark";
-    String encrypt = EncryptUtils.encrypt(value, "streampark");
-    String decrypt = EncryptUtils.decrypt(encrypt, "streampark");
-    Assertions.assertEquals(value, decrypt);
-  }
+  public static final String DEFAULT = "default";
+  public static final String STREAM_PARK = "streampark";
+  public static final String HTTP_SCHEMA = "http://";;
+  public static final String HTTPS_SCHEMA = "https://";;
+  public static final String JAR_SUFFIX = ".jar";
+  public static final String ZIP_SUFFIX = ".zip";
+  public static final String SEMICOLON = ";";
 }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
index e830d20c6..466674767 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
@@ -16,6 +16,8 @@
  */
 package org.apache.streampark.common.util
 
+import org.apache.streampark.common.Constant
+
 import java.io.{File, IOException}
 import java.net.{URL, URLClassLoader}
 import java.util.function.Supplier
@@ -106,7 +108,9 @@ object ClassLoaderUtils extends Logger {
     loopDirs(file)
   }
 
-  private[this] def loadPath(filepath: String, ext: List[String] = 
List(".jar", ".zip")): Unit = {
+  private[this] def loadPath(
+      filepath: String,
+      ext: List[String] = List(Constant.JAR_SUFFIX, Constant.ZIP_SUFFIX)): 
Unit = {
     val file = new File(filepath)
     loopFiles(file, ext)
   }
diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 849628362..2839bcc5e 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.streampark.common.util
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder}
 
 import org.apache.commons.lang3.StringUtils
@@ -127,8 +128,8 @@ object YarnUtils extends Logger {
         val conf = HadoopUtils.hadoopConf
         val useHttps = YarnConfiguration.useHttps(conf)
         val (addressPrefix, defaultPort, protocol) = useHttps match {
-          case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090", 
"https://";)
-          case _ => (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", "http://";)
+          case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090", 
Constant.HTTPS_SCHEMA)
+          case _ => (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", 
Constant.HTTP_SCHEMA)
         }
 
         rmHttpURL = Option(conf.get("yarn.web-proxy.address", null)) match {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index c87736004..670795012 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -17,8 +17,8 @@
 
 package org.apache.streampark.console.core.entity;
 
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.ConfigConst;
-import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.ApplicationType;
 import org.apache.streampark.common.enums.FlinkDevelopmentMode;
@@ -103,7 +103,7 @@ public class Application implements Serializable {
   private String k8sName;
 
   /** k8s namespace */
-  private String k8sNamespace = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE();
+  private String k8sNamespace = Constant.DEFAULT;
 
   /** The exposed type of the rest service of 
K8s(kubernetes.rest-service.exposed.type) */
   private Integer k8sRestExposedType;
@@ -258,10 +258,7 @@ public class Application implements Serializable {
   }
 
   public void setK8sNamespace(String k8sNamespace) {
-    this.k8sNamespace =
-        StringUtils.isBlank(k8sNamespace)
-            ? K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE()
-            : k8sNamespace;
+    this.k8sNamespace = StringUtils.isBlank(k8sNamespace) ? Constant.DEFAULT : 
k8sNamespace;
   }
 
   public K8sPodTemplates getK8sPodTemplates() {
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index bb1aff272..b60bd0d34 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.application.impl;
 
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
@@ -651,7 +652,8 @@ public class ApplicationActionServiceImpl extends 
ServiceImpl<ApplicationMapper,
             case STREAMPARK_FLINK:
               flinkUserJar =
                   String.format(
-                      "%s/%s", application.getAppLib(), 
application.getModule().concat(".jar"));
+                      "%s/%s",
+                      application.getAppLib(), 
application.getModule().concat(Constant.JAR_SUFFIX));
               break;
             case APACHE_FLINK:
               flinkUserJar = String.format("%s/%s", application.getAppHome(), 
application.getJar());
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 9c6decfa4..8b836c26c 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.application.impl;
 
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
 import org.apache.streampark.common.enums.FlinkExecutionMode;
@@ -311,7 +312,7 @@ public class ApplicationInfoServiceImpl extends 
ServiceImpl<ApplicationMapper, A
         .filter(File::isFile)
         .sorted(Comparator.comparingLong(File::lastModified).reversed())
         .map(File::getName)
-        .filter(fn -> fn.endsWith(".jar"))
+        .filter(fn -> fn.endsWith(Constant.JAR_SUFFIX))
         .limit(DEFAULT_HISTORY_RECORD_LIMIT)
         .collect(Collectors.toList());
   }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 1c9984752..222dc56bf 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.ConfigConst;
 import org.apache.streampark.common.conf.K8sFlinkConfig;
 import org.apache.streampark.common.conf.Workspace;
@@ -532,7 +533,8 @@ public class AppBuildPipeServiceImpl
       case CUSTOM_CODE:
         switch (app.getApplicationType()) {
           case STREAMPARK_FLINK:
-            return String.format("%s/%s", app.getAppLib(), 
app.getModule().concat(".jar"));
+            return String.format(
+                "%s/%s", app.getAppLib(), 
app.getModule().concat(Constant.JAR_SUFFIX));
           case APACHE_FLINK:
             return String.format("%s/%s", app.getAppHome(), app.getJar());
           default:
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 307d76fba..c4117b5fc 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.service.impl;
 
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.conf.CommonConfig;
 import org.apache.streampark.common.conf.InternalConfigHolder;
 import org.apache.streampark.common.conf.Workspace;
@@ -251,7 +252,7 @@ public class ProjectServiceImpl extends 
ServiceImpl<ProjectMapper, Project>
         project.getModule(), "Project module can't be null, please check.");
     File apps = new File(project.getDistHome(), project.getModule());
     for (File file : Objects.requireNonNull(apps.listFiles())) {
-      if (file.getName().endsWith(".jar")) {
+      if (file.getName().endsWith(Constant.JAR_SUFFIX)) {
         list.add(file.getName());
       }
     }
diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
index ded8897e6..b860f49db 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.console.core.task;
 
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.common.util.CommandUtils;
 import org.apache.streampark.common.util.Utils;
 import org.apache.streampark.console.base.util.GitUtils;
@@ -181,7 +182,7 @@ public class ProjectBuildTask extends AbstractLogFileTask {
       } else {
         // 2) .jar file(normal or official standard flink project)
         Utils.checkJarFile(app.toURI().toURL());
-        String moduleName = app.getName().replace(".jar", "");
+        String moduleName = app.getName().replace(Constant.JAR_SUFFIX, "");
         File distHome = project.getDistHome();
         File targetDir = new File(distHome, moduleName);
         if (!targetDir.exists()) {
@@ -211,7 +212,7 @@ public class ProjectBuildTask extends AbstractLogFileTask {
             // 2) try look for jar files, there may be multiple jars found.
             if (!targetFile.getName().startsWith("original-")
                 && !targetFile.getName().endsWith("-sources.jar")
-                && targetFile.getName().endsWith(".jar")) {
+                && targetFile.getName().endsWith(Constant.JAR_SUFFIX)) {
               if (jar == null) {
                 jar = targetFile;
               } else {
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
index 398c14288..939030b3c 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.console.base.util;
 
+import org.apache.streampark.common.Constant;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -25,8 +27,8 @@ class EncryptUtilsTest {
   @Test
   void testEncrypt() throws Exception {
     String value = "apache streampark";
-    String encrypt = EncryptUtils.encrypt(value, "streampark");
-    String decrypt = EncryptUtils.decrypt(encrypt, "streampark");
+    String encrypt = EncryptUtils.encrypt(value, Constant.STREAM_PARK);
+    String decrypt = EncryptUtils.decrypt(encrypt, Constant.STREAM_PARK);
     Assertions.assertEquals(value, decrypt);
   }
 }
diff --git 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ShaHashUtilsTest.java
 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ShaHashUtilsTest.java
index 62f9891ad..2fd9e6b15 100644
--- 
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ShaHashUtilsTest.java
+++ 
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ShaHashUtilsTest.java
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.console.base.util;
 
+import org.apache.streampark.common.Constant;
+
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
 
@@ -26,7 +28,7 @@ class ShaHashUtilsTest {
   @Test
   void testEncrypt() {
     String randomSalt = "rh8b1ojwog777yrg0daesf04gk";
-    String encryptPassword = ShaHashUtils.encrypt(randomSalt, "streampark");
+    String encryptPassword = ShaHashUtils.encrypt(randomSalt, 
Constant.STREAM_PARK);
     Assertions.assertEquals(
         "2513f3748847298ea324dffbf67fe68681dd92315bda830065facd8efe08f54f", 
encryptPassword);
   }
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index 8487c3927..2ac83eb73 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -17,7 +17,8 @@
 
 package org.apache.streampark.flink.client.bean
 
-import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.FlinkVersion
 import org.apache.streampark.common.enums.FlinkExecutionMode
 
 import javax.annotation.Nullable
@@ -35,5 +36,5 @@ case class CancelRequest(
     withDrain: Boolean,
     savepointPath: String,
     nativeFormat: Boolean,
-    override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
+    override val kubernetesNamespace: String = Constant.DEFAULT)
   extends SavepointRequestTrait
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
index 3165250aa..526d37163 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
@@ -17,7 +17,8 @@
 
 package org.apache.streampark.flink.client.bean
 
-import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.FlinkVersion
 import org.apache.streampark.common.enums.FlinkExecutionMode
 
 import javax.annotation.Nullable
@@ -40,7 +41,7 @@ trait SavepointRequestTrait {
 
   val nativeFormat: Boolean
 
-  val kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE
+  val kubernetesNamespace: String = Constant.DEFAULT
 
   @Nullable val properties: JavaMap[String, Any]
 
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 065da0bea..fbc81a1f3 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -17,7 +17,8 @@
 
 package org.apache.streampark.flink.client.bean
 
-import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.FlinkVersion
 import org.apache.streampark.common.enums.FlinkExecutionMode
 
 import javax.annotation.Nullable
@@ -34,5 +35,5 @@ case class TriggerSavepointRequest(
     jobId: String,
     savepointPath: String,
     nativeFormat: Boolean,
-    override val kubernetesNamespace: String = 
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
+    override val kubernetesNamespace: String = Constant.DEFAULT)
   extends SavepointRequestTrait
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
index c768ea378..4b51af2d0 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.client.impl
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
 import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
@@ -90,7 +91,7 @@ object KubernetesApplicationClientV2 extends 
KubernetesClientV2Trait with Logger
     val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
 
     val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
-      .getOrElse("default")
+      .getOrElse(Constant.DEFAULT)
 
     val name = submitReq.k8sSubmitParam.kubernetesName
       .orElse(Option(submitReq.k8sSubmitParam.clusterId))
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index f72798f01..8b2c3969d 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.streampark.flink.client.impl
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.util.Logger
 import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
 import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
@@ -88,7 +89,7 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
     val flinkConfObj = originFlinkConfig.clone()
 
     val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
-      .getOrElse("default")
+      .getOrElse(Constant.DEFAULT)
 
     val name = submitReq.k8sSubmitParam.kubernetesName
       .filter(str => StringUtils.isNotBlank(str))
@@ -196,7 +197,7 @@ object KubernetesSessionClientV2 extends 
KubernetesClientV2Trait with Logger {
     val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
 
     val namespace = Option(deployReq.k8sDeployParam.kubernetesNamespace)
-      .getOrElse("default")
+      .getOrElse(Constant.DEFAULT)
 
     val name = Option(deployReq.k8sDeployParam.clusterId)
       .filter(str => StringUtils.isNotBlank(str))
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
index 7f5fc5c9c..f1b71fba6 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
@@ -22,6 +22,8 @@ import org.apache.streampark.common.util.Logger
 import java.util
 import java.util.regex.Pattern
 
+import org.apache.streampark.common.Constant
+
 import scala.collection.convert.ImplicitConversions.`collection 
AsScalaIterable`
 import scala.collection.mutable
 import scala.collection.mutable.ListBuffer
@@ -62,7 +64,7 @@ case class SinkRequest(records: util.List[String], var 
attemptCounter: Int = 0)
       result = prefixMap.map(m => s"""${m._1} VALUES 
${m._2.mkString(",")}""").toList
     }
 
-    logDebug(s"script to commit: ${result.mkString(";")}")
+    logDebug(s"script to commit: ${result.mkString(Constant.SEMICOLON)}")
 
     result
   }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseSinkConfigOption.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseSinkConfigOption.scala
index 527951a47..9cd0d7af4 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseSinkConfigOption.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseSinkConfigOption.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.connector.clickhouse.conf
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.ConfigOption
 import org.apache.streampark.common.util.ConfigUtils
 
@@ -57,7 +58,7 @@ class ClickHouseSinkConfigOption(prefixStr: String, 
properties: Properties) exte
         .getProperty(k)
         .split(SIGN_COMMA)
         .filter(_.nonEmpty)
-        .map(_.replaceAll("\\s+", "").replaceFirst("^http://|^", "http://";))
+        .map(_.replaceAll("\\s+", "").replaceFirst("^http://|^", 
Constant.HTTP_SCHEMA))
         .toList
     }
   )
@@ -71,7 +72,7 @@ class ClickHouseSinkConfigOption(prefixStr: String, 
properties: Properties) exte
   val database: ConfigOption[String] = ConfigOption(
     key = "database",
     required = true,
-    defaultValue = "default",
+    defaultValue = Constant.DEFAULT,
     classType = classOf[String])
 
   val requestTimeout: ConfigOption[Int] = ConfigOption(
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
index c6afc284e..65c4fdf9d 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
@@ -127,7 +127,7 @@ case class ClickHouseWriterTask(
         s"""Failed to send data to ClickHouse, cause: limit of attempts is 
exceeded. ClickHouse response = $response. Ready to flush data to 
${clickHouseConf.storageType}""")
       failoverWriter.write(sinkRequest)
       logInfo(
-        s"failover Successful, StorageType = ${clickHouseConf.storageType}, 
size = ${sinkRequest.size}")
+        s"Failover Successful, StorageType = ${clickHouseConf.storageType}, 
size = ${sinkRequest.size}")
     } else {
       sinkRequest.incrementCounter()
       logWarn(
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/sink/ClickHouseSink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/sink/ClickHouseSink.scala
index c2e5158cb..65c70d98f 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/sink/ClickHouseSink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/sink/ClickHouseSink.scala
@@ -33,6 +33,8 @@ import scala.annotation.meta.param
 
 object ClickHouseSink {
 
+  val sinkNullHintMsg = "Sink Stream must not null"
+
   /**
    * @param property
    * @param parallelism
@@ -84,7 +86,7 @@ class ClickHouseSink(
    */
   def asyncSink[T](stream: DataStream[T])(implicit
       toSQLFn: T => String = null): DataStreamSink[T] = {
-    require(stream != null, () => s"sink Stream must not null")
+    require(stream != null, () => sinkNullHintMsg)
     val sinkFun = new AsyncClickHouseSinkFunction[T](prop, toSQLFn)
     val sink = stream.addSink(sinkFun)
     afterSink(sink, parallelism, name, uid)
@@ -102,7 +104,7 @@ class ClickHouseSink(
   def asyncSink[T](
       stream: JavaDataStream[T],
       toSQLFn: TransformFunction[T, String]): DataStreamSink[T] = {
-    require(stream != null, () => s"sink Stream must not null")
+    require(stream != null, () => sinkNullHintMsg)
     val sinkFun = new AsyncClickHouseSinkFunction[T](prop, toSQLFn)
     val sink = stream.addSink(sinkFun)
     afterSink(sink, parallelism, name, uid)
@@ -128,7 +130,7 @@ class ClickHouseSink(
    */
   def jdbcSink[T](stream: DataStream[T])(implicit
       toSQLFn: T => String = null): DataStreamSink[T] = {
-    require(stream != null, () => s"sink Stream must not null")
+    require(stream != null, () => sinkNullHintMsg)
     val sinkFun = new ClickHouseSinkFunction[T](prop, toSQLFn)
     val sink = stream.addSink(sinkFun)
     afterSink(sink, parallelism, name, uid)
@@ -146,7 +148,7 @@ class ClickHouseSink(
   def jdbcSink[T](
       stream: JavaDataStream[T],
       sqlFromFn: TransformFunction[T, String]): DataStreamSink[T] = {
-    require(stream != null, () => s"sink Stream must not null")
+    require(stream != null, () => sinkNullHintMsg)
     val sinkFun = new ClickHouseSinkFunction[T](prop, sqlFromFn)
     val sink = stream.addSink(sinkFun)
     afterSink(sink, parallelism, name, uid)
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
index a4940df84..2af63c3ec 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
@@ -49,7 +49,7 @@ class DorisSinkConfigOption(prefixStr: String, properties: 
Properties) extends S
         .getProperty(k)
         .split(SIGN_COMMA)
         .filter(_.nonEmpty)
-        .map(_.replaceAll("\\s+", "").replaceFirst("^http://|^", "http://";))
+        .map(_.replaceAll("\\s+", "").replaceFirst("^http://|^", 
Constant.HTTP_SCHEMA))
         .toList
     }
   )
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
index f0fd6b736..c1e2ba742 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
@@ -38,6 +38,9 @@ import scala.collection.convert.ImplicitConversions._
 
 object ES5Sink {
 
+  val functionNullHintMsg = "ES pocess element func must not null"
+  val sinkNullHintMsg = "Sink Stream must not null"
+
   def apply(
       @(transient @param)
       property: Properties = new Properties(),
@@ -73,8 +76,8 @@ class ES5Sink(
       stream: JavaDataStream[T],
       failureHandler: ActionRequestFailureHandler,
       f: TransformFunction[T, ActionRequest]): DataStreamSink[T] = {
-    require(stream != null, () => s"sink Stream must not null")
-    require(f != null, () => s"es pocess element func  must not null")
+    require(stream != null, () => sinkNullHintMsg)
+    require(f != null, () => functionNullHintMsg)
     val esSink: ElasticsearchSink[T] =
       new ElasticsearchSink(userConfig, config.host, new ESSinkFunction(f), 
failureHandler)
     if (config.disableFlushOnCheckpoint) {
@@ -89,8 +92,8 @@ class ES5Sink(
       stream: DataStream[T],
       failureHandler: ActionRequestFailureHandler,
       f: T => ActionRequest): DataStreamSink[T] = {
-    require(stream != null, () => s"sink Stream must not null")
-    require(f != null, () => s"es pocess element fun  must not null")
+    require(stream != null, () => sinkNullHintMsg)
+    require(f != null, () => functionNullHintMsg)
     val esSink: ElasticsearchSink[T] =
       new ElasticsearchSink(userConfig, config.host, new ESSinkFunction(f), 
failureHandler)
     if (config.disableFlushOnCheckpoint) {
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/util/ElasticsearchUtils.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/util/ElasticsearchUtils.scala
index f05a59daf..519b4239f 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/util/ElasticsearchUtils.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/util/ElasticsearchUtils.scala
@@ -25,8 +25,8 @@ object ElasticsearchUtils {
 
   def indexRequest(index: String, indexType: String, id: String, source: 
String)(implicit
       xContentType: XContentType = XContentType.JSON): IndexRequest = {
-    require(source != null, "indexRequest error:source can not be null...")
-    require(xContentType != null, "indexRequest error:xContentType can not be 
null...")
+    require(source != null, "IndexRequest error:source can not be null...")
+    require(xContentType != null, "IndexRequest error:xContentType can not be 
null...")
     val indexReq = new IndexRequest(index, indexType, id)
     val mapping = List("source" -> new BytesArray(source), "contentType" -> 
xContentType)
     mapping.foreach {
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
index 100c9b7b3..5eacc0bf2 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
@@ -40,6 +40,9 @@ import scala.collection.convert.ImplicitConversions._
 
 object ES6Sink {
 
+  val functionNullHintMsg = "ES pocess element func must not null"
+  val sinkNullHintMsg = "Sink Stream must not null"
+
   def apply(
       @(transient @param)
       property: Properties = new Properties(),
@@ -74,8 +77,8 @@ class ES6Sink(
       restClientFactory: Option[RestClientFactory],
       failureHandler: ActionRequestFailureHandler,
       f: T => ActionRequest): DataStreamSink[T] = {
-    require(stream != null, "sink Stream must not null")
-    require(f != null, "es pocess element func must not null")
+    require(stream != null, sinkNullHintMsg)
+    require(f != null, functionNullHintMsg)
 
     val sinkFunc: ESSinkFunction[T] = new ESSinkFunction(f)
 
@@ -92,8 +95,8 @@ class ES6Sink(
       restClientFactory: Option[RestClientFactory],
       failureHandler: ActionRequestFailureHandler,
       f: TransformFunction[T, ActionRequest]): DataStreamSink[T] = {
-    require(stream != null, () => s"sink Stream must not null")
-    require(f != null, () => s"es process element func  must not null")
+    require(stream != null, () => sinkNullHintMsg)
+    require(f != null, () => functionNullHintMsg)
 
     val sinkFunc: ESSinkFunction[T] = new ESSinkFunction(f)
 
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/util/ElasticsearchUtils.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/util/ElasticsearchUtils.scala
index 1f91386fe..34cd2d7a8 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/util/ElasticsearchUtils.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/util/ElasticsearchUtils.scala
@@ -25,8 +25,8 @@ object ElasticsearchUtils {
 
   def indexRequest(index: String, indexType: String, id: String, source: 
String)(implicit
       xContentType: XContentType = XContentType.JSON): IndexRequest = {
-    require(source != null, "indexRequest error:source can not be null...")
-    require(xContentType != null, "indexRequest error:xContentType can not be 
null...")
+    require(source != null, "EndexRequest error:source can not be null...")
+    require(xContentType != null, "IndexRequest error:xContentType can not be 
null...")
     val indexReq = new IndexRequest(index, indexType, id)
     val mapping = List("source" -> new BytesArray(source), "contentType" -> 
xContentType)
     mapping.foreach {
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala
index b56c7281e..c30b64e19 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala
@@ -55,7 +55,7 @@ class RestClientFactoryImpl(val config: ES7Config) extends 
RestClientFactory wit
             httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder 
= {
           if (credentialsProvider != null) {
             
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
-            logInfo("elasticsearch auth by userName,password...")
+            logInfo("Elasticsearch auth by userName,password...")
           }
           httpClientBuilder
         }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
index 22557d63c..981772188 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
@@ -41,6 +41,10 @@ import scala.annotation.meta.param
 import scala.collection.convert.ImplicitConversions._
 
 object ES7Sink {
+
+  val functionNullHintMsg = "ES pocess element func must not null"
+  val sinkNullHintMsg = "Sink Stream must not null"
+
   def apply(
       @(transient @param)
       property: Properties = new Properties(),
@@ -74,8 +78,8 @@ class ES7Sink(
       restClientFactory: Option[RestClientFactory],
       failureHandler: ActionRequestFailureHandler,
       f: T => ActionRequest): DataStreamSink[T] = {
-    require(stream != null, "sink Stream must not null")
-    require(f != null, "es process element func must not null")
+    require(stream != null, sinkNullHintMsg)
+    require(f != null, functionNullHintMsg)
     val sinkFunc: ESSinkFunction[T] = new ESSinkFunction(f)
     val esSink: ElasticsearchSink[T] = buildESSink(restClientFactory, 
failureHandler, sinkFunc)
     if (config.disableFlushOnCheckpoint) {
@@ -90,8 +94,8 @@ class ES7Sink(
       restClientFactory: Option[RestClientFactory],
       failureHandler: ActionRequestFailureHandler,
       f: TransformFunction[T, ActionRequest]): DataStreamSink[T] = {
-    require(stream != null, "sink Stream must not null")
-    require(f != null, "es process element func must not null")
+    require(stream != null, sinkNullHintMsg)
+    require(f != null, functionNullHintMsg)
     val sinkFunc: ESSinkFunction[T] = new ESSinkFunction(f)
     val esSink: ElasticsearchSink[T] = buildESSink(restClientFactory, 
failureHandler, sinkFunc)
     if (config.disableFlushOnCheckpoint) {
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/util/ElasticsearchUtils.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/util/ElasticsearchUtils.scala
index 036a5581f..855070671 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/util/ElasticsearchUtils.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/util/ElasticsearchUtils.scala
@@ -25,8 +25,8 @@ object ElasticsearchUtils {
 
   def indexRequest(index: String, id: String, source: String)(implicit
       xContentType: XContentType = XContentType.JSON): IndexRequest = {
-    require(source != null, "indexRequest error:source can not be null...")
-    require(xContentType != null, "indexRequest error:xContentType can not be 
null...")
+    require(source != null, "IndexRequest error:source can not be null...")
+    require(xContentType != null, "IndexRequest error:xContentType can not be 
null...")
     val indexReq = new IndexRequest(index).id(id)
     val mapping = List("source" -> new BytesArray(source), "contentType" -> 
xContentType)
     mapping.foreach {
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
index 4c6144a36..e0bf77d33 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
@@ -42,8 +42,8 @@ public class HBaseJavaSource<T> {
       HBaseResultFunction<T> resultFunction,
       RunningFunction runningFunc) {
 
-    Utils.notNull(queryFunction, "queryFunction must not be null");
-    Utils.notNull(resultFunction, "resultFunction must not be null");
+    Utils.notNull(queryFunction, "QueryFunction must not be null");
+    Utils.notNull(resultFunction, "ResultFunction must not be null");
     HBaseSourceFunction<T> sourceFunction =
         new HBaseSourceFunction<>(property, queryFunction, resultFunction, 
runningFunc, null);
     return context.getJavaEnv().addSource(sourceFunction);
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/sink/HBaseSink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/sink/HBaseSink.scala
index d5df0fe94..43061a151 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/sink/HBaseSink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/sink/HBaseSink.scala
@@ -80,9 +80,9 @@ class HBaseSink(
     implicit val prop: Properties =
       ConfigUtils.getConf(ctx.parameter.toMap, HBASE_PREFIX, 
HBASE_PREFIX)(alias)
     Utils.copyProperties(property, prop)
-    require(stream != null, () => s"sink Stream must not null")
-    require(tableName != null, () => s"sink tableName must not null")
-    require(fun != null, () => s" fun must not null")
+    require(stream != null, () => s"Sink Stream must not null")
+    require(tableName != null, () => s"Sink tableName must not null")
+    require(fun != null, () => s"Func must not null")
     prop
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/function/HttpSinkFunction.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/function/HttpSinkFunction.scala
index 369b8e90d..5bfbe42f6 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/function/HttpSinkFunction.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/function/HttpSinkFunction.scala
@@ -62,7 +62,7 @@ class HttpSinkFunction(
         val table = thresholdConf.failoverTable
         require(
           table != null && table.nonEmpty,
-          () => s"http async  insert failoverTable must not null")
+          () => s"Http async  insert failoverTable must not null")
 
         httpSinkWriter = HttpSinkWriter(thresholdConf, header)
         failoverChecker = FailoverChecker(thresholdConf.delayTime)
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
index 86fca0388..df39f3d61 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
@@ -55,7 +55,7 @@ public class JdbcJavaSink<T> {
   }
 
   public DataStreamSink<T> sink(DataStream<T> dataStream) {
-    Utils.notNull(sqlFunc, "transformFunction can not be null");
+    Utils.notNull(sqlFunc, "TransformFunction can not be null");
     this.jdbc =
         this.jdbc == null ? 
ConfigUtils.getJdbcConf(context.parameter().toMap(), alias) : this.jdbc;
     JdbcSinkFunction<T> sinkFun = new JdbcSinkFunction<>(this.jdbc, 
this.sqlFunc);
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index 69f494f23..464f1b715 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -54,8 +54,8 @@ public class JdbcJavaSource<T> {
       SQLResultFunction<T> resultFunction,
       RunningFunction runningFunc) {
 
-    Utils.notNull(queryFunction, "queryFunction must not be null");
-    Utils.notNull(resultFunction, "resultFunction must not be null");
+    Utils.notNull(queryFunction, "'queryFunction' must not be null");
+    Utils.notNull(resultFunction, "'resultFunction' must not be null");
     this.jdbc =
         this.jdbc == null ? 
ConfigUtils.getJdbcConf(context.parameter().toMap(), alias) : this.jdbc;
     JdbcSourceFunction<T> sourceFunction =
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
index 7d157c0db..1303e3661 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
@@ -78,7 +78,7 @@ class JdbcSink(
       case Semantic.EXACTLY_ONCE =>
         val sinkFun = new Jdbc2PCSinkFunction[T](prop, toSQLFn)
         if (parallelism > 1) {
-          logWarn(s"parallelism:$parallelism, Jdbc Semantic 
EXACTLY_ONCE,parallelism bust be 1.")
+          logWarn(s"'parallelism':$parallelism, Jdbc Semantic 
EXACTLY_ONCE,parallelism bust be 1.")
         }
         stream.addSink(sinkFun)
       case _ =>
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
index dc49f621d..d7dc70bd9 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
@@ -43,9 +43,9 @@ public class MongoJavaSource<T> {
       MongoResultFunction<T> resultFunction,
       RunningFunction runningFunc) {
 
-    Utils.notNull(collectionName, "collectionName must not be null");
-    Utils.notNull(queryFunction, "queryFunction must not be null");
-    Utils.notNull(resultFunction, "resultFunction must not be null");
+    Utils.notNull(collectionName, "'collectionName' must not be null");
+    Utils.notNull(queryFunction, "'queryFunction' must not be null");
+    Utils.notNull(resultFunction, "'resultFunction' must not be null");
     MongoSourceFunction<T> sourceFunction =
         new MongoSourceFunction<>(
             collectionName, property, queryFunction, resultFunction, 
runningFunc, null);
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisMapper.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisMapper.scala
index e15f35136..dabffa95f 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisMapper.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisMapper.scala
@@ -25,15 +25,19 @@ import 
org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand,
 
 object RedisMapper {
 
+  val additionalFailoverTableNullHint = "Redis additionalKey insert 
failoverTable must not null";
+  val insertFailoverTableNullHint = "Redis cmd insert failoverTable must not 
null";
+
+
   def map[T](
       cmd: RedisCommand,
       additionalKey: String,
       scalaKeyFun: T => String,
       scalaValueFun: T => String): RedisMapper[T] = {
-    require(cmd != null, () => s"redis cmd  insert failoverTable must not 
null")
-    require(additionalKey != null, () => s"redis additionalKey  insert 
failoverTable must not null")
-    require(scalaKeyFun != null, () => s"redis scalaKeyFun  insert 
failoverTable must not null")
-    require(scalaValueFun != null, () => s"redis scalaValueFun  insert 
failoverTable must not null")
+    require(cmd != null, () => insertFailoverTableNullHint)
+    require(additionalKey != null, () => failoverTableNullHint)
+    require(scalaKeyFun != null, () => s"Redis scalaKeyFun insert 
failoverTable must not null")
+    require(scalaValueFun != null, () => s"Redis scalaValueFun insert 
failoverTable must not null")
     new RedisMapper[T](cmd, additionalKey, scalaKeyFun, scalaValueFun)
   }
 
@@ -42,10 +46,10 @@ object RedisMapper {
       additionalKey: String,
       javaKeyFun: TransformFunction[T, String],
       javaValueFun: TransformFunction[T, String]): RedisMapper[T] = {
-    require(cmd != null, () => s"redis cmd  insert failoverTable must not 
null")
-    require(additionalKey != null, () => s"redis additionalKey  insert 
failoverTable must not null")
-    require(javaKeyFun != null, () => s"redis javaKeyFun  insert failoverTable 
must not null")
-    require(javaValueFun != null, () => s"redis javaValueFun  insert 
failoverTable must not null")
+    require(cmd != null, () => insertFailoverTableNullHint)
+    require(additionalKey != null, () => additionalFailoverTableNullHint)
+    require(javaKeyFun != null, () => s"Redis javaKeyFun insert failoverTable 
must not null")
+    require(javaValueFun != null, () => s"Redis javaValueFun insert 
failoverTable must not null")
     new RedisMapper[T](cmd, additionalKey, javaKeyFun, javaValueFun)
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/conf/RedisConfig.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/conf/RedisConfig.scala
index dfbb68681..7eda5b51d 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/conf/RedisConfig.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/conf/RedisConfig.scala
@@ -39,7 +39,7 @@ class RedisConfig(parameters: Properties) extends 
Serializable {
             if (x.contains(sinkOption.SIGN_COLON)) x;
             else {
               throw new IllegalArgumentException(
-                s"redis sentinel host invalid {$x} must match host:port ")
+                s"Redis sentinel host invalid {$x} must match host:port ")
             }
           })
         .toSet
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
index 072e42544..9adb40a43 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
@@ -97,7 +97,7 @@ class RedisSink(
             val field = Try(builder.getClass.getDeclaredField(x._1)).getOrElse 
{
               throw new IllegalArgumentException(
                 s"""
-                   |redis config error,property:${x._1} invalid,init 
FlinkJedisSentinelConfig error, property options:
+                   |Redis config error,property:${x._1} invalid,init 
FlinkJedisSentinelConfig error, property options:
                    |<String masterName>,
                    |<Set<String> sentinels>,
                    |<int connectionTimeout>,
@@ -121,7 +121,7 @@ class RedisSink(
             val field = Try(builder.getClass.getDeclaredField(x._1)).getOrElse 
{
               throw new IllegalArgumentException(
                 s"""
-                   |redis config error,property:${x._1} invalid,init 
FlinkJedisPoolConfig error,property options:
+                   |Redis config error,property:${x._1} invalid,init 
FlinkJedisPoolConfig error,property options:
                    |<String host>,
                    |<int port>,
                    |<int timeout>,
@@ -138,7 +138,7 @@ class RedisSink(
         builder.build()
       case _ =>
         throw throw new IllegalArgumentException(
-          s"redis connectType must be jedisPool|sentinel $connectType")
+          s"Redis connectType must be jedisPool|sentinel $connectType")
     }
   }
 
@@ -155,12 +155,12 @@ class RedisSink(
       stream: DataStream[T],
       mapper: RedisMapper[T],
       ttl: Int = Int.MaxValue): DataStreamSink[T] = {
-    require(stream != null, () => s"sink Stream must not null")
-    require(mapper != null, () => s"redis mapper must not null")
-    require(ttl > 0, () => s"redis ttl must greater than 0")
+    require(stream != null, () => s"Sink Stream must not null")
+    require(mapper != null, () => s"Redis mapper must not null")
+    require(ttl > 0, () => s"Redis ttl must greater than 0")
     val sinkFun = (enableCheckpoint, cpMode) match {
       case (false, CheckpointingMode.EXACTLY_ONCE) =>
-        throw new IllegalArgumentException("redis sink EXACTLY_ONCE must 
enable checkpoint")
+        throw new IllegalArgumentException("Redis sink EXACTLY_ONCE must 
enable checkpoint")
       case (true, CheckpointingMode.EXACTLY_ONCE) =>
         new Redis2PCSinkFunction[T](config, mapper, ttl)
       case _ => new RedisSinkFunction[T](config, mapper, ttl)
diff --git 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
index b57d7f478..a245508ee 100644
--- 
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.v2.operator
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.flink.kubernetes.v2.{pathLastSegment, yamlMapper}
 import org.apache.streampark.flink.kubernetes.v2.K8sTools.usingK8sClient
 import org.apache.streampark.flink.kubernetes.v2.fs.FileMirror
@@ -67,12 +68,13 @@ object CROperator extends CROperator {
       // Generate FlinkDeployment CR
       correctedJob        <- mirrorJobJarToHttpFileServer(spec.job, 
mirrorSpace)
       correctedExtJars    <- mirrorExtJarsToHttpFileServer(spec.extJarPaths, 
mirrorSpace)
-      correctedPod        <- correctPodSpec(
-                               spec.podTemplate,
-                               correctedExtJars ++ 
correctedJob.map(_.jarURI).filter(_.startsWith("http://";)).toArray[String]
-                             )
+      correctedPod        <-
+        correctPodSpec(
+          spec.podTemplate,
+          correctedExtJars ++ 
correctedJob.map(_.jarURI).filter(_.startsWith(Constant.HTTP_SCHEMA)).toArray[String]
+        )
       correctedLocalUriJob = correctedJob.map { jobDef =>
-                               if (!jobDef.jarURI.startsWith("http://";)) jobDef
+                               if 
(!jobDef.jarURI.startsWith(Constant.HTTP_SCHEMA)) jobDef
                                else jobDef.copy("local:///opt/flink/lib/" + 
pathLastSegment(jobDef.jarURI))
                              }
       correctedSpec        = spec.copy(
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
index 237dfd5d9..e67b74b28 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
@@ -17,13 +17,13 @@
 
 package org.apache.streampark.flink.kubernetes.model
 
-import org.apache.streampark.common.conf.K8sFlinkConfig
+import org.apache.streampark.common.Constant
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
 
 /** flink cluster identifier on kubernetes */
 case class ClusterKey(
     executeMode: FlinkK8sExecuteModeEnum.Value,
-    namespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
+    namespace: String = Constant.DEFAULT,
     clusterId: String)
 
 object ClusterKey {
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index 34e6dd30d..2dc7d0c7d 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.kubernetes.model
 
-import org.apache.streampark.common.conf.K8sFlinkConfig
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.util.Utils
 import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
 
@@ -26,7 +26,7 @@ import scala.util.Try
 /** tracking identifier for flink on kubernetes */
 case class TrackId(
     executeMode: FlinkK8sExecuteModeEnum.Value,
-    namespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
+    namespace: String = Constant.DEFAULT,
     clusterId: String,
     appId: Long,
     jobId: String,
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/PackerResourceGC.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/PackerResourceGC.scala
index 7b3c1d1c0..b73cf0e84 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/PackerResourceGC.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/PackerResourceGC.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.packer
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.Workspace
 import org.apache.streampark.common.util.Logger
 
@@ -57,7 +58,7 @@ object PackerResourceGC extends Logger {
   }
 
   private def findLastModifiedOfSubFile(file: File): Array[(File, Long)] = {
-    val isApplicationMode = 
file.listFiles.map(_.getName).exists(_.contains(".jar"))
+    val isApplicationMode = 
file.listFiles.map(_.getName).exists(_.contains(Constant.JAR_SUFFIX))
     if (isApplicationMode) {
       Array(file -> file.listFiles.map(_.lastModified).max)
     } else {
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/FlinkDockerfileTemplateTrait.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/FlinkDockerfileTemplateTrait.scala
index f24092ea4..319c862d4 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/FlinkDockerfileTemplateTrait.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/FlinkDockerfileTemplateTrait.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.packer.docker
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.fs.LfsOperator
 
 import org.apache.commons.io.FileUtils
@@ -80,10 +81,13 @@ trait FlinkDockerfileTemplateTrait {
     flinkExtraLibPaths
       .map(new File(_))
       .filter(_.exists())
-      .filter(_.getName.endsWith(".jar"))
+      .filter(_.getName.endsWith(Constant.JAR_SUFFIX))
       .flatMap {
         case f if f.isDirectory =>
-          
f.listFiles.filter(_.isFile).filter(_.getName.endsWith(".jar")).map(_.getAbsolutePath)
+          f.listFiles
+            .filter(_.isFile)
+            .filter(_.getName.endsWith(Constant.JAR_SUFFIX))
+            .map(_.getAbsolutePath)
         case f if f.isFile => Array(f.getAbsolutePath)
       }
       .foreach(LfsOperator.copy(_, s"${workspace.toString}/$FLINK_LIB_PATH"))
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 03eb83839..bd4648151 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.packer.maven
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.{InternalConfigHolder, Workspace}
 import org.apache.streampark.common.conf.CommonConfig.{MAVEN_AUTH_PASSWORD, 
MAVEN_AUTH_USER, MAVEN_REMOTE_URL}
 import org.apache.streampark.common.util.{Logger, Utils}
@@ -60,7 +61,10 @@ object MavenTool extends Logger {
 
   private[this] def getRemoteRepos(): List[RemoteRepository] = {
     val builder =
-      new RemoteRepository.Builder("central", "default", 
InternalConfigHolder.get(MAVEN_REMOTE_URL))
+      new RemoteRepository.Builder(
+        "central",
+        Constant.DEFAULT,
+        InternalConfigHolder.get(MAVEN_REMOTE_URL))
     val remoteRepository =
       if (
         InternalConfigHolder.get(MAVEN_AUTH_USER) == null || 
InternalConfigHolder.get(
@@ -98,8 +102,9 @@ object MavenTool extends Logger {
     // check userJarPath
     val uberJar = new File(outFatJarPath)
     require(
-      outFatJarPath.endsWith(".jar") && !uberJar.isDirectory,
-      s"[StreamPark] streampark-packer: outFatJarPath($outFatJarPath) should 
be a JAR file.")
+      outFatJarPath.endsWith(Constant.JAR_SUFFIX) && !uberJar.isDirectory,
+      s"[StreamPark] streampark-packer: outFatJarPath($outFatJarPath) should 
be a JAR file."
+    )
     uberJar.delete()
     // resolve all jarLibs
     val jarSet = new util.HashSet[File]
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildPipeline.scala
index bd690667f..968b8efcd 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildPipeline.scala
@@ -97,20 +97,20 @@ trait BuildPipeline extends BuildPipelineProcess with 
BuildPipelineExpose with L
     Try {
       curStep = seq
       stepsStatus(seq) = PipelineStepStatusEnum.running -> 
System.currentTimeMillis
-      logInfo(s"building pipeline step[$seq/$allSteps] running => 
${pipeType.getSteps.get(seq)}")
+      logInfo(s"Building pipeline step[$seq/$allSteps] running => 
${pipeType.getSteps.get(seq)}")
       watcher.onStepStateChange(snapshot)
       process
     } match {
       case Success(result) =>
         stepsStatus(seq) = PipelineStepStatusEnum.success -> 
System.currentTimeMillis
-        logInfo(s"building pipeline step[$seq/$allSteps] success")
+        logInfo(s"Building pipeline step[$seq/$allSteps] success")
         watcher.onStepStateChange(snapshot)
         Some(result)
       case Failure(cause) =>
         stepsStatus(seq) = PipelineStepStatusEnum.failure -> 
System.currentTimeMillis
         pipeStatus = PipelineStatusEnum.failure
         error = PipeError.of(cause.getMessage, cause)
-        logInfo(s"building pipeline step[$seq/$allSteps] failure => 
${pipeType.getSteps.get(seq)}")
+        logInfo(s"Building pipeline step[$seq/$allSteps] failure => 
${pipeType.getSteps.get(seq)}")
         watcher.onStepStateChange(snapshot)
         None
     }
@@ -119,7 +119,7 @@ trait BuildPipeline extends BuildPipelineProcess with 
BuildPipelineExpose with L
   protected def skipStep(step: Int): Unit = {
     curStep = step
     stepsStatus(step) = PipelineStepStatusEnum.skipped -> 
System.currentTimeMillis
-    logInfo(s"building pipeline step[$step/$allSteps] skipped => 
${pipeType.getSteps.get(step)}")
+    logInfo(s"Building pipeline step[$step/$allSteps] skipped => 
${pipeType.getSteps.get(step)}")
     watcher.onStepStateChange(snapshot)
   }
 
@@ -128,7 +128,7 @@ trait BuildPipeline extends BuildPipelineProcess with 
BuildPipelineExpose with L
     pipeStatus = PipelineStatusEnum.running
     Try {
       watcher.onStart(snapshot)
-      logInfo(s"building pipeline is launching, 
params=${offerBuildParam.toString}")
+      logInfo(s"Building pipeline is launching, 
params=${offerBuildParam.toString}")
       executor
         .submit(new Callable[BuildResult] {
           override def call(): BuildResult = buildProcess()
@@ -137,14 +137,14 @@ trait BuildPipeline extends BuildPipelineProcess with 
BuildPipelineExpose with L
     } match {
       case Success(result) =>
         pipeStatus = PipelineStatusEnum.success
-        logInfo(s"building pipeline has finished successfully.")
+        logInfo(s"Building pipeline has finished successfully.")
         watcher.onFinish(snapshot, result)
         result
       case Failure(cause) =>
         pipeStatus = PipelineStatusEnum.failure
         error = PipeError.of(cause.getMessage, cause)
         // log and print error trace stack
-        logError(s"building pipeline has failed.", cause)
+        logError(s"Building pipeline has failed.", cause)
         val result = ErrorResult()
         watcher.onFinish(snapshot, result)
         result
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 6caba3e35..ba8af634b 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -67,7 +67,7 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
       execStep(1) {
         val buildWorkspace = 
s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
         LfsOperator.mkCleanDirs(buildWorkspace)
-        logInfo(s"recreate building workspace: $buildWorkspace")
+        logInfo(s"Recreate building workspace: $buildWorkspace")
         buildWorkspace
       }.getOrElse(throw getError.exception)
 
@@ -80,7 +80,7 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
         execStep(2) {
           val podTemplateFiles =
             PodTemplateTool.preparePodTemplateFiles(buildWorkspace, 
podTemplate).tmplFiles
-          logInfo(s"export flink podTemplates: 
${podTemplateFiles.values.mkString(",")}")
+          logInfo(s"Export flink podTemplates: 
${podTemplateFiles.values.mkString(",")}")
           podTemplateFiles
         }.getOrElse(throw getError.exception)
     }
@@ -96,7 +96,7 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
         }
         val shadedJar =
           MavenTool.buildFatJar(request.mainClass, request.providedLibs, 
shadedJarOutputPath)
-        logInfo(s"output shaded flink job jar: ${shadedJar.getAbsolutePath}")
+        logInfo(s"Output shaded flink job jar: ${shadedJar.getAbsolutePath}")
         shadedJar -> extJarLibs
       }.getOrElse(throw getError.exception)
 
@@ -120,14 +120,14 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
         }
         val dockerFile = dockerFileTemplate.writeDockerfile
         logInfo(
-          s"output flink dockerfile: ${dockerFile.getAbsolutePath}, content: 
\n${dockerFileTemplate.offerDockerfileContent}")
+          s"Output flink dockerfile: ${dockerFile.getAbsolutePath}, content: 
\n${dockerFileTemplate.offerDockerfileContent}")
         dockerFile -> dockerFileTemplate
       }.getOrElse(throw getError.exception)
 
     val dockerConf = request.dockerConfig
     val baseImageTag = request.flinkBaseImage.trim
     val pushImageTag = {
-      val expectedImageTag = 
s"streamparkflinkjob-${request.k8sNamespace}-${request.clusterId}"
+      val expectedImageTag = s"streampark 
flinkjob-${request.k8sNamespace}-${request.clusterId}"
       compileTag(expectedImageTag, dockerConf.registerAddress, 
dockerConf.imageNamespace)
     }
 
@@ -155,8 +155,8 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
                 
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
             })
           pullCmdCallback.awaitCompletion
-          logInfo(s"already pulled docker image from remote register, 
imageTag=$baseImageTag")
-      }(err => throw new Exception(s"pull docker image failed, 
imageTag=$baseImageTag", err))
+          logInfo(s"Already pulled docker image from remote register, 
imageTag=$baseImageTag")
+      }(err => throw new Exception(s"Pull docker image failed, 
imageTag=$baseImageTag", err))
     }.getOrElse(throw getError.exception)
 
     // Step-6: build flink image
@@ -178,8 +178,8 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
                   
dockerProcessWatcher.onDockerBuildProgressChange(dockerProcess.build.snapshot))
             })
           val imageId = buildCmdCallback.awaitImageId
-          logInfo(s"built docker image, imageId=$imageId, 
imageTag=$pushImageTag")
-      }(err => throw new Exception(s"build docker image failed. 
tag=$pushImageTag", err))
+          logInfo(s"Built docker image, imageId=$imageId, 
imageTag=$pushImageTag")
+      }(err => throw new Exception(s"Build docker image failed. 
tag=$pushImageTag", err))
     }.getOrElse(throw getError.exception)
 
     // Step-7: push flink image
@@ -198,8 +198,8 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
                 
Future(dockerProcessWatcher.onDockerPushProgressChange(dockerProcess.push.snapshot))
             })
           pushCmdCallback.awaitCompletion
-          logInfo(s"already pushed docker image, imageTag=$pushImageTag")
-      }(err => throw new Exception(s"push docker image failed. 
tag=$pushImageTag", err))
+          logInfo(s"Already pushed docker image, imageTag=$pushImageTag")
+      }(err => throw new Exception(s"Push docker image failed. 
tag=$pushImageTag", err))
     }.getOrElse(throw getError.exception)
 
     // Step-8:  init build workspace of ingress
@@ -211,7 +211,7 @@ class FlinkK8sApplicationBuildPipeline(request: 
FlinkK8sApplicationBuildRequest)
         execStep(8) {
           val ingressOutputPath =
             IngressController.prepareIngressTemplateFiles(buildWorkspace, 
request.ingressTemplate)
-          logInfo(s"export flink ingress: $ingressOutputPath")
+          logInfo(s"Export flink ingress: $ingressOutputPath")
           ingressOutputPath
         }.getOrElse(throw getError.exception)
     }
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
index 5e086e517..544e18e41 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
@@ -41,7 +41,7 @@ class FlinkK8sApplicationBuildPipelineV2(request: 
FlinkK8sApplicationBuildReques
       execStep(1) {
         val buildWorkspace = 
s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
         LfsOperator.mkCleanDirs(buildWorkspace)
-        logInfo(s"recreate building workspace: $buildWorkspace")
+        logInfo(s"Recreate building workspace: $buildWorkspace")
         buildWorkspace
       }.getOrElse(throw getError.exception)
 
@@ -57,7 +57,7 @@ class FlinkK8sApplicationBuildPipelineV2(request: 
FlinkK8sApplicationBuildReques
         }
         val shadedJar =
           MavenTool.buildFatJar(request.mainClass, request.providedLibs, 
shadedJarOutputPath)
-        logInfo(s"output shaded flink job jar: ${shadedJar.getAbsolutePath}")
+        logInfo(s"Output shaded flink job jar: ${shadedJar.getAbsolutePath}")
         shadedJar -> extJarLibs
       }.getOrElse(throw getError.exception)
 
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sSessionBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sSessionBuildPipeline.scala
index 9b86e9b5f..5d6eb5696 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sSessionBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sSessionBuildPipeline.scala
@@ -38,7 +38,7 @@ class FlinkK8sSessionBuildPipeline(request: 
FlinkK8sSessionBuildRequest) extends
       execStep(1) {
         val buildWorkspace = 
s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
         LfsOperator.mkCleanDirs(buildWorkspace)
-        logInfo(s"recreate building workspace: $buildWorkspace")
+        logInfo(s"Recreate building workspace: $buildWorkspace")
         buildWorkspace
       }.getOrElse(throw getError.exception)
 
@@ -50,7 +50,7 @@ class FlinkK8sSessionBuildPipeline(request: 
FlinkK8sSessionBuildRequest) extends
           request.mainClass,
           request.providedLibs,
           request.getShadedJarPath(buildWorkspace))
-        logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
+        logInfo(s"Output shaded flink job jar: ${output.getAbsolutePath}")
         output
       }.getOrElse(throw getError.exception)
 
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
index fbd741070..11fd54a78 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
@@ -44,7 +44,7 @@ class FlinkRemoteBuildPipeline(request: 
FlinkRemotePerJobBuildRequest) extends B
     } else {
       execStep(1) {
         LfsOperator.mkCleanDirs(request.workspace)
-        logInfo(s"recreate building workspace: ${request.workspace}")
+        logInfo(s"Recreate building workspace: ${request.workspace}")
       }.getOrElse(throw getError.exception)
       // build flink job shaded jar
       val shadedJar =
diff --git 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index e804c834d..69ea6bf60 100644
--- 
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++ 
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -52,7 +52,7 @@ class FlinkYarnApplicationBuildPipeline(request: 
FlinkYarnApplicationBuildReques
           HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
         case _ =>
       }
-      logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
+      logInfo(s"Recreate building workspace: ${request.yarnProvidedPath}")
     }.getOrElse(throw getError.exception)
 
     val mavenJars =
diff --git 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 44294d756..2a8050f35 100644
--- 
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++ 
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.flink.proxy
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion}
 import org.apache.streampark.common.util.{ClassLoaderUtils, Logger, Utils}
 import org.apache.streampark.common.util.ImplicitsUtils._
@@ -78,7 +79,7 @@ object FlinkShimsProxy extends Logger {
 
   // need to load all flink-table dependencies compatible with different 
versions
   def getVerifySqlLibClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
-    logInfo(s"add verify sql lib,flink version: $flinkVersion")
+    logInfo(s"Add verify sql lib,flink version: $flinkVersion")
     VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(
       s"${flinkVersion.fullVersion}", {
         val getFlinkTable: File => Boolean = 
_.getName.startsWith("flink-table")
@@ -123,20 +124,20 @@ object FlinkShimsProxy extends Logger {
       .foreach(
         (jar: File) => {
           val jarName = jar.getName
-          if (jarName.endsWith(".jar")) {
+          if (jarName.endsWith(Constant.JAR_SUFFIX)) {
             if (jarName.startsWith(FLINK_SHIMS_PREFIX)) {
               val prefixVer = 
s"$FLINK_SHIMS_PREFIX-${majorVersion}_$scalaVersion"
               if (jarName.startsWith(prefixVer)) {
                 addShimUrl(jar)
-                logInfo(s"include flink shims jar lib: $jarName")
+                logInfo(s"Include flink shims jar lib: $jarName")
               }
             } else {
               if (INCLUDE_PATTERN.matcher(jarName).matches()) {
                 addShimUrl(jar)
-                logInfo(s"include jar lib: $jarName")
+                logInfo(s"Include jar lib: $jarName")
               } else if (jarName.matches(s"^streampark-.*_$scalaVersion.*$$")) 
{
                 addShimUrl(jar)
-                logInfo(s"include streampark lib: $jarName")
+                logInfo(s"Include streampark lib: $jarName")
               }
             }
           }
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 26c31f928..417d00adb 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -16,6 +16,7 @@
  */
 package org.apache.streampark.flink.core
 
+import org.apache.streampark.common.Constant
 import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
 import org.apache.streampark.common.util.{ExceptionUtils, Logger, Utils}
 import org.apache.streampark.flink.core.SqlCommand._
@@ -83,7 +84,7 @@ object FlinkSqlValidator extends Logger {
               .getOrElse(Class.forName(FLINK113_PLUS_CALCITE_PARSER_CLASS))
             sqlDialect.toUpperCase() match {
               case "HIVE" =>
-              case "DEFAULT" =>
+              case Constant.DEFAULT =>
                 val parser = calciteClass
                   .getConstructor(Array(classOf[Config]): _*)
                   .newInstance(sqlParserConfigMap(sqlDialect.toUpperCase()))
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
index b2f5a6eb7..0652a2e3d 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
@@ -94,6 +94,11 @@
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-common_${scala.binary.version}</artifactId>
+            <scope>provided</scope>
+        </dependency>
 
     </dependencies>
 
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
index a6e516657..6a34fa492 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.gateway.factories;
 
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.gateway.ConfigOption;
 import org.apache.streampark.gateway.exception.ValidationException;
 
@@ -30,7 +31,7 @@ import java.util.stream.Collectors;
 /** Factory utils for {@link Factory}. */
 public class FactoryUtil {
 
-  private static final String DEFAULT_IDENTIFIER = "default";
+  private static final String DEFAULT_IDENTIFIER = Constant.DEFAULT;
   private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
   public static final ConfigOption<String> SQL_GATEWAY_SERVICE_TYPE =
       ConfigOption.key("streampark.sql-gateway.service")
diff --git 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
index a219739bb..e3e1f4464 100644
--- 
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
+++ 
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
@@ -17,6 +17,7 @@
 
 package org.apache.streampark.gateway.factories;
 
+import org.apache.streampark.common.Constant;
 import org.apache.streampark.gateway.ConfigOption;
 import org.apache.streampark.gateway.exception.ValidationException;
 import org.apache.streampark.gateway.service.SqlGatewayService;
@@ -56,7 +57,7 @@ public class SqlGatewayServiceFactoryUtils {
                             "Service options do not contain an option key '%s' 
for discovering an service.",
                             SQL_GATEWAY_SERVICE_TYPE.getKey())));
 
-    List<String> identifiers = Arrays.asList(identifiersStr.split(";"));
+    List<String> identifiers = 
Arrays.asList(identifiersStr.split(Constant.SEMICOLON));
 
     if (identifiers.isEmpty()) {
       throw new ValidationException(


Reply via email to