This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
     new 1143f6458 [Bug] Scala TypeInformation type conversion bug fixed.
1143f6458 is described below

commit 1143f6458af407cef672736dcfff1570d869b1f4
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 26 18:11:46 2025 +0800

    [Bug] Scala TypeInformation type conversion bug fixed.
---
 .../console/core/controller/AlertController.java   |  2 +-
 .../src/test/application.yml                       |  4 +-
 .../quickstart/connector/ClickhouseJavaApp.java    | 44 ++++++-------
 .../flink/quickstart/connector/DorisJavaApp.java   | 28 ++++----
 .../flink/quickstart/connector/HttpJavaApp.java    | 19 +++---
 .../flink/quickstart/connector/KafkaJavaApp.java   | 57 ++++++++--------
 .../flink/quickstart/connector/MySQLJavaApp.java   | 71 ++++++++++----------
 .../flink/quickstart/connector/bean/Behavior.java  | 14 ++--
 .../flink/quickstart/connector/bean/Entity.java    | 23 +++----
 .../flink/quickstart/connector/bean/LogBean.java   | 10 +--
 .../flink/quickstart/connector/bean/Order.java     | 12 ++--
 .../quickstart/connector/ClickHouseSinkApp.scala   |  7 +-
 .../flink/quickstart/connector/ES7SinkApp.scala    | 11 ++--
 .../quickstart/connector/HBaseRequestApp.scala     | 22 ++++---
 .../flink/quickstart/connector/HBaseSinkApp.scala  | 18 ++---
 .../quickstart/connector/HBaseSourceApp.scala      | 68 ++++++++++---------
 .../flink/quickstart/connector/HttpSinkApp.scala   |  8 +--
 .../quickstart/connector/InfluxDBSinkApp.scala     | 26 +++-----
 .../flink/quickstart/connector/JdbcSinkApp.scala   | 37 ++++++-----
 .../flink/quickstart/connector/KafkaSinkApp.scala  | 17 ++---
 .../quickstart/connector/KafkaSourceApp.scala      |  6 +-
 .../quickstart/connector/MongoSourceApp.scala      | 39 ++++++-----
 .../flink/quickstart/connector/MyDataSource.scala  |  2 +-
 .../quickstart/connector/MySQLSourceApp.scala      | 17 +++--
 .../flink/quickstart/connector/SideOutApp.scala    | 76 +++++++++++++---------
 .../connector/hbase/source/HBaseJavaSource.java    | 57 +++++++++++-----
 .../flink/connector/hbase/source/HBaseSource.scala | 25 +++++--
 .../connector/jdbc/source/JdbcJavaSource.java      | 43 +++++++-----
 .../connector/mongo/source/MongoJavaSource.java    | 56 ++++++++++------
 29 files changed, 459 insertions(+), 360 deletions(-)

