This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch const in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit b847ca86a2a94bf33793737bee290b3ad91aa66e Author: benjobs <[email protected]> AuthorDate: Mon Oct 23 15:46:02 2023 +0800 [Improve] Constant improvement --- .../org/apache/streampark/common/Constant.java | 23 +++++++++++++- .../conf/{ConfigConst.scala => ConfigKeys.scala} | 36 +--------------------- .../common/conf/InternalConfigHolder.scala | 3 +- .../streampark/common/util/ConfigUtils.scala | 5 +-- .../streampark/common/util/HBaseClient.scala | 2 +- .../streampark/common/util/HadoopUtils.scala | 6 ++-- .../apache/streampark/common/util/JdbcUtils.scala | 2 +- .../streampark/common/util/MongoConfig.scala | 2 +- .../streampark/common/util/RedisClient.scala | 5 +-- .../streampark/common/util/RedisEndpoint.scala | 2 +- .../org/apache/streampark/common/util/Utils.scala | 19 +++++++++++- .../streampark/console/base/util/WebUtils.java | 4 +-- .../console/core/entity/Application.java | 8 ++--- .../console/core/entity/ApplicationConfig.java | 22 ++++++------- .../console/core/entity/FlinkCluster.java | 4 +-- .../streampark/console/core/entity/Variable.java | 4 +-- .../console/core/runner/EnvInitializer.java | 6 ++-- .../impl/ApplicationActionServiceImpl.java | 26 ++++++++-------- .../core/service/impl/AppBuildPipeServiceImpl.java | 3 +- .../core/service/impl/ResourceServiceImpl.java | 4 +-- .../core/utils/YarnQueueLabelExpression.java | 6 ++-- .../streampark/console/system/entity/User.java | 4 +-- .../console/system/runner/StartedUpRunner.java | 4 +-- .../console/SpringIntegrationTestBase.java | 4 +-- .../streampark/console/SpringUnitTestBase.java | 4 +-- .../flink/client/bean/SubmitRequest.scala | 9 +++--- .../flink/client/impl/YarnApplicationClient.scala | 9 +++--- .../flink/client/trait/FlinkClientTrait.scala | 9 +++--- .../clickhouse/conf/ClickHouseHttpConfig.scala | 4 +-- .../connector/doris/conf/DorisConfig.scala | 6 ++-- .../connector/influx/function/InfluxFunction.scala | 2 +- .../connector/jdbc/internal/JdbcSinkFunction.scala | 2 +- .../flink/connector/jdbc/sink/JdbcSink.scala | 2 +- .../flink/connector/kafka/sink/KafkaSink.scala | 6 ++-- .../flink/connector/kafka/source/KafkaSource.scala | 4 +-- .../flink/core/scala/FlinkStreamTable.scala | 2 +- .../flink/core/scala/FlinkStreaming.scala | 6 ++-- .../streampark/flink/core/scala/FlinkTable.scala | 2 +- .../flink/kubernetes/KubernetesRetriever.scala | 4 +-- .../flink/kubernetes/ingress/IngressStrategy.scala | 4 +-- .../kubernetes/watcher/FlinkK8sEventWatcher.scala | 4 +-- .../streampark/flink/proxy/FlinkShimsProxy.scala | 6 ++-- .../streampark/flink/core/EnhancerImplicit.scala | 2 +- .../streampark/flink/core/FlinkSqlExecutor.scala | 2 +- .../flink/core/FlinkStreamTableTrait.scala | 5 +-- .../flink/core/FlinkStreamingInitializer.scala | 2 +- .../flink/core/FlinkTableInitializer.scala | 2 +- .../streampark/flink/core/FlinkTableTrait.scala | 5 +-- .../streampark/flink/core/SqlCommandParser.scala | 2 +- .../streampark/flink/core/conf/ParameterCli.scala | 8 ++--- .../flink/core/test/FlinkSqlExecuteFunSuite.scala | 2 +- .../streampark/flink/core/TableContext.scala | 4 +-- .../streampark/flink/core/TableContext.scala | 4 +-- .../streampark/flink/core/TableContext.scala | 4 +-- .../streampark/flink/core/TableContext.scala | 4 +-- .../apache/streampark/flink/cli/SqlClient.scala | 2 +- 56 files changed, 174 insertions(+), 163 deletions(-) diff --git a/streampark-common/src/main/java/org/apache/streampark/common/Constant.java b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java index ac02bb8a1..ec99a5b06 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/Constant.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java @@ -18,15 +18,36 @@ package org.apache.streampark.common; /** A constant class to hold the constants variables. */ -public class Constant { +public final class Constant { private Constant() {} public static final String DEFAULT = "default"; + public static final String STREAM_PARK = "streampark"; + public static final String HTTP_SCHEMA = "http://"; + public static final String HTTPS_SCHEMA = "https://"; + public static final String JAR_SUFFIX = ".jar"; + public static final String ZIP_SUFFIX = ".zip"; + + public static final String EMPTY_STRING = ""; + + public static final String PYTHON_SUFFIX = ".py"; + public static final String SEMICOLON = ";"; + + public static final String DEFAULT_DATAMASK_STRING = "********"; + + public static final String PYTHON_FLINK_DRIVER_CLASS_NAME = + "org.apache.flink.client.python.PythonDriver"; + + public static final String STREAMPARK_FLINKSQL_CLIENT_CLASS = + "org.apache.streampark.flink.cli.SqlClient"; + + public static final String PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3"; + } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala similarity index 75% rename from streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala rename to streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala index f465f25e5..f2fb98fb0 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigConst.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/ConfigKeys.scala @@ -16,22 +16,10 @@ */ package org.apache.streampark.common.conf -import java.time.LocalDateTime - -object ConfigConst { - - /** common const */ - val DEFAULT_DATAMASK_STRING = "********" +object ConfigKeys { val PARAM_PREFIX = "--" - /** pyflink */ - val PYTHON_SUFFIX = ".py" - - val PYTHON_DRIVER_CLASS_NAME = "org.apache.flink.client.python.PythonDriver" - - val PYTHON_EXECUTABLE = "venv.zip/venv/bin/python3" - /** about parameter... */ val KEY_APP_HOME = "app.home" @@ -49,9 +37,6 @@ object ConfigConst { val KEY_SEMANTIC = "semantic" - /** sign.... */ - val SIGN_EMPTY = "" - /** kerberos */ val KEY_KERBEROS = "kerberos" @@ -180,23 +165,4 @@ object ConfigConst { val KEY_FLINK_TM_PROCESS_MEMORY = "taskmanager.memory.process.size" - val STREAMPARK_FLINKSQL_CLIENT_CLASS = "org.apache.streampark.flink.cli.SqlClient" - - def printLogo(info: String): Unit = { - // scalastyle:off println - println("\n") - println(" _____ __ __ ") - println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__ ") - println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/ //_/") - println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,< ") - println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/ /_/|_| ") - println(" /_/ \n\n") - println(" Version: 2.2.0-SNAPSHOT ") - println(" WebSite: https://streampark.apache.org ") - println(" GitHub : https://github.com/apache/incubator-streampark ") - println(s" Info : $info ") - println(s" Time : ${LocalDateTime.now} \n\n") - // scalastyle:on println - } - } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala index f6631b697..18aae4e8e 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/conf/InternalConfigHolder.scala @@ -17,6 +17,7 @@ package org.apache.streampark.common.conf +import org.apache.streampark.common.Constant import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} import org.apache.streampark.common.util.ImplicitsUtils._ @@ -164,7 +165,7 @@ object InternalConfigHolder extends Logger { | ${configKeys .map( key => - s"$key = ${if (key.contains("password")) ConfigConst.DEFAULT_DATAMASK_STRING + s"$key = ${if (key.contains("password")) Constant.DEFAULT_DATAMASK_STRING else get(key)}") .mkString("\n ")}""".stripMargin) } diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala index e0c159299..4cb23a2ad 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala @@ -16,7 +16,8 @@ */ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.ConfigKeys._ import java.util.{Map => JavaMap, Properties} @@ -52,7 +53,7 @@ object ConfigUtils { val kafkaProperty = new Properties() param.foreach(x => kafkaProperty.put(x._1, x._2.trim)) val _topic = topic match { - case SIGN_EMPTY => + case Constant.EMPTY_STRING => val top = kafkaProperty.getOrElse(KEY_KAFKA_TOPIC, null) if (top == null || top.split(",|\\s+").length > 1) { throw new IllegalArgumentException( diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala index f128f252b..e1ec974e7 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HBaseClient.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.hadoop.conf.Configuration import org.apache.hadoop.hbase.{HBaseConfiguration, TableName} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala index a2ae4bb90..93d0573a8 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/HadoopUtils.scala @@ -17,8 +17,8 @@ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.{CommonConfig, ConfigConst, InternalConfigHolder} -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.{CommonConfig, ConfigKeys, InternalConfigHolder} +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.commons.collections.CollectionUtils import org.apache.commons.lang3.StringUtils @@ -62,7 +62,7 @@ object HadoopUtils extends Logger { InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME) private[this] lazy val kerberosConf: Map[String, String] = - SystemPropertyUtils.get(ConfigConst.KEY_APP_HOME, null) match { + SystemPropertyUtils.get(ConfigKeys.KEY_APP_HOME, null) match { case null => getClass.getResourceAsStream("/kerberos.yml") match { case x if x != null => PropertiesUtils.fromYamlFile(x) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala index 6029d88e9..22bbfb889 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/JdbcUtils.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import com.zaxxer.hikari.{HikariConfig, HikariDataSource} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala index e04c97d13..7a6764df9 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/MongoConfig.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import com.mongodb._ diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala index 2b2a4420f..c208411cb 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisClient.scala @@ -17,7 +17,8 @@ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.ConfigKeys import redis.clients.jedis._ import redis.clients.jedis.exceptions.JedisConnectionException @@ -92,7 +93,7 @@ object RedisClient extends Logger { * @return */ def createJedisPool(endpoint: RedisEndpoint): JedisPool = { - val endpointEn: RedisEndpoint = endpoint.copy(auth = ConfigConst.DEFAULT_DATAMASK_STRING) + val endpointEn: RedisEndpoint = endpoint.copy(auth = Constant.DEFAULT_DATAMASK_STRING) logInfo(s"[StreamPark] RedisClient: createJedisPool with $endpointEn ") new JedisPool( poolConfig, diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala index ab765e691..82de51421 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/RedisEndpoint.scala @@ -17,7 +17,7 @@ package org.apache.streampark.common.util -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import redis.clients.jedis.{Jedis, Protocol} import redis.clients.jedis.util.{JedisClusterCRC16, JedisURIHelper, SafeEncoder} diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala index d56322982..5ab002ab2 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/Utils.scala @@ -20,7 +20,7 @@ import org.apache.commons.lang3.StringUtils import java.io._ import java.net.URL -import java.time.Duration +import java.time.{Duration, LocalDateTime} import java.util.{jar, Collection => JavaCollection, Map => JavaMap, Properties, UUID} import java.util.concurrent.locks.LockSupport import java.util.jar.{JarFile, JarInputStream} @@ -156,4 +156,21 @@ object Utils extends Logger { } } + def printLogo(info: String): Unit = { + // scalastyle:off println + println("\n") + println(" _____ __ __ ") + println(" / ___// /_________ ____ _____ ___ ____ ____ ______/ /__ ") + println(" \\__ \\/ __/ ___/ _ \\/ __ `/ __ `__ \\/ __ \\ __ `/ ___/ //_/") + println(" ___/ / /_/ / / __/ /_/ / / / / / / /_/ / /_/ / / / ,< ") + println(" /____/\\__/_/ \\___/\\__,_/_/ /_/ /_/ ____/\\__,_/_/ /_/|_| ") + println(" /_/ \n\n") + println(" Version: 2.2.0-SNAPSHOT ") + println(" WebSite: https://streampark.apache.org ") + println(" GitHub : https://github.com/apache/incubator-streampark ") + println(s" Info : $info ") + println(s" Time : ${LocalDateTime.now} \n\n") + // scalastyle:on println + } + } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java index cfef4cbe0..7313788e5 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/base/util/WebUtils.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.base.util; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.commons.lang3.StringUtils; @@ -97,7 +97,7 @@ public final class WebUtils { } public static String getAppHome() { - return System.getProperty(ConfigConst.KEY_APP_HOME()); + return System.getProperty(ConfigKeys.KEY_APP_HOME()); } public static File getAppDir(String dir) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index 670795012..09deab73d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.entity; import org.apache.streampark.common.Constant; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ApplicationType; import org.apache.streampark.common.enums.FlinkDevelopmentMode; @@ -278,10 +278,10 @@ public class Application implements Serializable { Map<String, Object> hotParamsMap = this.getHotParamsMap(); if (MapUtils.isNotEmpty(hotParamsMap) - && hotParamsMap.containsKey(ConfigConst.KEY_YARN_APP_QUEUE())) { - String yarnQueue = hotParamsMap.get(ConfigConst.KEY_YARN_APP_QUEUE()).toString(); + && hotParamsMap.containsKey(ConfigKeys.KEY_YARN_APP_QUEUE())) { + String yarnQueue = hotParamsMap.get(ConfigKeys.KEY_YARN_APP_QUEUE()).toString(); String labelExpr = - Optional.ofNullable(hotParamsMap.get(ConfigConst.KEY_YARN_APP_NODE_LABEL())) + Optional.ofNullable(hotParamsMap.get(ConfigKeys.KEY_YARN_APP_NODE_LABEL())) .map(Object::toString) .orElse(null); this.setYarnQueue(YarnQueueLabelExpression.of(yarnQueue, labelExpr).toString()); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java index b920d6e56..a50414839 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/ApplicationConfig.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.entity; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.util.DeflaterUtils; import org.apache.streampark.common.util.PropertiesUtils; import org.apache.streampark.console.core.enums.ConfigFileTypeEnum; @@ -103,16 +103,16 @@ public class ApplicationConfig { Collectors.toMap( entry -> { String key = entry.getKey(); - if (key.startsWith(ConfigConst.KEY_FLINK_OPTION_PREFIX())) { - key = key.substring(ConfigConst.KEY_FLINK_OPTION_PREFIX().length()); - } else if (key.startsWith(ConfigConst.KEY_FLINK_PROPERTY_PREFIX())) { - key = key.substring(ConfigConst.KEY_FLINK_PROPERTY_PREFIX().length()); - } else if (key.startsWith(ConfigConst.KEY_FLINK_TABLE_PREFIX())) { - key = key.substring(ConfigConst.KEY_FLINK_TABLE_PREFIX().length()); - } else if (key.startsWith(ConfigConst.KEY_APP_PREFIX())) { - key = key.substring(ConfigConst.KEY_APP_PREFIX().length()); - } else if (key.startsWith(ConfigConst.KEY_SQL_PREFIX())) { - key = key.substring(ConfigConst.KEY_SQL_PREFIX().length()); + if (key.startsWith(ConfigKeys.KEY_FLINK_OPTION_PREFIX())) { + key = key.substring(ConfigKeys.KEY_FLINK_OPTION_PREFIX().length()); + } else if (key.startsWith(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX())) { + key = key.substring(ConfigKeys.KEY_FLINK_PROPERTY_PREFIX().length()); + } else if (key.startsWith(ConfigKeys.KEY_FLINK_TABLE_PREFIX())) { + key = key.substring(ConfigKeys.KEY_FLINK_TABLE_PREFIX().length()); + } else if (key.startsWith(ConfigKeys.KEY_APP_PREFIX())) { + key = key.substring(ConfigKeys.KEY_APP_PREFIX().length()); + } else if (key.startsWith(ConfigKeys.KEY_SQL_PREFIX())) { + key = key.substring(ConfigKeys.KEY_SQL_PREFIX().length()); } return key; }, diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java index 48f3d8b15..611fc038e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/FlinkCluster.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.entity; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.enums.ClusterState; import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.common.enums.FlinkK8sRestExposedType; @@ -138,7 +138,7 @@ public class FlinkCluster implements Serializable { } Map<String, Object> map = JacksonUtils.read(this.options, Map.class); if (FlinkExecutionMode.YARN_SESSION == getFlinkExecutionModeEnum()) { - map.put(ConfigConst.KEY_YARN_APP_NAME(), this.clusterName); + map.put(ConfigKeys.KEY_YARN_APP_NAME(), this.clusterName); map.putAll(YarnQueueLabelExpression.getQueueLabelMap(yarnQueue)); } map.entrySet().removeIf(entry -> entry.getValue() == null); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java index 458c74d3d..6d4f1f9e4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Variable.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.entity; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.Constant; import com.baomidou.mybatisplus.annotation.IdType; import com.baomidou.mybatisplus.annotation.TableId; @@ -70,7 +70,7 @@ public class Variable implements Serializable { public void dataMasking() { if (desensitization) { - this.setVariableValue(ConfigConst.DEFAULT_DATAMASK_STRING()); + this.setVariableValue(Constant.DEFAULT_DATAMASK_STRING); } } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java index 13d631d4f..86dc54169 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/runner/EnvInitializer.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.runner; import org.apache.streampark.common.conf.CommonConfig; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.conf.InternalConfigHolder; import org.apache.streampark.common.conf.InternalOption; import org.apache.streampark.common.conf.Workspace; @@ -92,14 +92,14 @@ public class EnvInitializer implements ApplicationRunner { + " The system initialization check failed. If started local for development and debugging," + " please ensure the -D%s parameter is clearly specified," + " more detail: https://streampark.apache.org/docs/user-guide/deployment", - ConfigConst.KEY_APP_HOME())); + ConfigKeys.KEY_APP_HOME())); } // init InternalConfig initInternalConfig(context.getEnvironment()); // overwrite system variable HADOOP_USER_NAME String hadoopUserName = InternalConfigHolder.get(CommonConfig.STREAMPARK_HADOOP_USER_NAME()); - overrideSystemProp(ConfigConst.KEY_HADOOP_USER_NAME(), hadoopUserName); + overrideSystemProp(ConfigKeys.KEY_HADOOP_USER_NAME(), hadoopUserName); // initialize local file system resources storageInitialize(LFS); // Launch the embedded http file server. diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java index b60bd0d34..ff0745284 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java @@ -18,7 +18,7 @@ package org.apache.streampark.console.core.service.application.impl; import org.apache.streampark.common.Constant; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.FlinkDevelopmentMode; @@ -418,7 +418,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, // Get the sql of the replaced placeholder String realSql = variableService.replaceVariable(application.getTeamId(), flinkSql.getSql()); flinkSql.setSql(DeflaterUtils.zipString(realSql)); - extraParameter.put(ConfigConst.KEY_FLINK_SQL(null), flinkSql.getSql()); + extraParameter.put(ConfigKeys.KEY_FLINK_SQL(null), flinkSql.getSql()); } // TODO Need to display more K8s submission parameters in the front-end UI. @@ -478,12 +478,12 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, submitResponse -> { if (submitResponse.flinkConfig() != null) { String jmMemory = - submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_JM_PROCESS_MEMORY()); + submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_JM_PROCESS_MEMORY()); if (jmMemory != null) { application.setJmMemory(MemorySize.parse(jmMemory).getMebiBytes()); } String tmMemory = - submitResponse.flinkConfig().get(ConfigConst.KEY_FLINK_TM_PROCESS_MEMORY()); + submitResponse.flinkConfig().get(ConfigKeys.KEY_FLINK_TM_PROCESS_MEMORY()); if (tmMemory != null) { application.setTmMemory(MemorySize.parse(tmMemory).getMebiBytes()); } @@ -610,7 +610,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, resource.getFilePath(), "pyflink file can't be null, start application failed."); ApiAlertException.throwIfFalse( - resource.getFilePath().endsWith(ConfigConst.PYTHON_SUFFIX()), + resource.getFilePath().endsWith(Constant.PYTHON_SUFFIX), "pyflink format error, must be a \".py\" suffix, start application failed."); flinkUserJar = resource.getFilePath(); @@ -621,7 +621,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, appConf = String.format( "json://{\"%s\":\"%s\"}", - ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); } else { switch (application.getApplicationType()) { case STREAMPARK_FLINK: @@ -639,7 +639,7 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, appConf = String.format( "json://{\"%s\":\"%s\"}", - ConfigConst.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); + ConfigKeys.KEY_FLINK_APPLICATION_MAIN_CLASS(), application.getMainClass()); break; default: throw new IllegalArgumentException( @@ -701,19 +701,19 @@ public class ApplicationActionServiceImpl extends ServiceImpl<ApplicationMapper, "The yarn session clusterId=%s cannot be find, maybe the clusterId is wrong or " + "the cluster has been deleted. Please contact the Admin.", application.getFlinkClusterId())); - properties.put(ConfigConst.KEY_YARN_APP_ID(), cluster.getClusterId()); + properties.put(ConfigKeys.KEY_YARN_APP_ID(), cluster.getClusterId()); } else { String yarnQueue = - (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_QUEUE()); + (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_QUEUE()); String yarnLabelExpr = - (String) application.getHotParamsMap().get(ConfigConst.KEY_YARN_APP_NODE_LABEL()); + (String) application.getHotParamsMap().get(ConfigKeys.KEY_YARN_APP_NODE_LABEL()); Optional.ofNullable(yarnQueue) - .ifPresent(yq -> properties.put(ConfigConst.KEY_YARN_APP_QUEUE(), yq)); + .ifPresent(yq -> properties.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yq)); Optional.ofNullable(yarnLabelExpr) - .ifPresent(yLabel -> properties.put(ConfigConst.KEY_YARN_APP_NODE_LABEL(), yLabel)); + .ifPresent(yLabel -> properties.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), yLabel)); } } else if (FlinkExecutionMode.isKubernetesMode(application.getFlinkExecutionMode())) { - properties.put(ConfigConst.KEY_K8S_IMAGE_PULL_POLICY(), "Always"); + properties.put(ConfigKeys.KEY_K8S_IMAGE_PULL_POLICY(), "Always"); } if (FlinkExecutionMode.isKubernetesApplicationMode(application.getExecutionMode())) { diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 222dc56bf..15946b7f6 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -18,7 +18,6 @@ package org.apache.streampark.console.core.service.impl; import org.apache.streampark.common.Constant; -import org.apache.streampark.common.conf.ConfigConst; import org.apache.streampark.common.conf.K8sFlinkConfig; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.enums.ApplicationType; @@ -441,7 +440,7 @@ public class AppBuildPipeServiceImpl } FlinkExecutionMode executionModeEnum = app.getFlinkExecutionMode(); - String mainClass = ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS(); + String mainClass = Constant.STREAMPARK_FLINKSQL_CLIENT_CLASS; switch (executionModeEnum) { case YARN_APPLICATION: String yarnProvidedPath = app.getAppLib(); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java index 47b113616..c1b4b4564 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.service.impl; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.Constant; import org.apache.streampark.common.conf.Workspace; import org.apache.streampark.common.fs.FsOperator; import org.apache.streampark.common.util.ExceptionUtils; @@ -289,7 +289,7 @@ public class ResourceServiceImpl extends ServiceImpl<ResourceMapper, Resource> resp.put(EXCEPTION, ExceptionUtils.stringifyException(e)); return RestResponse.success().data(resp); } - if (jarFile.getName().endsWith(ConfigConst.PYTHON_SUFFIX())) { + if (jarFile.getName().endsWith(Constant.PYTHON_SUFFIX)) { return RestResponse.success().data(resp); } Manifest manifest = Utils.getJarManifest(jarFile); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java index e44482b58..ab0a35bea 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/utils/YarnQueueLabelExpression.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.core.utils; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.console.base.exception.ApiAlertException; import org.apache.commons.lang3.StringUtils; @@ -111,8 +111,8 @@ public class YarnQueueLabelExpression { Map<String, String> map = new HashMap<>(2); yarnQueueLabelExpression .getLabelExpression() - .ifPresent(labelExp -> map.put(ConfigConst.KEY_YARN_APP_NODE_LABEL(), labelExp)); - map.put(ConfigConst.KEY_YARN_APP_QUEUE(), yarnQueueLabelExpression.queue); + .ifPresent(labelExp -> map.put(ConfigKeys.KEY_YARN_APP_NODE_LABEL(), labelExp)); + map.put(ConfigKeys.KEY_YARN_APP_QUEUE(), yarnQueueLabelExpression.queue); return map; } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java index 88a2f5733..1699e0b6e 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/entity/User.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.entity; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.Constant; import org.apache.streampark.console.core.enums.LoginTypeEnum; import org.apache.streampark.console.core.enums.UserTypeEnum; @@ -101,7 +101,7 @@ public class User implements Serializable { private Long lastTeamId; public void dataMasking() { - String dataMask = ConfigConst.DEFAULT_DATAMASK_STRING(); + String dataMask = Constant.DEFAULT_DATAMASK_STRING; this.setPassword(dataMask); this.setSalt(dataMask); } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java index 4a19b709f..3228c2266 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/system/runner/StartedUpRunner.java @@ -17,7 +17,7 @@ package org.apache.streampark.console.system.runner; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.util.Utils; import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; @@ -37,7 +37,7 @@ public class StartedUpRunner implements ApplicationRunner { @Override public void run(ApplicationArguments args) { if (context.isActive()) { - ConfigConst.printLogo("streampark-console start successful"); + Utils.printLogo("streampark-console start successful"); } } } diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java index 8780e221e..398fce3d5 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringIntegrationTestBase.java @@ -18,7 +18,7 @@ package org.apache.streampark.console; import org.apache.streampark.common.conf.CommonConfig; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.util.SystemPropertyUtils; import org.apache.commons.io.FileUtils; @@ -91,7 +91,7 @@ public abstract class SpringIntegrationTestBase { Files.createDirectories(new File(tempAbsPath, DEFAULT_LOCAL_WORKSPACE_DIR_NAME).toPath()); appHome = new File(tempAbsPath, DEFAULT_APP_HOME_DIR_NAME).getAbsolutePath(); - System.setProperty(ConfigConst.KEY_APP_HOME(), appHome); + System.setProperty(ConfigKeys.KEY_APP_HOME(), appHome); System.setProperty( CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(), localWorkspace.toAbsolutePath().toString()); diff --git a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java index 5e40b3122..88f5284d8 100644 --- a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java +++ b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/SpringUnitTestBase.java @@ -18,7 +18,7 @@ package org.apache.streampark.console; import org.apache.streampark.common.conf.CommonConfig; -import org.apache.streampark.common.conf.ConfigConst; +import org.apache.streampark.common.conf.ConfigKeys; import org.apache.streampark.common.enums.FlinkExecutionMode; import org.apache.streampark.console.core.entity.Application; import org.apache.streampark.console.core.entity.FlinkCluster; @@ -68,7 +68,7 @@ public abstract class SpringUnitTestBase { Path localWorkspace = Files.createDirectories(new File(mockedHome + "/localWorkspace").toPath()); - System.setProperty(ConfigConst.KEY_APP_HOME(), mockedHome); + System.setProperty(ConfigKeys.KEY_APP_HOME(), mockedHome); System.setProperty( CommonConfig.STREAMPARK_WORKSPACE_LOCAL().key(), localWorkspace.toAbsolutePath().toString()); diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala index 18665186f..25e17120d 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SubmitRequest.scala @@ -17,8 +17,9 @@ package org.apache.streampark.flink.client.bean -import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion, Workspace} -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion, Workspace} +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums._ import org.apache.streampark.common.util.{DeflaterUtils, HdfsUtils, PropertiesUtils} import org.apache.streampark.flink.packer.pipeline.{BuildResult, ShadedBuildResponse} @@ -59,8 +60,8 @@ case class SubmitRequest( lazy val appOption: Map[String, String] = getParameterMap(KEY_FLINK_OPTION_PREFIX) lazy val appMain: String = this.developmentMode match { - case FlinkDevelopmentMode.FLINK_SQL => ConfigConst.STREAMPARK_FLINKSQL_CLIENT_CLASS - case FlinkDevelopmentMode.PYFLINK => ConfigConst.PYTHON_DRIVER_CLASS_NAME + case FlinkDevelopmentMode.FLINK_SQL => Constant.STREAMPARK_FLINKSQL_CLIENT_CLASS + case FlinkDevelopmentMode.PYFLINK => Constant.PYTHON_FLINK_DRIVER_CLASS_NAME case _ => appProperties(KEY_FLINK_APPLICATION_MAIN_CLASS) } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala index f39fa341a..f5862a082 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnApplicationClient.scala @@ -17,7 +17,8 @@ package org.apache.streampark.flink.client.impl -import org.apache.streampark.common.conf.{ConfigConst, Workspace} +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.{ConfigKeys, Workspace} import org.apache.streampark.common.enums.FlinkDevelopmentMode import org.apache.streampark.common.fs.FsOperator import org.apache.streampark.common.util.{FileUtils, HdfsUtils, Utils} @@ -117,15 +118,15 @@ object YarnApplicationClient extends YarnClientTrait { // python.archives .safeSet(PythonOptions.PYTHON_ARCHIVES, pyVenv) // python.client.executable - .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE) // python.executable - .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE) val args: util.List[String] = flinkConfig.get(ApplicationConfiguration.APPLICATION_ARGS) // Caused by: java.lang.UnsupportedOperationException val argsList: util.ArrayList[String] = new util.ArrayList[String](args) argsList.add("-pym") - argsList.add(submitRequest.userJarFile.getName.dropRight(ConfigConst.PYTHON_SUFFIX.length)) + argsList.add(submitRequest.userJarFile.getName.dropRight(Constant.PYTHON_SUFFIX.length)) flinkConfig.safeSet(ApplicationConfiguration.APPLICATION_ARGS, argsList) } diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 7fb6aa148..75a05326f 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -17,8 +17,9 @@ package org.apache.streampark.flink.client.`trait` -import org.apache.streampark.common.conf.{ConfigConst, Workspace} -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.{ConfigKeys, Workspace} +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.{ApplicationType, FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode} import org.apache.streampark.common.fs.FsOperator import org.apache.streampark.common.util.{DeflaterUtils, FileUtils, Logger, SystemPropertyUtils} @@ -251,9 +252,9 @@ trait FlinkClientTrait extends Logger { // python.archives .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv) // python.client.executable - .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constant.PYTHON_EXECUTABLE) // python.executable - .safeSet(PythonOptions.PYTHON_EXECUTABLE, ConfigConst.PYTHON_EXECUTABLE) + .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constant.PYTHON_EXECUTABLE) } val packageProgram = PackagedProgram.newBuilder diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala index c75eb1790..041dbe5ac 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseHttpConfig.scala @@ -17,9 +17,9 @@ package org.apache.streampark.flink.connector.clickhouse.conf -import org.apache.streampark.common.conf.ConfigConst import org.apache.streampark.flink.connector.conf.ThresholdConf +import org.apache.streampark.common.Constant import java.util.{Base64, Properties} import java.util.concurrent.ThreadLocalRandom @@ -69,7 +69,7 @@ class ClickHouseHttpConfig(parameters: Properties) override def toString: String = { s""" - |{ user: $user, password: ${ConfigConst.DEFAULT_DATAMASK_STRING}, hosts: ${hosts.mkString(",")} } + |{ user: $user, password: ${Constant.DEFAULT_DATAMASK_STRING}, hosts: ${hosts.mkString(",")} } |""".stripMargin } } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala index 40a97ff1e..df09056c1 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisConfig.scala @@ -16,10 +16,10 @@ */ package org.apache.streampark.connector.doris.conf -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.Constant +import org.apache.streampark.common.conf.ConfigKeys import java.util.Properties - import scala.collection.convert.ImplicitConversions._ object DorisConfig { @@ -83,7 +83,7 @@ class DorisConfig(parameters: Properties) { override def toString: String = { s""" - |{ doris user: $user, password: ${ConfigConst.DEFAULT_DATAMASK_STRING}, hosts: ${loadUrl.mkString(",")} } + |{ doris user: $user, password: ${Constant.DEFAULT_DATAMASK_STRING}, hosts: ${loadUrl.mkString(",")} } |""".stripMargin } } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala index 41e8e324e..c3565c9a2 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-influx/src/main/scala/org/apache/streampark/flink/connector/influx/function/InfluxFunction.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.influx.function -import org.apache.streampark.common.conf.ConfigConst.{KEY_JDBC_PASSWORD, KEY_JDBC_URL, KEY_JDBC_USER} +import org.apache.streampark.common.conf.ConfigKeys.{KEY_JDBC_PASSWORD, KEY_JDBC_URL, KEY_JDBC_USER} import org.apache.streampark.common.enums.ApiType import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.connector.influx.bean.InfluxEntity diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala index f00647dd3..6fb365e31 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSinkFunction.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.jdbc.internal -import org.apache.streampark.common.conf.ConfigConst.{DEFAULT_JDBC_INSERT_BATCH, KEY_JDBC_INSERT_BATCH} +import org.apache.streampark.common.conf.ConfigKeys.{DEFAULT_JDBC_INSERT_BATCH, KEY_JDBC_INSERT_BATCH} import org.apache.streampark.common.enums.ApiType import org.apache.streampark.common.enums.ApiType.ApiType import org.apache.streampark.common.util.{JdbcUtils, Logger} diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala index 1303e3661..c5ca134e2 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.jdbc.sink -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.Semantic import org.apache.streampark.common.util.{ConfigUtils, Logger} import org.apache.streampark.flink.connector.jdbc.internal.{Jdbc2PCSinkFunction, JdbcSinkFunction} diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala index 9bfe176e0..8a60922a8 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/sink/KafkaSink.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.kafka.sink -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.conf.ConfigKeys import org.apache.streampark.common.util.{ConfigUtils, Utils} import org.apache.streampark.flink.connector.kafka.bean.KafkaEqualityPartitioner import org.apache.streampark.flink.connector.sink.Sink @@ -83,10 +83,10 @@ class KafkaSink( val producer = { val prop = ConfigUtils.getKafkaSinkConf(ctx.parameter.toMap, topic, alias) Utils.copyProperties(property, prop) - val topicId = prop.remove(ConfigConst.KEY_KAFKA_TOPIC).toString + val topicId = prop.remove(ConfigKeys.KEY_KAFKA_TOPIC).toString /** kafkaProducersPoolSize will be used under EXACTLY_ONCE semantics */ - val semantic = Try(Some(prop.remove(ConfigConst.KEY_KAFKA_SEMANTIC).toString.toUpperCase)) + val semantic = Try(Some(prop.remove(ConfigKeys.KEY_KAFKA_SEMANTIC).toString.toUpperCase)) .getOrElse(None) match { case None => Semantic.AT_LEAST_ONCE case Some("AT_LEAST_ONCE") => Semantic.AT_LEAST_ONCE diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala index 05e8e7b8c..aea660284 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-kafka/src/main/scala/org/apache/streampark/flink/connector/kafka/source/KafkaSource.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.connector.kafka.source -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.util.{ConfigUtils, Utils} import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord import org.apache.streampark.flink.core.scala.StreamingContext @@ -25,7 +25,7 @@ import org.apache.streampark.flink.core.scala.StreamingContext import org.apache.flink.api.common.eventtime.WatermarkStrategy import org.apache.flink.api.common.typeinfo.{BasicTypeInfo, TypeInformation} import org.apache.flink.api.java.typeutils.TypeExtractor.getForClass -import org.apache.flink.streaming.api.scala.{DataStream, _} +import org.apache.flink.streaming.api.scala._ import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer, KafkaDeserializationSchema} import org.apache.flink.streaming.connectors.kafka.internals.KafkaTopicPartition import org.apache.kafka.clients.consumer.{ConsumerConfig, ConsumerRecord} diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala index c7bfbd155..b480231c0 100644 --- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala +++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreamTable.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.core.scala -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} import org.apache.streampark.flink.core.{FlinkTableInitializer, StreamTableContext} import org.apache.streampark.flink.core.TableExt diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala index 7dffccd29..8d5dca940 100644 --- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala +++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkStreaming.scala @@ -17,8 +17,8 @@ package org.apache.streampark.flink.core.scala -import org.apache.streampark.common.conf.ConfigConst._ -import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} +import org.apache.streampark.common.conf.ConfigKeys._ +import org.apache.streampark.common.util.{Logger, SystemPropertyUtils, Utils} import org.apache.streampark.flink.core.{FlinkStreamingInitializer, StreamEnvConfig} import org.apache.streampark.flink.core.EnhancerImplicit._ @@ -50,7 +50,7 @@ class StreamingContext( } @Deprecated override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkStreaming $jobName Starting...") + Utils.printLogo(s"FlinkStreaming $jobName Starting...") super.execute(jobName) } } diff --git a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala index 78ead4fb2..44ed2b972 100644 --- a/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala +++ b/streampark-flink/streampark-flink-core/src/main/scala/org/apache/streampark/flink/core/scala/FlinkTable.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.core.scala -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.util.{Logger, SystemPropertyUtils} import org.apache.streampark.flink.core.{FlinkTableInitializer, TableContext} diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala index edd679ad7..ee67a202d 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.kubernetes -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.conf.ConfigKeys import org.apache.streampark.common.util.ImplicitsUtils._ import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum @@ -116,7 +116,7 @@ object KubernetesRetriever extends Logger { .apps() .deployments() .inNamespace(namespace) - .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL) + .withLabel("type", ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL) .list() .getItems .asScala diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala index e97fcfc1f..5cba3b36f 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/ingress/IngressStrategy.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.kubernetes.ingress -import org.apache.streampark.common.conf.{ConfigConst, InternalConfigHolder, K8sFlinkConfig} +import org.apache.streampark.common.conf.{ConfigKeys, InternalConfigHolder, K8sFlinkConfig} import io.fabric8.kubernetes.api.model.{OwnerReference, OwnerReferenceBuilder} import io.fabric8.kubernetes.client.DefaultKubernetesClient @@ -63,7 +63,7 @@ trait IngressStrategy { def buildIngressLabels(clusterId: String): Map[String, String] = { Map( "app" -> clusterId, - "type" -> ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL, + "type" -> ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL, "component" -> "ingress" ) } diff --git a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala index f49bf4406..4e3681a81 100644 --- a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala +++ b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkK8sEventWatcher.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.kubernetes.watcher -import org.apache.streampark.common.conf.ConfigConst +import org.apache.streampark.common.conf.ConfigKeys import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.kubernetes.{FlinkK8sWatchController, KubernetesRetriever} import org.apache.streampark.flink.kubernetes.model.{K8sDeploymentEventCV, K8sEventKey} @@ -67,7 +67,7 @@ class FlinkK8sEventWatcher(implicit watchController: FlinkK8sWatchController) k8sClient .apps() .deployments() - .withLabel("type", ConfigConst.FLINK_NATIVE_KUBERNETES_LABEL) + .withLabel("type", ConfigKeys.FLINK_NATIVE_KUBERNETES_LABEL) .watch(new CompatibleKubernetesWatcher[Deployment, CompKubernetesDeployment] { override def eventReceived(action: Watcher.Action, event: Deployment): Unit = { handleDeploymentEvent(action, event) diff --git a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala index 2a8050f35..de3b9eaed 100644 --- a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala +++ b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala @@ -18,7 +18,7 @@ package org.apache.streampark.flink.proxy import org.apache.streampark.common.Constant -import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion} +import org.apache.streampark.common.conf.{ConfigKeys, FlinkVersion} import org.apache.streampark.common.util.{ClassLoaderUtils, Logger, Utils} import org.apache.streampark.common.util.ImplicitsUtils._ @@ -108,10 +108,10 @@ object FlinkShimsProxy extends Logger { } def addShimsUrls(flinkVersion: FlinkVersion, addShimUrl: File => Unit): Unit = { - val appHome = System.getProperty(ConfigConst.KEY_APP_HOME) + val appHome = System.getProperty(ConfigKeys.KEY_APP_HOME) require( appHome != null, - String.format("%s is not found on System env.", ConfigConst.KEY_APP_HOME)) + String.format("%s is not found on System env.", ConfigKeys.KEY_APP_HOME)) val libPath = new File(s"$appHome/lib") require(libPath.exists()) diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala index 528e47907..d855ee030 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/EnhancerImplicit.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_NAME, KEY_FLINK_APP_NAME} +import org.apache.streampark.common.conf.ConfigKeys.{KEY_APP_NAME, KEY_FLINK_APP_NAME} import org.apache.streampark.common.util.DeflaterUtils import org.apache.flink.api.java.utils.ParameterTool diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala index 8d18a31e2..9d6a09339 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlExecutor.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.KEY_FLINK_SQL +import org.apache.streampark.common.conf.ConfigKeys.KEY_FLINK_SQL import org.apache.streampark.common.util.Logger import org.apache.streampark.flink.core.SqlCommand._ diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala index 53e381275..517ae1ae6 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamTableTrait.scala @@ -16,7 +16,8 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ +import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.core.EnhancerImplicit._ import com.esotericsoftware.kryo.Serializer @@ -77,7 +78,7 @@ abstract class FlinkStreamTableTrait( } @Deprecated def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkStreamTable $jobName Starting...") + Utils.printLogo(s"FlinkStreamTable $jobName Starting...") if (isConvertedToDataStream) { streamEnv.execute(jobName) } else null diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala index 87a54bd07..bfb4be1cc 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkStreamingInitializer.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.ApiType import org.apache.streampark.common.enums.ApiType.ApiType import org.apache.streampark.common.util._ diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala index 2169b8ea5..ce8c93823 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableInitializer.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.enums.{ApiType, PlannerType} import org.apache.streampark.common.enums.ApiType.ApiType import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala index e581f857c..71b1a1c48 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkTableTrait.scala @@ -16,7 +16,8 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst._ +import org.apache.streampark.common.conf.ConfigKeys._ +import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.core.EnhancerImplicit._ import org.apache.flink.api.common.JobExecutionResult @@ -40,7 +41,7 @@ abstract class FlinkTableTrait(val parameter: ParameterTool, private val tableEn } def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala index dc4de596c..157216ce8 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/SqlCommandParser.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.PARAM_PREFIX +import org.apache.streampark.common.conf.ConfigKeys.PARAM_PREFIX import org.apache.streampark.common.enums.FlinkSqlValidationFailedType import org.apache.streampark.common.util.Logger diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala index 1206c817e..3331253b3 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/conf/ParameterCli.scala @@ -16,8 +16,8 @@ */ package org.apache.streampark.flink.core.conf -import org.apache.streampark.common.conf.ConfigConst -import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_OPTION_PREFIX, KEY_FLINK_PROPERTY_PREFIX} +import org.apache.streampark.common.conf.ConfigKeys +import org.apache.streampark.common.conf.ConfigKeys.{KEY_FLINK_OPTION_PREFIX, KEY_FLINK_PROPERTY_PREFIX} import org.apache.streampark.common.util.PropertiesUtils import org.apache.commons.cli.{DefaultParser, Options} @@ -96,7 +96,7 @@ object ParameterCli { x => val key = x._1.drop(propertyPrefix.length).trim val value = x._2.trim - if (key == ConfigConst.KEY_FLINK_APP_NAME) { + if (key == ConfigKeys.KEY_FLINK_APP_NAME) { buffer.append(s" -D$key=${value.replace(" ", "_")}") } else { buffer.append(s" -D$key=$value") @@ -104,7 +104,7 @@ object ParameterCli { } buffer.toString.trim case "--name" => - map.getOrElse(propertyPrefix.concat(ConfigConst.KEY_FLINK_APP_NAME), "").trim match { + map.getOrElse(propertyPrefix.concat(ConfigKeys.KEY_FLINK_APP_NAME), "").trim match { case appName if appName.nonEmpty => appName case _ => "" } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala index a681018a3..ed6040fa3 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims-test/src/test/scala/org/apache/streampark/flink/core/test/FlinkSqlExecuteFunSuite.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core.test -import org.apache.streampark.common.conf.ConfigConst.{KEY_FLINK_SQL, PARAM_PREFIX} +import org.apache.streampark.common.conf.ConfigKeys.{KEY_FLINK_SQL, PARAM_PREFIX} import org.apache.streampark.common.util.DeflaterUtils import org.apache.streampark.flink.core.{FlinkSqlExecutor, FlinkTableInitializer, StreamTableContext} diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index bbe35967c..f7aedd8ea 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.12/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.printLogo +import org.apache.streampark.common.util.Utils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.utils.ParameterTool @@ -46,7 +46,7 @@ class TableContext(override val parameter: ParameterTool, private val tableEnv: tableEnv.connect(connectorDescriptor) override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index 96ac48549..f0f2ad135 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.13/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.printLogo +import org.apache.streampark.common.util.Utils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.utils.ParameterTool @@ -51,7 +51,7 @@ class TableContext(override val parameter: ParameterTool, private val tableEnv: tableEnv.connect(connectorDescriptor) override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index fcd7708f3..d0815b5cb 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.14/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -16,7 +16,7 @@ */ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.printLogo +import org.apache.streampark.common.util.Utils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.utils.ParameterTool @@ -64,7 +64,7 @@ class TableContext(override val parameter: ParameterTool, private val tableEnv: def from(descriptor: TableDescriptor): Table = tableEnv.from(descriptor) @Deprecated override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala index 64337d26b..8ca8e3fe1 100644 --- a/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala +++ b/streampark-flink/streampark-flink-shims/streampark-flink-shims_flink-1.15/src/main/scala/org/apache/streampark/flink/core/TableContext.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.core -import org.apache.streampark.common.conf.ConfigConst.printLogo +import org.apache.streampark.common.util.Utils import org.apache.flink.api.common.JobExecutionResult import org.apache.flink.api.java.utils.ParameterTool @@ -46,7 +46,7 @@ class TableContext(override val parameter: ParameterTool, private val tableEnv: override def listFullModules(): Array[ModuleEntry] = tableEnv.listFullModules() override def execute(jobName: String): JobExecutionResult = { - printLogo(s"FlinkTable $jobName Starting...") + Utils.printLogo(s"FlinkTable $jobName Starting...") null } diff --git a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala index 3b094d086..e305f428b 100644 --- a/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala +++ b/streampark-flink/streampark-flink-sqlclient/src/main/scala/org/apache/streampark/flink/cli/SqlClient.scala @@ -17,7 +17,7 @@ package org.apache.streampark.flink.cli -import org.apache.streampark.common.conf.ConfigConst.{KEY_APP_CONF, KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE} +import org.apache.streampark.common.conf.ConfigKeys.{KEY_APP_CONF, KEY_FLINK_SQL, KEY_FLINK_TABLE_MODE} import org.apache.streampark.common.util.{DeflaterUtils, PropertiesUtils} import org.apache.streampark.flink.core.{SqlCommand, SqlCommandParser} import org.apache.streampark.flink.core.scala.{FlinkStreamTable, FlinkTable}
