This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch dev-2.1.6 in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
commit 0ceaf1a6a047d3c2041fcb056a9a9a05059c99d8 Author: benjobs <[email protected]> AuthorDate: Fri Jan 24 16:10:46 2025 +0800 [Improve] jdbc config improvement --- .../main/scala/org/apache/streampark/common/util/ConfigUtils.scala | 2 +- .../apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java | 5 +++-- .../streampark/flink/connector/jdbc/source/JdbcJavaSource.java | 5 +++-- .../org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala | 2 +- 4 files changed, 8 insertions(+), 6 deletions(-) diff --git a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala index 8e6dcf671..cc8b30e9d 100644 --- a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala +++ b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala @@ -77,7 +77,7 @@ object ConfigUtils { * @param alias * @return */ - def getJdbcConf(parameter: JavaMap[String, String], alias: String = ""): Properties = { + def getJdbcProperties(parameter: JavaMap[String, String], alias: String = ""): Properties = { val prefix = alias match { case "" | null => KEY_JDBC_PREFIX case other => s"$KEY_JDBC_PREFIX$other".replaceFirst("\\.+$|$", ".") diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java index 86fca0388..8f9634978 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/sink/JdbcJavaSink.java @@ -56,8 +56,9 @@ public class JdbcJavaSink<T> { public DataStreamSink<T> sink(DataStream<T> dataStream) { Utils.notNull(sqlFunc, "transformFunction can not be null"); - this.jdbc = - this.jdbc == null ? ConfigUtils.getJdbcConf(context.parameter().toMap(), alias) : this.jdbc; + if (this.jdbc == null) { + this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), alias); + } JdbcSinkFunction<T> sinkFun = new JdbcSinkFunction<>(this.jdbc, this.sqlFunc); return dataStream.addSink(sinkFun); } diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java index 69f494f23..e3f36d34c 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java @@ -56,8 +56,9 @@ public class JdbcJavaSource<T> { Utils.notNull(queryFunction, "queryFunction must not be null"); Utils.notNull(resultFunction, "resultFunction must not be null"); - this.jdbc = - this.jdbc == null ? ConfigUtils.getJdbcConf(context.parameter().toMap(), alias) : this.jdbc; + if (this.jdbc == null) { + this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), alias); + } JdbcSourceFunction<T> sourceFunction = new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction, runningFunc, null); return context.getJavaEnv().addSource(sourceFunction); diff --git a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala index 7d157c0db..1ea25d24a 100644 --- a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala +++ b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/sink/JdbcSink.scala @@ -72,7 +72,7 @@ class JdbcSink( * @return */ def sink[T](stream: DataStream[T])(implicit toSQLFn: T => String): DataStreamSink[T] = { - val prop = ConfigUtils.getJdbcConf(ctx.parameter.toMap, alias) + val prop = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias) val semantic = Semantic.of(prop.getProperty(KEY_SEMANTIC, Semantic.NONE.name())) val sink = semantic match { case Semantic.EXACTLY_ONCE =>
