This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
     new 9c60db249 [Improve] jdbc-connector filterFunction improvement
9c60db249 is described below

commit 9c60db24902494ed5a554655a264e1e9d77d2e76
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 26 20:47:16 2025 +0800

    [Improve] jdbc-connector filterFunction improvement
---
 .../apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala  | 3 +--
 .../apache/streampark/flink/quickstart/connector/MongoSourceApp.scala  | 3 +--
 .../apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala  | 3 +--
 .../apache/streampark/flink/connector/hbase/source/HBaseSource.scala   | 2 +-
 .../org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala | 2 +-
 .../apache/streampark/flink/connector/mongo/source/MongoSource.scala   | 2 +-
 6 files changed, 6 insertions(+), 9 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
index 3d56679d7..67e072d2e 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
@@ -47,8 +47,7 @@ object HBaseSourceApp extends FlinkStreaming {
           new HBaseQuery("person", new Scan())
         }
       },
-      r => new String(r.getRow),
-      null
+      r => new String(r.getRow)
     )
 
     HBaseRequest(id)
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
index 4329758ff..37873a964 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
@@ -50,8 +50,7 @@ object MongoSourceApp extends FlinkStreaming {
             .append("updateTime", new BasicDBObject("$gte", 
DateUtils.parse(offset)))
           d.find(cond)
         },
-        _.toList.map(_.toJson()),
-        null
+        _.toList.map(_.toJson())
       )
       .print()
   }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
index dcc28ffd1..3b7447a67 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
@@ -33,8 +33,7 @@ object MySQLSourceApp extends FlinkStreaming {
           val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else 
lastOne.timestamp
           s"select * from t_order where timestamp > '$laseOffset' order by 
timestamp asc "
         },
-        _.map(x => new Order(x("market_id").toString, 
x("timestamp").toString)),
-        null
+        _.map(x => new Order(x("market_id").toString, x("timestamp").toString))
       )
       .print()
 
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
index fa2265ed5..32f8de5bb 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
@@ -51,7 +51,7 @@ class HBaseSource(
   def getDataStream[R: TypeInformation](
       query: R => HBaseQuery,
       func: Result => R,
-      running: R => Boolean): DataStream[R] = {
+      running: R => Boolean = null): DataStream[R] = {
 
     if (query == null) {
       throw new NullPointerException("getDataStream error, SQLQueryFunction 
must not be null")
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
index 458971bc9..00e3d6663 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
@@ -50,7 +50,7 @@ class JdbcSource(
   def getDataStream[R: TypeInformation](
       sqlFun: R => String,
       func: Iterable[Map[String, _]] => Iterable[R],
-      filter: R => Boolean): DataStream[R] = {
+      filter: R => Boolean = null): DataStream[R] = {
     val jdbc = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias)
     if (property != null) {
       jdbc.putAll(property)
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
index 6cfdfcb2f..21d5a6c08 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
@@ -45,7 +45,7 @@ class MongoSource(
       collection: String,
       queryFun: (R, MongoCollection[Document]) => FindIterable[Document],
       resultFun: MongoCursor[Document] => List[R],
-      filter: R => Boolean)(implicit prop: Properties = new Properties()): 
DataStream[R] = {
+      filter: R => Boolean = null)(implicit prop: Properties = new 
Properties()): DataStream[R] = {
 
     Utils.copyProperties(property, prop)
     val mongoFun = new MongoSourceFunction[R](collection, prop, queryFun, 
resultFun, filter)

Reply via email to