This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
new 9c60db249 [Improve] jdbc-connector filterFunction improvement
9c60db249 is described below
commit 9c60db24902494ed5a554655a264e1e9d77d2e76
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 26 20:47:16 2025 +0800
[Improve] jdbc-connector filterFunction improvement
---
.../apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala | 3 +--
.../apache/streampark/flink/quickstart/connector/MongoSourceApp.scala | 3 +--
.../apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala | 3 +--
.../apache/streampark/flink/connector/hbase/source/HBaseSource.scala | 2 +-
.../org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala | 2 +-
.../apache/streampark/flink/connector/mongo/source/MongoSource.scala | 2 +-
6 files changed, 6 insertions(+), 9 deletions(-)
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
index 3d56679d7..67e072d2e 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
@@ -47,8 +47,7 @@ object HBaseSourceApp extends FlinkStreaming {
new HBaseQuery("person", new Scan())
}
},
- r => new String(r.getRow),
- null
+ r => new String(r.getRow)
)
HBaseRequest(id)
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
index 4329758ff..37873a964 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
@@ -50,8 +50,7 @@ object MongoSourceApp extends FlinkStreaming {
.append("updateTime", new BasicDBObject("$gte",
DateUtils.parse(offset)))
d.find(cond)
},
- _.toList.map(_.toJson()),
- null
+ _.toList.map(_.toJson())
)
.print()
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
index dcc28ffd1..3b7447a67 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
@@ -33,8 +33,7 @@ object MySQLSourceApp extends FlinkStreaming {
val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else
lastOne.timestamp
s"select * from t_order where timestamp > '$laseOffset' order by
timestamp asc "
},
- _.map(x => new Order(x("market_id").toString,
x("timestamp").toString)),
- null
+ _.map(x => new Order(x("market_id").toString, x("timestamp").toString))
)
.print()
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
index fa2265ed5..32f8de5bb 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
@@ -51,7 +51,7 @@ class HBaseSource(
def getDataStream[R: TypeInformation](
query: R => HBaseQuery,
func: Result => R,
- running: R => Boolean): DataStream[R] = {
+ running: R => Boolean = null): DataStream[R] = {
if (query == null) {
throw new NullPointerException("getDataStream error, SQLQueryFunction
must not be null")
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
index 458971bc9..00e3d6663 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
@@ -50,7 +50,7 @@ class JdbcSource(
def getDataStream[R: TypeInformation](
sqlFun: R => String,
func: Iterable[Map[String, _]] => Iterable[R],
- filter: R => Boolean): DataStream[R] = {
+ filter: R => Boolean = null): DataStream[R] = {
val jdbc = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias)
if (property != null) {
jdbc.putAll(property)
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
index 6cfdfcb2f..21d5a6c08 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
@@ -45,7 +45,7 @@ class MongoSource(
collection: String,
queryFun: (R, MongoCollection[Document]) => FindIterable[Document],
resultFun: MongoCursor[Document] => List[R],
- filter: R => Boolean)(implicit prop: Properties = new Properties()):
DataStream[R] = {
+ filter: R => Boolean = null)(implicit prop: Properties = new
Properties()): DataStream[R] = {
Utils.copyProperties(property, prop)
val mongoFun = new MongoSourceFunction[R](collection, prop, queryFun,
resultFun, filter)