This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
     new 0ed5a0f11 [Improve] jdbc-datastream-connector filterFunction 
improvements
0ed5a0f11 is described below

commit 0ed5a0f11841201e478f8799530e3e882215aa38
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 26 20:44:02 2025 +0800

    [Improve] jdbc-datastream-connector filterFunction improvements
---
 .../flink/quickstart/connector/MySQLJavaApp.java   |  8 +-
 .../{RunningFunction.java => FilterFunction.java}  | 10 +--
 .../{SQLQueryFunction.java => QueryFunction.java}  |  2 +-
 ...{SQLResultFunction.java => ResultFunction.java} |  2 +-
 .../connector/hbase/source/HBaseJavaSource.java    |  6 +-
 .../hbase/internal/HBaseSourceFunction.scala       | 95 +++++++++++-----------
 .../flink/connector/hbase/source/HBaseSource.scala |  2 +-
 .../connector/jdbc/source/JdbcJavaSource.java      | 18 ++--
 .../jdbc/internal/JdbcSourceFunction.scala         | 77 +++++++++---------
 .../flink/connector/jdbc/source/JdbcSource.scala   |  7 +-
 .../connector/mongo/source/MongoJavaSource.java    |  4 +-
 .../mongo/internal/MongoSourceFunction.scala       | 60 +++++++-------
 .../flink/connector/mongo/source/MongoSource.scala |  4 +-
 13 files changed, 146 insertions(+), 149 deletions(-)

diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
index 4248843af..a11772ef9 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
@@ -16,8 +16,8 @@
  */
 package org.apache.streampark.flink.quickstart.connector;
 
-import org.apache.streampark.flink.connector.function.SQLQueryFunction;
-import org.apache.streampark.flink.connector.function.SQLResultFunction;
+import org.apache.streampark.flink.connector.function.QueryFunction;
+import org.apache.streampark.flink.connector.function.ResultFunction;
 import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
 import org.apache.streampark.flink.core.StreamEnvConfig;
 import org.apache.streampark.flink.core.scala.StreamingContext;
