This is an automated email from the ASF dual-hosted git repository.
panyuepeng pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new a35618b24 [Improve] Constant improvement (#3272)
a35618b24 is described below
commit a35618b2479936ef6915f201f959b5275bc3e19c
Author: benjobs <[email protected]>
AuthorDate: Mon Oct 23 03:27:35 2023 -0500
[Improve] Constant improvement (#3272)
* [Improve] Constant improvement
* minor imprement
---------
Co-authored-by: benjobs <[email protected]>
---
.../org/apache/streampark/common/Constant.java | 22 ++++++++++++-
.../conf/{ConfigConst.scala => ConfigKeys.scala} | 36 +---------------------
.../common/conf/InternalConfigHolder.scala | 3 +-
.../streampark/common/util/ConfigUtils.scala | 5 +--
.../streampark/common/util/HBaseClient.scala | 2 +-
.../streampark/common/util/HadoopUtils.scala | 6 ++--
.../apache/streampark/common/util/JdbcUtils.scala | 2 +-
.../streampark/common/util/MongoConfig.scala | 2 +-
.../streampark/common/util/RedisClient.scala | 5 +--
.../streampark/common/util/RedisEndpoint.scala | 2 +-
.../org/apache/streampark/common/util/Utils.scala | 19 +++++++++++-
.../streampark/console/base/util/WebUtils.java | 4 +--
.../console/core/entity/Application.java | 8 ++---
.../console/core/entity/ApplicationConfig.java | 22 ++++++-------
.../console/core/entity/FlinkCluster.java | 4 +--
.../streampark/console/core/entity/Variable.java | 4 +--
.../console/core/runner/EnvInitializer.java | 6 ++--
.../impl/ApplicationActionServiceImpl.java | 26 ++++++++--------
.../core/service/impl/AppBuildPipeServiceImpl.java | 3 +-
.../core/service/impl/ResourceServiceImpl.java | 4 +--
.../core/utils/YarnQueueLabelExpression.java | 6 ++--
.../streampark/console/system/entity/User.java | 4 +--
.../console/system/runner/StartedUpRunner.java | 4 +--
.../console/SpringIntegrationTestBase.java | 4 +--
.../streampark/console/SpringUnitTestBase.java | 4 +--
.../flink/client/bean/SubmitRequest.scala | 9 +++---
.../flink/client/impl/YarnApplicationClient.scala | 9 +++---
.../flink/client/trait/FlinkClientTrait.scala | 9 +++---
.../clickhouse/conf/ClickHouseHttpConfig.scala | 4 +--
.../connector/doris/conf/DorisConfig.scala | 6 ++--
.../connector/influx/function/InfluxFunction.scala | 2 +-
.../connector/jdbc/internal/JdbcSinkFunction.scala | 2 +-
.../flink/connector/jdbc/sink/JdbcSink.scala | 2 +-
.../flink/connector/kafka/sink/KafkaSink.scala | 6 ++--
.../flink/connector/kafka/source/KafkaSource.scala | 4 +--
.../flink/core/scala/FlinkStreamTable.scala | 2 +-
.../flink/core/scala/FlinkStreaming.scala | 6 ++--
.../streampark/flink/core/scala/FlinkTable.scala | 2 +-
.../flink/kubernetes/KubernetesRetriever.scala | 4 +--
.../flink/kubernetes/ingress/IngressStrategy.scala | 4 +--
.../kubernetes/watcher/FlinkK8sEventWatcher.scala | 4 +--
.../streampark/flink/proxy/FlinkShimsProxy.scala | 6 ++--
.../streampark/flink/core/EnhancerImplicit.scala | 2 +-
.../streampark/flink/core/FlinkSqlExecutor.scala | 2 +-
.../flink/core/FlinkStreamTableTrait.scala | 5 +--
.../flink/core/FlinkStreamingInitializer.scala | 2 +-
.../flink/core/FlinkTableInitializer.scala | 2 +-
.../streampark/flink/core/FlinkTableTrait.scala | 5 +--
.../streampark/flink/core/SqlCommandParser.scala | 2 +-
.../streampark/flink/core/conf/ParameterCli.scala | 8 ++---
.../flink/core/test/FlinkSqlExecuteFunSuite.scala | 2 +-
.../streampark/flink/core/TableContext.scala | 4 +--
.../streampark/flink/core/TableContext.scala | 4 +--
.../streampark/flink/core/TableContext.scala | 4 +--
.../streampark/flink/core/TableContext.scala | 4 +--
.../apache/streampark/flink/cli/SqlClient.scala | 2 +-
56 files changed, 173 insertions(+), 163 deletions(-)
diff --git
a/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
index ac02bb8a1..553d68bb3 100644
--- a/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
+++ b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
@@ -18,15 +18,35 @@
package org.apache.streampark.common;
/** A constant class to hold the constants variables. */
-public class Constant {
+public final class Constant {
private Constant() {}
public static final String DEFAULT = "default";
+
public static final String STREAM_PARK = "streampark";
+
public static final String HTTP_SCHEMA = "http://";
+
public static final String HTTPS_SCHEMA = "https://";
+
public static final String JAR_SUFFIX = ".jar";
+
public static final String ZIP_SUFFIX = ".zip";
+
+ public static final String EMPTY_STRING = "";
+
+ public static final String PYTHON_SUFFIX = ".py";
+
public static final String SEMICOLON = ";";
+
+ public static final String DEFAULT_DATAMASK_STRING = "********";
+
+ public static final String PYTHON_FLINK_DRIVER_CLASS_NAME =
+ "org.apache.flink.client.python.PythonDriver";
+
+ public static final String STREAMPARK_FLINKSQL_CLIENT_CLASS =
+ "org.apache.streampark.flink.cli.SqlClient";
+
+ public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3";
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala
similarity index 75%
rename from
streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
rename to
streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala
index f465f25e5..f2fb98fb0 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala
@@ -16,22 +16,10 @@
*/
package org.apache.streampark.common.conf
-import java.time.LocalDateTime
-
-object ConfigConst {
-
- /** common const */
- val DEFAULT_DATAMASK_STRING = "********"
+object ConfigKeys {
val PARAM_PREFIX = "--"
- /** pyflink */
- val PYTHON_SUFFIX = ".py"
-
- val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver"
-
- val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"
-
/** about parameter... */
val KEY_APP_HOME = "app.home"
@@ -49,9 +37,6 @@ object ConfigConst {
val KEY_SEMANTIC = "semantic"
- /** sign.... */
- val SIGN_EMPTY = ""
-
/** kerberos */
val KEY_KERBEROS = "kerberos"
@@ -180,23 +165,4 @@ object ConfigConst {
val KEY_FLINK_TM_PROCESS_MEMORY = "taskmanager.memory.process.size"
- val STREAMPARK_FLINKSQL_CLIENT_CLASS =
"org.apache.streampark.flink.cli.SqlClient"
-
- def printLogo(info: String): Unit = {
- // scalastyle:off println
- println("\n")
- println(" _____ __ __
")
- println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__
")
- println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/
//_/")
- println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,<
")
- println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/
/_/|_| ")
- println(" /_/
\n\n")
- println(" Version: 2.2.0-SNAPSHOT
")
- println(" WebSite: https://streampark.apache.org
")
- println(" GitHub : https://github.com/apache/incubator-streampark
")
- println(s" Info : $info ")
- println(s" Time : ${LocalDateTime.now} \n\n")
- // scalastyle:on println
- }
-
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
index f6631b697..18aae4e8e 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.common.conf
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.common.util.ImplicitsUtils._
@@ -164,7 +165,7 @@ object InternalConfigHolder extends Logger {
| ${configKeys
.map(
key =>
- s"$key = ${if (key.contains("password"))
ConfigConst.DEFAULT_DATAMASK_STRING
+ s"$key = ${if (key.contains("password"))
Constant.DEFAULT_DATAMASK_STRING
else get(key)}")
.mkString("\n ")}""".stripMargin)
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
index e0c159299..4cb23a2ad 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
@@ -16,7 +16,8 @@
*/
package org.apache.streampark.common.util
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.ConfigKeys._
import java.util.{Map => JavaMap, Properties}
@@ -52,7 +53,7 @@ object ConfigUtils {
val kafkaProperty = new Properties()
param.foreach(x => kafkaProperty.put(x._1, x._2.trim))
val _topic = topic match {
- case SIGN_EMPTY =>
+ case Constant.EMPTY_STRING =>
val top = kafkaProperty.getOrElse(KEY_KAFKA_TOPIC, null)
if (top == null || top.split(",|\\s+").length > 1) {
throw new IllegalArgumentException(
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
index f128f252b..e1ec974e7 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.common.util
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.{HBaseConfiguration, TableName}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
index a2ae4bb90..93d0573a8 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala
@@ -17,8 +17,8 @@
package org.apache.streampark.common.util
-import org.apache.streampark.common.conf.{CommonConfig, ConfigConst,
InternalConfigHolder}
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.{CommonConfig, ConfigKeys,
InternalConfigHolder}
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.commons.collections.CollectionUtils
import org.apache.commons.lang3.StringUtils
@@ -62,7 +62,7 @@ object HadoopUtils extends Logger {
InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME)
private[this] lazy val kerberosConf: Map[String, String] =
- SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME, null) match {
+ SystemPropertyUtils.get(ConfigKeys.KEY_APP_HOME, null) match {
case null =>
getClass.getResourceAsStream("/kerberos.yml") match {
case x if x != null => PropertiesUtils.fromYamlFile(x)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
index 6029d88e9..22bbfb889 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.common.util
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import com.zaxxer.hikari.{HikariConfig, HikariDataSource}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
index e04c97d13..7a6764df9 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.common.util
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import com.mongodb._
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
index 2b2a4420f..c208411cb 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala
@@ -17,7 +17,8 @@
package org.apache.streampark.common.util
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.ConfigKeys
import redis.clients.jedis._
import redis.clients.jedis.exceptions.JedisConnectionException
@@ -92,7 +93,7 @@ object RedisClient extends Logger {
* @return
*/
def createJedisPool(endpoint: RedisEndpoint): JedisPool = {
- val endpointEn: RedisEndpoint = endpoint.copy(auth =
ConfigConst.DEFAULT_DATAMASK_STRING)
+ val endpointEn: RedisEndpoint = endpoint.copy(auth =
Constant.DEFAULT_DATAMASK_STRING)
logInfo(s"[StreamPark] RedisClient: createJedisPool with $endpointEn ")
new JedisPool(
poolConfig,
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
index ab765e691..82de51421 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.common.util
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import redis.clients.jedis.{Jedis, Protocol}
import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper,
SafeEncoder}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
index d56322982..5ab002ab2 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala
@@ -20,7 +20,7 @@ import org.apache.commons.lang3.StringUtils
import java.io._
import java.net.URL
-import java.time.Duration
+import java.time.{Duration, LocalDateTime}
import java.util.{jar, Collection => JavaCollection, Map => JavaMap,
Properties, UUID}
import java.util.concurrent.locks.LockSupport
import java.util.jar.{JarFile, JarInputStream}
@@ -156,4 +156,21 @@ object Utils extends Logger {
}
}
+ def printLogo(info: String): Unit = {
+ // scalastyle:off println
+ println("\n")
+ println(" _____ __ __
")
+ println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__
")
+ println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/
//_/")
+ println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,<
")
+ println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/
/_/|_| ")
+ println(" /_/
\n\n")
+ println(" Version: 2.2.0-SNAPSHOT
")
+ println(" WebSite: https://streampark.apache.org
")
+ println(" GitHub : https://github.com/apache/incubator-streampark
")
+ println(s" Info : $info ")
+ println(s" Time : ${LocalDateTime.now} \n\n")
+ // scalastyle:on println
+ }
+
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
index cfef4cbe0..7313788e5 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.base.util;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.commons.lang3.StringUtils;
@@ -97,7 +97,7 @@ public final class WebUtils {
}
public static String getAppHome() {
- return System.getProperty(ConfigConst.KEY_APP_HOME());
+ return System.getProperty(ConfigKeys.KEY_APP_HOME());
}
public static File getAppDir(String dir) {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index 670795012..09deab73d 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.entity;
import org.apache.streampark.common.Constant;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
@@ -278,10 +278,10 @@ public class Application implements Serializable {
Map<String, Object> hotParamsMap = this.getHotParamsMap();
if (MapUtils.isNotEmpty(hotParamsMap)
- && hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) {
- String yarnQueue =
hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString();
+ && hotParamsMap.containsKey(ConfigKeys.KEY_YARN_APP_QUEUE())) {
+ String yarnQueue =
hotParamsMap.get(ConfigKeys.KEY_YARN_APP_QUEUE()).toString();
String labelExpr =
-
Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL()))
+
Optional.ofNullable(hotParamsMap.get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()))
.map(Object::toString)
.orElse(null);
this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue,
labelExpr).toString());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
index b920d6e56..a50414839 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.entity;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.util.DeflaterUtils;
import org.apache.streampark.common.util.PropertiesUtils;
import org.apache.streampark.console.core.enums.ConfigFileTypeEnum;
@@ -103,16 +103,16 @@ public class ApplicationConfig {
Collectors.toMap(
entry -> {
String key = entry.getKey();
- if (key.startsWith(ConfigConst.KEY_FLINK_OPTION_PREFIX()))
{
- key =
key.substring(ConfigConst.KEY_FLINK_OPTION_PREFIX().length());
- } else if
(key.startsWith(ConfigConst.KEY_FLINK_PROPERTY_PREFIX())) {
- key =
key.substring(ConfigConst.KEY_FLINK_PROPERTY_PREFIX().length());
- } else if
(key.startsWith(ConfigConst.KEY_FLINK_TABLE_PREFIX())) {
- key =
key.substring(ConfigConst.KEY_FLINK_TABLE_PREFIX().length());
- } else if (key.startsWith(ConfigConst.KEY_APP_PREFIX())) {
- key =
key.substring(ConfigConst.KEY_APP_PREFIX().length());
- } else if (key.startsWith(ConfigConst.KEY_SQL_PREFIX())) {
- key =
key.substring(ConfigConst.KEY_SQL_PREFIX().length());
+ if (key.startsWith(ConfigKeys.KEY_FLINK_OPTION_PREFIX())) {
+ key =
key.substring(ConfigKeys.KEY_FLINK_OPTION_PREFIX().length());
+ } else if
(key.startsWith(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX())) {
+ key =
key.substring(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX().length());
+ } else if
(key.startsWith(ConfigKeys.KEY_FLINK_TABLE_PREFIX())) {
+ key =
key.substring(ConfigKeys.KEY_FLINK_TABLE_PREFIX().length());
+ } else if (key.startsWith(ConfigKeys.KEY_APP_PREFIX())) {
+ key =
key.substring(ConfigKeys.KEY_APP_PREFIX().length());
+ } else if (key.startsWith(ConfigKeys.KEY_SQL_PREFIX())) {
+ key =
key.substring(ConfigKeys.KEY_SQL_PREFIX().length());
}
return key;
},
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
index 48f3d8b15..611fc038e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.entity;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.enums.ClusterState;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.common.enums.FlinkK8sRestExposedType;
@@ -138,7 +138,7 @@ public class FlinkCluster implements Serializable {
}
Map<String, Object> map = JacksonUtils.read(this.options, Map.class);
if (FlinkExecutionMode.YARN_SESSION == getFlinkExecutionModeEnum()) {
- map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName);
+ map.put(ConfigKeys.KEY_YARN_APP_NAME(), this.clusterName);
map.putAll(YarnQueueLabelExpression.getQueueLabelMap(yarnQueue));
}
map.entrySet().removeIf(entry -> entry.getValue() == null);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
index 458c74d3d..6d4f1f9e4 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.entity;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.Constant;
import com.baomidou.mybatisplus.annotation.IdType;
import com.baomidou.mybatisplus.annotation.TableId;
@@ -70,7 +70,7 @@ public class Variable implements Serializable {
public void dataMasking() {
if (desensitization) {
- this.setVariableValue(ConfigConst.DEFAULT_DATAMASK_STRING());
+ this.setVariableValue(Constant.DEFAULT_DATAMASK_STRING);
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
index 13d631d4f..86dc54169 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.runner;
import org.apache.streampark.common.conf.CommonConfig;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.InternalOption;
import org.apache.streampark.common.conf.Workspace;
@@ -92,14 +92,14 @@ public class EnvInitializer implements ApplicationRunner {
+ " The system initialization check failed. If started local
for development and debugging,"
+ " please ensure the -D%s parameter is clearly specified,"
+ " more detail:
https://streampark.apache.org/docs/user-guide/deployment",
- ConfigConst.KEY_APP_HOME()));
+ ConfigKeys.KEY_APP_HOME()));
}
// init InternalConfig
initInternalConfig(context.getEnvironment());
// overwrite system variable HADOOP_USER_NAME
String hadoopUserName =
InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME());
- overrideSystemProp(ConfigConst.KEY_HADOOP_USER_NAME(), hadoopUserName);
+ overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName);
// initialize local file system resources
storageInitialize(LFS);
// Launch the embedded http file server.
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index b60bd0d34..ff0745284 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console.core.service.application.impl;
import org.apache.streampark.common.Constant;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
@@ -418,7 +418,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
// Get the sql of the replaced placeholder
String realSql =
variableService.replaceVariable(application.getTeamId(), flinkSql.getSql());
flinkSql.setSql(DeflaterUtils.zipString(realSql));
- extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql());
+ extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql());
}
// TODO Need to display more K8s submission parameters in the front-end UI.
@@ -478,12 +478,12 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
submitResponse -> {
if (submitResponse.flinkConfig() != null) {
String jmMemory =
-
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY());
+
submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY());
if (jmMemory != null) {
application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes());
}
String tmMemory =
-
submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY());
+
submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY());
if (tmMemory != null) {
application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes());
}
@@ -610,7 +610,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
resource.getFilePath(), "pyflink file can't be null, start
application failed.");
ApiAlertException.throwIfFalse(
- resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX()),
+ resource.getFilePath().endsWith(Constant.PYTHON_SUFFIX),
"pyflink format error, must be a \".py\" suffix, start application
failed.");
flinkUserJar = resource.getFilePath();
@@ -621,7 +621,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
- ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(),
application.getMainClass());
+ ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(),
application.getMainClass());
} else {
switch (application.getApplicationType()) {
case STREAMPARK_FLINK:
@@ -639,7 +639,7 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
appConf =
String.format(
"json://{\"%s\":\"%s\"}",
- ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(),
application.getMainClass());
+ ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(),
application.getMainClass());
break;
default:
throw new IllegalArgumentException(
@@ -701,19 +701,19 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
"The yarn session clusterId=%s cannot be find, maybe the
clusterId is wrong or "
+ "the cluster has been deleted. Please contact the
Admin.",
application.getFlinkClusterId()));
- properties.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId());
+ properties.put(ConfigKeys.KEY_YARN_APP_ID(), cluster.getClusterId());
} else {
String yarnQueue =
- (String)
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE());
+ (String)
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE());
String yarnLabelExpr =
- (String)
application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_NODE_LABEL());
+ (String)
application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL());
Optional.ofNullable(yarnQueue)
- .ifPresent(yq -> properties.put(ConfigConst.KEY_YARN_APP_QUEUE(),
yq));
+ .ifPresent(yq -> properties.put(ConfigKeys.KEY_YARN_APP_QUEUE(),
yq));
Optional.ofNullable(yarnLabelExpr)
- .ifPresent(yLabel ->
properties.put(ConfigConst.KEY_YARN_APP_NODE_LABEL(), yLabel));
+ .ifPresent(yLabel ->
properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel));
}
} else if
(FlinkExecutionMode.isKubernetesMode(application.getFlinkExecutionMode())) {
- properties.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
+ properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always");
}
if
(FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode()))
{
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 222dc56bf..15946b7f6 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -18,7 +18,6 @@
package org.apache.streampark.console.core.service.impl;
import org.apache.streampark.common.Constant;
-import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
@@ -441,7 +440,7 @@ public class AppBuildPipeServiceImpl
}
FlinkExecutionMode executionModeEnum = app.getFlinkExecutionMode();
- String mainClass = ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS();
+ String mainClass = Constant.STREAMPARK_FLINKSQL_CLIENT_CLASS;
switch (executionModeEnum) {
case YARN_APPLICATION:
String yarnProvidedPath = app.getAppLib();
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
index 47b113616..c1b4b4564 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.service.impl;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.fs.FsOperator;
import org.apache.streampark.common.util.ExceptionUtils;
@@ -289,7 +289,7 @@ public class ResourceServiceImpl extends
ServiceImpl<ResourceMapper, Resource>
resp.put(EXCEPTION, ExceptionUtils.stringifyException(e));
return RestResponse.success().data(resp);
}
- if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) {
+ if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) {
return RestResponse.success().data(resp);
}
Manifest manifest = Utils.getJarManifest(jarFile);
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
index e44482b58..ab0a35bea 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.core.utils;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.console.base.exception.ApiAlertException;
import org.apache.commons.lang3.StringUtils;
@@ -111,8 +111,8 @@ public class YarnQueueLabelExpression {
Map<String, String> map = new HashMap<>(2);
yarnQueueLabelExpression
.getLabelExpression()
- .ifPresent(labelExp -> map.put(ConfigConst.KEY_YARN_APP_NODE_LABEL(),
labelExp));
- map.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueueLabelExpression.queue);
+ .ifPresent(labelExp -> map.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(),
labelExp));
+ map.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yarnQueueLabelExpression.queue);
return map;
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
index 88a2f5733..1699e0b6e 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.system.entity;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.console.core.enums.LoginTypeEnum;
import org.apache.streampark.console.core.enums.UserTypeEnum;
@@ -101,7 +101,7 @@ public class User implements Serializable {
private Long lastTeamId;
public void dataMasking() {
- String dataMask = ConfigConst.DEFAULT_DATAMASK_STRING();
+ String dataMask = Constant.DEFAULT_DATAMASK_STRING;
this.setPassword(dataMask);
this.setSalt(dataMask);
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
index 4a19b709f..3228c2266 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java
@@ -17,7 +17,7 @@
package org.apache.streampark.console.system.runner;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.util.Utils;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
@@ -37,7 +37,7 @@ public class StartedUpRunner implements ApplicationRunner {
@Override
public void run(ApplicationArguments args) {
if (context.isActive()) {
- ConfigConst.printLogo("streampark-console start successful");
+ Utils.printLogo("streampark-console start successful");
}
}
}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
index 8780e221e..398fce3d5 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console;
import org.apache.streampark.common.conf.CommonConfig;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.util.SystemPropertyUtils;
import org.apache.commons.io.FileUtils;
@@ -91,7 +91,7 @@ public abstract class SpringIntegrationTestBase {
Files.createDirectories(new File(tempAbsPath,
DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath());
appHome = new File(tempAbsPath,
DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath();
- System.setProperty(ConfigConst.KEY_APP_HOME(), appHome);
+ System.setProperty(ConfigKeys.KEY_APP_HOME(), appHome);
System.setProperty(
CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
localWorkspace.toAbsolutePath().toString());
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
index 5e40b3122..88f5284d8 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java
@@ -18,7 +18,7 @@
package org.apache.streampark.console;
import org.apache.streampark.common.conf.CommonConfig;
-import org.apache.streampark.common.conf.ConfigConst;
+import org.apache.streampark.common.conf.ConfigKeys;
import org.apache.streampark.common.enums.FlinkExecutionMode;
import org.apache.streampark.console.core.entity.Application;
import org.apache.streampark.console.core.entity.FlinkCluster;
@@ -68,7 +68,7 @@ public abstract class SpringUnitTestBase {
Path localWorkspace =
Files.createDirectories(new File(mockedHome +
"/localWorkspace").toPath());
- System.setProperty(ConfigConst.KEY_APP_HOME(), mockedHome);
+ System.setProperty(ConfigKeys.KEY_APP_HOME(), mockedHome);
System.setProperty(
CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(),
localWorkspace.toAbsolutePath().toString());
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
index 18665186f..25e17120d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala
@@ -17,8 +17,9 @@
package org.apache.streampark.flink.client.bean
-import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion, Workspace}
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion, Workspace}
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums._
import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils,
PropertiesUtils}
import org.apache.streampark.flink.packer.pipeline.{BuildResult,
ShadedBuildResponse}
@@ -59,8 +60,8 @@ case class SubmitRequest(
lazy val appOption: Map[String, String] =
getParameterMap(KEY_FLINK_OPTION_PREFIX)
lazy val appMain: String = this.developmentMode match {
- case FlinkDevelopmentMode.FLINK_SQL =>
ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS
- case FlinkDevelopmentMode.PYFLINK => ConfigConst.PYTHON_DRIVER_CLASS_NAME
+ case FlinkDevelopmentMode.FLINK_SQL =>
Constant.STREAMPARK_FLINKSQL_CLIENT_CLASS
+ case FlinkDevelopmentMode.PYFLINK =>
Constant.PYTHON_FLINK_DRIVER_CLASS_NAME
case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS)
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
index f39fa341a..f5862a082 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala
@@ -17,7 +17,8 @@
package org.apache.streampark.flink.client.impl
-import org.apache.streampark.common.conf.{ConfigConst, Workspace}
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
import org.apache.streampark.common.enums.FlinkDevelopmentMode
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils}
@@ -117,15 +118,15 @@ object YarnApplicationClient extends YarnClientTrait {
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pyVenv)
// python.client.executable
- .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
+ .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
Constant.PYTHON_EXECUTABLE)
// python.executable
- .safeSet(PythonOptions.PYTHON_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
+ .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
val args: util.List[String] =
flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS)
// Caused by: java.lang.UnsupportedOperationException
val argsList: util.ArrayList[String] = new util.ArrayList[String](args)
argsList.add("-pym")
-
argsList.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length))
+
argsList.add(submitRequest.userJarFile.getName.dropRight(Constant.PYTHON_SUFFIX.length))
flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argsList)
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
index 7fb6aa148..75a05326f 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala
@@ -17,8 +17,9 @@
package org.apache.streampark.flink.client.`trait`
-import org.apache.streampark.common.conf.{ConfigConst, Workspace}
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.{ConfigKeys, Workspace}
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums.{ApplicationType,
FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode}
import org.apache.streampark.common.fs.FsOperator
import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger,
SystemPropertyUtils}
@@ -251,9 +252,9 @@ trait FlinkClientTrait extends Logger {
// python.archives
.safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
// python.client.executable
- .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
+ .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE,
Constant.PYTHON_EXECUTABLE)
// python.executable
- .safeSet(PythonOptions.PYTHON_EXECUTABLE,
ConfigConst.PYTHON_EXECUTABLE)
+ .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE)
}
val packageProgram = PackagedProgram.newBuilder
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
index c75eb1790..041dbe5ac 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala
@@ -17,9 +17,9 @@
package org.apache.streampark.flink.connector.clickhouse.conf
-import org.apache.streampark.common.conf.ConfigConst
import org.apache.streampark.flink.connector.conf.ThresholdConf
+import org.apache.streampark.common.Constant
import java.util.{Base64, Properties}
import java.util.concurrent.ThreadLocalRandom
@@ -69,7 +69,7 @@ class ClickHouseHttpConfig(parameters: Properties)
override def toString: String = {
s"""
- |{ user: $user, password: ${ConfigConst.DEFAULT_DATAMASK_STRING},
hosts: ${hosts.mkString(",")} }
+ |{ user: $user, password: ${Constant.DEFAULT_DATAMASK_STRING}, hosts:
${hosts.mkString(",")} }
|""".stripMargin
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
index 40a97ff1e..df09056c1 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala
@@ -16,10 +16,10 @@
*/
package org.apache.streampark.connector.doris.conf
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.ConfigKeys
import java.util.Properties
-
import scala.collection.convert.ImplicitConversions._
object DorisConfig {
@@ -83,7 +83,7 @@ class DorisConfig(parameters: Properties) {
override def toString: String = {
s"""
- |{ doris user: $user, password: ${ConfigConst.DEFAULT_DATAMASK_STRING},
hosts: ${loadUrl.mkString(",")} }
+ |{ doris user: $user, password: ${Constant.DEFAULT_DATAMASK_STRING},
hosts: ${loadUrl.mkString(",")} }
|""".stripMargin
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
index 41e8e324e..c3565c9a2 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.connector.influx.function
-import org.apache.streampark.common.conf.ConfigConst.{KEY_JDBC_PASSWORD,
KEY_JDBC_URL, KEY_JDBC_USER}
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_JDBC_PASSWORD,
KEY_JDBC_URL, KEY_JDBC_USER}
import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.connector.influx.bean.InfluxEntity
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala
index f00647dd3..6fb365e31 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.connector.jdbc.internal
-import
org.apache.streampark.common.conf.ConfigConst.{DEFAULT_JDBC_INSERT_BATCH,
KEY_JDBC_INSERT_BATCH}
+import
org.apache.streampark.common.conf.ConfigKeys.{DEFAULT_JDBC_INSERT_BATCH,
KEY_JDBC_INSERT_BATCH}
import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util.{JdbcUtils, Logger}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
index 1303e3661..c5ca134e2 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.connector.jdbc.sink
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums.Semantic
import org.apache.streampark.common.util.{ConfigUtils, Logger}
import
org.apache.streampark.flink.connector.jdbc.internal.{Jdbc2PCSinkFunction,
JdbcSinkFunction}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala
index 9bfe176e0..8a60922a8 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.connector.kafka.sink
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.conf.ConfigKeys
import org.apache.streampark.common.util.{ConfigUtils, Utils}
import
org.apache.streampark.flink.connector.kafka.bean.KafkaEqualityPartitioner
import org.apache.streampark.flink.connector.sink.Sink
@@ -83,10 +83,10 @@ class KafkaSink(
val producer = {
val prop = ConfigUtils.getKafkaSinkConf(ctx.parameter.toMap, topic,
alias)
Utils.copyProperties(property, prop)
- val topicId = prop.remove(ConfigConst.KEY_KAFKA_TOPIC).toString
+ val topicId = prop.remove(ConfigKeys.KEY_KAFKA_TOPIC).toString
/** kafkaProducersPoolSize will be used under EXACTLY_ONCE semantics */
- val semantic =
Try(Some(prop.remove(ConfigConst.KEY_KAFKA_SEMANTIC).toString.toUpperCase))
+ val semantic =
Try(Some(prop.remove(ConfigKeys.KEY_KAFKA_SEMANTIC).toString.toUpperCase))
.getOrElse(None) match {
case None => Semantic.AT_LEAST_ONCE
case Some("AT_LEAST_ONCE") => Semantic.AT_LEAST_ONCE
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
index 05e8e7b8c..aea660284 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.connector.kafka.source
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.util.{ConfigUtils, Utils}
import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord
import org.apache.streampark.flink.core.scala.StreamingContext
@@ -25,7 +25,7 @@ import org.apache.streampark.flink.core.scala.StreamingContext
import org.apache.flink.api.common.eventtime.WatermarkStrategy
import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation}
import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass
-import org.apache.flink.streaming.api.scala.{DataStream, _}
+import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer,
KafkaDeserializationSchema}
import
org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition
import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord}
diff --git
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
index c7bfbd155..b480231c0 100644
---
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
+++
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.core.scala
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.flink.core.{FlinkTableInitializer,
StreamTableContext}
import org.apache.streampark.flink.core.TableExt
diff --git
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
index 7dffccd29..8d5dca940 100644
---
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
+++
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala
@@ -17,8 +17,8 @@
package org.apache.streampark.flink.core.scala
-import org.apache.streampark.common.conf.ConfigConst._
-import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
+import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils}
import org.apache.streampark.flink.core.{FlinkStreamingInitializer,
StreamEnvConfig}
import org.apache.streampark.flink.core.EnhancerImplicit._
@@ -50,7 +50,7 @@ class StreamingContext(
}
@Deprecated override def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkStreaming $jobName Starting...")
+ Utils.printLogo(s"FlinkStreaming $jobName Starting...")
super.execute(jobName)
}
}
diff --git
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
index 78ead4fb2..44ed2b972 100644
---
a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
+++
b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.core.scala
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.util.{Logger, SystemPropertyUtils}
import org.apache.streampark.flink.core.{FlinkTableInitializer, TableContext}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index edd679ad7..ee67a202d 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.conf.ConfigKeys
import org.apache.streampark.common.util.ImplicitsUtils._
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
@@ -116,7 +116,7 @@ object KubernetesRetriever extends Logger {
.apps()
.deployments()
.inNamespace(namespace)
- .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL)
+ .withLabel("type", ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL)
.list()
.getItems
.asScala
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
index e97fcfc1f..5cba3b36f 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.ingress
-import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder,
K8sFlinkConfig}
+import org.apache.streampark.common.conf.{ConfigKeys, InternalConfigHolder,
K8sFlinkConfig}
import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder}
import io.fabric8.kubernetes.client.DefaultKubernetesClient
@@ -63,7 +63,7 @@ trait IngressStrategy {
def buildIngressLabels(clusterId: String): Map[String, String] = {
Map(
"app" -> clusterId,
- "type" -> ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL,
+ "type" -> ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL,
"component" -> "ingress"
)
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
index f49bf4406..4e3681a81 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.watcher
-import org.apache.streampark.common.conf.ConfigConst
+import org.apache.streampark.common.conf.ConfigKeys
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.kubernetes.{FlinkK8sWatchController,
KubernetesRetriever}
import org.apache.streampark.flink.kubernetes.model.{K8sDeploymentEventCV,
K8sEventKey}
@@ -67,7 +67,7 @@ class FlinkK8sEventWatcher(implicit watchController:
FlinkK8sWatchController)
k8sClient
.apps()
.deployments()
- .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL)
+ .withLabel("type", ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL)
.watch(new CompatibleKubernetesWatcher[Deployment,
CompKubernetesDeployment] {
override def eventReceived(action: Watcher.Action, event: Deployment):
Unit = {
handleDeploymentEvent(action, event)
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 2a8050f35..de3b9eaed 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.proxy
import org.apache.streampark.common.Constant
-import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion}
+import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion}
import org.apache.streampark.common.util.{ClassLoaderUtils, Logger, Utils}
import org.apache.streampark.common.util.ImplicitsUtils._
@@ -108,10 +108,10 @@ object FlinkShimsProxy extends Logger {
}
def addShimsUrls(flinkVersion: FlinkVersion, addShimUrl: File => Unit): Unit
= {
- val appHome = System.getProperty(ConfigConst.KEY_APP_HOME)
+ val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME)
require(
appHome != null,
- String.format("%s is not found on System env.",
ConfigConst.KEY_APP_HOME))
+ String.format("%s is not found on System env.", ConfigKeys.KEY_APP_HOME))
val libPath = new File(s"$appHome/lib")
require(libPath.exists())
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
index 528e47907..d855ee030 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME,
KEY_FLINK_APP_NAME}
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_APP_NAME,
KEY_FLINK_APP_NAME}
import org.apache.streampark.common.util.DeflaterUtils
import org.apache.flink.api.java.utils.ParameterTool
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
index 8d18a31e2..9d6a09339 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL
+import org.apache.streampark.common.conf.ConfigKeys.KEY_FLINK_SQL
import org.apache.streampark.common.util.Logger
import org.apache.streampark.flink.core.SqlCommand._
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
index 53e381275..517ae1ae6 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala
@@ -16,7 +16,8 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.core.EnhancerImplicit._
import com.esotericsoftware.kryo.Serializer
@@ -77,7 +78,7 @@ abstract class FlinkStreamTableTrait(
}
@Deprecated def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkStreamTable $jobName Starting...")
+ Utils.printLogo(s"FlinkStreamTable $jobName Starting...")
if (isConvertedToDataStream) {
streamEnv.execute(jobName)
} else null
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
index 87a54bd07..bfb4be1cc 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util._
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
index 2169b8ea5..ce8c93823 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
import org.apache.streampark.common.enums.{ApiType, PlannerType}
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
index e581f857c..71b1a1c48 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala
@@ -16,7 +16,8 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst._
+import org.apache.streampark.common.conf.ConfigKeys._
+import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.core.EnhancerImplicit._
import org.apache.flink.api.common.JobExecutionResult
@@ -40,7 +41,7 @@ abstract class FlinkTableTrait(val parameter: ParameterTool,
private val tableEn
}
def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
+ Utils.printLogo(s"FlinkTable $jobName Starting...")
null
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
index dc4de596c..157216ce8 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.PARAM_PREFIX
+import org.apache.streampark.common.conf.ConfigKeys.PARAM_PREFIX
import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
import org.apache.streampark.common.util.Logger
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
index 1206c817e..3331253b3 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala
@@ -16,8 +16,8 @@
*/
package org.apache.streampark.flink.core.conf
-import org.apache.streampark.common.conf.ConfigConst
-import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_OPTION_PREFIX,
KEY_FLINK_PROPERTY_PREFIX}
+import org.apache.streampark.common.conf.ConfigKeys
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_FLINK_OPTION_PREFIX,
KEY_FLINK_PROPERTY_PREFIX}
import org.apache.streampark.common.util.PropertiesUtils
import org.apache.commons.cli.{DefaultParser, Options}
@@ -96,7 +96,7 @@ object ParameterCli {
x =>
val key = x._1.drop(propertyPrefix.length).trim
val value = x._2.trim
- if (key == ConfigConst.KEY_FLINK_APP_NAME) {
+ if (key == ConfigKeys.KEY_FLINK_APP_NAME) {
buffer.append(s" -D$key=${value.replace(" ", "_")}")
} else {
buffer.append(s" -D$key=$value")
@@ -104,7 +104,7 @@ object ParameterCli {
}
buffer.toString.trim
case "--name" =>
-
map.getOrElse(propertyPrefix.concat(ConfigConst.KEY_FLINK_APP_NAME), "").trim
match {
+
map.getOrElse(propertyPrefix.concat(ConfigKeys.KEY_FLINK_APP_NAME), "").trim
match {
case appName if appName.nonEmpty => appName
case _ => ""
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
index a681018a3..ed6040fa3 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.core.test
-import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_SQL,
PARAM_PREFIX}
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_FLINK_SQL,
PARAM_PREFIX}
import org.apache.streampark.common.util.DeflaterUtils
import org.apache.streampark.flink.core.{FlinkSqlExecutor,
FlinkTableInitializer, StreamTableContext}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index bbe35967c..f7aedd8ea 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.printLogo
+import org.apache.streampark.common.util.Utils
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
@@ -46,7 +46,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
tableEnv.connect(connectorDescriptor)
override def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
+ Utils.printLogo(s"FlinkTable $jobName Starting...")
null
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 96ac48549..f0f2ad135 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.printLogo
+import org.apache.streampark.common.util.Utils
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
@@ -51,7 +51,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
tableEnv.connect(connectorDescriptor)
override def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
+ Utils.printLogo(s"FlinkTable $jobName Starting...")
null
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index fcd7708f3..d0815b5cb 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -16,7 +16,7 @@
*/
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.printLogo
+import org.apache.streampark.common.util.Utils
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
@@ -64,7 +64,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor)
@Deprecated override def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
+ Utils.printLogo(s"FlinkTable $jobName Starting...")
null
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
index 64337d26b..8ca8e3fe1 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.core
-import org.apache.streampark.common.conf.ConfigConst.printLogo
+import org.apache.streampark.common.util.Utils
import org.apache.flink.api.common.JobExecutionResult
import org.apache.flink.api.java.utils.ParameterTool
@@ -46,7 +46,7 @@ class TableContext(override val parameter: ParameterTool,
private val tableEnv:
override def listFullModules(): Array[ModuleEntry] =
tableEnv.listFullModules()
override def execute(jobName: String): JobExecutionResult = {
- printLogo(s"FlinkTable $jobName Starting...")
+ Utils.printLogo(s"FlinkTable $jobName Starting...")
null
}
diff --git
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
index 3b094d086..e305f428b 100644
---
a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
+++
b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.cli
-import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF,
KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE}
+import org.apache.streampark.common.conf.ConfigKeys.{KEY_APP_CONF,
KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE}
import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils}
import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser}
import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}