This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
new 0ed5a0f11 [Improve] jdbc-datastream-connector filterFunction
improvements
0ed5a0f11 is described below
commit 0ed5a0f11841201e478f8799530e3e882215aa38
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 26 20:44:02 2025 +0800
[Improve] jdbc-datastream-connector filterFunction improvements
---
.../flink/quickstart/connector/MySQLJavaApp.java | 8 +-
.../{RunningFunction.java => FilterFunction.java} | 10 +--
.../{SQLQueryFunction.java => QueryFunction.java} | 2 +-
...{SQLResultFunction.java => ResultFunction.java} | 2 +-
.../connector/hbase/source/HBaseJavaSource.java | 6 +-
.../hbase/internal/HBaseSourceFunction.scala | 95 +++++++++++-----------
.../flink/connector/hbase/source/HBaseSource.scala | 2 +-
.../connector/jdbc/source/JdbcJavaSource.java | 18 ++--
.../jdbc/internal/JdbcSourceFunction.scala | 77 +++++++++---------
.../flink/connector/jdbc/source/JdbcSource.scala | 7 +-
.../connector/mongo/source/MongoJavaSource.java | 4 +-
.../mongo/internal/MongoSourceFunction.scala | 60 +++++++-------
.../flink/connector/mongo/source/MongoSource.scala | 4 +-
13 files changed, 146 insertions(+), 149 deletions(-)
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
index 4248843af..a11772ef9 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
@@ -16,8 +16,8 @@
*/
package org.apache.streampark.flink.quickstart.connector;
-import org.apache.streampark.flink.connector.function.SQLQueryFunction;
-import org.apache.streampark.flink.connector.function.SQLResultFunction;
+import org.apache.streampark.flink.connector.function.QueryFunction;
+import org.apache.streampark.flink.connector.function.ResultFunction;
import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
@@ -40,7 +40,7 @@ public class MySQLJavaApp {
// 读取MySQL数据源
new JdbcJavaSource<>(context, Order.class)
.getDataStream(
- (SQLQueryFunction<Order>)
+ (QueryFunction<Order>)
lastOne -> {
// 5秒抽取一次
Thread.sleep(3000);
@@ -52,7 +52,7 @@ public class MySQLJavaApp {
+ "order by timestamp asc ",
lastOffset);
},
- (SQLResultFunction<Order>)
+ (ResultFunction<Order>)
map -> {
List<Order> result = new ArrayList<>();
map.forEach(
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/RunningFunction.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/FilterFunction.java
similarity index 86%
rename from
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/RunningFunction.java
rename to
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/FilterFunction.java
index bfc0eb69f..1cc96e2d2 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/RunningFunction.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/FilterFunction.java
@@ -20,12 +20,8 @@ package org.apache.streampark.flink.connector.function;
import java.io.Serializable;
@FunctionalInterface
-public interface RunningFunction extends Serializable {
+public interface FilterFunction<T> extends Serializable {
- /**
- * Is it running...
- *
- * @return Boolean: isRunning
- */
- Boolean running();
+ /** filter function */
+ Boolean filter(T t);
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/QueryFunction.java
similarity index 94%
rename from
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
rename to
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/QueryFunction.java
index fc3c86bec..5dae375ad 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/QueryFunction.java
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.function;
import java.io.Serializable;
@FunctionalInterface
-public interface SQLQueryFunction<T> extends Serializable {
+public interface QueryFunction<T> extends Serializable {
/**
* Get the SQL to query
*
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLResultFunction.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/ResultFunction.java
similarity index 95%
rename from
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLResultFunction.java
rename to
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/ResultFunction.java
index 552bd8010..cff6620e0 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLResultFunction.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/ResultFunction.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
import java.util.Map;
@FunctionalInterface
-public interface SQLResultFunction<T> extends Serializable {
+public interface ResultFunction<T> extends Serializable {
/**
* The result of the search is returned as a Map, and the user can convert
it into an object.
*
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
index 4eccc4490..771a8bb54 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
@@ -18,7 +18,7 @@
package org.apache.streampark.flink.connector.hbase.source;
import org.apache.streampark.common.util.ConfigUtils;
-import org.apache.streampark.flink.connector.function.RunningFunction;
+import org.apache.streampark.flink.connector.function.FilterFunction;
import org.apache.streampark.flink.connector.hbase.function.HBaseQueryFunction;
import
org.apache.streampark.flink.connector.hbase.function.HBaseResultFunction;
import
org.apache.streampark.flink.connector.hbase.internal.HBaseSourceFunction;
@@ -64,7 +64,7 @@ public class HBaseJavaSource<T> {
public DataStreamSource<T> getDataStream(
HBaseQueryFunction<T> queryFunction,
HBaseResultFunction<T> resultFunction,
- RunningFunction runningFunc) {
+ FilterFunction<T> filterFunction) {
if (queryFunction == null) {
throw new NullPointerException("HBaseJavaSource error: query function
cannot be null");
@@ -79,7 +79,7 @@ public class HBaseJavaSource<T> {
HBaseSourceFunction<T> sourceFunction =
new HBaseSourceFunction<>(
- property, queryFunction, resultFunction, runningFunc,
typeInformation);
+ property, queryFunction, resultFunction, filterFunction,
typeInformation);
return context.getJavaEnv().addSource(sourceFunction);
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
index 04e82941e..06e9dc865 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.hbase.internal
import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.connector.function.RunningFunction
+import org.apache.streampark.flink.connector.function.FilterFunction
import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
import
org.apache.streampark.flink.connector.hbase.function.{HBaseQueryFunction,
HBaseResultFunction}
import org.apache.streampark.flink.util.FlinkUtils
@@ -47,8 +47,12 @@ class HBaseSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala,
with Logger {
@volatile private[this] var running = true
- private[this] var scalaRunningFunc: Unit => Boolean = _
- private[this] var javaRunningFunc: RunningFunction = _
+ private[this] var scalaFilterFunc: R => Boolean = (_: R) => true
+ private[this] var javaFilterFunc: FilterFunction[R] = new FilterFunction[R] {
+
+ /** filter function */
+ override def filter(t: R): lang.Boolean = true
+ }
@transient private[this] var table: Table = _
@@ -69,13 +73,14 @@ class HBaseSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala,
prop: Properties,
queryFunc: R => HBaseQuery,
resultFunc: Result => R,
- runningFunc: Unit => Boolean) = {
+ filter: R => Boolean) = {
this(ApiType.scala, prop)
this.scalaQueryFunc = queryFunc
this.scalaResultFunc = resultFunc
- this.scalaRunningFunc = if (runningFunc == null) _ => true else runningFunc
-
+ if (filter != null) {
+ this.scalaFilterFunc = filter
+ }
}
// for JAVA
@@ -83,18 +88,14 @@ class HBaseSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala,
prop: Properties,
queryFunc: HBaseQueryFunction[R],
resultFunc: HBaseResultFunction[R],
- runningFunc: RunningFunction) {
+ filter: FilterFunction[R]) {
this(ApiType.java, prop)
this.javaQueryFunc = queryFunc
this.javaResultFunc = resultFunc
- this.javaRunningFunc =
- if (runningFunc != null) runningFunc
- else
- new RunningFunction {
- override def running(): lang.Boolean = true
- }
-
+ if (filter != null) {
+ this.javaFilterFunc = filter
+ }
}
@throws[Exception]
@@ -106,40 +107,42 @@ class HBaseSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala,
while (this.running) {
apiType match {
case ApiType.scala =>
- if (scalaRunningFunc()) {
- ctx.getCheckpointLock.synchronized {
- // Returns the query object of the last (or recovered from
checkpoint) query to the user, and the user constructs the conditions for the
next query based on this.
- query = scalaQueryFunc(last)
- require(
- query != null && query.getTable != null,
- "[StreamPark] HBaseSource query and query's param table must
not be null ")
- table = query.getTable(prop)
- table
- .getScanner(query)
- .foreach(
- x => {
- last = scalaResultFunc(x)
- ctx.collectWithTimestamp(last, System.currentTimeMillis())
- })
- }
+ ctx.getCheckpointLock.synchronized {
+ // Returns the query object of the last (or recovered from
checkpoint) query to the user, and the user constructs the conditions for the
next query based on this.
+ query = scalaQueryFunc(last)
+ require(
+ query != null && query.getTable != null,
+ "[StreamPark] HBaseSource query and query's param table must not
be null ")
+ table = query.getTable(prop)
+ table
+ .getScanner(query)
+ .foreach(
+ x => {
+ val r = scalaResultFunc(x)
+ if (scalaFilterFunc(r)) {
+ last = r
+ ctx.collectWithTimestamp(r, System.currentTimeMillis())
+ }
+ })
}
case ApiType.java =>
- if (javaRunningFunc.running()) {
- ctx.getCheckpointLock.synchronized {
- // Returns the query object of the last (or recovered from
checkpoint) query to the user, and the user constructs the conditions for the
next query based on this.
- query = javaQueryFunc.query(last)
- require(
- query != null && query.getTable != null,
- "[StreamPark] HBaseSource query and query's param table must
not be null ")
- table = query.getTable(prop)
- table
- .getScanner(query)
- .foreach(
- x => {
- last = javaResultFunc.result(x)
- ctx.collectWithTimestamp(last, System.currentTimeMillis())
- })
- }
+ ctx.getCheckpointLock.synchronized {
+ // Returns the query object of the last (or recovered from
checkpoint) query to the user, and the user constructs the conditions for the
next query based on this.
+ query = javaQueryFunc.query(last)
+ require(
+ query != null && query.getTable != null,
+ "[StreamPark] HBaseSource query and query's param table must not
be null ")
+ table = query.getTable(prop)
+ table
+ .getScanner(query)
+ .foreach(
+ x => {
+ val r = javaResultFunc.result(x)
+ if (javaFilterFunc.filter(r)) {
+ last = r
+ ctx.collectWithTimestamp(r, System.currentTimeMillis())
+ }
+ })
}
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
index 50bfc4ea6..fa2265ed5 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
@@ -51,7 +51,7 @@ class HBaseSource(
def getDataStream[R: TypeInformation](
query: R => HBaseQuery,
func: Result => R,
- running: Unit => Boolean): DataStream[R] = {
+ running: R => Boolean): DataStream[R] = {
if (query == null) {
throw new NullPointerException("getDataStream error, SQLQueryFunction
must not be null")
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index fdb6b8c50..c00413335 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -18,9 +18,9 @@
package org.apache.streampark.flink.connector.jdbc.source;
import org.apache.streampark.common.util.ConfigUtils;
-import org.apache.streampark.flink.connector.function.RunningFunction;
-import org.apache.streampark.flink.connector.function.SQLQueryFunction;
-import org.apache.streampark.flink.connector.function.SQLResultFunction;
+import org.apache.streampark.flink.connector.function.FilterFunction;
+import org.apache.streampark.flink.connector.function.QueryFunction;
+import org.apache.streampark.flink.connector.function.ResultFunction;
import org.apache.streampark.flink.connector.jdbc.internal.JdbcSourceFunction;
import org.apache.streampark.flink.core.scala.StreamingContext;
@@ -59,22 +59,20 @@ public class JdbcJavaSource<T> {
}
public DataStreamSource<T> getDataStream(
- SQLQueryFunction<T> queryFunction, SQLResultFunction<T> resultFunction) {
+ QueryFunction<T> queryFunction, ResultFunction<T> resultFunction) {
return getDataStream(queryFunction, resultFunction, null);
}
public DataStreamSource<T> getDataStream(
- SQLQueryFunction<T> queryFunction,
- SQLResultFunction<T> resultFunction,
- RunningFunction runningFunc) {
+ QueryFunction<T> queryFunction, ResultFunction<T> resultFunction,
FilterFunction<T> filter) {
if (queryFunction == null) {
throw new NullPointerException(
- "JdbcJavaSource getDataStream error: SQLQueryFunction must not be
null");
+ "JdbcJavaSource getDataStream error: QueryFunction must not be
null");
}
if (resultFunction == null) {
throw new NullPointerException(
- "JdbcJavaSource getDataStream error: SQLResultFunction must not be
null");
+ "JdbcJavaSource getDataStream error: ResultFunction must not be
null");
}
if (this.jdbc == null) {
@@ -82,7 +80,7 @@ public class JdbcJavaSource<T> {
}
JdbcSourceFunction<T> sourceFunction =
- new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction,
runningFunc, typeInformation);
+ new JdbcSourceFunction<T>(jdbc, queryFunction, resultFunction, filter,
typeInformation);
return context.getJavaEnv().addSource(sourceFunction);
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
index 8429a1135..0428aaa5b 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.jdbc.internal
import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util.{JdbcUtils, Logger}
-import org.apache.streampark.flink.connector.function.{RunningFunction,
SQLQueryFunction, SQLResultFunction}
+import org.apache.streampark.flink.connector.function.{FilterFunction,
QueryFunction, ResultFunction}
import org.apache.streampark.flink.util.FlinkUtils
import org.apache.flink.api.common.state.ListState
@@ -45,15 +45,16 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
with Logger {
@volatile private[this] var running = true
- private[this] var scalaRunningFunc: Unit => Boolean = (_) => true
- private[this] var javaRunningFunc: RunningFunction = new RunningFunction {
- override def running(): lang.Boolean = true
- }
- private[this] var scalaSqlFunc: R => String = _
+ private[this] var scalaQueryFunc: R => String = _
private[this] var scalaResultFunc: Function[Iterable[Map[String, _]],
Iterable[R]] = _
- private[this] var javaSqlFunc: SQLQueryFunction[R] = _
- private[this] var javaResultFunc: SQLResultFunction[R] = _
+ private[this] var javaQueryFunc: QueryFunction[R] = _
+ private[this] var javaResultFunc: ResultFunction[R] = _
+ private[this] var scalaFilterFunc: R => Boolean = (_: R) => true
+ private[this] var javaFilterFunc: FilterFunction[R] = new FilterFunction[R] {
+ override def filter(r: R): lang.Boolean = true
+ }
+
@transient private var unionOffsetStates: ListState[R] = null
private val OFFSETS_STATE_NAME: String = "jdbc-source-query-states"
@@ -64,28 +65,28 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
jdbc: Properties,
sqlFunc: R => String,
resultFunc: Iterable[Map[String, _]] => Iterable[R],
- runningFunc: Unit => Boolean) = {
+ filterFunc: R => Boolean) = {
this(ApiType.scala, jdbc)
- this.scalaSqlFunc = sqlFunc
+ this.scalaQueryFunc = sqlFunc
this.scalaResultFunc = resultFunc
- if (runningFunc != null) {
- this.scalaRunningFunc = runningFunc
+ if (filterFunc != null) {
+ this.scalaFilterFunc = filterFunc
}
}
// for JAVA
def this(
jdbc: Properties,
- javaSqlFunc: SQLQueryFunction[R],
- javaResultFunc: SQLResultFunction[R],
- runningFunc: RunningFunction) {
+ javaQueryFunc: QueryFunction[R],
+ javaResultFunc: ResultFunction[R],
+ filterFunc: FilterFunction[R]) {
this(ApiType.java, jdbc)
- this.javaSqlFunc = javaSqlFunc
+ this.javaQueryFunc = javaQueryFunc
this.javaResultFunc = javaResultFunc
- if (runningFunc != null) {
- this.javaRunningFunc = runningFunc
+ if (filterFunc != null) {
+ this.javaFilterFunc = filterFunc
}
}
@@ -94,30 +95,30 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
while (this.running) {
apiType match {
case ApiType.scala =>
- if (scalaRunningFunc()) {
- ctx.getCheckpointLock.synchronized {
- val sql = scalaSqlFunc(last)
- val result: List[Map[String, _]] = JdbcUtils.select(sql)(jdbc)
- scalaResultFunc(result).foreach(
- x => {
+ ctx.getCheckpointLock.synchronized {
+ val sql = scalaQueryFunc(last)
+ val result: List[Map[String, _]] = JdbcUtils.select(sql)(jdbc)
+ scalaResultFunc(result).foreach(
+ x => {
+ if (scalaFilterFunc(x)) {
last = x
ctx.collectWithTimestamp(last, System.currentTimeMillis())
- })
- }
+ }
+ })
}
case ApiType.java =>
- if (javaRunningFunc.running()) {
- ctx.getCheckpointLock.synchronized {
- val sql = javaSqlFunc.query(last)
- val result: List[Map[String, _]] = JdbcUtils.select(sql)(jdbc)
- javaResultFunc
- .result(result.map(_.asJava))
- .foreach(
- x => {
+ ctx.getCheckpointLock.synchronized {
+ val sql = javaQueryFunc.query(last)
+ val result: List[Map[String, _]] = JdbcUtils.select(sql)(jdbc)
+ javaResultFunc
+ .result(result.map(_.asJava))
+ .foreach(
+ x => {
+ if (javaFilterFunc.filter(x)) {
last = x
ctx.collectWithTimestamp(last, System.currentTimeMillis())
- })
- }
+ }
+ })
}
}
}
@@ -137,7 +138,7 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
}
override def initializeState(context: FunctionInitializationContext): Unit =
{
- logInfo("JdbcSource snapshotState initialize")
+ logDebug("JdbcSource snapshotState initialize")
unionOffsetStates = FlinkUtils
.getUnionListState[R](context, getRuntimeContext.getExecutionConfig,
OFFSETS_STATE_NAME)
Try(unionOffsetStates.get.head) match {
@@ -147,7 +148,7 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
}
override def notifyCheckpointComplete(checkpointId: Long): Unit = {
- logInfo(s"JdbcSource checkpointComplete: $checkpointId")
+ logDebug(s"JdbcSource checkpointComplete: $checkpointId")
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
index 6186711ae..458971bc9 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
@@ -33,7 +33,6 @@ object JdbcSource {
def apply(alias: String = "", properties: Properties = new
Properties())(implicit
ctx: StreamingContext): JdbcSource = new JdbcSource(ctx, alias,
properties) {}
-
}
class JdbcSource(
@@ -50,13 +49,13 @@ class JdbcSource(
*/
def getDataStream[R: TypeInformation](
sqlFun: R => String,
- fun: Iterable[Map[String, _]] => Iterable[R],
- running: Unit => Boolean): DataStream[R] = {
+ func: Iterable[Map[String, _]] => Iterable[R],
+ filter: R => Boolean): DataStream[R] = {
val jdbc = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias)
if (property != null) {
jdbc.putAll(property)
}
- val mysqlFun = new JdbcSourceFunction[R](jdbc, sqlFun, fun, running)
+ val mysqlFun = new JdbcSourceFunction[R](jdbc, sqlFun, func, filter)
ctx.addSource(mysqlFun)
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
index 41b230cc9..b2bc210bc 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.connector.mongo.source;
-import org.apache.streampark.flink.connector.function.RunningFunction;
+import org.apache.streampark.flink.connector.function.FilterFunction;
import org.apache.streampark.flink.connector.mongo.function.MongoQueryFunction;
import
org.apache.streampark.flink.connector.mongo.function.MongoResultFunction;
import
org.apache.streampark.flink.connector.mongo.internal.MongoSourceFunction;
@@ -60,7 +60,7 @@ public class MongoJavaSource<T> {
String collectionName,
MongoQueryFunction<T> queryFunction,
MongoResultFunction<T> resultFunction,
- RunningFunction runningFunc) {
+ FilterFunction runningFunc) {
if (collectionName == null) {
throw new NullPointerException("MongoJavaSource error: collectionName
must not be null");
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
index eea0c93c0..8b6a91ab7 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.mongo.internal
import org.apache.streampark.common.enums.ApiType
import org.apache.streampark.common.enums.ApiType.ApiType
import org.apache.streampark.common.util.{Logger, MongoConfig}
-import org.apache.streampark.flink.connector.function.RunningFunction
+import org.apache.streampark.flink.connector.function.FilterFunction
import
org.apache.streampark.flink.connector.mongo.function.{MongoQueryFunction,
MongoResultFunction}
import org.apache.streampark.flink.util.FlinkUtils
@@ -51,9 +51,9 @@ class MongoSourceFunction[R: TypeInformation](
with Logger {
@volatile private[this] var running = true
- private[this] var scalaRunningFunc: Unit => Boolean = (_) => true
- private[this] var javaRunningFunc: RunningFunction = new RunningFunction {
- override def running(): lang.Boolean = true
+ private[this] var scalaFilterFunc: R => Boolean = (_: R) => true
+ private[this] var javaFilterFunc: FilterFunction[R] = new FilterFunction[R] {
+ override def filter(r: R): lang.Boolean = true
}
var client: MongoClient = _
@@ -75,13 +75,13 @@ class MongoSourceFunction[R: TypeInformation](
prop: Properties,
scalaQueryFunc: (R, MongoCollection[Document]) => FindIterable[Document],
scalaResultFunc: MongoCursor[Document] => List[R],
- runningFunc: Unit => Boolean) = {
+ filter: R => Boolean) = {
this(ApiType.scala, prop, collectionName)
this.scalaQueryFunc = scalaQueryFunc
this.scalaResultFunc = scalaResultFunc
- if (runningFunc != null) {
- this.scalaRunningFunc = runningFunc
+ if (filter != null) {
+ this.scalaFilterFunc = filter
}
}
@@ -91,13 +91,13 @@ class MongoSourceFunction[R: TypeInformation](
prop: Properties,
queryFunc: MongoQueryFunction[R],
resultFunc: MongoResultFunction[R],
- runningFunc: RunningFunction) {
+ filter: FilterFunction[R]) {
this(ApiType.java, prop, collectionName)
this.javaQueryFunc = queryFunc
this.javaResultFunc = resultFunc
- if (runningFunc != null) {
- this.javaRunningFunc = runningFunc
+ if (filter != null) {
+ this.javaFilterFunc = filter
}
}
@@ -115,31 +115,31 @@ class MongoSourceFunction[R: TypeInformation](
while (this.running) {
apiType match {
case ApiType.scala =>
- if (scalaRunningFunc()) {
- ctx.getCheckpointLock.synchronized {
- val find = scalaQueryFunc(last, mongoCollection)
- if (find != null) {
- scalaResultFunc(find.iterator).foreach(
- x => {
+ ctx.getCheckpointLock.synchronized {
+ val find = scalaQueryFunc(last, mongoCollection)
+ if (find != null) {
+ scalaResultFunc(find.iterator).foreach(
+ x => {
+ if (scalaFilterFunc(x)) {
last = x
ctx.collectWithTimestamp(last, System.currentTimeMillis())
- })
- }
+ }
+ })
}
}
case ApiType.java =>
- if (javaRunningFunc.running()) {
- ctx.getCheckpointLock.synchronized {
- val find = javaQueryFunc.query(last, mongoCollection)
- if (find != null) {
- javaResultFunc
- .result(find.iterator)
- .foreach(
- x => {
+ ctx.getCheckpointLock.synchronized {
+ val find = javaQueryFunc.query(last, mongoCollection)
+ if (find != null) {
+ javaResultFunc
+ .result(find.iterator)
+ .foreach(
+ x => {
+ if (javaFilterFunc.filter(x)) {
last = x
ctx.collectWithTimestamp(last,
System.currentTimeMillis())
- })
- }
+ }
+ })
}
}
}
@@ -164,7 +164,7 @@ class MongoSourceFunction[R: TypeInformation](
override def initializeState(context: FunctionInitializationContext): Unit =
{
// restore from checkpoint
- logInfo("MongoSource snapshotState initialize")
+ logDebug("MongoSource snapshotState initialize")
state = FlinkUtils
.getUnionListState[R](context, getRuntimeContext.getExecutionConfig,
OFFSETS_STATE_NAME)
Try(state.get().head) match {
@@ -174,7 +174,7 @@ class MongoSourceFunction[R: TypeInformation](
}
override def notifyCheckpointComplete(checkpointId: Long): Unit = {
- logInfo(s"MongoSource checkpointComplete: $checkpointId")
+ logDebug(s"MongoSource checkpointComplete: $checkpointId")
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
index 9856b49b2..6cfdfcb2f 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
@@ -45,10 +45,10 @@ class MongoSource(
collection: String,
queryFun: (R, MongoCollection[Document]) => FindIterable[Document],
resultFun: MongoCursor[Document] => List[R],
- running: Unit => Boolean)(implicit prop: Properties = new Properties()):
DataStream[R] = {
+ filter: R => Boolean)(implicit prop: Properties = new Properties()):
DataStream[R] = {
Utils.copyProperties(property, prop)
- val mongoFun = new MongoSourceFunction[R](collection, prop, queryFun,
resultFun, running)
+ val mongoFun = new MongoSourceFunction[R](collection, prop, queryFun,
resultFun, filter)
ctx.addSource(mongoFun)
}