This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git

commit 0ceaf1a6a047d3c2041fcb056a9a9a05059c99d8
Author: benjobs <[email protected]>
AuthorDate: Fri Jan 24 16:10:46 2025 +0800

    [Improve] jdbc config improvement
---
 .../main/scala/org/apache/streampark/common/util/ConfigUtils.scala   | 2 +-
 .../apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java    | 5 +++--
 .../streampark/flink/connector/jdbc/source/JdbcJavaSource.java       | 5 +++--
 .../org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala   | 2 +-
 4 files changed, 8 insertions(+), 6 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
index 8e6dcf671..cc8b30e9d 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
@@ -77,7 +77,7 @@ object ConfigUtils {
    * @param alias
    * @return
    */
-  def getJdbcConf(parameter: JavaMap[String, String], alias: String = ""): 
Properties = {
+  def getJdbcProperties(parameter: JavaMap[String, String], alias: String = 
""): Properties = {
     val prefix = alias match {
       case "" | null => KEY_JDBC_PREFIX
       case other => s"$KEY_JDBC_PREFIX$other".replaceFirst("\\.+$|$", ".")
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
index 86fca0388..8f9634978 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java
@@ -56,8 +56,9 @@ public class JdbcJavaSink<T> {
 
   public DataStreamSink<T> sink(DataStream<T> dataStream) {
     Utils.notNull(sqlFunc, "transformFunction can not be null");
-    this.jdbc =
-        this.jdbc == null ? 
ConfigUtils.getJdbcConf(context.parameter().toMap(), alias) : this.jdbc;
+    if (this.jdbc == null) {
+      this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), 
alias);
+    }
     JdbcSinkFunction<T> sinkFun = new JdbcSinkFunction<>(this.jdbc, 
this.sqlFunc);
     return dataStream.addSink(sinkFun);
   }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index 69f494f23..e3f36d34c 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -56,8 +56,9 @@ public class JdbcJavaSource<T> {
 
     Utils.notNull(queryFunction, "queryFunction must not be null");
     Utils.notNull(resultFunction, "resultFunction must not be null");
-    this.jdbc =
-        this.jdbc == null ? 
ConfigUtils.getJdbcConf(context.parameter().toMap(), alias) : this.jdbc;
+    if (this.jdbc == null) {
+      this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), 
alias);
+    }
     JdbcSourceFunction<T> sourceFunction =
         new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction, 
runningFunc, null);
     return context.getJavaEnv().addSource(sourceFunction);
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
index 7d157c0db..1ea25d24a 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala
@@ -72,7 +72,7 @@ class JdbcSink(
    * @return
    */
   def sink[T](stream: DataStream[T])(implicit toSQLFn: T => String): 
DataStreamSink[T] = {
-    val prop = ConfigUtils.getJdbcConf(ctx.parameter.toMap, alias)
+    val prop = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias)
     val semantic = Semantic.of(prop.getProperty(KEY_SEMANTIC, 
Semantic.NONE.name()))
     val sink = semantic match {
       case Semantic.EXACTLY_ONCE =>

Reply via email to