This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev by this push:
new db330bb25 [ISSUE-3083][Improve] Improve streampark-flink module base
on [3.2 Constant Variables Definition] (#3264)
db330bb25 is described below
commit db330bb258a08459a04b6c93754c6af75a1c0ac0
Author: Yuepeng Pan <[email protected]>
AuthorDate: Mon Oct 23 10:48:10 2023 +0800
[ISSUE-3083][Improve] Improve streampark-flink module base on [3.2 Constant
Variables Definition] (#3264)
---
.../org/apache/streampark/common/Constant.java | 22 ++++++++++----------
.../streampark/common/util/ClassLoaderUtils.scala | 6 +++++-
.../apache/streampark/common/util/YarnUtils.scala | 5 +++--
.../console/core/entity/Application.java | 9 +++-----
.../impl/ApplicationActionServiceImpl.java | 4 +++-
.../impl/ApplicationInfoServiceImpl.java | 3 ++-
.../core/service/impl/AppBuildPipeServiceImpl.java | 4 +++-
.../core/service/impl/ProjectServiceImpl.java | 3 ++-
.../console/core/task/ProjectBuildTask.java | 5 +++--
.../console/base/util/EncryptUtilsTest.java | 6 ++++--
.../console/base/util/ShaHashUtilsTest.java | 4 +++-
.../flink/client/bean/CancelRequest.scala | 5 +++--
.../flink/client/bean/SavepointRequestTrait.scala | 5 +++--
.../client/bean/TriggerSavepointRequest.scala | 5 +++--
.../impl/KubernetesApplicationClientV2.scala | 3 ++-
.../client/impl/KubernetesSessionClientV2.scala | 5 +++--
.../flink/connector/failover/SinkRequest.scala | 4 +++-
.../conf/ClickHouseSinkConfigOption.scala | 5 +++--
.../clickhouse/internal/ClickHouseWriterTask.scala | 2 +-
.../connector/clickhouse/sink/ClickHouseSink.scala | 10 +++++----
.../doris/conf/DorisSinkConfigOption.scala | 2 +-
.../connector/elasticsearch5/sink/ES5Sink.scala | 11 ++++++----
.../elasticsearch5/util/ElasticsearchUtils.scala | 4 ++--
.../connector/elasticsearch6/sink/ES6Sink.scala | 11 ++++++----
.../elasticsearch6/util/ElasticsearchUtils.scala | 4 ++--
.../bean/RestClientFactoryImpl.scala | 2 +-
.../connector/elasticsearch7/sink/ES7Sink.scala | 12 +++++++----
.../elasticsearch7/util/ElasticsearchUtils.scala | 4 ++--
.../connector/hbase/source/HBaseJavaSource.java | 4 ++--
.../flink/connector/hbase/sink/HBaseSink.scala | 6 +++---
.../connector/http/function/HttpSinkFunction.scala | 2 +-
.../flink/connector/jdbc/sink/JdbcJavaSink.java | 2 +-
.../connector/jdbc/source/JdbcJavaSource.java | 4 ++--
.../flink/connector/jdbc/sink/JdbcSink.scala | 2 +-
.../connector/mongo/source/MongoJavaSource.java | 6 +++---
.../flink/connector/redis/bean/RedisMapper.scala | 20 ++++++++++--------
.../flink/connector/redis/conf/RedisConfig.scala | 2 +-
.../flink/connector/redis/sink/RedisSink.scala | 14 ++++++-------
.../flink/kubernetes/v2/operator/CROperator.scala | 12 ++++++-----
.../flink/kubernetes/model/ClusterKey.scala | 4 ++--
.../flink/kubernetes/model/TrackId.scala | 4 ++--
.../streampark/flink/packer/PackerResourceGC.scala | 3 ++-
.../docker/FlinkDockerfileTemplateTrait.scala | 8 ++++++--
.../streampark/flink/packer/maven/MavenTool.scala | 11 +++++++---
.../flink/packer/pipeline/BuildPipeline.scala | 14 ++++++-------
.../impl/FlinkK8sApplicationBuildPipeline.scala | 24 +++++++++++-----------
.../impl/FlinkK8sApplicationBuildPipelineV2.scala | 4 ++--
.../impl/FlinkK8sSessionBuildPipeline.scala | 4 ++--
.../pipeline/impl/FlinkRemoteBuildPipeline.scala | 2 +-
.../impl/FlinkYarnApplicationBuildPipeline.scala | 2 +-
.../streampark/flink/proxy/FlinkShimsProxy.scala | 11 +++++-----
.../streampark/flink/core/FlinkSqlValidator.scala | 3 ++-
.../streampark-flink-sql-gateway-base/pom.xml | 5 +++++
.../streampark/gateway/factories/FactoryUtil.java | 3 ++-
.../factories/SqlGatewayServiceFactoryUtils.java | 3 ++-
55 files changed, 201 insertions(+), 143 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
similarity index 61%
copy from
streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
copy to
streampark-common/src/main/java/org/apache/streampark/common/Constant.java
index 398c14288..ac02bb8a1 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
+++ b/streampark-common/src/main/java/org/apache/streampark/common/Constant.java
@@ -15,18 +15,18 @@
* limitations under the License.
*/
-package org.apache.streampark.console.base.util;
+package org.apache.streampark.common;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.Test;
+/** A constant class to hold the constants variables. */
+public class Constant {
-class EncryptUtilsTest {
+ private Constant() {}
- @Test
- void testEncrypt() throws Exception {
- String value = "apache streampark";
- String encrypt = EncryptUtils.encrypt(value, "streampark");
- String decrypt = EncryptUtils.decrypt(encrypt, "streampark");
- Assertions.assertEquals(value, decrypt);
- }
+ public static final String DEFAULT = "default";
+ public static final String STREAM_PARK = "streampark";
+ public static final String HTTP_SCHEMA = "http://";
+ public static final String HTTPS_SCHEMA = "https://";
+ public static final String JAR_SUFFIX = ".jar";
+ public static final String ZIP_SUFFIX = ".zip";
+ public static final String SEMICOLON = ";";
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
index e830d20c6..466674767 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ClassLoaderUtils.scala
@@ -16,6 +16,8 @@
*/
package org.apache.streampark.common.util
+import org.apache.streampark.common.Constant
+
import java.io.{File, IOException}
import java.net.{URL, URLClassLoader}
import java.util.function.Supplier
@@ -106,7 +108,9 @@ object ClassLoaderUtils extends Logger {
loopDirs(file)
}
- private[this] def loadPath(filepath: String, ext: List[String] =
List(".jar", ".zip")): Unit = {
+ private[this] def loadPath(
+ filepath: String,
+ ext: List[String] = List(Constant.JAR_SUFFIX, Constant.ZIP_SUFFIX)):
Unit = {
val file = new File(filepath)
loopFiles(file, ext)
}
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
index 849628362..2839bcc5e 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/YarnUtils.scala
@@ -16,6 +16,7 @@
*/
package org.apache.streampark.common.util
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{CommonConfig, InternalConfigHolder}
import org.apache.commons.lang3.StringUtils
@@ -127,8 +128,8 @@ object YarnUtils extends Logger {
val conf = HadoopUtils.hadoopConf
val useHttps = YarnConfiguration.useHttps(conf)
val (addressPrefix, defaultPort, protocol) = useHttps match {
- case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090",
"https://")
- case _ => (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088", "http://")
+ case x if x => (YarnConfiguration.RM_WEBAPP_HTTPS_ADDRESS, "8090",
Constant.HTTPS_SCHEMA)
+ case _ => (YarnConfiguration.RM_WEBAPP_ADDRESS, "8088",
Constant.HTTP_SCHEMA)
}
rmHttpURL = Option(conf.get("yarn.web-proxy.address", null)) match {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
index c87736004..670795012 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java
@@ -17,8 +17,8 @@
package org.apache.streampark.console.core.entity;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigConst;
-import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.ApplicationType;
import org.apache.streampark.common.enums.FlinkDevelopmentMode;
@@ -103,7 +103,7 @@ public class Application implements Serializable {
private String k8sName;
/** k8s namespace */
- private String k8sNamespace = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE();
+ private String k8sNamespace = Constant.DEFAULT;
/** The exposed type of the rest service of
K8s(kubernetes.rest-service.exposed.type) */
private Integer k8sRestExposedType;
@@ -258,10 +258,7 @@ public class Application implements Serializable {
}
public void setK8sNamespace(String k8sNamespace) {
- this.k8sNamespace =
- StringUtils.isBlank(k8sNamespace)
- ? K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE()
- : k8sNamespace;
+ this.k8sNamespace = StringUtils.isBlank(k8sNamespace) ? Constant.DEFAULT :
k8sNamespace;
}
public K8sPodTemplates getK8sPodTemplates() {
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
index bb1aff272..b60bd0d34 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationActionServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.application.impl;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
@@ -651,7 +652,8 @@ public class ApplicationActionServiceImpl extends
ServiceImpl<ApplicationMapper,
case STREAMPARK_FLINK:
flinkUserJar =
String.format(
- "%s/%s", application.getAppLib(),
application.getModule().concat(".jar"));
+ "%s/%s",
+ application.getAppLib(),
application.getModule().concat(Constant.JAR_SUFFIX));
break;
case APACHE_FLINK:
flinkUserJar = String.format("%s/%s", application.getAppHome(),
application.getJar());
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
index 9c6decfa4..8b836c26c 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/ApplicationInfoServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.application.impl;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
import org.apache.streampark.common.enums.FlinkExecutionMode;
@@ -311,7 +312,7 @@ public class ApplicationInfoServiceImpl extends
ServiceImpl<ApplicationMapper, A
.filter(File::isFile)
.sorted(Comparator.comparingLong(File::lastModified).reversed())
.map(File::getName)
- .filter(fn -> fn.endsWith(".jar"))
+ .filter(fn -> fn.endsWith(Constant.JAR_SUFFIX))
.limit(DEFAULT_HISTORY_RECORD_LIMIT)
.collect(Collectors.toList());
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
index 1c9984752..222dc56bf 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.ConfigConst;
import org.apache.streampark.common.conf.K8sFlinkConfig;
import org.apache.streampark.common.conf.Workspace;
@@ -532,7 +533,8 @@ public class AppBuildPipeServiceImpl
case CUSTOM_CODE:
switch (app.getApplicationType()) {
case STREAMPARK_FLINK:
- return String.format("%s/%s", app.getAppLib(),
app.getModule().concat(".jar"));
+ return String.format(
+ "%s/%s", app.getAppLib(),
app.getModule().concat(Constant.JAR_SUFFIX));
case APACHE_FLINK:
return String.format("%s/%s", app.getAppHome(), app.getJar());
default:
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
index 307d76fba..c4117b5fc 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ProjectServiceImpl.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.service.impl;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.common.conf.CommonConfig;
import org.apache.streampark.common.conf.InternalConfigHolder;
import org.apache.streampark.common.conf.Workspace;
@@ -251,7 +252,7 @@ public class ProjectServiceImpl extends
ServiceImpl<ProjectMapper, Project>
project.getModule(), "Project module can't be null, please check.");
File apps = new File(project.getDistHome(), project.getModule());
for (File file : Objects.requireNonNull(apps.listFiles())) {
- if (file.getName().endsWith(".jar")) {
+ if (file.getName().endsWith(Constant.JAR_SUFFIX)) {
list.add(file.getName());
}
}
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
index ded8897e6..b860f49db 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/ProjectBuildTask.java
@@ -17,6 +17,7 @@
package org.apache.streampark.console.core.task;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.common.util.CommandUtils;
import org.apache.streampark.common.util.Utils;
import org.apache.streampark.console.base.util.GitUtils;
@@ -181,7 +182,7 @@ public class ProjectBuildTask extends AbstractLogFileTask {
} else {
// 2) .jar file(normal or official standard flink project)
Utils.checkJarFile(app.toURI().toURL());
- String moduleName = app.getName().replace(".jar", "");
+ String moduleName = app.getName().replace(Constant.JAR_SUFFIX, "");
File distHome = project.getDistHome();
File targetDir = new File(distHome, moduleName);
if (!targetDir.exists()) {
@@ -211,7 +212,7 @@ public class ProjectBuildTask extends AbstractLogFileTask {
// 2) try look for jar files, there may be multiple jars found.
if (!targetFile.getName().startsWith("original-")
&& !targetFile.getName().endsWith("-sources.jar")
- && targetFile.getName().endsWith(".jar")) {
+ && targetFile.getName().endsWith(Constant.JAR_SUFFIX)) {
if (jar == null) {
jar = targetFile;
} else {
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
index 398c14288..939030b3c 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/EncryptUtilsTest.java
@@ -17,6 +17,8 @@
package org.apache.streampark.console.base.util;
+import org.apache.streampark.common.Constant;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -25,8 +27,8 @@ class EncryptUtilsTest {
@Test
void testEncrypt() throws Exception {
String value = "apache streampark";
- String encrypt = EncryptUtils.encrypt(value, "streampark");
- String decrypt = EncryptUtils.decrypt(encrypt, "streampark");
+ String encrypt = EncryptUtils.encrypt(value, Constant.STREAM_PARK);
+ String decrypt = EncryptUtils.decrypt(encrypt, Constant.STREAM_PARK);
Assertions.assertEquals(value, decrypt);
}
}
diff --git
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ShaHashUtilsTest.java
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ShaHashUtilsTest.java
index 62f9891ad..2fd9e6b15 100644
---
a/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ShaHashUtilsTest.java
+++
b/streampark-console/streampark-console-service/src/test/java/org/apache/streampark/console/base/util/ShaHashUtilsTest.java
@@ -17,6 +17,8 @@
package org.apache.streampark.console.base.util;
+import org.apache.streampark.common.Constant;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -26,7 +28,7 @@ class ShaHashUtilsTest {
@Test
void testEncrypt() {
String randomSalt = "rh8b1ojwog777yrg0daesf04gk";
- String encryptPassword = ShaHashUtils.encrypt(randomSalt, "streampark");
+ String encryptPassword = ShaHashUtils.encrypt(randomSalt,
Constant.STREAM_PARK);
Assertions.assertEquals(
"2513f3748847298ea324dffbf67fe68681dd92315bda830065facd8efe08f54f",
encryptPassword);
}
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
index 8487c3927..2ac83eb73 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/CancelRequest.scala
@@ -17,7 +17,8 @@
package org.apache.streampark.flink.client.bean
-import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.FlinkVersion
import org.apache.streampark.common.enums.FlinkExecutionMode
import javax.annotation.Nullable
@@ -35,5 +36,5 @@ case class CancelRequest(
withDrain: Boolean,
savepointPath: String,
nativeFormat: Boolean,
- override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
+ override val kubernetesNamespace: String = Constant.DEFAULT)
extends SavepointRequestTrait
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
index 3165250aa..526d37163 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/SavepointRequestTrait.scala
@@ -17,7 +17,8 @@
package org.apache.streampark.flink.client.bean
-import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.FlinkVersion
import org.apache.streampark.common.enums.FlinkExecutionMode
import javax.annotation.Nullable
@@ -40,7 +41,7 @@ trait SavepointRequestTrait {
val nativeFormat: Boolean
- val kubernetesNamespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE
+ val kubernetesNamespace: String = Constant.DEFAULT
@Nullable val properties: JavaMap[String, Any]
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
index 065da0bea..fbc81a1f3 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-api/src/main/scala/org/apache/streampark/flink/client/bean/TriggerSavepointRequest.scala
@@ -17,7 +17,8 @@
package org.apache.streampark.flink.client.bean
-import org.apache.streampark.common.conf.{FlinkVersion, K8sFlinkConfig}
+import org.apache.streampark.common.Constant
+import org.apache.streampark.common.conf.FlinkVersion
import org.apache.streampark.common.enums.FlinkExecutionMode
import javax.annotation.Nullable
@@ -34,5 +35,5 @@ case class TriggerSavepointRequest(
jobId: String,
savepointPath: String,
nativeFormat: Boolean,
- override val kubernetesNamespace: String =
K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE)
+ override val kubernetesNamespace: String = Constant.DEFAULT)
extends SavepointRequestTrait
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
index c768ea378..4b51af2d0 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesApplicationClientV2.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.client.impl
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.util.Logger
import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
@@ -90,7 +91,7 @@ object KubernetesApplicationClientV2 extends
KubernetesClientV2Trait with Logger
val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
- .getOrElse("default")
+ .getOrElse(Constant.DEFAULT)
val name = submitReq.k8sSubmitParam.kubernetesName
.orElse(Option(submitReq.k8sSubmitParam.clusterId))
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
index f72798f01..8b2c3969d 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesSessionClientV2.scala
@@ -16,6 +16,7 @@
*/
package org.apache.streampark.flink.client.impl
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.util.Logger
import org.apache.streampark.common.zio.ZIOExt.{IOOps, OptionZIOOps}
import org.apache.streampark.flink.client.`trait`.KubernetesClientV2Trait
@@ -88,7 +89,7 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
val flinkConfObj = originFlinkConfig.clone()
val namespace = Option(submitReq.k8sSubmitParam.kubernetesNamespace)
- .getOrElse("default")
+ .getOrElse(Constant.DEFAULT)
val name = submitReq.k8sSubmitParam.kubernetesName
.filter(str => StringUtils.isNotBlank(str))
@@ -196,7 +197,7 @@ object KubernetesSessionClientV2 extends
KubernetesClientV2Trait with Logger {
val flinkConfMap = originFlinkConfig.toMap.asScala.toMap
val namespace = Option(deployReq.k8sDeployParam.kubernetesNamespace)
- .getOrElse("default")
+ .getOrElse(Constant.DEFAULT)
val name = Option(deployReq.k8sDeployParam.clusterId)
.filter(str => StringUtils.isNotBlank(str))
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
index 7f5fc5c9c..f1b71fba6 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/scala/org/apache/streampark/flink/connector/failover/SinkRequest.scala
@@ -22,6 +22,8 @@ import org.apache.streampark.common.util.Logger
import java.util
import java.util.regex.Pattern
+import org.apache.streampark.common.Constant
+
import scala.collection.convert.ImplicitConversions.`collection
AsScalaIterable`
import scala.collection.mutable
import scala.collection.mutable.ListBuffer
@@ -62,7 +64,7 @@ case class SinkRequest(records: util.List[String], var
attemptCounter: Int = 0)
result = prefixMap.map(m => s"""${m._1} VALUES
${m._2.mkString(",")}""").toList
}
- logDebug(s"script to commit: ${result.mkString(";")}")
+ logDebug(s"script to commit: ${result.mkString(Constant.SEMICOLON)}")
result
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseSinkConfigOption.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseSinkConfigOption.scala
index 527951a47..9cd0d7af4 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseSinkConfigOption.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/conf/ClickHouseSinkConfigOption.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.connector.clickhouse.conf
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.ConfigOption
import org.apache.streampark.common.util.ConfigUtils
@@ -57,7 +58,7 @@ class ClickHouseSinkConfigOption(prefixStr: String,
properties: Properties) exte
.getProperty(k)
.split(SIGN_COMMA)
.filter(_.nonEmpty)
- .map(_.replaceAll("\\s+", "").replaceFirst("^http://|^", "http://"))
+ .map(_.replaceAll("\\s+", "").replaceFirst("^http://|^",
Constant.HTTP_SCHEMA))
.toList
}
)
@@ -71,7 +72,7 @@ class ClickHouseSinkConfigOption(prefixStr: String,
properties: Properties) exte
val database: ConfigOption[String] = ConfigOption(
key = "database",
required = true,
- defaultValue = "default",
+ defaultValue = Constant.DEFAULT,
classType = classOf[String])
val requestTimeout: ConfigOption[Int] = ConfigOption(
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
index c6afc284e..65c4fdf9d 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/internal/ClickHouseWriterTask.scala
@@ -127,7 +127,7 @@ case class ClickHouseWriterTask(
s"""Failed to send data to ClickHouse, cause: limit of attempts is
exceeded. ClickHouse response = $response. Ready to flush data to
${clickHouseConf.storageType}""")
failoverWriter.write(sinkRequest)
logInfo(
- s"failover Successful, StorageType = ${clickHouseConf.storageType},
size = ${sinkRequest.size}")
+ s"Failover Successful, StorageType = ${clickHouseConf.storageType},
size = ${sinkRequest.size}")
} else {
sinkRequest.incrementCounter()
logWarn(
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/sink/ClickHouseSink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/sink/ClickHouseSink.scala
index c2e5158cb..65c70d98f 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/sink/ClickHouseSink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-clickhouse/src/main/scala/org/apache/streampark/flink/connector/clickhouse/sink/ClickHouseSink.scala
@@ -33,6 +33,8 @@ import scala.annotation.meta.param
object ClickHouseSink {
+ val sinkNullHintMsg = "Sink Stream must not null"
+
/**
* @param property
* @param parallelism
@@ -84,7 +86,7 @@ class ClickHouseSink(
*/
def asyncSink[T](stream: DataStream[T])(implicit
toSQLFn: T => String = null): DataStreamSink[T] = {
- require(stream != null, () => s"sink Stream must not null")
+ require(stream != null, () => sinkNullHintMsg)
val sinkFun = new AsyncClickHouseSinkFunction[T](prop, toSQLFn)
val sink = stream.addSink(sinkFun)
afterSink(sink, parallelism, name, uid)
@@ -102,7 +104,7 @@ class ClickHouseSink(
def asyncSink[T](
stream: JavaDataStream[T],
toSQLFn: TransformFunction[T, String]): DataStreamSink[T] = {
- require(stream != null, () => s"sink Stream must not null")
+ require(stream != null, () => sinkNullHintMsg)
val sinkFun = new AsyncClickHouseSinkFunction[T](prop, toSQLFn)
val sink = stream.addSink(sinkFun)
afterSink(sink, parallelism, name, uid)
@@ -128,7 +130,7 @@ class ClickHouseSink(
*/
def jdbcSink[T](stream: DataStream[T])(implicit
toSQLFn: T => String = null): DataStreamSink[T] = {
- require(stream != null, () => s"sink Stream must not null")
+ require(stream != null, () => sinkNullHintMsg)
val sinkFun = new ClickHouseSinkFunction[T](prop, toSQLFn)
val sink = stream.addSink(sinkFun)
afterSink(sink, parallelism, name, uid)
@@ -146,7 +148,7 @@ class ClickHouseSink(
def jdbcSink[T](
stream: JavaDataStream[T],
sqlFromFn: TransformFunction[T, String]): DataStreamSink[T] = {
- require(stream != null, () => s"sink Stream must not null")
+ require(stream != null, () => sinkNullHintMsg)
val sinkFun = new ClickHouseSinkFunction[T](prop, sqlFromFn)
val sink = stream.addSink(sinkFun)
afterSink(sink, parallelism, name, uid)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
index a4940df84..2af63c3ec 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-doris/src/main/scala/org/apache/streampark/connector/doris/conf/DorisSinkConfigOption.scala
@@ -49,7 +49,7 @@ class DorisSinkConfigOption(prefixStr: String, properties:
Properties) extends S
.getProperty(k)
.split(SIGN_COMMA)
.filter(_.nonEmpty)
- .map(_.replaceAll("\\s+", "").replaceFirst("^http://|^", "http://"))
+ .map(_.replaceAll("\\s+", "").replaceFirst("^http://|^",
Constant.HTTP_SCHEMA))
.toList
}
)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
index f0fd6b736..c1e2ba742 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/sink/ES5Sink.scala
@@ -38,6 +38,9 @@ import scala.collection.convert.ImplicitConversions._
object ES5Sink {
+ val functionNullHintMsg = "ES pocess element func must not null"
+ val sinkNullHintMsg = "Sink Stream must not null"
+
def apply(
@(transient @param)
property: Properties = new Properties(),
@@ -73,8 +76,8 @@ class ES5Sink(
stream: JavaDataStream[T],
failureHandler: ActionRequestFailureHandler,
f: TransformFunction[T, ActionRequest]): DataStreamSink[T] = {
- require(stream != null, () => s"sink Stream must not null")
- require(f != null, () => s"es pocess element func must not null")
+ require(stream != null, () => sinkNullHintMsg)
+ require(f != null, () => functionNullHintMsg)
val esSink: ElasticsearchSink[T] =
new ElasticsearchSink(userConfig, config.host, new ESSinkFunction(f),
failureHandler)
if (config.disableFlushOnCheckpoint) {
@@ -89,8 +92,8 @@ class ES5Sink(
stream: DataStream[T],
failureHandler: ActionRequestFailureHandler,
f: T => ActionRequest): DataStreamSink[T] = {
- require(stream != null, () => s"sink Stream must not null")
- require(f != null, () => s"es pocess element fun must not null")
+ require(stream != null, () => sinkNullHintMsg)
+ require(f != null, () => functionNullHintMsg)
val esSink: ElasticsearchSink[T] =
new ElasticsearchSink(userConfig, config.host, new ESSinkFunction(f),
failureHandler)
if (config.disableFlushOnCheckpoint) {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/util/ElasticsearchUtils.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/util/ElasticsearchUtils.scala
index f05a59daf..519b4239f 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/util/ElasticsearchUtils.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch5/src/main/scala/org/apache/streampark/flink/connector/elasticsearch5/util/ElasticsearchUtils.scala
@@ -25,8 +25,8 @@ object ElasticsearchUtils {
def indexRequest(index: String, indexType: String, id: String, source:
String)(implicit
xContentType: XContentType = XContentType.JSON): IndexRequest = {
- require(source != null, "indexRequest error:source can not be null...")
- require(xContentType != null, "indexRequest error:xContentType can not be
null...")
+ require(source != null, "IndexRequest error:source can not be null...")
+ require(xContentType != null, "IndexRequest error:xContentType can not be
null...")
val indexReq = new IndexRequest(index, indexType, id)
val mapping = List("source" -> new BytesArray(source), "contentType" ->
xContentType)
mapping.foreach {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
index 100c9b7b3..5eacc0bf2 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/sink/ES6Sink.scala
@@ -40,6 +40,9 @@ import scala.collection.convert.ImplicitConversions._
object ES6Sink {
+ val functionNullHintMsg = "ES pocess element func must not null"
+ val sinkNullHintMsg = "Sink Stream must not null"
+
def apply(
@(transient @param)
property: Properties = new Properties(),
@@ -74,8 +77,8 @@ class ES6Sink(
restClientFactory: Option[RestClientFactory],
failureHandler: ActionRequestFailureHandler,
f: T => ActionRequest): DataStreamSink[T] = {
- require(stream != null, "sink Stream must not null")
- require(f != null, "es pocess element func must not null")
+ require(stream != null, sinkNullHintMsg)
+ require(f != null, functionNullHintMsg)
val sinkFunc: ESSinkFunction[T] = new ESSinkFunction(f)
@@ -92,8 +95,8 @@ class ES6Sink(
restClientFactory: Option[RestClientFactory],
failureHandler: ActionRequestFailureHandler,
f: TransformFunction[T, ActionRequest]): DataStreamSink[T] = {
- require(stream != null, () => s"sink Stream must not null")
- require(f != null, () => s"es process element func must not null")
+ require(stream != null, () => sinkNullHintMsg)
+ require(f != null, () => functionNullHintMsg)
val sinkFunc: ESSinkFunction[T] = new ESSinkFunction(f)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/util/ElasticsearchUtils.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/util/ElasticsearchUtils.scala
index 1f91386fe..34cd2d7a8 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/util/ElasticsearchUtils.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch6/src/main/scala/org/apache/streampark/flink/connector/elasticsearch6/util/ElasticsearchUtils.scala
@@ -25,8 +25,8 @@ object ElasticsearchUtils {
def indexRequest(index: String, indexType: String, id: String, source:
String)(implicit
xContentType: XContentType = XContentType.JSON): IndexRequest = {
- require(source != null, "indexRequest error:source can not be null...")
- require(xContentType != null, "indexRequest error:xContentType can not be
null...")
+ require(source != null, "EndexRequest error:source can not be null...")
+ require(xContentType != null, "IndexRequest error:xContentType can not be
null...")
val indexReq = new IndexRequest(index, indexType, id)
val mapping = List("source" -> new BytesArray(source), "contentType" ->
xContentType)
mapping.foreach {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala
index b56c7281e..c30b64e19 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/bean/RestClientFactoryImpl.scala
@@ -55,7 +55,7 @@ class RestClientFactoryImpl(val config: ES7Config) extends
RestClientFactory wit
httpClientBuilder: HttpAsyncClientBuilder): HttpAsyncClientBuilder
= {
if (credentialsProvider != null) {
httpClientBuilder.setDefaultCredentialsProvider(credentialsProvider)
- logInfo("elasticsearch auth by userName,password...")
+ logInfo("Elasticsearch auth by userName,password...")
}
httpClientBuilder
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
index 22557d63c..981772188 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/sink/ES7Sink.scala
@@ -41,6 +41,10 @@ import scala.annotation.meta.param
import scala.collection.convert.ImplicitConversions._
object ES7Sink {
+
+ val functionNullHintMsg = "ES pocess element func must not null"
+ val sinkNullHintMsg = "Sink Stream must not null"
+
def apply(
@(transient @param)
property: Properties = new Properties(),
@@ -74,8 +78,8 @@ class ES7Sink(
restClientFactory: Option[RestClientFactory],
failureHandler: ActionRequestFailureHandler,
f: T => ActionRequest): DataStreamSink[T] = {
- require(stream != null, "sink Stream must not null")
- require(f != null, "es process element func must not null")
+ require(stream != null, sinkNullHintMsg)
+ require(f != null, functionNullHintMsg)
val sinkFunc: ESSinkFunction[T] = new ESSinkFunction(f)
val esSink: ElasticsearchSink[T] = buildESSink(restClientFactory,
failureHandler, sinkFunc)
if (config.disableFlushOnCheckpoint) {
@@ -90,8 +94,8 @@ class ES7Sink(
restClientFactory: Option[RestClientFactory],
failureHandler: ActionRequestFailureHandler,
f: TransformFunction[T, ActionRequest]): DataStreamSink[T] = {
- require(stream != null, "sink Stream must not null")
- require(f != null, "es process element func must not null")
+ require(stream != null, sinkNullHintMsg)
+ require(f != null, functionNullHintMsg)
val sinkFunc: ESSinkFunction[T] = new ESSinkFunction(f)
val esSink: ElasticsearchSink[T] = buildESSink(restClientFactory,
failureHandler, sinkFunc)
if (config.disableFlushOnCheckpoint) {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/util/ElasticsearchUtils.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/util/ElasticsearchUtils.scala
index 036a5581f..855070671 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/util/ElasticsearchUtils.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-elasticsearch/streampark-flink-connector-elasticsearch7/src/main/scala/org/apache/streampark/flink/connector/elasticsearch7/util/ElasticsearchUtils.scala
@@ -25,8 +25,8 @@ object ElasticsearchUtils {
def indexRequest(index: String, id: String, source: String)(implicit
xContentType: XContentType = XContentType.JSON): IndexRequest = {
- require(source != null, "indexRequest error:source can not be null...")
- require(xContentType != null, "indexRequest error:xContentType can not be
null...")
+ require(source != null, "IndexRequest error:source can not be null...")
+ require(xContentType != null, "IndexRequest error:xContentType can not be
null...")
val indexReq = new IndexRequest(index).id(id)
val mapping = List("source" -> new BytesArray(source), "contentType" ->
xContentType)
mapping.foreach {
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
index 4c6144a36..e0bf77d33 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
@@ -42,8 +42,8 @@ public class HBaseJavaSource<T> {
HBaseResultFunction<T> resultFunction,
RunningFunction runningFunc) {
- Utils.notNull(queryFunction, "queryFunction must not be null");
- Utils.notNull(resultFunction, "resultFunction must not be null");
+ Utils.notNull(queryFunction, "QueryFunction must not be null");
+ Utils.notNull(resultFunction, "ResultFunction must not be null");
HBaseSourceFunction<T> sourceFunction =
new HBaseSourceFunction<>(property, queryFunction, resultFunction,
runningFunc, null);
return context.getJavaEnv().addSource(sourceFunction);
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/sink/HBaseSink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/sink/HBaseSink.scala
index d5df0fe94..43061a151 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/sink/HBaseSink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/sink/HBaseSink.scala
@@ -80,9 +80,9 @@ class HBaseSink(
implicit val prop: Properties =
ConfigUtils.getConf(ctx.parameter.toMap, HBASE_PREFIX,
HBASE_PREFIX)(alias)
Utils.copyProperties(property, prop)
- require(stream != null, () => s"sink Stream must not null")
- require(tableName != null, () => s"sink tableName must not null")
- require(fun != null, () => s" fun must not null")
+ require(stream != null, () => s"Sink Stream must not null")
+ require(tableName != null, () => s"Sink tableName must not null")
+ require(fun != null, () => s"Func must not null")
prop
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/function/HttpSinkFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/function/HttpSinkFunction.scala
index 369b8e90d..5bfbe42f6 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/function/HttpSinkFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-http/src/main/scala/org/apache/streampark/flink/connector/http/function/HttpSinkFunction.scala
@@ -62,7 +62,7 @@ class HttpSinkFunction(
val table = thresholdConf.failoverTable
require(
table != null && table.nonEmpty,
- () => s"http async insert failoverTable must not null")
+ () => s"Http async insert failoverTable must not null")
httpSinkWriter = HttpSinkWriter(thresholdConf, header)
failoverChecker = FailoverChecker(thresholdConf.delayTime)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
index 86fca0388..df39f3d61 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
@@ -55,7 +55,7 @@ public class JdbcJavaSink<T> {
}
public DataStreamSink<T> sink(DataStream<T> dataStream) {
- Utils.notNull(sqlFunc, "transformFunction can not be null");
+ Utils.notNull(sqlFunc, "TransformFunction can not be null");
this.jdbc =
this.jdbc == null ?
ConfigUtils.getJdbcConf(context.parameter().toMap(), alias) : this.jdbc;
JdbcSinkFunction<T> sinkFun = new JdbcSinkFunction<>(this.jdbc,
this.sqlFunc);
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index 69f494f23..464f1b715 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -54,8 +54,8 @@ public class JdbcJavaSource<T> {
SQLResultFunction<T> resultFunction,
RunningFunction runningFunc) {
- Utils.notNull(queryFunction, "queryFunction must not be null");
- Utils.notNull(resultFunction, "resultFunction must not be null");
+ Utils.notNull(queryFunction, "'queryFunction' must not be null");
+ Utils.notNull(resultFunction, "'resultFunction' must not be null");
this.jdbc =
this.jdbc == null ?
ConfigUtils.getJdbcConf(context.parameter().toMap(), alias) : this.jdbc;
JdbcSourceFunction<T> sourceFunction =
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
index 7d157c0db..1303e3661 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
@@ -78,7 +78,7 @@ class JdbcSink(
case Semantic.EXACTLY_ONCE =>
val sinkFun = new Jdbc2PCSinkFunction[T](prop, toSQLFn)
if (parallelism > 1) {
- logWarn(s"parallelism:$parallelism, Jdbc Semantic
EXACTLY_ONCE,parallelism bust be 1.")
+ logWarn(s"'parallelism':$parallelism, Jdbc Semantic
EXACTLY_ONCE,parallelism bust be 1.")
}
stream.addSink(sinkFun)
case _ =>
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
index dc49f621d..d7dc70bd9 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
@@ -43,9 +43,9 @@ public class MongoJavaSource<T> {
MongoResultFunction<T> resultFunction,
RunningFunction runningFunc) {
- Utils.notNull(collectionName, "collectionName must not be null");
- Utils.notNull(queryFunction, "queryFunction must not be null");
- Utils.notNull(resultFunction, "resultFunction must not be null");
+ Utils.notNull(collectionName, "'collectionName' must not be null");
+ Utils.notNull(queryFunction, "'queryFunction' must not be null");
+ Utils.notNull(resultFunction, "'resultFunction' must not be null");
MongoSourceFunction<T> sourceFunction =
new MongoSourceFunction<>(
collectionName, property, queryFunction, resultFunction,
runningFunc, null);
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisMapper.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisMapper.scala
index e15f35136..dabffa95f 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisMapper.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/bean/RedisMapper.scala
@@ -25,15 +25,19 @@ import
org.apache.flink.streaming.connectors.redis.common.mapper.{RedisCommand,
object RedisMapper {
+ val additionalFailoverTableNullHint = "Redis additionalKey insert
failoverTable must not null";
+ val insertFailoverTableNullHint = "Redis cmd insert failoverTable must not
null";
+
+
def map[T](
cmd: RedisCommand,
additionalKey: String,
scalaKeyFun: T => String,
scalaValueFun: T => String): RedisMapper[T] = {
- require(cmd != null, () => s"redis cmd insert failoverTable must not
null")
- require(additionalKey != null, () => s"redis additionalKey insert
failoverTable must not null")
- require(scalaKeyFun != null, () => s"redis scalaKeyFun insert
failoverTable must not null")
- require(scalaValueFun != null, () => s"redis scalaValueFun insert
failoverTable must not null")
+ require(cmd != null, () => insertFailoverTableNullHint)
+ require(additionalKey != null, () => failoverTableNullHint)
+ require(scalaKeyFun != null, () => s"Redis scalaKeyFun insert
failoverTable must not null")
+ require(scalaValueFun != null, () => s"Redis scalaValueFun insert
failoverTable must not null")
new RedisMapper[T](cmd, additionalKey, scalaKeyFun, scalaValueFun)
}
@@ -42,10 +46,10 @@ object RedisMapper {
additionalKey: String,
javaKeyFun: TransformFunction[T, String],
javaValueFun: TransformFunction[T, String]): RedisMapper[T] = {
- require(cmd != null, () => s"redis cmd insert failoverTable must not
null")
- require(additionalKey != null, () => s"redis additionalKey insert
failoverTable must not null")
- require(javaKeyFun != null, () => s"redis javaKeyFun insert failoverTable
must not null")
- require(javaValueFun != null, () => s"redis javaValueFun insert
failoverTable must not null")
+ require(cmd != null, () => insertFailoverTableNullHint)
+ require(additionalKey != null, () => additionalFailoverTableNullHint)
+ require(javaKeyFun != null, () => s"Redis javaKeyFun insert failoverTable
must not null")
+ require(javaValueFun != null, () => s"Redis javaValueFun insert
failoverTable must not null")
new RedisMapper[T](cmd, additionalKey, javaKeyFun, javaValueFun)
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/conf/RedisConfig.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/conf/RedisConfig.scala
index dfbb68681..7eda5b51d 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/conf/RedisConfig.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/conf/RedisConfig.scala
@@ -39,7 +39,7 @@ class RedisConfig(parameters: Properties) extends
Serializable {
if (x.contains(sinkOption.SIGN_COLON)) x;
else {
throw new IllegalArgumentException(
- s"redis sentinel host invalid {$x} must match host:port ")
+ s"Redis sentinel host invalid {$x} must match host:port ")
}
})
.toSet
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
index 072e42544..9adb40a43 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-redis/src/main/scala/org/apache/streampark/flink/connector/redis/sink/RedisSink.scala
@@ -97,7 +97,7 @@ class RedisSink(
val field = Try(builder.getClass.getDeclaredField(x._1)).getOrElse
{
throw new IllegalArgumentException(
s"""
- |redis config error,property:${x._1} invalid,init
FlinkJedisSentinelConfig error, property options:
+ |Redis config error,property:${x._1} invalid,init
FlinkJedisSentinelConfig error, property options:
|<String masterName>,
|<Set<String> sentinels>,
|<int connectionTimeout>,
@@ -121,7 +121,7 @@ class RedisSink(
val field = Try(builder.getClass.getDeclaredField(x._1)).getOrElse
{
throw new IllegalArgumentException(
s"""
- |redis config error,property:${x._1} invalid,init
FlinkJedisPoolConfig error,property options:
+ |Redis config error,property:${x._1} invalid,init
FlinkJedisPoolConfig error,property options:
|<String host>,
|<int port>,
|<int timeout>,
@@ -138,7 +138,7 @@ class RedisSink(
builder.build()
case _ =>
throw throw new IllegalArgumentException(
- s"redis connectType must be jedisPool|sentinel $connectType")
+ s"Redis connectType must be jedisPool|sentinel $connectType")
}
}
@@ -155,12 +155,12 @@ class RedisSink(
stream: DataStream[T],
mapper: RedisMapper[T],
ttl: Int = Int.MaxValue): DataStreamSink[T] = {
- require(stream != null, () => s"sink Stream must not null")
- require(mapper != null, () => s"redis mapper must not null")
- require(ttl > 0, () => s"redis ttl must greater than 0")
+ require(stream != null, () => s"Sink Stream must not null")
+ require(mapper != null, () => s"Redis mapper must not null")
+ require(ttl > 0, () => s"Redis ttl must greater than 0")
val sinkFun = (enableCheckpoint, cpMode) match {
case (false, CheckpointingMode.EXACTLY_ONCE) =>
- throw new IllegalArgumentException("redis sink EXACTLY_ONCE must
enable checkpoint")
+ throw new IllegalArgumentException("Redis sink EXACTLY_ONCE must
enable checkpoint")
case (true, CheckpointingMode.EXACTLY_ONCE) =>
new Redis2PCSinkFunction[T](config, mapper, ttl)
case _ => new RedisSinkFunction[T](config, mapper, ttl)
diff --git
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
index b57d7f478..a245508ee 100644
---
a/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
+++
b/streampark-flink/streampark-flink-kubernetes-v2/streampark-flink-kubernetes-core/src/main/scala/org/apache/streampark/flink/kubernetes/v2/operator/CROperator.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.kubernetes.v2.operator
+import org.apache.streampark.common.Constant
import org.apache.streampark.flink.kubernetes.v2.{pathLastSegment, yamlMapper}
import org.apache.streampark.flink.kubernetes.v2.K8sTools.usingK8sClient
import org.apache.streampark.flink.kubernetes.v2.fs.FileMirror
@@ -67,12 +68,13 @@ object CROperator extends CROperator {
// Generate FlinkDeployment CR
correctedJob <- mirrorJobJarToHttpFileServer(spec.job,
mirrorSpace)
correctedExtJars <- mirrorExtJarsToHttpFileServer(spec.extJarPaths,
mirrorSpace)
- correctedPod <- correctPodSpec(
- spec.podTemplate,
- correctedExtJars ++
correctedJob.map(_.jarURI).filter(_.startsWith("http://")).toArray[String]
- )
+ correctedPod <-
+ correctPodSpec(
+ spec.podTemplate,
+ correctedExtJars ++
correctedJob.map(_.jarURI).filter(_.startsWith(Constant.HTTP_SCHEMA)).toArray[String]
+ )
correctedLocalUriJob = correctedJob.map { jobDef =>
- if (!jobDef.jarURI.startsWith("http://")) jobDef
+ if
(!jobDef.jarURI.startsWith(Constant.HTTP_SCHEMA)) jobDef
else jobDef.copy("local:///opt/flink/lib/" +
pathLastSegment(jobDef.jarURI))
}
correctedSpec = spec.copy(
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
index 237dfd5d9..e67b74b28 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/ClusterKey.scala
@@ -17,13 +17,13 @@
package org.apache.streampark.flink.kubernetes.model
-import org.apache.streampark.common.conf.K8sFlinkConfig
+import org.apache.streampark.common.Constant
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
/** flink cluster identifier on kubernetes */
case class ClusterKey(
executeMode: FlinkK8sExecuteModeEnum.Value,
- namespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
+ namespace: String = Constant.DEFAULT,
clusterId: String)
object ClusterKey {
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
index 34e6dd30d..2dc7d0c7d 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/model/TrackId.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.kubernetes.model
-import org.apache.streampark.common.conf.K8sFlinkConfig
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteModeEnum
@@ -26,7 +26,7 @@ import scala.util.Try
/** tracking identifier for flink on kubernetes */
case class TrackId(
executeMode: FlinkK8sExecuteModeEnum.Value,
- namespace: String = K8sFlinkConfig.DEFAULT_KUBERNETES_NAMESPACE,
+ namespace: String = Constant.DEFAULT,
clusterId: String,
appId: Long,
jobId: String,
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/PackerResourceGC.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/PackerResourceGC.scala
index 7b3c1d1c0..b73cf0e84 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/PackerResourceGC.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/PackerResourceGC.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.packer
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.Workspace
import org.apache.streampark.common.util.Logger
@@ -57,7 +58,7 @@ object PackerResourceGC extends Logger {
}
private def findLastModifiedOfSubFile(file: File): Array[(File, Long)] = {
- val isApplicationMode =
file.listFiles.map(_.getName).exists(_.contains(".jar"))
+ val isApplicationMode =
file.listFiles.map(_.getName).exists(_.contains(Constant.JAR_SUFFIX))
if (isApplicationMode) {
Array(file -> file.listFiles.map(_.lastModified).max)
} else {
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/FlinkDockerfileTemplateTrait.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/FlinkDockerfileTemplateTrait.scala
index f24092ea4..319c862d4 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/FlinkDockerfileTemplateTrait.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/docker/FlinkDockerfileTemplateTrait.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.packer.docker
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.fs.LfsOperator
import org.apache.commons.io.FileUtils
@@ -80,10 +81,13 @@ trait FlinkDockerfileTemplateTrait {
flinkExtraLibPaths
.map(new File(_))
.filter(_.exists())
- .filter(_.getName.endsWith(".jar"))
+ .filter(_.getName.endsWith(Constant.JAR_SUFFIX))
.flatMap {
case f if f.isDirectory =>
-
f.listFiles.filter(_.isFile).filter(_.getName.endsWith(".jar")).map(_.getAbsolutePath)
+ f.listFiles
+ .filter(_.isFile)
+ .filter(_.getName.endsWith(Constant.JAR_SUFFIX))
+ .map(_.getAbsolutePath)
case f if f.isFile => Array(f.getAbsolutePath)
}
.foreach(LfsOperator.copy(_, s"${workspace.toString}/$FLINK_LIB_PATH"))
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
index 03eb83839..bd4648151 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/maven/MavenTool.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.packer.maven
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{InternalConfigHolder, Workspace}
import org.apache.streampark.common.conf.CommonConfig.{MAVEN_AUTH_PASSWORD,
MAVEN_AUTH_USER, MAVEN_REMOTE_URL}
import org.apache.streampark.common.util.{Logger, Utils}
@@ -60,7 +61,10 @@ object MavenTool extends Logger {
private[this] def getRemoteRepos(): List[RemoteRepository] = {
val builder =
- new RemoteRepository.Builder("central", "default",
InternalConfigHolder.get(MAVEN_REMOTE_URL))
+ new RemoteRepository.Builder(
+ "central",
+ Constant.DEFAULT,
+ InternalConfigHolder.get(MAVEN_REMOTE_URL))
val remoteRepository =
if (
InternalConfigHolder.get(MAVEN_AUTH_USER) == null ||
InternalConfigHolder.get(
@@ -98,8 +102,9 @@ object MavenTool extends Logger {
// check userJarPath
val uberJar = new File(outFatJarPath)
require(
- outFatJarPath.endsWith(".jar") && !uberJar.isDirectory,
- s"[StreamPark] streampark-packer: outFatJarPath($outFatJarPath) should
be a JAR file.")
+ outFatJarPath.endsWith(Constant.JAR_SUFFIX) && !uberJar.isDirectory,
+ s"[StreamPark] streampark-packer: outFatJarPath($outFatJarPath) should
be a JAR file."
+ )
uberJar.delete()
// resolve all jarLibs
val jarSet = new util.HashSet[File]
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildPipeline.scala
index bd690667f..968b8efcd 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/BuildPipeline.scala
@@ -97,20 +97,20 @@ trait BuildPipeline extends BuildPipelineProcess with
BuildPipelineExpose with L
Try {
curStep = seq
stepsStatus(seq) = PipelineStepStatusEnum.running ->
System.currentTimeMillis
- logInfo(s"building pipeline step[$seq/$allSteps] running =>
${pipeType.getSteps.get(seq)}")
+ logInfo(s"Building pipeline step[$seq/$allSteps] running =>
${pipeType.getSteps.get(seq)}")
watcher.onStepStateChange(snapshot)
process
} match {
case Success(result) =>
stepsStatus(seq) = PipelineStepStatusEnum.success ->
System.currentTimeMillis
- logInfo(s"building pipeline step[$seq/$allSteps] success")
+ logInfo(s"Building pipeline step[$seq/$allSteps] success")
watcher.onStepStateChange(snapshot)
Some(result)
case Failure(cause) =>
stepsStatus(seq) = PipelineStepStatusEnum.failure ->
System.currentTimeMillis
pipeStatus = PipelineStatusEnum.failure
error = PipeError.of(cause.getMessage, cause)
- logInfo(s"building pipeline step[$seq/$allSteps] failure =>
${pipeType.getSteps.get(seq)}")
+ logInfo(s"Building pipeline step[$seq/$allSteps] failure =>
${pipeType.getSteps.get(seq)}")
watcher.onStepStateChange(snapshot)
None
}
@@ -119,7 +119,7 @@ trait BuildPipeline extends BuildPipelineProcess with
BuildPipelineExpose with L
protected def skipStep(step: Int): Unit = {
curStep = step
stepsStatus(step) = PipelineStepStatusEnum.skipped ->
System.currentTimeMillis
- logInfo(s"building pipeline step[$step/$allSteps] skipped =>
${pipeType.getSteps.get(step)}")
+ logInfo(s"Building pipeline step[$step/$allSteps] skipped =>
${pipeType.getSteps.get(step)}")
watcher.onStepStateChange(snapshot)
}
@@ -128,7 +128,7 @@ trait BuildPipeline extends BuildPipelineProcess with
BuildPipelineExpose with L
pipeStatus = PipelineStatusEnum.running
Try {
watcher.onStart(snapshot)
- logInfo(s"building pipeline is launching,
params=${offerBuildParam.toString}")
+ logInfo(s"Building pipeline is launching,
params=${offerBuildParam.toString}")
executor
.submit(new Callable[BuildResult] {
override def call(): BuildResult = buildProcess()
@@ -137,14 +137,14 @@ trait BuildPipeline extends BuildPipelineProcess with
BuildPipelineExpose with L
} match {
case Success(result) =>
pipeStatus = PipelineStatusEnum.success
- logInfo(s"building pipeline has finished successfully.")
+ logInfo(s"Building pipeline has finished successfully.")
watcher.onFinish(snapshot, result)
result
case Failure(cause) =>
pipeStatus = PipelineStatusEnum.failure
error = PipeError.of(cause.getMessage, cause)
// log and print error trace stack
- logError(s"building pipeline has failed.", cause)
+ logError(s"Building pipeline has failed.", cause)
val result = ErrorResult()
watcher.onFinish(snapshot, result)
result
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
index 6caba3e35..ba8af634b 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala
@@ -67,7 +67,7 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
execStep(1) {
val buildWorkspace =
s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
LfsOperator.mkCleanDirs(buildWorkspace)
- logInfo(s"recreate building workspace: $buildWorkspace")
+ logInfo(s"Recreate building workspace: $buildWorkspace")
buildWorkspace
}.getOrElse(throw getError.exception)
@@ -80,7 +80,7 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
execStep(2) {
val podTemplateFiles =
PodTemplateTool.preparePodTemplateFiles(buildWorkspace,
podTemplate).tmplFiles
- logInfo(s"export flink podTemplates:
${podTemplateFiles.values.mkString(",")}")
+ logInfo(s"Export flink podTemplates:
${podTemplateFiles.values.mkString(",")}")
podTemplateFiles
}.getOrElse(throw getError.exception)
}
@@ -96,7 +96,7 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
}
val shadedJar =
MavenTool.buildFatJar(request.mainClass, request.providedLibs,
shadedJarOutputPath)
- logInfo(s"output shaded flink job jar: ${shadedJar.getAbsolutePath}")
+ logInfo(s"Output shaded flink job jar: ${shadedJar.getAbsolutePath}")
shadedJar -> extJarLibs
}.getOrElse(throw getError.exception)
@@ -120,14 +120,14 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
}
val dockerFile = dockerFileTemplate.writeDockerfile
logInfo(
- s"output flink dockerfile: ${dockerFile.getAbsolutePath}, content:
\n${dockerFileTemplate.offerDockerfileContent}")
+ s"Output flink dockerfile: ${dockerFile.getAbsolutePath}, content:
\n${dockerFileTemplate.offerDockerfileContent}")
dockerFile -> dockerFileTemplate
}.getOrElse(throw getError.exception)
val dockerConf = request.dockerConfig
val baseImageTag = request.flinkBaseImage.trim
val pushImageTag = {
- val expectedImageTag =
s"streamparkflinkjob-${request.k8sNamespace}-${request.clusterId}"
+ val expectedImageTag = s"streampark
flinkjob-${request.k8sNamespace}-${request.clusterId}"
compileTag(expectedImageTag, dockerConf.registerAddress,
dockerConf.imageNamespace)
}
@@ -155,8 +155,8 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
})
pullCmdCallback.awaitCompletion
- logInfo(s"already pulled docker image from remote register,
imageTag=$baseImageTag")
- }(err => throw new Exception(s"pull docker image failed,
imageTag=$baseImageTag", err))
+ logInfo(s"Already pulled docker image from remote register,
imageTag=$baseImageTag")
+ }(err => throw new Exception(s"Pull docker image failed,
imageTag=$baseImageTag", err))
}.getOrElse(throw getError.exception)
// Step-6: build flink image
@@ -178,8 +178,8 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
dockerProcessWatcher.onDockerBuildProgressChange(dockerProcess.build.snapshot))
})
val imageId = buildCmdCallback.awaitImageId
- logInfo(s"built docker image, imageId=$imageId,
imageTag=$pushImageTag")
- }(err => throw new Exception(s"build docker image failed.
tag=$pushImageTag", err))
+ logInfo(s"Built docker image, imageId=$imageId,
imageTag=$pushImageTag")
+ }(err => throw new Exception(s"Build docker image failed.
tag=$pushImageTag", err))
}.getOrElse(throw getError.exception)
// Step-7: push flink image
@@ -198,8 +198,8 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
Future(dockerProcessWatcher.onDockerPushProgressChange(dockerProcess.push.snapshot))
})
pushCmdCallback.awaitCompletion
- logInfo(s"already pushed docker image, imageTag=$pushImageTag")
- }(err => throw new Exception(s"push docker image failed.
tag=$pushImageTag", err))
+ logInfo(s"Already pushed docker image, imageTag=$pushImageTag")
+ }(err => throw new Exception(s"Push docker image failed.
tag=$pushImageTag", err))
}.getOrElse(throw getError.exception)
// Step-8: init build workspace of ingress
@@ -211,7 +211,7 @@ class FlinkK8sApplicationBuildPipeline(request:
FlinkK8sApplicationBuildRequest)
execStep(8) {
val ingressOutputPath =
IngressController.prepareIngressTemplateFiles(buildWorkspace,
request.ingressTemplate)
- logInfo(s"export flink ingress: $ingressOutputPath")
+ logInfo(s"Export flink ingress: $ingressOutputPath")
ingressOutputPath
}.getOrElse(throw getError.exception)
}
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
index 5e086e517..544e18e41 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipelineV2.scala
@@ -41,7 +41,7 @@ class FlinkK8sApplicationBuildPipelineV2(request:
FlinkK8sApplicationBuildReques
execStep(1) {
val buildWorkspace =
s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
LfsOperator.mkCleanDirs(buildWorkspace)
- logInfo(s"recreate building workspace: $buildWorkspace")
+ logInfo(s"Recreate building workspace: $buildWorkspace")
buildWorkspace
}.getOrElse(throw getError.exception)
@@ -57,7 +57,7 @@ class FlinkK8sApplicationBuildPipelineV2(request:
FlinkK8sApplicationBuildReques
}
val shadedJar =
MavenTool.buildFatJar(request.mainClass, request.providedLibs,
shadedJarOutputPath)
- logInfo(s"output shaded flink job jar: ${shadedJar.getAbsolutePath}")
+ logInfo(s"Output shaded flink job jar: ${shadedJar.getAbsolutePath}")
shadedJar -> extJarLibs
}.getOrElse(throw getError.exception)
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sSessionBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sSessionBuildPipeline.scala
index 9b86e9b5f..5d6eb5696 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sSessionBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sSessionBuildPipeline.scala
@@ -38,7 +38,7 @@ class FlinkK8sSessionBuildPipeline(request:
FlinkK8sSessionBuildRequest) extends
execStep(1) {
val buildWorkspace =
s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
LfsOperator.mkCleanDirs(buildWorkspace)
- logInfo(s"recreate building workspace: $buildWorkspace")
+ logInfo(s"Recreate building workspace: $buildWorkspace")
buildWorkspace
}.getOrElse(throw getError.exception)
@@ -50,7 +50,7 @@ class FlinkK8sSessionBuildPipeline(request:
FlinkK8sSessionBuildRequest) extends
request.mainClass,
request.providedLibs,
request.getShadedJarPath(buildWorkspace))
- logInfo(s"output shaded flink job jar: ${output.getAbsolutePath}")
+ logInfo(s"Output shaded flink job jar: ${output.getAbsolutePath}")
output
}.getOrElse(throw getError.exception)
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
index fbd741070..11fd54a78 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkRemoteBuildPipeline.scala
@@ -44,7 +44,7 @@ class FlinkRemoteBuildPipeline(request:
FlinkRemotePerJobBuildRequest) extends B
} else {
execStep(1) {
LfsOperator.mkCleanDirs(request.workspace)
- logInfo(s"recreate building workspace: ${request.workspace}")
+ logInfo(s"Recreate building workspace: ${request.workspace}")
}.getOrElse(throw getError.exception)
// build flink job shaded jar
val shadedJar =
diff --git
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
index e804c834d..69ea6bf60 100644
---
a/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
+++
b/streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkYarnApplicationBuildPipeline.scala
@@ -52,7 +52,7 @@ class FlinkYarnApplicationBuildPipeline(request:
FlinkYarnApplicationBuildReques
HdfsOperator.mkCleanDirs(request.yarnProvidedPath)
case _ =>
}
- logInfo(s"recreate building workspace: ${request.yarnProvidedPath}")
+ logInfo(s"Recreate building workspace: ${request.yarnProvidedPath}")
}.getOrElse(throw getError.exception)
val mavenJars =
diff --git
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
index 44294d756..2a8050f35 100644
---
a/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
+++
b/streampark-flink/streampark-flink-proxy/src/main/scala/org/apache/streampark/flink/proxy/FlinkShimsProxy.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.proxy
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.conf.{ConfigConst, FlinkVersion}
import org.apache.streampark.common.util.{ClassLoaderUtils, Logger, Utils}
import org.apache.streampark.common.util.ImplicitsUtils._
@@ -78,7 +79,7 @@ object FlinkShimsProxy extends Logger {
// need to load all flink-table dependencies compatible with different
versions
def getVerifySqlLibClassLoader(flinkVersion: FlinkVersion): ClassLoader = {
- logInfo(s"add verify sql lib,flink version: $flinkVersion")
+ logInfo(s"Add verify sql lib,flink version: $flinkVersion")
VERIFY_SQL_CLASS_LOADER_CACHE.getOrElseUpdate(
s"${flinkVersion.fullVersion}", {
val getFlinkTable: File => Boolean =
_.getName.startsWith("flink-table")
@@ -123,20 +124,20 @@ object FlinkShimsProxy extends Logger {
.foreach(
(jar: File) => {
val jarName = jar.getName
- if (jarName.endsWith(".jar")) {
+ if (jarName.endsWith(Constant.JAR_SUFFIX)) {
if (jarName.startsWith(FLINK_SHIMS_PREFIX)) {
val prefixVer =
s"$FLINK_SHIMS_PREFIX-${majorVersion}_$scalaVersion"
if (jarName.startsWith(prefixVer)) {
addShimUrl(jar)
- logInfo(s"include flink shims jar lib: $jarName")
+ logInfo(s"Include flink shims jar lib: $jarName")
}
} else {
if (INCLUDE_PATTERN.matcher(jarName).matches()) {
addShimUrl(jar)
- logInfo(s"include jar lib: $jarName")
+ logInfo(s"Include jar lib: $jarName")
} else if (jarName.matches(s"^streampark-.*_$scalaVersion.*$$"))
{
addShimUrl(jar)
- logInfo(s"include streampark lib: $jarName")
+ logInfo(s"Include streampark lib: $jarName")
}
}
}
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
index 26c31f928..417d00adb 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/core/FlinkSqlValidator.scala
@@ -16,6 +16,7 @@
*/
package org.apache.streampark.flink.core
+import org.apache.streampark.common.Constant
import org.apache.streampark.common.enums.FlinkSqlValidationFailedType
import org.apache.streampark.common.util.{ExceptionUtils, Logger, Utils}
import org.apache.streampark.flink.core.SqlCommand._
@@ -83,7 +84,7 @@ object FlinkSqlValidator extends Logger {
.getOrElse(Class.forName(FLINK113_PLUS_CALCITE_PARSER_CLASS))
sqlDialect.toUpperCase() match {
case "HIVE" =>
- case "DEFAULT" =>
+ case Constant.DEFAULT =>
val parser = calciteClass
.getConstructor(Array(classOf[Config]): _*)
.newInstance(sqlParserConfigMap(sqlDialect.toUpperCase()))
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
index b2f5a6eb7..0652a2e3d 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/pom.xml
@@ -94,6 +94,11 @@
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-common_${scala.binary.version}</artifactId>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
index a6e516657..6a34fa492 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/FactoryUtil.java
@@ -17,6 +17,7 @@
package org.apache.streampark.gateway.factories;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.gateway.ConfigOption;
import org.apache.streampark.gateway.exception.ValidationException;
@@ -30,7 +31,7 @@ import java.util.stream.Collectors;
/** Factory utils for {@link Factory}. */
public class FactoryUtil {
- private static final String DEFAULT_IDENTIFIER = "default";
+ private static final String DEFAULT_IDENTIFIER = Constant.DEFAULT;
private static final Logger LOG = LoggerFactory.getLogger(FactoryUtil.class);
public static final ConfigOption<String> SQL_GATEWAY_SERVICE_TYPE =
ConfigOption.key("streampark.sql-gateway.service")
diff --git
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
index a219739bb..e3e1f4464 100644
---
a/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
+++
b/streampark-flink/streampark-flink-sql-gateway/streampark-flink-sql-gateway-base/src/main/java/org/apache/streampark/gateway/factories/SqlGatewayServiceFactoryUtils.java
@@ -17,6 +17,7 @@
package org.apache.streampark.gateway.factories;
+import org.apache.streampark.common.Constant;
import org.apache.streampark.gateway.ConfigOption;
import org.apache.streampark.gateway.exception.ValidationException;
import org.apache.streampark.gateway.service.SqlGatewayService;
@@ -56,7 +57,7 @@ public class SqlGatewayServiceFactoryUtils {
"Service options do not contain an option key '%s'
for discovering an service.",
SQL_GATEWAY_SERVICE_TYPE.getKey())));
- List<String> identifiers = Arrays.asList(identifiersStr.split(";"));
+ List<String> identifiers =
Arrays.asList(identifiersStr.split(Constant.SEMICOLON));
if (identifiers.isEmpty()) {
throw new ValidationException(