@@ -40,7 +40,7 @@ public class MySQLJavaApp {
     // 读取MySQL数据源
     new JdbcJavaSource<>(context, Order.class)
         .getDataStream(
-            (SQLQueryFunction<Order>)
+            (QueryFunction<Order>)
                 lastOne -> {
                   // 5秒抽取一次
                   Thread.sleep(3000);
@@ -52,7 +52,7 @@ public class MySQLJavaApp {
                           + "order by timestamp asc ",
                       lastOffset);
                 },
-            (SQLResultFunction<Order>)
+            (ResultFunction<Order>)
                 map -> {
                   List<Order> result = new ArrayList<>();
                   map.forEach(
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/RunningFunction.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/FilterFunction.java
similarity index 86%
rename from 
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/RunningFunction.java
rename to 
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/FilterFunction.java
index bfc0eb69f..1cc96e2d2 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/RunningFunction.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/FilterFunction.java
@@ -20,12 +20,8 @@ package org.apache.streampark.flink.connector.function;
 import java.io.Serializable;
 
 @FunctionalInterface
-public interface RunningFunction extends Serializable {
+public interface FilterFunction<T> extends Serializable {
 
-  /**
-   * Is it running...
-   *
-   * @return Boolean: isRunning
-   */
-  Boolean running();
+  /** filter function */
+  Boolean filter(T t);
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/QueryFunction.java
similarity index 94%
rename from 
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
rename to 
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/QueryFunction.java
index fc3c86bec..5dae375ad 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLQueryFunction.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/QueryFunction.java
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.function;
 import java.io.Serializable;
 
 @FunctionalInterface
-public interface SQLQueryFunction<T> extends Serializable {
+public interface QueryFunction<T> extends Serializable {
   /**
    * Get the SQL to query
    *
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLResultFunction.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/ResultFunction.java
similarity index 95%
rename from 
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLResultFunction.java
rename to 
streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/ResultFunction.java
index 552bd8010..cff6620e0 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/SQLResultFunction.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-base/src/main/java/org/apache/streampark/flink/connector/function/ResultFunction.java
@@ -21,7 +21,7 @@ import java.io.Serializable;
 import java.util.Map;
 
 @FunctionalInterface
-public interface SQLResultFunction<T> extends Serializable {
+public interface ResultFunction<T> extends Serializable {
   /**
    * The result of the search is returned as a Map, and the user can convert 
it into an object.
    *
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
index 4eccc4490..771a8bb54 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
@@ -18,7 +18,7 @@
 package org.apache.streampark.flink.connector.hbase.source;
 
 import org.apache.streampark.common.util.ConfigUtils;
-import org.apache.streampark.flink.connector.function.RunningFunction;
+import org.apache.streampark.flink.connector.function.FilterFunction;
 import org.apache.streampark.flink.connector.hbase.function.HBaseQueryFunction;
 import 
org.apache.streampark.flink.connector.hbase.function.HBaseResultFunction;
 import 
org.apache.streampark.flink.connector.hbase.internal.HBaseSourceFunction;
@@ -64,7 +64,7 @@ public class HBaseJavaSource<T> {
   public DataStreamSource<T> getDataStream(
       HBaseQueryFunction<T> queryFunction,
       HBaseResultFunction<T> resultFunction,
-      RunningFunction runningFunc) {
+      FilterFunction<T> filterFunction) {
 
     if (queryFunction == null) {
       throw new NullPointerException("HBaseJavaSource error: query function 
cannot be null");
@@ -79,7 +79,7 @@ public class HBaseJavaSource<T> {
 
     HBaseSourceFunction<T> sourceFunction =
         new HBaseSourceFunction<>(
-            property, queryFunction, resultFunction, runningFunc, 
typeInformation);
+            property, queryFunction, resultFunction, filterFunction, 
typeInformation);
     return context.getJavaEnv().addSource(sourceFunction);
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
index 04e82941e..06e9dc865 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.hbase.internal
 import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util.Logger
-import org.apache.streampark.flink.connector.function.RunningFunction
+import org.apache.streampark.flink.connector.function.FilterFunction
 import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
 import 
org.apache.streampark.flink.connector.hbase.function.{HBaseQueryFunction, 
HBaseResultFunction}
 import org.apache.streampark.flink.util.FlinkUtils
@@ -47,8 +47,12 @@ class HBaseSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala,
   with Logger {
 
   @volatile private[this] var running = true
-  private[this] var scalaRunningFunc: Unit => Boolean = _
-  private[this] var javaRunningFunc: RunningFunction = _
+  private[this] var scalaFilterFunc: R => Boolean = (_: R) => true
+  private[this] var javaFilterFunc: FilterFunction[R] = new FilterFunction[R] {
+
+    /** filter function */
+    override def filter(t: R): lang.Boolean = true
+  }
 
   @transient private[this] var table: Table = _
 
@@ -69,13 +73,14 @@ class HBaseSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala,
       prop: Properties,
       queryFunc: R => HBaseQuery,
       resultFunc: Result => R,
-      runningFunc: Unit => Boolean) = {
+      filter: R => Boolean) = {
 
     this(ApiType.scala, prop)
     this.scalaQueryFunc = queryFunc
     this.scalaResultFunc = resultFunc
-    this.scalaRunningFunc = if (runningFunc == null) _ => true else runningFunc
-
+    if (filter != null) {
+      this.scalaFilterFunc = filter
+    }
   }
 
   // for JAVA
@@ -83,18 +88,14 @@ class HBaseSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala,
       prop: Properties,
       queryFunc: HBaseQueryFunction[R],
       resultFunc: HBaseResultFunction[R],
-      runningFunc: RunningFunction) {
+      filter: FilterFunction[R]) {
 
     this(ApiType.java, prop)
     this.javaQueryFunc = queryFunc
     this.javaResultFunc = resultFunc
-    this.javaRunningFunc =
-      if (runningFunc != null) runningFunc
-      else
-        new RunningFunction {
-          override def running(): lang.Boolean = true
-        }
-
+    if (filter != null) {
+      this.javaFilterFunc = filter
+    }
   }
 
   @throws[Exception]
@@ -106,40 +107,42 @@ class HBaseSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala,
     while (this.running) {
       apiType match {
         case ApiType.scala =>
-          if (scalaRunningFunc()) {
-            ctx.getCheckpointLock.synchronized {
-              // Returns the query object of the last (or recovered from 
checkpoint) query to the user, and the user constructs the conditions for the 
next query based on this.
-              query = scalaQueryFunc(last)
-              require(
-                query != null && query.getTable != null,
-                "[StreamPark] HBaseSource query and query's param table must 
not be null ")
-              table = query.getTable(prop)
-              table
-                .getScanner(query)
-                .foreach(
-                  x => {
-                    last = scalaResultFunc(x)
-                    ctx.collectWithTimestamp(last, System.currentTimeMillis())
-                  })
-            }
+          ctx.getCheckpointLock.synchronized {
+            // Returns the query object of the last (or recovered from 
checkpoint) query to the user, and the user constructs the conditions for the 
next query based on this.
+            query = scalaQueryFunc(last)
+            require(
+              query != null && query.getTable != null,
+              "[StreamPark] HBaseSource query and query's param table must not 
be null ")
+            table = query.getTable(prop)
+            table
+              .getScanner(query)
+              .foreach(
+                x => {
+                  val r = scalaResultFunc(x)
+                  if (scalaFilterFunc(r)) {
+                    last = r
+                    ctx.collectWithTimestamp(r, System.currentTimeMillis())
+                  }
+                })
           }
         case ApiType.java =>
-          if (javaRunningFunc.running()) {
-            ctx.getCheckpointLock.synchronized {
-              // Returns the query object of the last (or recovered from 
checkpoint) query to the user, and the user constructs the conditions for the 
next query based on this.
-              query = javaQueryFunc.query(last)
-              require(
-                query != null && query.getTable != null,
-                "[StreamPark] HBaseSource query and query's param table must 
not be null ")
-              table = query.getTable(prop)
-              table
-                .getScanner(query)
-                .foreach(
-                  x => {
-                    last = javaResultFunc.result(x)
-                    ctx.collectWithTimestamp(last, System.currentTimeMillis())
-                  })
-            }
+          ctx.getCheckpointLock.synchronized {
+            // Returns the query object of the last (or recovered from 
checkpoint) query to the user, and the user constructs the conditions for the 
next query based on this.
+            query = javaQueryFunc.query(last)
+            require(
+              query != null && query.getTable != null,
+              "[StreamPark] HBaseSource query and query's param table must not 
be null ")
+            table = query.getTable(prop)
+            table
+              .getScanner(query)
+              .foreach(
+                x => {
+                  val r = javaResultFunc.result(x)
+                  if (javaFilterFunc.filter(r)) {
+                    last = r
+                    ctx.collectWithTimestamp(r, System.currentTimeMillis())
+                  }
+                })
           }
       }
     }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
index 50bfc4ea6..fa2265ed5 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
@@ -51,7 +51,7 @@ class HBaseSource(
   def getDataStream[R: TypeInformation](
       query: R => HBaseQuery,
       func: Result => R,
-      running: Unit => Boolean): DataStream[R] = {
+      running: R => Boolean): DataStream[R] = {
 
     if (query == null) {
       throw new NullPointerException("getDataStream error, SQLQueryFunction 
must not be null")
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index fdb6b8c50..c00413335 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -18,9 +18,9 @@
 package org.apache.streampark.flink.connector.jdbc.source;
 
 import org.apache.streampark.common.util.ConfigUtils;
-import org.apache.streampark.flink.connector.function.RunningFunction;
-import org.apache.streampark.flink.connector.function.SQLQueryFunction;
-import org.apache.streampark.flink.connector.function.SQLResultFunction;
+import org.apache.streampark.flink.connector.function.FilterFunction;
+import org.apache.streampark.flink.connector.function.QueryFunction;
+import org.apache.streampark.flink.connector.function.ResultFunction;
 import org.apache.streampark.flink.connector.jdbc.internal.JdbcSourceFunction;
 import org.apache.streampark.flink.core.scala.StreamingContext;
 
@@ -59,22 +59,20 @@ public class JdbcJavaSource<T> {
   }
 
   public DataStreamSource<T> getDataStream(
-      SQLQueryFunction<T> queryFunction, SQLResultFunction<T> resultFunction) {
+      QueryFunction<T> queryFunction, ResultFunction<T> resultFunction) {
     return getDataStream(queryFunction, resultFunction, null);
   }
 
   public DataStreamSource<T> getDataStream(
-      SQLQueryFunction<T> queryFunction,
-      SQLResultFunction<T> resultFunction,
-      RunningFunction runningFunc) {
+      QueryFunction<T> queryFunction, ResultFunction<T> resultFunction, 
FilterFunction<T> filter) {
 
     if (queryFunction == null) {
       throw new NullPointerException(
-          "JdbcJavaSource getDataStream error: SQLQueryFunction must not be 
null");
+          "JdbcJavaSource getDataStream error: QueryFunction must not be 
null");
     }
     if (resultFunction == null) {
       throw new NullPointerException(
-          "JdbcJavaSource getDataStream error: SQLResultFunction must not be 
null");
+          "JdbcJavaSource getDataStream error: ResultFunction must not be 
null");
     }
 
     if (this.jdbc == null) {
@@ -82,7 +80,7 @@ public class JdbcJavaSource<T> {
     }
 
     JdbcSourceFunction<T> sourceFunction =
-        new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction, 
runningFunc, typeInformation);
+        new JdbcSourceFunction<T>(jdbc, queryFunction, resultFunction, filter, 
typeInformation);
     return context.getJavaEnv().addSource(sourceFunction);
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
index 8429a1135..0428aaa5b 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.jdbc.internal
 import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util.{JdbcUtils, Logger}
-import org.apache.streampark.flink.connector.function.{RunningFunction, 
SQLQueryFunction, SQLResultFunction}
+import org.apache.streampark.flink.connector.function.{FilterFunction, 
QueryFunction, ResultFunction}
 import org.apache.streampark.flink.util.FlinkUtils
 
 import org.apache.flink.api.common.state.ListState
@@ -45,15 +45,16 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
   with Logger {
 
   @volatile private[this] var running = true
-  private[this] var scalaRunningFunc: Unit => Boolean = (_) => true
-  private[this] var javaRunningFunc: RunningFunction = new RunningFunction {
-    override def running(): lang.Boolean = true
-  }
 
-  private[this] var scalaSqlFunc: R => String = _
+  private[this] var scalaQueryFunc: R => String = _
   private[this] var scalaResultFunc: Function[Iterable[Map[String, _]], 
Iterable[R]] = _
-  private[this] var javaSqlFunc: SQLQueryFunction[R] = _
-  private[this] var javaResultFunc: SQLResultFunction[R] = _
+  private[this] var javaQueryFunc: QueryFunction[R] = _
+  private[this] var javaResultFunc: ResultFunction[R] = _
+  private[this] var scalaFilterFunc: R => Boolean = (_: R) => true
+  private[this] var javaFilterFunc: FilterFunction[R] = new FilterFunction[R] {
+    override def filter(r: R): lang.Boolean = true
+  }
+
   @transient private var unionOffsetStates: ListState[R] = null
 
   private val OFFSETS_STATE_NAME: String = "jdbc-source-query-states"
@@ -64,28 +65,28 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
       jdbc: Properties,
       sqlFunc: R => String,
       resultFunc: Iterable[Map[String, _]] => Iterable[R],
-      runningFunc: Unit => Boolean) = {
+      filterFunc: R => Boolean) = {
 
     this(ApiType.scala, jdbc)
-    this.scalaSqlFunc = sqlFunc
+    this.scalaQueryFunc = sqlFunc
     this.scalaResultFunc = resultFunc
-    if (runningFunc != null) {
-      this.scalaRunningFunc = runningFunc
+    if (filterFunc != null) {
+      this.scalaFilterFunc = filterFunc
     }
   }
 
   // for JAVA
   def this(
       jdbc: Properties,
-      javaSqlFunc: SQLQueryFunction[R],
-      javaResultFunc: SQLResultFunction[R],
-      runningFunc: RunningFunction) {
+      javaQueryFunc: QueryFunction[R],
+      javaResultFunc: ResultFunction[R],
+      filterFunc: FilterFunction[R]) {
 
     this(ApiType.java, jdbc)
-    this.javaSqlFunc = javaSqlFunc
+    this.javaQueryFunc = javaQueryFunc
     this.javaResultFunc = javaResultFunc
-    if (runningFunc != null) {
-      this.javaRunningFunc = runningFunc
+    if (filterFunc != null) {
+      this.javaFilterFunc = filterFunc
     }
   }
 
@@ -94,30 +95,30 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
     while (this.running) {
       apiType match {
         case ApiType.scala =>
-          if (scalaRunningFunc()) {
-            ctx.getCheckpointLock.synchronized {
-              val sql = scalaSqlFunc(last)
-              val result: List[Map[String, _]] = JdbcUtils.select(sql)(jdbc)
-              scalaResultFunc(result).foreach(
-                x => {
+          ctx.getCheckpointLock.synchronized {
+            val sql = scalaQueryFunc(last)
+            val result: List[Map[String, _]] = JdbcUtils.select(sql)(jdbc)
+            scalaResultFunc(result).foreach(
+              x => {
+                if (scalaFilterFunc(x)) {
                   last = x
                   ctx.collectWithTimestamp(last, System.currentTimeMillis())
-                })
-            }
+                }
+              })
           }
         case ApiType.java =>
-          if (javaRunningFunc.running()) {
-            ctx.getCheckpointLock.synchronized {
-              val sql = javaSqlFunc.query(last)
-              val result: List[Map[String, _]] = JdbcUtils.select(sql)(jdbc)
-              javaResultFunc
-                .result(result.map(_.asJava))
-                .foreach(
-                  x => {
+          ctx.getCheckpointLock.synchronized {
+            val sql = javaQueryFunc.query(last)
+            val result: List[Map[String, _]] = JdbcUtils.select(sql)(jdbc)
+            javaResultFunc
+              .result(result.map(_.asJava))
+              .foreach(
+                x => {
+                  if (javaFilterFunc.filter(x)) {
                     last = x
                     ctx.collectWithTimestamp(last, System.currentTimeMillis())
-                  })
-            }
+                  }
+                })
           }
       }
     }
@@ -137,7 +138,7 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
   }
 
   override def initializeState(context: FunctionInitializationContext): Unit = 
{
-    logInfo("JdbcSource snapshotState initialize")
+    logDebug("JdbcSource snapshotState initialize")
     unionOffsetStates = FlinkUtils
       .getUnionListState[R](context, getRuntimeContext.getExecutionConfig, 
OFFSETS_STATE_NAME)
     Try(unionOffsetStates.get.head) match {
@@ -147,7 +148,7 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
   }
 
   override def notifyCheckpointComplete(checkpointId: Long): Unit = {
-    logInfo(s"JdbcSource checkpointComplete: $checkpointId")
+    logDebug(s"JdbcSource checkpointComplete: $checkpointId")
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
index 6186711ae..458971bc9 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/source/JdbcSource.scala
@@ -33,7 +33,6 @@ object JdbcSource {
 
   def apply(alias: String = "", properties: Properties = new 
Properties())(implicit
       ctx: StreamingContext): JdbcSource = new JdbcSource(ctx, alias, 
properties) {}
-
 }
 
 class JdbcSource(
@@ -50,13 +49,13 @@ class JdbcSource(
    */
   def getDataStream[R: TypeInformation](
       sqlFun: R => String,
-      fun: Iterable[Map[String, _]] => Iterable[R],
-      running: Unit => Boolean): DataStream[R] = {
+      func: Iterable[Map[String, _]] => Iterable[R],
+      filter: R => Boolean): DataStream[R] = {
     val jdbc = ConfigUtils.getJdbcProperties(ctx.parameter.toMap, alias)
     if (property != null) {
       jdbc.putAll(property)
     }
-    val mysqlFun = new JdbcSourceFunction[R](jdbc, sqlFun, fun, running)
+    val mysqlFun = new JdbcSourceFunction[R](jdbc, sqlFun, func, filter)
     ctx.addSource(mysqlFun)
   }
 
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
index 41b230cc9..b2bc210bc 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.mongo.source;
 
-import org.apache.streampark.flink.connector.function.RunningFunction;
+import org.apache.streampark.flink.connector.function.FilterFunction;
 import org.apache.streampark.flink.connector.mongo.function.MongoQueryFunction;
 import 
org.apache.streampark.flink.connector.mongo.function.MongoResultFunction;
 import 
org.apache.streampark.flink.connector.mongo.internal.MongoSourceFunction;
@@ -60,7 +60,7 @@ public class MongoJavaSource<T> {
       String collectionName,
       MongoQueryFunction<T> queryFunction,
       MongoResultFunction<T> resultFunction,
-      RunningFunction runningFunc) {
+      FilterFunction runningFunc) {
 
     if (collectionName == null) {
       throw new NullPointerException("MongoJavaSource error: collectionName 
must not be null");
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
index eea0c93c0..8b6a91ab7 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
@@ -20,7 +20,7 @@ package org.apache.streampark.flink.connector.mongo.internal
 import org.apache.streampark.common.enums.ApiType
 import org.apache.streampark.common.enums.ApiType.ApiType
 import org.apache.streampark.common.util.{Logger, MongoConfig}
-import org.apache.streampark.flink.connector.function.RunningFunction
+import org.apache.streampark.flink.connector.function.FilterFunction
 import 
org.apache.streampark.flink.connector.mongo.function.{MongoQueryFunction, 
MongoResultFunction}
 import org.apache.streampark.flink.util.FlinkUtils
 
@@ -51,9 +51,9 @@ class MongoSourceFunction[R: TypeInformation](
   with Logger {
 
   @volatile private[this] var running = true
-  private[this] var scalaRunningFunc: Unit => Boolean = (_) => true
-  private[this] var javaRunningFunc: RunningFunction = new RunningFunction {
-    override def running(): lang.Boolean = true
+  private[this] var scalaFilterFunc: R => Boolean = (_: R) => true
+  private[this] var javaFilterFunc: FilterFunction[R] = new FilterFunction[R] {
+    override def filter(r: R): lang.Boolean = true
   }
 
   var client: MongoClient = _
@@ -75,13 +75,13 @@ class MongoSourceFunction[R: TypeInformation](
       prop: Properties,
       scalaQueryFunc: (R, MongoCollection[Document]) => FindIterable[Document],
       scalaResultFunc: MongoCursor[Document] => List[R],
-      runningFunc: Unit => Boolean) = {
+      filter: R => Boolean) = {
 
     this(ApiType.scala, prop, collectionName)
     this.scalaQueryFunc = scalaQueryFunc
     this.scalaResultFunc = scalaResultFunc
-    if (runningFunc != null) {
-      this.scalaRunningFunc = runningFunc
+    if (filter != null) {
+      this.scalaFilterFunc = filter
     }
   }
 
@@ -91,13 +91,13 @@ class MongoSourceFunction[R: TypeInformation](
       prop: Properties,
       queryFunc: MongoQueryFunction[R],
       resultFunc: MongoResultFunction[R],
-      runningFunc: RunningFunction) {
+      filter: FilterFunction[R]) {
 
     this(ApiType.java, prop, collectionName)
     this.javaQueryFunc = queryFunc
     this.javaResultFunc = resultFunc
-    if (runningFunc != null) {
-      this.javaRunningFunc = runningFunc
+    if (filter != null) {
+      this.javaFilterFunc = filter
     }
   }
 
@@ -115,31 +115,31 @@ class MongoSourceFunction[R: TypeInformation](
     while (this.running) {
       apiType match {
         case ApiType.scala =>
-          if (scalaRunningFunc()) {
-            ctx.getCheckpointLock.synchronized {
-              val find = scalaQueryFunc(last, mongoCollection)
-              if (find != null) {
-                scalaResultFunc(find.iterator).foreach(
-                  x => {
+          ctx.getCheckpointLock.synchronized {
+            val find = scalaQueryFunc(last, mongoCollection)
+            if (find != null) {
+              scalaResultFunc(find.iterator).foreach(
+                x => {
+                  if (scalaFilterFunc(x)) {
                     last = x
                     ctx.collectWithTimestamp(last, System.currentTimeMillis())
-                  })
-              }
+                  }
+                })
             }
           }
         case ApiType.java =>
-          if (javaRunningFunc.running()) {
-            ctx.getCheckpointLock.synchronized {
-              val find = javaQueryFunc.query(last, mongoCollection)
-              if (find != null) {
-                javaResultFunc
-                  .result(find.iterator)
-                  .foreach(
-                    x => {
+          ctx.getCheckpointLock.synchronized {
+            val find = javaQueryFunc.query(last, mongoCollection)
+            if (find != null) {
+              javaResultFunc
+                .result(find.iterator)
+                .foreach(
+                  x => {
+                    if (javaFilterFunc.filter(x)) {
                       last = x
                       ctx.collectWithTimestamp(last, 
System.currentTimeMillis())
-                    })
-              }
+                    }
+                  })
             }
           }
       }
@@ -164,7 +164,7 @@ class MongoSourceFunction[R: TypeInformation](
 
   override def initializeState(context: FunctionInitializationContext): Unit = 
{
     // restore from checkpoint
-    logInfo("MongoSource snapshotState initialize")
+    logDebug("MongoSource snapshotState initialize")
     state = FlinkUtils
       .getUnionListState[R](context, getRuntimeContext.getExecutionConfig, 
OFFSETS_STATE_NAME)
     Try(state.get().head) match {
@@ -174,7 +174,7 @@ class MongoSourceFunction[R: TypeInformation](
   }
 
   override def notifyCheckpointComplete(checkpointId: Long): Unit = {
-    logInfo(s"MongoSource checkpointComplete: $checkpointId")
+    logDebug(s"MongoSource checkpointComplete: $checkpointId")
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
index 9856b49b2..6cfdfcb2f 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/source/MongoSource.scala
@@ -45,10 +45,10 @@ class MongoSource(
       collection: String,
       queryFun: (R, MongoCollection[Document]) => FindIterable[Document],
       resultFun: MongoCursor[Document] => List[R],
-      running: Unit => Boolean)(implicit prop: Properties = new Properties()): 
DataStream[R] = {
+      filter: R => Boolean)(implicit prop: Properties = new Properties()): 
DataStream[R] = {
 
     Utils.copyProperties(property, prop)
-    val mongoFun = new MongoSourceFunction[R](collection, prop, queryFun, 
resultFun, running)
+    val mongoFun = new MongoSourceFunction[R](collection, prop, queryFun, 
resultFun, filter)
     ctx.addSource(mongoFun)
   }
 


Reply via email to