diff --git 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
index 9c4965ff1..1de322c69 100644
--- 
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
+++ 
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
@@ -93,7 +93,7 @@ public class AlertController {
 
   @DeleteMapping("/delete")
   public RestResponse deleteAlertConfig(
-      @RequestParam("id") @NotNull(message = "config id must be not null") 
Long id) {
+      @RequestParam("id") @NotNull(message = "config id must not be null") 
Long id) {
     boolean result = alertConfigService.deleteById(id);
     return RestResponse.success(result);
   }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/application.yml 
b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
index 29e1c7333..2462834b8 100644
--- a/streampark-flink/streampark-flink-connector-test/src/test/application.yml
+++ b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
@@ -25,7 +25,7 @@ flink:
     pipeline.name: streampark-quickstartApp
     yarn.application.queue:
     taskmanager.numberOfTaskSlots: 1
-    parallelism.default: 2
+    parallelism.default: 1
     jobmanager.memory:
       flink.size:
       heap.size:
@@ -51,7 +51,7 @@ flink:
     execution:
       checkpointing:
         mode: EXACTLY_ONCE
-        interval: 30s
+        interval: 10s
         timeout: 10min
         unaligned: false
         externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
index 4ba8fd674..8370a1999 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
@@ -20,32 +20,32 @@ import 
org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink;
 import org.apache.streampark.flink.core.StreamEnvConfig;
 import org.apache.streampark.flink.core.scala.StreamingContext;
 import org.apache.streampark.flink.quickstart.connector.bean.Entity;
+
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 
-/**
- * @author benjobs
- */
+/** @author benjobs */
 public class ClickhouseJavaApp {
 
-    public static void main(String[] args) {
-        StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
-        StreamingContext context = new StreamingContext(envConfig);
-        DataStreamSource<Entity> source = context.getJavaEnv().addSource(new 
MyDataSource());
-
-        //2) async高性能写入
-        new ClickHouseSink(context).asyncSink(source, value ->
-                String.format("insert into test.orders(userId, siteId) values 
(%d,%d)", value.userId, value.siteId)
-        ).setParallelism(1);
+  public static void main(String[] args) {
+    StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+    StreamingContext context = new StreamingContext(envConfig);
+    DataStreamSource<Entity> source = context.getJavaEnv().addSource(new 
MyDataSource());
 
-        //3) jdbc方式写入
-        /**
-         *
-         * new ClickHouseSink(context).jdbcSink(source, bean ->
-         *      String.format("insert into test.orders(userId, siteId) values 
(%d,%d)", bean.userId, bean.siteId)
-         * ).setParallelism(1);
-         *
-         */
-        context.start();
-    }
+    // 2) async高性能写入
+    new ClickHouseSink(context)
+        .asyncSink(
+            source,
+            value ->
+                String.format(
+                    "insert into test.orders(userId, siteId) values (%d,%d)",
+                    value.userId, value.siteId))
+        .setParallelism(1);
 
+    // 3) jdbc方式写入
+    /**
+     * new ClickHouseSink(context).jdbcSink(source, bean -> 
String.format("insert into
+     * test.orders(userId, siteId) values (%d,%d)", bean.userId, bean.siteId) 
).setParallelism(1);
+     */
+    context.start();
+  }
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
index 551a28583..2d8091199 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
@@ -17,28 +17,28 @@
 package org.apache.streampark.flink.quickstart.connector;
 
 import org.apache.streampark.flink.connector.doris.sink.DorisSink;
-import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
 import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
+import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
 import org.apache.streampark.flink.core.StreamEnvConfig;
 import org.apache.streampark.flink.core.scala.StreamingContext;
+
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.streaming.api.datastream.DataStream;
 
-/**
- * @author wudi
- **/
+/** @author wudi */
 public class DorisJavaApp {
 
-    public static void main(String[] args) {
-        StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
-        StreamingContext context = new StreamingContext(envConfig);
-        DataStream<String> source = new KafkaJavaSource<String>(context)
-                .getDataStream()
-                .map((MapFunction<KafkaRecord<String>, String>) 
KafkaRecord::value)
-                .returns(String.class);
+  public static void main(String[] args) {
+    StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+    StreamingContext context = new StreamingContext(envConfig);
+    DataStream<String> source =
+        new KafkaJavaSource<String>(context)
+            .getDataStream()
+            .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value)
+            .returns(String.class);
 
-        new DorisSink<String>(context).sink(source);
+    new DorisSink<String>(context).sink(source);
 
-        context.start();
-    }
+    context.start();
+  }
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
index 81bfaa338..643d6840b 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
@@ -20,18 +20,17 @@ import 
org.apache.streampark.flink.connector.http.sink.HttpSink;
 import org.apache.streampark.flink.core.StreamEnvConfig;
 import org.apache.streampark.flink.core.scala.StreamingContext;
 import org.apache.streampark.flink.quickstart.connector.bean.Entity;
+
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 
-/**
- * @author wudi
- **/
+/** @author wudi */
 public class HttpJavaApp {
 
-    public static void main(String[] args) {
-        StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
-        StreamingContext context = new StreamingContext(envConfig);
-        DataStreamSource<Entity> source = context.getJavaEnv().addSource(new 
MyDataSource());
-        new HttpSink(context).get(source.map(x -> 
String.format("http://www.qq.com?id=%d";, x.userId)));
-        context.start();
-    }
+  public static void main(String[] args) {
+    StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+    StreamingContext context = new StreamingContext(envConfig);
+    DataStreamSource<Entity> source = context.getJavaEnv().addSource(new 
MyDataSource());
+    new HttpSink(context).get(source.map(x -> 
String.format("http://www.qq.com?id=%d";, x.userId)));
+    context.start();
+  }
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
index e45dadf3c..740ca08d8 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
@@ -17,42 +17,45 @@
 package org.apache.streampark.flink.quickstart.connector;
 
 import org.apache.streampark.common.util.JsonUtils;
+import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
 import org.apache.streampark.flink.connector.kafka.sink.KafkaJavaSink;
 import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
-import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
 import org.apache.streampark.flink.core.StreamEnvConfig;
 import org.apache.streampark.flink.core.scala.StreamingContext;
 import org.apache.streampark.flink.quickstart.connector.bean.Behavior;
+
 import org.apache.flink.api.common.functions.MapFunction;
 import org.apache.flink.api.common.serialization.SerializationSchema;
 import org.apache.flink.streaming.api.datastream.DataStream;
 
-/**
- * @author benjobs
- */
+/** @author benjobs */
 public class KafkaJavaApp {
 
-    public static void main(String[] args) {
-
-        StreamEnvConfig javaConfig = new StreamEnvConfig(args, (environment, 
parameterTool) -> {
-            //environment.getConfig().enableForceAvro();
-            System.out.println("environment argument set...");
-        });
-
-        StreamingContext context = new StreamingContext(javaConfig);
-
-        //1) 从 kafka 中读取数据
-        DataStream<Behavior> source = new KafkaJavaSource<String>(context)
-                .getDataStream()
-                .map((MapFunction<KafkaRecord<String>, Behavior>) value -> 
JsonUtils.read(value, Behavior.class));
-
-
-        // 2) 将数据写入其他 kafka 主题
-        new KafkaJavaSink<Behavior>(context)
-                .serializer((SerializationSchema<Behavior>) element -> 
JsonUtils.write(element).getBytes())
-                .sink(source);
-
-        context.start();
-    }
-
+  public static void main(String[] args) {
+
+    StreamEnvConfig javaConfig =
+        new StreamEnvConfig(
+            args,
+            (environment, parameterTool) -> {
+              // environment.getConfig().enableForceAvro();
+              System.out.println("environment argument set...");
+            });
+
+    StreamingContext context = new StreamingContext(javaConfig);
+
+    // 1) 从 kafka 中读取数据
+    DataStream<Behavior> source =
+        new KafkaJavaSource<String>(context)
+            .getDataStream()
+            .map(
+                (MapFunction<KafkaRecord<String>, Behavior>)
+                    value -> JsonUtils.read(value, Behavior.class));
+
+    // 2) 将数据写入其他 kafka 主题
+    new KafkaJavaSink<Behavior>(context)
+        .serializer((SerializationSchema<Behavior>) element -> 
JsonUtils.write(element).getBytes())
+        .sink(source);
+
+    context.start();
+  }
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
index 97611b0d1..9c86f1b0e 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
@@ -16,7 +16,6 @@
  */
 package org.apache.streampark.flink.quickstart.connector;
 
-import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.streampark.flink.connector.function.SQLQueryFunction;
 import org.apache.streampark.flink.connector.function.SQLResultFunction;
 import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
@@ -24,47 +23,51 @@ import org.apache.streampark.flink.core.StreamEnvConfig;
 import org.apache.streampark.flink.core.scala.StreamingContext;
 import org.apache.streampark.flink.quickstart.connector.bean.Order;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.List;
 
 public class MySQLJavaApp {
 
-    public static void main(String[] args) {
-
-        StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+  public static void main(String[] args) {
 
-        StreamingContext context = new StreamingContext(envConfig);
+    StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
 
-        //读取MySQL数据源
-        new JdbcJavaSource<Order>(context)
-                .getDataStream(
-                        (SQLQueryFunction<Order>) lastOne -> {
-                            //5秒抽取一次
-                            Thread.sleep(3000);
-                            Serializable lastOffset = lastOne == null ? 
"2020-10-10 23:00:00" : lastOne.getTimestamp();
-                            return String.format(
-                                    "select * from t_order " +
-                                            "where timestamp > '%s' " +
-                                            "order by timestamp asc ",
-                                    lastOffset
-                            );
-                        },
-                        (SQLResultFunction<Order>) map -> {
-                            List<Order> result = new ArrayList<>();
-                            map.forEach(item -> {
-                                Order order = new Order();
-                                
order.setOrderId(item.get("order_id").toString());
-                                
order.setMarketId(item.get("market_id").toString());
-                                
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
-                                result.add(order);
-                            });
-                            return result;
-                        })
-                .returns(TypeInformation.of(Order.class))
-                .print("jdbc source: >>>>>");
+    StreamingContext context = new StreamingContext(envConfig);
 
-        context.start();
+    // 读取MySQL数据源
+    new JdbcJavaSource<Order>(context, Order.class)
+        .getDataStream(
+            (SQLQueryFunction<Order>)
+                lastOne -> {
+                  // 5秒抽取一次
+                  Thread.sleep(3000);
+                  Serializable lastOffset =
+                      lastOne == null ? "2020-10-10 23:00:00" : 
lastOne.getTimestamp();
+                  return String.format(
+                      "select * from t_order "
+                          + "where timestamp > '%s' "
+                          + "order by timestamp asc ",
+                      lastOffset);
+                },
+            (SQLResultFunction<Order>)
+                map -> {
+                  List<Order> result = new ArrayList<>();
+                  map.forEach(
+                      item -> {
+                        Order order = new Order();
+                        order.setOrderId(item.get("order_id").toString());
+                        order.setMarketId(item.get("market_id").toString());
+                        
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
+                        result.add(order);
+                      });
+                  return result;
+                })
+        .returns(TypeInformation.of(Order.class))
+        .print("jdbc source: >>>>>");
 
-    }
+    context.start();
+  }
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
index 6bfbb3a33..d01165370 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
@@ -18,14 +18,12 @@ package 
org.apache.streampark.flink.quickstart.connector.bean;
 
 import lombok.Data;
 
-/**
- * @author benjobs
- */
+/** @author benjobs */
 @Data
 public class Behavior {
-    private String user_id;
-    private Long item_id;
-    private Long category_id;
-    private String behavior;
-    private Long ts;
+  private String user_id;
+  private Long item_id;
+  private Long category_id;
+  private String behavior;
+  private Long ts;
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
index a5b5b3282..09653fe2a 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
@@ -16,20 +16,15 @@
  */
 package org.apache.streampark.flink.quickstart.connector.bean;
 
-
-/**
- * @author benjobs
- */
-
+/** @author benjobs */
 public class Entity {
 
-    public Long userId;
-    public Long orderId;
-    public Long siteId;
-    public Long cityId;
-    public Integer orderStatus;
-    public Double price;
-    public Integer quantity;
-    public Long timestamp;
-
+  public Long userId;
+  public Long orderId;
+  public Long siteId;
+  public Long cityId;
+  public Integer orderStatus;
+  public Double price;
+  public Integer quantity;
+  public Long timestamp;
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
index c0114d2c0..b143d70e3 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
@@ -22,9 +22,9 @@ import java.io.Serializable;
 
 @Data
 public class LogBean implements Serializable {
-    private String platenum;
-    private String cardType;
-    private Long inTime;
-    private Long outTime;
-    private String controlid;
+  private String platenum;
+  private String cardType;
+  private Long inTime;
+  private Long outTime;
+  private String controlid;
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
index c561a1b52..8f98f4d67 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
@@ -18,10 +18,12 @@ package 
org.apache.streampark.flink.quickstart.connector.bean;
 
 import lombok.Data;
 
+import java.io.Serializable;
+
 @Data
-public class Order {
-    private String orderId;
-    private String marketId;
-    private Double price;
-    private Long timestamp;
+public class Order implements Serializable {
+  private String orderId;
+  private String marketId;
+  private Double price;
+  private Long timestamp;
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
index 79c0950af..db70be422 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.quickstart.connector
 import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink
 import org.apache.streampark.flink.core.scala.FlinkStreaming
 import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 
 object ClickHouseSinkApp extends FlinkStreaming {
@@ -42,12 +43,12 @@ object ClickHouseSinkApp extends FlinkStreaming {
     val source = context.addSource(new MyDataSource)
 
     // 2)高性能异步写入
-    ClickHouseSink().asyncSink(source)(x => {s"insert into 
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
+    ClickHouseSink().asyncSink(source)(
+      x => { s"insert into test.orders(userId,siteId) values 
(${x.userId},${x.siteId})" })
 
-    //3) jdbc方式写入
+    // 3) jdbc方式写入
     // ClickHouseSink().jdbcSink(source)(x => {s"insert into 
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
 
-
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
index fb9d80038..190c0eca6 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
@@ -16,12 +16,13 @@
  */
 package org.apache.streampark.flink.quickstart.connector
 
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.scala.DataStream
 import org.apache.streampark.flink.connector.elasticsearch7.sink.ES7Sink
 import 
org.apache.streampark.flink.connector.elasticsearch7.util.ElasticsearchUtils
 import org.apache.streampark.flink.core.scala.FlinkStreaming
 import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
 import org.elasticsearch.action.index.IndexRequest
 import org.json4s.jackson.JsonMethods
 
@@ -33,9 +34,9 @@ object ES7SinkApp extends FlinkStreaming {
     val source: DataStream[Entity] = context.addSource(new MyDataSource)
 
     implicit def indexedSeq(x: Entity): IndexRequest = 
ElasticsearchUtils.indexRequest(
-        "flink_order",
-        s"${x.orderId}_${x.timestamp}",
-        JsonMethods.mapper.writeValueAsString(x)
+      "flink_order",
+      s"${x.orderId}_${x.timestamp}",
+      JsonMethods.mapper.writeValueAsString(x)
     )
 
     ES7Sink().sink[Entity](source)
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
index e41c62ff7..c3e49b36f 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
@@ -20,6 +20,7 @@ import org.apache.streampark.common.util.ConfigUtils
 import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
 import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
 import org.apache.streampark.flink.core.scala.FlinkStreaming
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.hadoop.hbase.client.Get
 
@@ -27,22 +28,27 @@ object HBaseRequestApp extends FlinkStreaming {
 
   implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
 
-  implicit val reqType: TypeInformation[(String, Boolean)] = 
TypeInformation.of(classOf[(String, Boolean)])
+  implicit val reqType: TypeInformation[(String, Boolean)] =
+    TypeInformation.of(classOf[(String, Boolean)])
 
   override def handle(): Unit = {
 
     implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
-    //one topic
+    // one topic
     val source = context.fromCollection(Seq("123456", "1111", "222"))
 
     source.print("source:>>>")
 
-    HBaseRequest(source).requestOrdered[(String, Boolean)](x => {
-      new HBaseQuery("person", new Get(x.getBytes()))
-    }, timeout = 5000, resultFunc = (a, r) => {
-      a -> !r.isEmpty
-    }).print(" check.... ")
-
+    HBaseRequest(source)
+      .requestOrdered[(String, Boolean)](
+        x => {
+          new HBaseQuery("person", new Get(x.getBytes()))
+        },
+        timeout = 5000,
+        resultFunc = (a, r) => {
+          a -> !r.isEmpty
+        })
+      .print(" check.... ")
 
   }
 
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
index 0a4695590..11fa2d484 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
@@ -20,11 +20,13 @@ import org.apache.streampark.common.util.ConfigUtils
 import org.apache.streampark.flink.connector.hbase.sink.{HBaseOutputFormat, 
HBaseSink}
 import org.apache.streampark.flink.core.scala.FlinkStreaming
 import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.hadoop.hbase.client.{Mutation, Put}
 import org.apache.hadoop.hbase.util.Bytes
 
 import java.util.{Collections, Random}
+
 import scala.language.implicitConversions
 
 object HBaseSinkApp extends FlinkStreaming {
@@ -36,9 +38,10 @@ object HBaseSinkApp extends FlinkStreaming {
     val source = context.addSource(new MyDataSource)
     val random = new Random()
 
-    //定义转换规则...
+    // 定义转换规则...
     implicit def entry2Put(entity: Entity): java.lang.Iterable[Mutation] = {
-      val put = new Put(Bytes.toBytes(System.nanoTime() + 
random.nextInt(1000000)), entity.timestamp)
+      val put =
+        new Put(Bytes.toBytes(System.nanoTime() + random.nextInt(1000000)), 
entity.timestamp)
       put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"), 
Bytes.toBytes(entity.cityId))
       put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"), 
Bytes.toBytes(entity.orderId))
       put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"), 
Bytes.toBytes(entity.orderStatus))
@@ -46,18 +49,17 @@ object HBaseSinkApp extends FlinkStreaming {
       put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"), 
Bytes.toBytes(entity.siteId))
       Collections.singleton(put)
     }
-    //source ===> trans ===> sink
+    // source ===> trans ===> sink
 
-    //1)插入方式1
+    // 1)插入方式1
     HBaseSink().sink[Entity](source, "order")
 
-    //2) 插入方式2
-    //1.指定HBase 配置文件
+    // 2) 插入方式2
+    // 1.指定HBase 配置文件
     val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
-    //2.插入...
+    // 2.插入...
     source.writeUsingOutputFormat(new HBaseOutputFormat[Entity]("order", prop))
 
-
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
index 5e2a9b5c4..3d56679d7 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
@@ -21,6 +21,7 @@ import 
org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
 import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
 import org.apache.streampark.flink.connector.hbase.source.HBaseSource
 import org.apache.streampark.flink.core.scala.FlinkStreaming
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.hadoop.hbase.CellUtil
 import org.apache.hadoop.hbase.client.{Get, Scan}
@@ -36,38 +37,47 @@ object HBaseSourceApp extends FlinkStreaming {
 
     implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
 
-    val id = HBaseSource().getDataStream[String](query => {
-      Thread.sleep(10)
-      if (query == null) {
-        new HBaseQuery("person", new Scan())
-      } else {
-        //TODO 从上一条记录中获取便宜量,决定下次查询的条件...
-        new HBaseQuery("person", new Scan())
-      }
-    }, r => new String(r.getRow), null)
+    val id = HBaseSource().getDataStream[String](
+      query => {
+        Thread.sleep(10)
+        if (query == null) {
+          new HBaseQuery("person", new Scan())
+        } else {
+          // TODO 从上一条记录中获取便宜量,决定下次查询的条件...
+          new HBaseQuery("person", new Scan())
+        }
+      },
+      r => new String(r.getRow),
+      null
+    )
 
-    HBaseRequest(id).requestOrdered(x => {
-      new HBaseQuery("person", new Get(x.getBytes()))
-    }, (a, r) => {
-      val map = new util.HashMap[String, String]()
-      val cellScanner = r.cellScanner()
-      while (cellScanner.advance()) {
-        val cell = cellScanner.current()
-        val q = Bytes.toString(CellUtil.cloneQualifier(cell))
-        val (name, v) = q.split("_") match {
-          case Array(_type, name) =>
-            _type match {
-              case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
-              case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
-              case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
-              case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
+    HBaseRequest(id)
+      .requestOrdered(
+        x => {
+          new HBaseQuery("person", new Get(x.getBytes()))
+        },
+        (a, r) => {
+          val map = new util.HashMap[String, String]()
+          val cellScanner = r.cellScanner()
+          while (cellScanner.advance()) {
+            val cell = cellScanner.current()
+            val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+            val (name, v) = q.split("_") match {
+              case Array(_type, name) =>
+                _type match {
+                  case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
+                  case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
+                  case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
+                  case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
+                }
+              case _ =>
             }
-          case _ =>
+            map.put(name.toString, v.toString)
+          }
+          map.toString
         }
-        map.put(name.toString, v.toString)
-      }
-      map.toString
-    }).print("Async")
+      )
+      .print("Async")
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
index 763daf92d..80b238798 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.quickstart.connector
 import org.apache.streampark.flink.connector.http.sink.HttpSink
 import org.apache.streampark.flink.core.scala.FlinkStreaming
 import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 
 object HttpSinkApp extends FlinkStreaming {
@@ -28,10 +29,9 @@ object HttpSinkApp extends FlinkStreaming {
 
   override def handle(): Unit = {
 
-    /**
-     * source
-     */
-    val source = context.addSource(new MyDataSource)
+    /** source */
+    val source = context
+      .addSource(new MyDataSource)
       .map(x => s"http://www.qq.com?id=${x.userId}";)
 
     // sink
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
index f1c289604..0f18055e0 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
@@ -19,14 +19,13 @@ package org.apache.streampark.flink.quickstart.connector
 import org.apache.streampark.flink.connector.influx.bean.InfluxEntity
 import org.apache.streampark.flink.connector.influx.sink.InfluxSink
 import org.apache.streampark.flink.core.scala.FlinkStreaming
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 
 import scala.util.Random
 
-/**
- * 侧输出流
- */
+/** 侧输出流 */
 object InfluxDBSinkApp extends FlinkStreaming {
 
   implicit val entityType: TypeInformation[Weather] = 
TypeInformation.of(classOf[Weather])
@@ -34,14 +33,17 @@ object InfluxDBSinkApp extends FlinkStreaming {
   override def handle(): Unit = {
     val source = context.addSource(new WeatherSource())
 
-    //weather,altitude=1000,area=北 temperature=11,humidity=-4
+    // weather,altitude=1000,area=北 temperature=11,humidity=-4
 
     implicit val entity: InfluxEntity[Weather] = new InfluxEntity[Weather](
       "mydb",
       "test",
       "autogen",
       (x: Weather) => Map("altitude" -> x.altitude.toString, "area" -> x.area),
-      (x: Weather) => Map("temperature" -> x.temperature.asInstanceOf[Object], 
"humidity" -> x.humidity.asInstanceOf[Object])
+      (x: Weather) =>
+        Map(
+          "temperature" -> x.temperature.asInstanceOf[Object],
+          "humidity" -> x.humidity.asInstanceOf[Object])
     )
 
     InfluxSink().sink(source, "mydb")
@@ -50,17 +52,8 @@ object InfluxDBSinkApp extends FlinkStreaming {
 
 }
 
-/**
- *
- * 温度 temperature
- * 湿度 humidity
- * 地区 area
- * 海拔 altitude
- */
-case class Weather(temperature: Long,
-                   humidity: Long,
-                   area: String,
-                   altitude: Long)
+/** 温度 temperature 湿度 humidity 地区 area 海拔 altitude */
+case class Weather(temperature: Long, humidity: Long, area: String, altitude: 
Long)
 
 class WeatherSource extends SourceFunction[Weather] {
 
@@ -82,4 +75,3 @@ class WeatherSource extends SourceFunction[Weather] {
   }
 
 }
-
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
index 01a792474..cb4c993ad 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.quickstart.connector
 import org.apache.streampark.flink.connector.jdbc.sink.JdbcSink
 import org.apache.streampark.flink.connector.kafka.source.KafkaSource
 import org.apache.streampark.flink.core.scala.FlinkStreaming
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 
 object JdbcSinkApp extends FlinkStreaming {
@@ -27,28 +28,30 @@ object JdbcSinkApp extends FlinkStreaming {
 
   override def handle(): Unit = {
 
-    /**
-     * 从kafka里读数据.这里的数据是数字或者字母,每次读取1条
-     */
-    val source = KafkaSource().getDataStream[String]()
+    /** 从kafka里读数据.这里的数据是数字或者字母,每次读取1条 */
+    val source = KafkaSource()
+      .getDataStream[String]()
       .uid("kfkSource1")
       .name("kfkSource1")
-      .map(x => {
-        x.value
-      })
-
+      .map(
+        x => {
+          x.value
+        })
 
     /**
-     * 假设这里有一个orders表.有一个字段,id的类型是int
-     * 在数据插入的时候制造异常:
-     * 1)正确情况: 当从kafka中读取的内容全部是数字时会插入成功,kafka的消费的offset也会更新.
-     * 如: 当前kafka 
size为20,手动输入10个数字,则size为30,然后会将这10个数字写入到Mysql,kafka的offset也会更新
-     * 2)异常情况: 当从kafka中读取的内容非数字会导致插入失败,kafka的消费的offset会回滚
-     * 如: 当前的kafka size为30,offset是30, 
手动输入1个字母,此时size为31,写入mysql会报错,kafka的offset依旧是30,不会发生更新.
+     * 假设这里有一个orders表.有一个字段,id的类型是int 在数据插入的时候制造异常: 1)正确情况:
+     * 当从kafka中读取的内容全部是数字时会插入成功,kafka的消费的offset也会更新. 如: 当前kafka
+     * size为20,手动输入10个数字,则size为30,然后会将这10个数字写入到Mysql,kafka的offset也会更新 2)异常情况:
+     * 当从kafka中读取的内容非数字会导致插入失败,kafka的消费的offset会回滚 如: 当前的kafka 
size为30,offset是30,
+     * 手动输入1个字母,此时size为31,写入mysql会报错,kafka的offset依旧是30,不会发生更新.
      */
-    JdbcSink(parallelism = 5).sink[String](source)(x => {
-      s"insert into orders(id,timestamp) 
values('$x',${System.currentTimeMillis()})"
-    }).uid("mysqlSink").name("mysqlSink")
+    JdbcSink(parallelism = 5)
+      .sink[String](source)(
+        x => {
+          s"insert into orders(id,timestamp) 
values('$x',${System.currentTimeMillis()})"
+        })
+      .uid("mysqlSink")
+      .name("mysqlSink")
 
   }
 
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
index b4e82ab69..af8aa55aa 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
@@ -1,5 +1,5 @@
 /*
-  * Licensed to the Apache Software Foundation (ASF) under one or more
+ * Licensed to the Apache Software Foundation (ASF) under one or more
  * contributor license agreements.  See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
@@ -18,12 +18,12 @@ package org.apache.streampark.flink.quickstart.connector
 
 import org.apache.streampark.flink.connector.kafka.sink.KafkaSink
 import org.apache.streampark.flink.core.scala.FlinkStreaming
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 
 import scala.util.Random
 
-
 object KafkaSinkApp extends FlinkStreaming {
 
   implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
@@ -38,12 +38,7 @@ object KafkaSinkApp extends FlinkStreaming {
 
 }
 
-
-case class Behavior(user_id: String,
-                    item_id: Long,
-                    category_id: Long,
-                    behavior: String,
-                    ts: Long) {
+case class Behavior(user_id: String, item_id: Long, category_id: Long, 
behavior: String, ts: Long) {
   override def toString: String = {
     s"""
        |{
@@ -57,7 +52,6 @@ case class Behavior(user_id: String,
   }
 }
 
-
 class BehaviorSource extends SourceFunction[Behavior] {
   private[this] var isRunning = true
 
@@ -74,11 +68,10 @@ class BehaviorSource extends SourceFunction[Behavior] {
       val item_id = random.nextInt(100)
       val category_id = random.nextInt(20)
       val behavior = seq(random.nextInt(5))
-      val order = Behavior(user_id.toString, item_id, category_id, behavior, 
System.currentTimeMillis())
+      val order =
+        Behavior(user_id.toString, item_id, category_id, behavior, 
System.currentTimeMillis())
       ctx.collect(order)
     }
   }
 
 }
-
-
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
index e9e3eaa08..651605678 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
@@ -16,14 +16,16 @@
  */
 package org.apache.streampark.flink.quickstart.connector
 
-import org.apache.flink.streaming.api.scala._
 import org.apache.streampark.flink.connector.kafka.source.KafkaSource
 import org.apache.streampark.flink.core.scala.FlinkStreaming
 
+import org.apache.flink.streaming.api.scala._
+
 object KafkaSourceApp extends FlinkStreaming {
   override def handle(): Unit = {
 
-    KafkaSource().getDataStream[String]()
+    KafkaSource()
+      .getDataStream[String]()
       .map(_.value)
       .print()
 
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
index c0db74056..4329758ff 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
@@ -16,13 +16,15 @@
  */
 package org.apache.streampark.flink.quickstart.connector
 
-import com.mongodb.BasicDBObject
 import org.apache.streampark.common.util.{DateUtils, JsonUtils}
 import org.apache.streampark.flink.connector.mongo.source.MongoSource
 import org.apache.streampark.flink.core.scala.FlinkStreaming
+
+import com.mongodb.BasicDBObject
 import org.apache.flink.api.common.typeinfo.TypeInformation
 
 import java.util.Properties
+
 import scala.collection.JavaConversions._
 
 object MongoSourceApp extends FlinkStreaming {
@@ -32,21 +34,26 @@ object MongoSourceApp extends FlinkStreaming {
   override def handle(): Unit = {
     implicit val prop: Properties = context.parameter.getProperties
     val source = MongoSource()
-    source.getDataStream[String](
-      "shop",
-      (a, d) => {
-        Thread.sleep(1000)
-        /**
-         * 从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset
-         */
-        val offset = if (a == null) "2019-09-27 00:00:00" else {
-          JsonUtils.read[Map[String, _]](a).get("updateTime").toString
-        }
-        val cond = new BasicDBObject().append("updateTime", new 
BasicDBObject("$gte", DateUtils.parse(offset)))
-        d.find(cond)
-      },
-      _.toList.map(_.toJson()), null
-    ).print()
+    source
+      .getDataStream[String](
+        "shop",
+        (a, d) => {
+          Thread.sleep(1000)
+
+          /** 
从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset */
+          val offset =
+            if (a == null) "2019-09-27 00:00:00"
+            else {
+              JsonUtils.read[Map[String, _]](a).get("updateTime").toString
+            }
+          val cond = new BasicDBObject()
+            .append("updateTime", new BasicDBObject("$gte", 
DateUtils.parse(offset)))
+          d.find(cond)
+        },
+        _.toList.map(_.toJson()),
+        null
+      )
+      .print()
   }
 
 }
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
index af84ea674..5348d104d 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
@@ -17,11 +17,11 @@
 package org.apache.streampark.flink.quickstart.connector
 
 import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
 import org.apache.flink.streaming.api.functions.source.SourceFunction
 
 import scala.util.Random
 
-
 class MyDataSource extends SourceFunction[Entity] {
 
   private[this] var isRunning = true
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
index 5ab968edb..dcc28ffd1 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
@@ -18,6 +18,7 @@ package org.apache.streampark.flink.quickstart.connector
 
 import org.apache.streampark.flink.connector.jdbc.source.JdbcSource
 import org.apache.streampark.flink.core.scala.FlinkStreaming
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 
 object MySQLSourceApp extends FlinkStreaming {
@@ -26,12 +27,16 @@ object MySQLSourceApp extends FlinkStreaming {
 
   override def handle(): Unit = {
 
-    JdbcSource().getDataStream[Order](lastOne => {
-      val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else 
lastOne.timestamp
-      s"select * from t_order where timestamp > '$laseOffset' order by 
timestamp asc "
-    },
-      _.map(x => new Order(x("market_id").toString, x("timestamp").toString)), 
null
-    ).print()
+    JdbcSource()
+      .getDataStream[Order](
+        lastOne => {
+          val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else 
lastOne.timestamp
+          s"select * from t_order where timestamp > '$laseOffset' order by 
timestamp asc "
+        },
+        _.map(x => new Order(x("market_id").toString, 
x("timestamp").toString)),
+        null
+      )
+      .print()
 
   }
 
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
index 179c57f7d..24180b834 100644
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
@@ -17,6 +17,7 @@
 package org.apache.streampark.flink.quickstart.connector
 
 import org.apache.streampark.flink.core.scala.FlinkStreaming
+
 import org.apache.flink.api.common.typeinfo.TypeInformation
 import org.apache.flink.streaming.api.functions.ProcessFunction
 import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -25,9 +26,7 @@ import org.apache.flink.util.Collector
 
 import scala.util.Random
 
-/**
- * 侧输出流
- */
+/** 侧输出流 */
 object SideOutApp extends FlinkStreaming {
 
   implicit val entityType: TypeInformation[SideEntry] = 
TypeInformation.of(classOf[SideEntry])
@@ -35,14 +34,14 @@ object SideOutApp extends FlinkStreaming {
   override def handle(): Unit = {
     val source = context.addSource(new SideSource())
 
-    /**
-     * 侧输出流。。。
-     * 官方写法:设置侧输出流
-     */
+    /** 侧输出流。。。 官方写法:设置侧输出流 */
     val side1 = source.process(new ProcessFunction[SideEntry, SideEntry] {
       val tag = new OutputTag[SideEntry]("flink")
 
-      override def processElement(value: SideEntry, ctx: 
ProcessFunction[SideEntry, SideEntry]#Context, out: Collector[SideEntry]): Unit 
= {
+      override def processElement(
+          value: SideEntry,
+          ctx: ProcessFunction[SideEntry, SideEntry]#Context,
+          out: Collector[SideEntry]): Unit = {
         if (value.userId < 100) {
           ctx.output(tag, value)
         } else {
@@ -51,7 +50,7 @@ object SideOutApp extends FlinkStreaming {
       }
     })
 
-    //官方写法,获取侧输出流
+    // 官方写法,获取侧输出流
     side1.getSideOutput(new 
OutputTag[SideEntry]("flink")).print("flink:========>")
 
   }
@@ -59,26 +58,35 @@ object SideOutApp extends FlinkStreaming {
 }
 
 /**
- *
- * @param userId      : 用户Id
- * @param orderId     : 订单ID
- * @param siteId      : 站点ID
- * @param cityId      : 城市Id
- * @param orderStatus : 订单状态(1:下单,0:退单)
- * @param isNewOrder  : 是否是首单
- * @param price       : 单价
- * @param quantity    : 订单数量
- * @param timestamp   : 下单时间
+ * @param userId
+ *   : 用户Id
+ * @param orderId
+ *   : 订单ID
+ * @param siteId
+ *   : 站点ID
+ * @param cityId
+ *   : 城市Id
+ * @param orderStatus
+ *   : 订单状态(1:下单,0:退单)
+ * @param isNewOrder
+ *   : 是否是首单
+ * @param price
+ *   : 单价
+ * @param quantity
+ *   : 订单数量
+ * @param timestamp
+ *   : 下单时间
  */
-case class SideEntry(userId: Long,
-                     orderId: Long,
-                     siteId: Long,
-                     cityId: Long,
-                     orderStatus: Int,
-                     isNewOrder: Int,
-                     price: Double,
-                     quantity: Int,
-                     timestamp: Long)
+case class SideEntry(
+    userId: Long,
+    orderId: Long,
+    siteId: Long,
+    cityId: Long,
+    orderStatus: Int,
+    isNewOrder: Int,
+    price: Double,
+    quantity: Int,
+    timestamp: Long)
 
 class SideSource extends SourceFunction[SideEntry] {
 
@@ -96,10 +104,18 @@ class SideSource extends SourceFunction[SideEntry] {
       val isNew = random.nextInt(1)
       val price = random.nextDouble()
       val quantity = new Random(10).nextInt()
-      val order = SideEntry(userId, orderId, siteId = 1, cityId = 1, status, 
isNew, price, quantity, System.currentTimeMillis)
+      val order = SideEntry(
+        userId,
+        orderId,
+        siteId = 1,
+        cityId = 1,
+        status,
+        isNew,
+        price,
+        quantity,
+        System.currentTimeMillis)
       ctx.collect(order)
     }
   }
 
 }
-
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
index ca19fd9e5..4eccc4490 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
@@ -17,46 +17,69 @@
 
 package org.apache.streampark.flink.connector.hbase.source;
 
-import org.apache.streampark.common.util.AssertUtils;
+import org.apache.streampark.common.util.ConfigUtils;
 import org.apache.streampark.flink.connector.function.RunningFunction;
 import org.apache.streampark.flink.connector.hbase.function.HBaseQueryFunction;
 import 
org.apache.streampark.flink.connector.hbase.function.HBaseResultFunction;
 import 
org.apache.streampark.flink.connector.hbase.internal.HBaseSourceFunction;
 import org.apache.streampark.flink.core.scala.StreamingContext;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 
 import java.util.Properties;
 
 public class HBaseJavaSource<T> {
   private final StreamingContext context;
-  private final Properties property;
+  private Properties property;
+  private String alias;
+  private final TypeInformation<T> typeInformation;
 
-  public HBaseJavaSource(StreamingContext context, Properties property) {
+  public HBaseJavaSource(StreamingContext context, Class<T> typeInfo) {
+    if (context == null) {
+      throw new NullPointerException("context must not be null");
+    }
+    if (typeInfo == null) {
+      throw new NullPointerException("typeInfo must not be null");
+    }
     this.context = context;
+    this.typeInformation = TypeInformation.of(typeInfo);
+  }
+
+  public HBaseJavaSource<T> property(Properties property) {
     this.property = property;
+    return this;
   }
 
-    public DataStreamSource<T> getDataStream(
-        HBaseQueryFunction<T> queryFunction,
-        HBaseResultFunction<T> resultFunction) {
-      return getDataStream(queryFunction, resultFunction, null);
-    }
+  public HBaseJavaSource<T> alias(String alias) {
+    this.alias = alias;
+    return this;
+  }
+
+  public DataStreamSource<T> getDataStream(
+      HBaseQueryFunction<T> queryFunction, HBaseResultFunction<T> 
resultFunction) {
+    return getDataStream(queryFunction, resultFunction, null);
+  }
 
   public DataStreamSource<T> getDataStream(
       HBaseQueryFunction<T> queryFunction,
       HBaseResultFunction<T> resultFunction,
       RunningFunction runningFunc) {
 
-    AssertUtils.notNull(queryFunction, "queryFunction must not be null");
-    AssertUtils.notNull(resultFunction, "resultFunction must not be null");
-    HBaseSourceFunction<T> sourceFunction = new HBaseSourceFunction<>(
-        property,
-        queryFunction,
-        resultFunction,
-        runningFunc,
-        null
-    );
+    if (queryFunction == null) {
+      throw new NullPointerException("HBaseJavaSource error: query function 
cannot be null");
+    }
+    if (resultFunction == null) {
+      throw new NullPointerException("HBaseJavaSource error: result function 
cannot be null");
+    }
+
+    if (this.property == null) {
+      this.property = ConfigUtils.getHBaseConfig(context.parameter().toMap(), 
alias);
+    }
+
+    HBaseSourceFunction<T> sourceFunction =
+        new HBaseSourceFunction<>(
+            property, queryFunction, resultFunction, runningFunc, 
typeInformation);
     return context.getJavaEnv().addSource(sourceFunction);
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
index 726ff74ee..50bfc4ea6 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
@@ -17,7 +17,7 @@
 
 package org.apache.streampark.flink.connector.hbase.source
 
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.ConfigUtils
 import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
 import org.apache.streampark.flink.connector.hbase.internal.HBaseSourceFunction
 import org.apache.streampark.flink.core.scala.StreamingContext
@@ -32,8 +32,8 @@ import scala.annotation.meta.param
 
 object HBaseSource {
 
-  def apply(@(transient @param) property: Properties = new 
Properties())(implicit
-      ctx: StreamingContext): HBaseSource = new HBaseSource(ctx, property)
+  def apply(@(transient @param) alias: String = "", properties: Properties = 
new Properties())(
+      implicit ctx: StreamingContext): HBaseSource = new HBaseSource(ctx, 
alias, properties)
 
 }
 
@@ -45,14 +45,25 @@ object HBaseSource {
  */
 class HBaseSource(
     @(transient @param) val ctx: StreamingContext,
-    property: Properties = new Properties()) {
+    alias: String,
+    property: Properties) {
 
   def getDataStream[R: TypeInformation](
       query: R => HBaseQuery,
       func: Result => R,
-      running: Unit => Boolean)(implicit prop: Properties = new Properties()): 
DataStream[R] = {
-    Utils.copyProperties(property, prop)
-    val hBaseFunc = new HBaseSourceFunction[R](prop, query, func, running)
+      running: Unit => Boolean): DataStream[R] = {
+
+    if (query == null) {
+      throw new NullPointerException("getDataStream error, SQLQueryFunction 
must not be null")
+    }
+    if (func == null) {
+      throw new NullPointerException("getDataStream error, SQLResultFunction 
must not be null")
+    }
+    val jdbc = ConfigUtils.getHBaseConfig(ctx.parameter.toMap)
+    if (property != null) {
+      jdbc.putAll(property)
+    }
+    val hBaseFunc = new HBaseSourceFunction[R](jdbc, query, func, running)
     ctx.addSource(hBaseFunc)
   }
 
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index d611de227..fdb6b8c50 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -17,7 +17,6 @@
 
 package org.apache.streampark.flink.connector.jdbc.source;
 
-import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.common.util.ConfigUtils;
 import org.apache.streampark.flink.connector.function.RunningFunction;
 import org.apache.streampark.flink.connector.function.SQLQueryFunction;
@@ -25,6 +24,7 @@ import 
org.apache.streampark.flink.connector.function.SQLResultFunction;
 import org.apache.streampark.flink.connector.jdbc.internal.JdbcSourceFunction;
 import org.apache.streampark.flink.core.scala.StreamingContext;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 
 import java.util.Properties;
@@ -32,11 +32,20 @@ import java.util.Properties;
 public class JdbcJavaSource<T> {
 
   private final StreamingContext context;
+  private final TypeInformation<T> typeInformation;
+
   private Properties jdbc;
   private String alias = null;
 
-  public JdbcJavaSource(StreamingContext context) {
+  public JdbcJavaSource(StreamingContext context, Class<T> typeInfo) {
+    if (typeInfo == null) {
+      throw new NullPointerException("typeInfo must not be null");
+    }
+    if (context == null) {
+      throw new NullPointerException("context must not be null");
+    }
     this.context = context;
+    this.typeInformation = TypeInformation.of(typeInfo);
   }
 
   public JdbcJavaSource<T> jdbc(Properties jdbc) {
@@ -49,29 +58,31 @@ public class JdbcJavaSource<T> {
     return this;
   }
 
-    public DataStreamSource<T> getDataStream(
-        SQLQueryFunction<T> queryFunction,
-        SQLResultFunction<T> resultFunction) {
-        return getDataStream(queryFunction, resultFunction, null);
-    }
+  public DataStreamSource<T> getDataStream(
+      SQLQueryFunction<T> queryFunction, SQLResultFunction<T> resultFunction) {
+    return getDataStream(queryFunction, resultFunction, null);
+  }
 
   public DataStreamSource<T> getDataStream(
       SQLQueryFunction<T> queryFunction,
       SQLResultFunction<T> resultFunction,
       RunningFunction runningFunc) {
 
-    AssertUtils.notNull(queryFunction, "queryFunction must not be null");
-    AssertUtils.notNull(resultFunction, "resultFunction must not be null");
+    if (queryFunction == null) {
+      throw new NullPointerException(
+          "JdbcJavaSource getDataStream error: SQLQueryFunction must not be 
null");
+    }
+    if (resultFunction == null) {
+      throw new NullPointerException(
+          "JdbcJavaSource getDataStream error: SQLResultFunction must not be 
null");
+    }
+
     if (this.jdbc == null) {
       this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), 
alias);
     }
-    JdbcSourceFunction<T> sourceFunction = new JdbcSourceFunction<>(
-            jdbc,
-            queryFunction,
-            resultFunction,
-            runningFunc,
-            null
-        );
+
+    JdbcSourceFunction<T> sourceFunction =
+        new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction, 
runningFunc, typeInformation);
     return context.getJavaEnv().addSource(sourceFunction);
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
index 9d8e3691f..41b230cc9 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
@@ -17,32 +17,44 @@
 
 package org.apache.streampark.flink.connector.mongo.source;
 
-import org.apache.streampark.common.util.AssertUtils;
 import org.apache.streampark.flink.connector.function.RunningFunction;
 import org.apache.streampark.flink.connector.mongo.function.MongoQueryFunction;
 import 
org.apache.streampark.flink.connector.mongo.function.MongoResultFunction;
 import 
org.apache.streampark.flink.connector.mongo.internal.MongoSourceFunction;
 import org.apache.streampark.flink.core.scala.StreamingContext;
 
+import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.streaming.api.datastream.DataStreamSource;
 
 import java.util.Properties;
 
 public class MongoJavaSource<T> {
   private final StreamingContext context;
-  private final Properties property;
+  private Properties property;
+  private String alias;
+  private final TypeInformation<T> typeInformation;
 
-  public MongoJavaSource(StreamingContext context, Properties property) {
+  public MongoJavaSource(StreamingContext context, Class<T> typeInfo) {
     this.context = context;
+    this.typeInformation = TypeInformation.of(typeInfo);
+  }
+
+  public MongoJavaSource<T> property(Properties property) {
     this.property = property;
+    return this;
   }
 
-    public DataStreamSource<T> getDataStream(
-        String collectionName,
-        MongoQueryFunction<T> queryFunction,
-        MongoResultFunction<T> resultFunction) {
-      return getDataStream(collectionName, queryFunction, resultFunction, 
null);
-    }
+  public MongoJavaSource<T> alias(String alias) {
+    this.alias = alias;
+    return this;
+  }
+
+  public DataStreamSource<T> getDataStream(
+      String collectionName,
+      MongoQueryFunction<T> queryFunction,
+      MongoResultFunction<T> resultFunction) {
+    return getDataStream(collectionName, queryFunction, resultFunction, null);
+  }
 
   public DataStreamSource<T> getDataStream(
       String collectionName,
@@ -50,17 +62,21 @@ public class MongoJavaSource<T> {
       MongoResultFunction<T> resultFunction,
       RunningFunction runningFunc) {
 
-    AssertUtils.notNull(collectionName, "collectionName must not be null");
-    AssertUtils.notNull(queryFunction, "queryFunction must not be null");
-    AssertUtils.notNull(resultFunction, "resultFunction must not be null");
-    MongoSourceFunction<T> sourceFunction = new MongoSourceFunction<>(
-            collectionName,
-            property,
-            queryFunction,
-            resultFunction,
-            runningFunc,
-            null
-        );
+    if (collectionName == null) {
+      throw new NullPointerException("MongoJavaSource error: collectionName 
must not be null");
+    }
+
+    if (queryFunction == null) {
+      throw new NullPointerException("MongoJavaSource error: 
mongoQueryFunction must not be null");
+    }
+
+    if (resultFunction == null) {
+      throw new NullPointerException("MongoJavaSource error: 
mongoResultFunction must not be null");
+    }
+
+    MongoSourceFunction<T> sourceFunction =
+        new MongoSourceFunction<>(
+            collectionName, property, queryFunction, resultFunction, 
runningFunc, typeInformation);
     return context.getJavaEnv().addSource(sourceFunction);
   }
 }

Reply via email to