This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
     new eb7176408 [Improve] add flink jdbc|mongo|hbase datastream-connector 
test case
eb7176408 is described below

commit eb717640847ad8c41e659c0c1936364f20738f5e
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 25 09:16:35 2025 +0800

    [Improve] add flink jdbc|mongo|hbase datastream-connector test case
---
 .../streampark/common/util/ConfigUtils.scala       |   2 +-
 streampark-flink/pom.xml                           |   1 +
 .../client/tool/FlinkSessionClientHelper.scala     |   7 +-
 .../streampark-flink-connector-test/pom.xml        | 267 +++++++++++++++++++++
 .../src/test/application.yml                       | 267 +++++++++++++++++++++
 .../quickstart/connector/ClickhouseJavaApp.java    |  51 ++++
 .../flink/quickstart/connector/DorisJavaApp.java   |  44 ++++
 .../flink/quickstart/connector/HttpJavaApp.java    |  37 +++
 .../flink/quickstart/connector/KafkaJavaApp.java   |  58 +++++
 .../flink/quickstart/connector/MySQLJavaApp.java   |  70 ++++++
 .../flink/quickstart/connector/bean/Behavior.java  |  31 +++
 .../flink/quickstart/connector/bean/Entity.java    |  35 +++
 .../flink/quickstart/connector/bean/LogBean.java   |  30 +++
 .../flink/quickstart/connector/bean/Order.java     |  27 +++
 .../src/test/resources/logback.xml                 | 142 +++++++++++
 .../quickstart/connector/ClickHouseSinkApp.scala   |  53 ++++
 .../flink/quickstart/connector/ES7SinkApp.scala    |  43 ++++
 .../quickstart/connector/HBaseRequestApp.scala     |  49 ++++
 .../flink/quickstart/connector/HBaseSinkApp.scala  |  63 +++++
 .../quickstart/connector/HBaseSourceApp.scala      |  73 ++++++
 .../flink/quickstart/connector/HttpSinkApp.scala   |  42 ++++
 .../quickstart/connector/InfluxDBSinkApp.scala     |  85 +++++++
 .../flink/quickstart/connector/JdbcSinkApp.scala   |  55 +++++
 .../flink/quickstart/connector/KafkaSinkApp.scala  |  84 +++++++
 .../quickstart/connector/KafkaSourceApp.scala      |  32 +++
 .../quickstart/connector/MongoSourceApp.scala      |  52 ++++
 .../flink/quickstart/connector/MyDataSource.scala  |  53 ++++
 .../quickstart/connector/MySQLSourceApp.scala      |  40 +++
 .../flink/quickstart/connector/SideOutApp.scala    | 105 ++++++++
 .../connector/hbase/source/HBaseJavaSource.java    |  15 +-
 .../hbase/internal/HBaseSourceFunction.scala       |   3 +-
 .../connector/jdbc/source/JdbcJavaSource.java      |  15 +-
 .../jdbc/internal/JdbcSourceFunction.scala         |  30 ++-
 .../connector/mongo/source/MongoJavaSource.java    |  18 +-
 .../mongo/internal/MongoSourceFunction.scala       |  27 ++-
 .../flink/kubernetes/KubernetesRetriever.scala     |  12 +-
 .../watcher/FlinkCheckpointWatcher.scala           |  13 +-
 .../kubernetes/watcher/FlinkMetricsWatcher.scala   |  15 +-
 .../flink/kubernetes/watcher/FlinkWatcher.scala    |   4 +-
 .../apache/streampark/flink/util/FlinkUtils.scala  |   9 +-
 40 files changed, 2005 insertions(+), 54 deletions(-)

diff --git 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
index cc8b30e9d..f93ca5461 100644
--- 
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
+++ 
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
 import scala.collection.immutable.{Map => ScalaMap}
 import scala.util.Try
 
-object ConfigUtils {
+private[streampark] object ConfigUtils {
 
   def getConf(parameter: JavaMap[String, String], prefix: String = "", addfix: 
String = "")(implicit
       alias: String = ""): Properties = {
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index fb00fe5db..692adb640 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -32,6 +32,7 @@
         <module>streampark-flink-shims</module>
         <module>streampark-flink-core</module>
         <module>streampark-flink-connector</module>
+        <module>streampark-flink-connector-test</module>
         <module>streampark-flink-sqlclient</module>
         <module>streampark-flink-udf</module>
         <module>streampark-flink-client</module>
diff --git 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index 1fc86c789..51da01f88 100644
--- 
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++ 
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -17,6 +17,8 @@
 
 package org.apache.streampark.flink.client.tool
 
+import org.apache.streampark.common.util.Logger
+
 import org.apache.flink.client.deployment.application.ApplicationConfiguration
 import org.apache.flink.configuration.{Configuration, CoreOptions}
 import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
@@ -25,7 +27,6 @@ import org.apache.hc.client5.http.fluent.Request
 import org.apache.hc.core5.http.ContentType
 import org.apache.hc.core5.http.io.entity.StringEntity
 import org.apache.hc.core5.util.Timeout
-import org.apache.streampark.common.util.Logger
 import org.json4s.DefaultFormats
 import org.json4s.jackson.JsonMethods._
 import org.json4s.jackson.Serialization
@@ -33,13 +34,15 @@ import org.json4s.jackson.Serialization
 import java.io.File
 import java.nio.charset.StandardCharsets
 import java.time.Duration
+
 import scala.collection.JavaConversions._
 import scala.util.{Failure, Success, Try}
 
 object FlinkSessionSubmitHelper extends Logger {
 
   // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
-  private lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = 
Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
+  private lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
+    Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
 
   // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
   private lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = 
Timeout.ofMilliseconds(30000L)
diff --git a/streampark-flink/streampark-flink-connector-test/pom.xml 
b/streampark-flink/streampark-flink-connector-test/pom.xml
new file mode 100644
index 000000000..7a53ccc47
--- /dev/null
+++ b/streampark-flink/streampark-flink-connector-test/pom.xml
@@ -0,0 +1,267 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       https://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+
+    <modelVersion>4.0.0</modelVersion>
+    <parent>
+        <groupId>org.apache.streampark</groupId>
+        <artifactId>streampark-flink-connector</artifactId>
+        <version>2.1.6</version>
+    </parent>
+    <artifactId>streampark-flink-connector-test</artifactId>
+    <name>StreamPark-quickstart: connector Test</name>
+
+    <properties>
+        <streampark.flink.shims.version>1.14</streampark.flink.shims.version>
+        <flink112.version>1.12.0</flink112.version>
+        <flink113.version>1.13.0</flink113.version>
+        <flink114.version>1.14.0</flink114.version>
+        <flink115.version>1.15.0</flink115.version>
+        <flink116.version>1.16.0</flink116.version>
+        <flink117.version>1.17.0</flink117.version>
+        <flink118.version>1.18.0</flink118.version>
+        <flink119.version>1.19.0</flink119.version>
+    </properties>
+
+    <dependencies>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            <artifactId>streampark-common_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-core_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-shims_flink-${streampark.flink.shims.version}_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-clickhouse_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-influx_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-kafka_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-jdbc_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-http_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-hbase_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-doris_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>com.mysql</groupId>
+            <artifactId>mysql-connector-j</artifactId>
+            <version>8.0.33</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-mongo_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.json4s</groupId>
+            <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
+            <version>3.6.7</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.json4s</groupId>
+            <artifactId>json4s-native_2.11</artifactId>
+            <version>3.6.7</version>
+        </dependency>
+
+        <!--flink base-->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-core</artifactId>
+            <version>${flink114.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-scala_${scala.binary.version}</artifactId>
+            <version>${flink114.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-clients_${scala.binary.version}</artifactId>
+            <version>${flink114.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+            <version>${flink114.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+            <version>${flink114.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.twitter</groupId>
+            <artifactId>chill-thrift</artifactId>
+            <version>0.7.6</version>
+            <!-- exclusions for dependency conversion -->
+            <exclusions>
+                <exclusion>
+                    <groupId>com.esotericsoftware.kryo</groupId>
+                    <artifactId>kryo</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <!-- libthrift is required by chill-thrift -->
+        <dependency>
+            <groupId>org.apache.thrift</groupId>
+            <artifactId>libthrift</artifactId>
+            <version>0.14.0</version>
+            <exclusions>
+                <exclusion>
+                    <groupId>javax.servlet</groupId>
+                    <artifactId>servlet-api</artifactId>
+                </exclusion>
+                <exclusion>
+                    <groupId>org.apache.httpcomponents</groupId>
+                    <artifactId>httpclient</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
+
+        <dependency>
+            <groupId>org.projectlombok</groupId>
+            <artifactId>lombok</artifactId>
+            <version>1.18.20</version>
+            <scope>provided</scope>
+        </dependency>
+
+        <!-- Kafka里面的消息采用Json格式 -->
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-json</artifactId>
+            <version>${flink114.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-csv</artifactId>
+            <version>${flink114.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.ververica</groupId>
+            <artifactId>flink-connector-mysql-cdc</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>com.alibaba.ververica</groupId>
+            <artifactId>flink-format-changelog-json</artifactId>
+            <version>1.0.0</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-kafka_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.streampark</groupId>
+            
<artifactId>streampark-flink-connector-jdbc_${scala.binary.version}</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+    </dependencies>
+
+    <build>
+        <plugins>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-shade-plugin</artifactId>
+                <version>3.2.4</version>
+            </plugin>
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-assembly-plugin</artifactId>
+                <version>3.1.1</version>
+            </plugin>
+        </plugins>
+    </build>
+
+</project>
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/application.yml 
b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
new file mode 100644
index 000000000..29e1c7333
--- /dev/null
+++ b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+flink:
+  option:
+    target: yarn-per-job
+    detached:
+    shutdownOnAttachedExit:
+    jobmanager:
+  property: #@see: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/
+    $internal.application.main: 
org.apache.streampark.flink.quickstart.QuickStartApp
+    pipeline.name: streampark-quickstartApp
+    yarn.application.queue:
+    taskmanager.numberOfTaskSlots: 1
+    parallelism.default: 2
+    jobmanager.memory:
+      flink.size:
+      heap.size:
+      jvm-metaspace.size:
+      jvm-overhead.max:
+      off-heap.size:
+      process.size:
+    taskmanager.memory:
+      flink.size:
+      framework.heap.size:
+      framework.off-heap.size:
+      managed.size:
+      process.size:
+      task.heap.size:
+      task.off-heap.size:
+      jvm-metaspace.size:
+      jvm-overhead.max:
+      jvm-overhead.min:
+      managed.fraction: 0.4
+    pipeline:
+      auto-watermark-interval: 200ms
+    # checkpoint
+    execution:
+      checkpointing:
+        mode: EXACTLY_ONCE
+        interval: 30s
+        timeout: 10min
+        unaligned: false
+        externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
+    # state backend
+    state:
+      backend: rocksdb # Special note: flink1.12 optional configuration 
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration 
('hashmap', 'rocksdb'),
+      backend.incremental: true
+      checkpoint-storage: filesystem
+      savepoints.dir: file:///tmp/chkdir
+      checkpoints.dir: file:///tmp/chkdir
+    # restart strategy
+    restart-strategy: fixed-delay  # Restart strategy 
[(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
+    restart-strategy.fixed-delay:
+      attempts: 3
+      delay: 50000
+    restart-strategy.failure-rate:
+      max-failures-per-interval:
+      failure-rate-interval:
+      delay:
+  # table
+  table:
+    table.local-time-zone: default # @see 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
+
+app: # user's parameter
+  # kafka source config....
+  kafka.source:
+    bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
+    topic: test_user
+    group.id: flink_02
+    auto.offset.reset: earliest
+      #enable.auto.commit: true
+      #start.from:
+    #timestamp: 1591286400000 #指定timestamp,针对所有的topic生效
+    #offset: # 给每个topic的partition指定offset
+    #topic: kafka01,kafka02
+    #kafka01: 0:182,1:183,2:182 #分区0从182开始消费,分区1从183...
+    #kafka02: 0:182,1:183,2:182
+
+  # kafka sink config....
+  kafka.sink:
+    bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
+    topic: test_user
+    transaction.timeout.ms: 1000
+    semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
+    batch.size: 1
+
+
+  # jdbc config...
+  jdbc:
+    semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
+    driverClassName: com.mysql.cj.jdbc.Driver
+    jdbcUrl: 
jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
+    username: root
+    password: 123456
+
+  # influx config...
+  influx:
+    mydb:
+      jdbcUrl: http://test9:8086
+      #username: admin
+      #password: admin
+
+  # hbase
+  hbase:
+    zookeeper.quorum: test1,test2,test6
+    zookeeper.property.clientPort: 2181
+    zookeeper.session.timeout: 1200000
+    rpc.timeout: 5000
+    client.pause: 20
+
+  ##clickhouse  jdbc同步写入配置
+  #clickhouse:
+  #  sink:
+  #    #    写入节点地址
+  #    jdbcUrl: jdbc:clickhouse://127.0.0.1:8123,192.168.1.2:8123
+  #    socketTimeout: 3000000
+  #    database: test
+  #    user: $user
+  #    password: $password
+  #    #    写结果表及对应的字段,全部可不写字段
+  #    targetTable: orders(userId,siteId)
+  #    batch:
+  #      size: 1
+  #      delaytime: 6000000
+
+  clickhouse:
+    sink:
+      hosts: 127.0.0.1:8123,192.168.1.2:8123
+      socketTimeout: 3000000
+      database: test
+      user: $user
+      password: $password
+      targetTable: test.orders(userId,siteId)
+      batch:
+        size: 1
+        delaytime: 60000
+      threshold:
+        bufferSize: 10
+        #      异步写入的并发数
+        numWriters: 4
+        #      缓存队列大小
+        queueCapacity: 100
+        delayTime: 10
+        requestTimeout: 600
+        retries: 1
+        #      成功响应码
+        successCode: 200
+      failover:
+        table: chfailover
+        #      达到失败最大写入次数后,数据备份的组件
+        storage: kafka #kafka|mysql|hbase|hdfs
+        mysql:
+          driverClassName: com.mysql.cj.jdbc.Driver
+          jdbcUrl: 
jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
+          username: user
+          password: pass
+        kafka:
+          bootstrap.servers: localhost:9092
+          topic: test1
+          group.id: user_01
+          auto.offset.reset: latest
+        hbase:
+        hdfs:
+          path: /data/chfailover
+          namenode: hdfs://localhost:8020
+          user: hdfs
+
+  #http sink 配置
+  http.sink:
+    threshold:
+      numWriters: 3
+      queueCapacity: 10000 
#队列最大容量,视单条记录大小而自行估量队列大小,如值太大,上游数据源来的太快,下游写入数据跟不上可能会OOM.
+      timeout: 100 #发送http请求的超时时间
+      retries: 3 #发送失败时的最大重试次数
+      successCode: 200 #发送成功状态码,这里可以有多个值,用","号分隔
+    failover:
+      table: record
+      storage: mysql #kafka,hbase,hdfs
+      jdbc: # 保存类型为MySQL,将失败的数据保存到MySQL
+        jdbcUrl: jdbc:mysql://localhost:3306/test
+        username: root
+        password: 123456
+      kafka:
+        topic: bigdata
+        bootstrap.servers: localhost:9093
+      hbase:
+        zookeeper.quorum: localhost
+        zookeeper.property.clientPort: 2181
+      hdfs:
+        namenode: hdfs://localhost:8020 # namenode rpc address and port, e.g: 
hdfs://hadoop:8020 , hdfs://hadoop:9000
+        user: benjobs # user
+        path: /clickhouse/failover # save path
+        format: yyyy-MM-dd
+
+  #redis sink 配置
+  redis.sink:
+    #  masterName: master 哨兵模式参数
+    #  host: 192.168.0.1:6379, 192.168.0.3:6379 哨兵模式参数
+    host: 127.0.0.1
+    port: 6379
+    database: 2
+    connectType: jedisPool #可选参数:jedisPool(默认)|sentinel
+
+  es.sink:
+    #  必填参数,多个节点使用 host1:port, host2:port,
+    host: localhost:9200
+  #  选填参数
+  #  es:
+  #    disableFlushOnCheckpoint: false #是否
+  #    auth:
+  #    user:
+  #      password:
+  #    rest:
+  #      max.retry.timeout:
+  #      path.prefix:
+  #      content.type:
+  #    connect:
+  #      request.timeout:
+  #      timeout:
+  #    cluster.name: elasticsearch
+  #  client.transport.sniff:
+  #  bulk.flush.:
+
+sql:
+  my_flinksql: |
+    CREATE TABLE datagen (
+      f_sequence INT,
+      f_random INT,
+      f_random_str STRING,
+      ts AS localtimestamp,
+      WATERMARK FOR ts AS ts
+    ) WITH (
+      'connector' = 'datagen',
+      -- optional options --
+      'rows-per-second'='5', --这个是注释--
+      'fields.f_sequence.kind'='sequence',
+      'fields.f_sequence.start'='1',
+      'fields.f_sequence.end'='1000',
+      'fields.f_random.min'='1',
+      'fields.f_random.max'='1000',
+      'fields.f_random_str.length'='10'
+    );
+
+    CREATE TABLE print_table (
+      f_sequence INT,
+      f_random INT,
+      f_random_str STRING
+      ) WITH (
+      'connector' = 'print'
+    );
+
+    INSERT INTO print_table select f_sequence,f_random,f_random_str from 
datagen;
+
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
new file mode 100644
index 000000000..4ba8fd674
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.streampark.flink.quickstart.connector.bean.Entity;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+/**
+ * @author benjobs
+ */
+public class ClickhouseJavaApp {
+
+    public static void main(String[] args) {
+        StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+        StreamingContext context = new StreamingContext(envConfig);
+        DataStreamSource<Entity> source = context.getJavaEnv().addSource(new 
MyDataSource());
+
+        //2) async高性能写入
+        new ClickHouseSink(context).asyncSink(source, value ->
+                String.format("insert into test.orders(userId, siteId) values 
(%d,%d)", value.userId, value.siteId)
+        ).setParallelism(1);
+
+        //3) jdbc方式写入
+        /**
+         *
+         * new ClickHouseSink(context).jdbcSink(source, bean ->
+         *      String.format("insert into test.orders(userId, siteId) values 
(%d,%d)", bean.userId, bean.siteId)
+         * ).setParallelism(1);
+         *
+         */
+        context.start();
+    }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
new file mode 100644
index 000000000..551a28583
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.streampark.flink.connector.doris.sink.DorisSink;
+import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
+import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * @author wudi
+ **/
+public class DorisJavaApp {
+
+    public static void main(String[] args) {
+        StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+        StreamingContext context = new StreamingContext(envConfig);
+        DataStream<String> source = new KafkaJavaSource<String>(context)
+                .getDataStream()
+                .map((MapFunction<KafkaRecord<String>, String>) 
KafkaRecord::value)
+                .returns(String.class);
+
+        new DorisSink<String>(context).sink(source);
+
+        context.start();
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
new file mode 100644
index 000000000..81bfaa338
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.streampark.flink.connector.http.sink.HttpSink;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.streampark.flink.quickstart.connector.bean.Entity;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+/**
+ * @author wudi
+ **/
+public class HttpJavaApp {
+
+    public static void main(String[] args) {
+        StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+        StreamingContext context = new StreamingContext(envConfig);
+        DataStreamSource<Entity> source = context.getJavaEnv().addSource(new 
MyDataSource());
+        new HttpSink(context).get(source.map(x -> 
String.format("http://www.qq.com?id=%d";, x.userId)));
+        context.start();
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
new file mode 100644
index 000000000..e45dadf3c
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.streampark.common.util.JsonUtils;
+import org.apache.streampark.flink.connector.kafka.sink.KafkaJavaSink;
+import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
+import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.streampark.flink.quickstart.connector.bean.Behavior;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * @author benjobs
+ */
+public class KafkaJavaApp {
+
+    public static void main(String[] args) {
+
+        StreamEnvConfig javaConfig = new StreamEnvConfig(args, (environment, 
parameterTool) -> {
+            //environment.getConfig().enableForceAvro();
+            System.out.println("environment argument set...");
+        });
+
+        StreamingContext context = new StreamingContext(javaConfig);
+
+        //1) 从 kafka 中读取数据
+        DataStream<Behavior> source = new KafkaJavaSource<String>(context)
+                .getDataStream()
+                .map((MapFunction<KafkaRecord<String>, Behavior>) value -> 
JsonUtils.read(value, Behavior.class));
+
+
+        // 2) 将数据写入其他 kafka 主题
+        new KafkaJavaSink<Behavior>(context)
+                .serializer((SerializationSchema<Behavior>) element -> 
JsonUtils.write(element).getBytes())
+                .sink(source);
+
+        context.start();
+    }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
new file mode 100644
index 000000000..97611b0d1
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.streampark.flink.connector.function.SQLQueryFunction;
+import org.apache.streampark.flink.connector.function.SQLResultFunction;
+import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.streampark.flink.quickstart.connector.bean.Order;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MySQLJavaApp {
+
+    public static void main(String[] args) {
+
+        StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+
+        StreamingContext context = new StreamingContext(envConfig);
+
+        //读取MySQL数据源
+        new JdbcJavaSource<Order>(context)
+                .getDataStream(
+                        (SQLQueryFunction<Order>) lastOne -> {
+                            //5秒抽取一次
+                            Thread.sleep(3000);
+                            Serializable lastOffset = lastOne == null ? 
"2020-10-10 23:00:00" : lastOne.getTimestamp();
+                            return String.format(
+                                    "select * from t_order " +
+                                            "where timestamp > '%s' " +
+                                            "order by timestamp asc ",
+                                    lastOffset
+                            );
+                        },
+                        (SQLResultFunction<Order>) map -> {
+                            List<Order> result = new ArrayList<>();
+                            map.forEach(item -> {
+                                Order order = new Order();
+                                
order.setOrderId(item.get("order_id").toString());
+                                
order.setMarketId(item.get("market_id").toString());
+                                
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
+                                result.add(order);
+                            });
+                            return result;
+                        })
+                .returns(TypeInformation.of(Order.class))
+                .print("jdbc source: >>>>>");
+
+        context.start();
+
+    }
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
new file mode 100644
index 000000000..6bfbb3a33
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector.bean;
+
+import lombok.Data;
+
+/**
+ * @author benjobs
+ */
+@Data
+public class Behavior {
+    private String user_id;
+    private Long item_id;
+    private Long category_id;
+    private String behavior;
+    private Long ts;
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
new file mode 100644
index 000000000..a5b5b3282
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector.bean;
+
+
+/**
+ * @author benjobs
+ */
+
+public class Entity {
+
+    public Long userId;
+    public Long orderId;
+    public Long siteId;
+    public Long cityId;
+    public Integer orderStatus;
+    public Double price;
+    public Integer quantity;
+    public Long timestamp;
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
new file mode 100644
index 000000000..c0114d2c0
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class LogBean implements Serializable {
+    private String platenum;
+    private String cardType;
+    private Long inTime;
+    private Long outTime;
+    private String controlid;
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
new file mode 100644
index 000000000..c561a1b52
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector.bean;
+
+import lombok.Data;
+
+@Data
+public class Order {
+    private String orderId;
+    private String marketId;
+    private Double price;
+    private Long timestamp;
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
 
b/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
new file mode 100755
index 000000000..537cb762c
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+       https://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+
+-->
+<configuration>
+    <!-- 日志文件存储路径 -->
+    <property name="LOG_HOME" value="logs/"/>
+    <property name="FILE_SIZE" value="50MB"/>
+    <property name="MAX_HISTORY" value="100"/>
+    <timestamp key="DATE_TIME" datePattern="yyyy-MM-dd HH:mm:ss"/>
+
+    <property name="log.colorPattern"
+              value="%d{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) | 
%boldYellow(%thread) | %boldGreen(%logger) | %msg%n"/>
+    <property name="log.pattern"
+              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %contextName [%thread] 
%-5level %logger{36} - %msg%n"/>
+
+    <!-- 控制台打印 -->
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder charset="utf-8">
+            <pattern>${log.colorPattern}</pattern>
+        </encoder>
+    </appender>
+    <!-- ERROR 输入到文件,按日期和文件大小 -->
+    <appender name="ERROR" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <encoder charset="utf-8">
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <level>ERROR</level>
+            <onMatch>ACCEPT</onMatch>
+            <onMismatch>DENY</onMismatch>
+        </filter>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_HOME}%d/error.%i.log</fileNamePattern>
+            <maxHistory>${MAX_HISTORY}</maxHistory>
+            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>${FILE_SIZE}</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+        </rollingPolicy>
+    </appender>
+
+    <!-- WARN 输入到文件,按日期和文件大小 -->
+    <appender name="WARN" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <encoder charset="utf-8">
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <level>WARN</level>
+            <onMatch>ACCEPT</onMatch>
+            <onMismatch>DENY</onMismatch>
+        </filter>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_HOME}%d/warn.%i.log</fileNamePattern>
+            <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
+            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>${FILE_SIZE}</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+        </rollingPolicy>
+    </appender>
+
+    <!-- INFO 输入到文件,按日期和文件大小 -->
+    <appender name="INFO" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <encoder charset="utf-8">
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <level>INFO</level>
+            <onMatch>ACCEPT</onMatch>
+            <onMismatch>DENY</onMismatch>
+        </filter>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_HOME}%d/info.%i.log</fileNamePattern>
+            <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
+            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>${FILE_SIZE}</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+        </rollingPolicy>
+    </appender>
+    <!-- DEBUG 输入到文件,按日期和文件大小 -->
+    <appender name="DEBUG" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <encoder charset="utf-8">
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <level>DEBUG</level>
+            <onMatch>ACCEPT</onMatch>
+            <onMismatch>DENY</onMismatch>
+        </filter>
+        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_HOME}%d/debug.%i.log</fileNamePattern>
+            <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
+            <timeBasedFileNamingAndTriggeringPolicy
+                    class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>${FILE_SIZE}</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+        </rollingPolicy>
+    </appender>
+    <!-- TRACE 输入到文件,按日期和文件大小 -->
+    <appender name="TRACE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
+        <encoder charset="utf-8">
+            <pattern>${log.pattern}</pattern>
+        </encoder>
+        <filter class="ch.qos.logback.classic.filter.LevelFilter">
+            <level>TRACE</level>
+            <onMatch>ACCEPT</onMatch>
+            <onMismatch>DENY</onMismatch>
+        </filter>
+        <rollingPolicy
+                class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+            <fileNamePattern>${LOG_HOME}%d/trace.%i.log</fileNamePattern>
+            <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
+            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+                <maxFileSize>${FILE_SIZE}</maxFileSize>
+            </timeBasedFileNamingAndTriggeringPolicy>
+        </rollingPolicy>
+    </appender>
+
+    <!-- Logger 根目录 -->
+    <root level="INFO">
+        <appender-ref ref="STDOUT"/>
+        <appender-ref ref="DEBUG"/>
+        <appender-ref ref="ERROR"/>
+        <appender-ref ref="WARN"/>
+        <appender-ref ref="INFO"/>
+        <appender-ref ref="TRACE"/>
+    </root>
+</configuration>
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
new file mode 100644
index 000000000..79c0950af
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object ClickHouseSinkApp extends FlinkStreaming {
+
+  implicit val entityType: TypeInformation[Entity] = 
TypeInformation.of(classOf[Entity])
+
+  override def handle(): Unit = {
+    // 假如在clickhouse里已经有以下表.
+    val createTable =
+      """
+        |create TABLE test.orders(
+        |userId UInt16,
+        |siteId UInt8,
+        |timestamp UInt16
+        |)ENGINE = TinyLog;
+        |""".stripMargin
+
+    println(createTable)
+
+    // 1) 接入数据源
+    val source = context.addSource(new MyDataSource)
+
+    // 2)高性能异步写入
+    ClickHouseSink().asyncSink(source)(x => {s"insert into 
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
+
+    //3) jdbc方式写入
+    // ClickHouseSink().jdbcSink(source)(x => {s"insert into 
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
+
+
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
new file mode 100644
index 000000000..fb9d80038
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.streampark.flink.connector.elasticsearch7.sink.ES7Sink
+import 
org.apache.streampark.flink.connector.elasticsearch7.util.ElasticsearchUtils
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.elasticsearch.action.index.IndexRequest
+import org.json4s.jackson.JsonMethods
+
+object ES7SinkApp extends FlinkStreaming {
+
+  implicit val entityType: TypeInformation[Entity] = 
TypeInformation.of(classOf[Entity])
+
+  override def handle(): Unit = {
+    val source: DataStream[Entity] = context.addSource(new MyDataSource)
+
+    implicit def indexedSeq(x: Entity): IndexRequest = 
ElasticsearchUtils.indexRequest(
+        "flink_order",
+        s"${x.orderId}_${x.timestamp}",
+        JsonMethods.mapper.writeValueAsString(x)
+    )
+
+    ES7Sink().sink[Entity](source)
+  }
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
new file mode 100644
index 000000000..e41c62ff7
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.common.util.ConfigUtils
+import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
+import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.hadoop.hbase.client.Get
+
+object HBaseRequestApp extends FlinkStreaming {
+
+  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
+
+  implicit val reqType: TypeInformation[(String, Boolean)] = 
TypeInformation.of(classOf[(String, Boolean)])
+
+  override def handle(): Unit = {
+
+    implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
+    //one topic
+    val source = context.fromCollection(Seq("123456", "1111", "222"))
+
+    source.print("source:>>>")
+
+    HBaseRequest(source).requestOrdered[(String, Boolean)](x => {
+      new HBaseQuery("person", new Get(x.getBytes()))
+    }, timeout = 5000, resultFunc = (a, r) => {
+      a -> !r.isEmpty
+    }).print(" check.... ")
+
+
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
new file mode 100644
index 000000000..0a4695590
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.common.util.ConfigUtils
+import org.apache.streampark.flink.connector.hbase.sink.{HBaseOutputFormat, 
HBaseSink}
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.hadoop.hbase.client.{Mutation, Put}
+import org.apache.hadoop.hbase.util.Bytes
+
+import java.util.{Collections, Random}
+import scala.language.implicitConversions
+
+object HBaseSinkApp extends FlinkStreaming {
+
+  implicit val entityType: TypeInformation[Entity] = 
TypeInformation.of(classOf[Entity])
+  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
+
+  override def handle(): Unit = {
+    val source = context.addSource(new MyDataSource)
+    val random = new Random()
+
+    //定义转换规则...
+    implicit def entry2Put(entity: Entity): java.lang.Iterable[Mutation] = {
+      val put = new Put(Bytes.toBytes(System.nanoTime() + 
random.nextInt(1000000)), entity.timestamp)
+      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"), 
Bytes.toBytes(entity.cityId))
+      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"), 
Bytes.toBytes(entity.orderId))
+      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"), 
Bytes.toBytes(entity.orderStatus))
+      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oq"), 
Bytes.toBytes(entity.quantity))
+      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"), 
Bytes.toBytes(entity.siteId))
+      Collections.singleton(put)
+    }
+    //source ===> trans ===> sink
+
+    //1)插入方式1
+    HBaseSink().sink[Entity](source, "order")
+
+    //2) 插入方式2
+    //1.指定HBase 配置文件
+    val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
+    //2.插入...
+    source.writeUsingOutputFormat(new HBaseOutputFormat[Entity]("order", prop))
+
+
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
new file mode 100644
index 000000000..5e2a9b5c4
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.common.util.ConfigUtils
+import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
+import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
+import org.apache.streampark.flink.connector.hbase.source.HBaseSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.hadoop.hbase.CellUtil
+import org.apache.hadoop.hbase.client.{Get, Scan}
+import org.apache.hadoop.hbase.util.Bytes
+
+import java.util
+
+object HBaseSourceApp extends FlinkStreaming {
+
+  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
+
+  override def handle(): Unit = {
+
+    implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
+
+    val id = HBaseSource().getDataStream[String](query => {
+      Thread.sleep(10)
+      if (query == null) {
+        new HBaseQuery("person", new Scan())
+      } else {
+        //TODO 从上一条记录中获取便宜量,决定下次查询的条件...
+        new HBaseQuery("person", new Scan())
+      }
+    }, r => new String(r.getRow), null)
+
+    HBaseRequest(id).requestOrdered(x => {
+      new HBaseQuery("person", new Get(x.getBytes()))
+    }, (a, r) => {
+      val map = new util.HashMap[String, String]()
+      val cellScanner = r.cellScanner()
+      while (cellScanner.advance()) {
+        val cell = cellScanner.current()
+        val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+        val (name, v) = q.split("_") match {
+          case Array(_type, name) =>
+            _type match {
+              case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
+              case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
+              case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
+              case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
+            }
+          case _ =>
+        }
+        map.put(name.toString, v.toString)
+      }
+      map.toString
+    }).print("Async")
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
new file mode 100644
index 000000000..763daf92d
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.http.sink.HttpSink
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object HttpSinkApp extends FlinkStreaming {
+
+  implicit val entityType: TypeInformation[Entity] = 
TypeInformation.of(classOf[Entity])
+  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
+
+  override def handle(): Unit = {
+
+    /**
+     * source
+     */
+    val source = context.addSource(new MyDataSource)
+      .map(x => s"http://www.qq.com?id=${x.userId}";)
+
+    // sink
+    new HttpSink(context).get(source)
+
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
new file mode 100644
index 000000000..f1c289604
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.influx.bean.InfluxEntity
+import org.apache.streampark.flink.connector.influx.sink.InfluxSink
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+import scala.util.Random
+
+/**
+ * 侧输出流
+ */
+object InfluxDBSinkApp extends FlinkStreaming {
+
+  implicit val entityType: TypeInformation[Weather] = 
TypeInformation.of(classOf[Weather])
+
+  override def handle(): Unit = {
+    val source = context.addSource(new WeatherSource())
+
+    //weather,altitude=1000,area=北 temperature=11,humidity=-4
+
+    implicit val entity: InfluxEntity[Weather] = new InfluxEntity[Weather](
+      "mydb",
+      "test",
+      "autogen",
+      (x: Weather) => Map("altitude" -> x.altitude.toString, "area" -> x.area),
+      (x: Weather) => Map("temperature" -> x.temperature.asInstanceOf[Object], 
"humidity" -> x.humidity.asInstanceOf[Object])
+    )
+
+    InfluxSink().sink(source, "mydb")
+
+  }
+
+}
+
+/**
+ *
+ * 温度 temperature
+ * 湿度 humidity
+ * 地区 area
+ * 海拔 altitude
+ */
+case class Weather(temperature: Long,
+                   humidity: Long,
+                   area: String,
+                   altitude: Long)
+
+class WeatherSource extends SourceFunction[Weather] {
+
+  private[this] var isRunning = true
+
+  override def cancel(): Unit = this.isRunning = false
+
+  val random = new Random()
+
+  override def run(ctx: SourceFunction.SourceContext[Weather]): Unit = {
+    while (isRunning) {
+      val temperature = random.nextInt(100)
+      val humidity = random.nextInt(30)
+      val area = List("北", "上", "广", "深")(random.nextInt(4))
+      val altitude = random.nextInt(10000)
+      val order = Weather(temperature, humidity, area, altitude)
+      ctx.collect(order)
+    }
+  }
+
+}
+
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
new file mode 100644
index 000000000..01a792474
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.jdbc.sink.JdbcSink
+import org.apache.streampark.flink.connector.kafka.source.KafkaSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object JdbcSinkApp extends FlinkStreaming {
+
+  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
+
+  override def handle(): Unit = {
+
+    /**
+     * 从kafka里读数据.这里的数据是数字或者字母,每次读取1条
+     */
+    val source = KafkaSource().getDataStream[String]()
+      .uid("kfkSource1")
+      .name("kfkSource1")
+      .map(x => {
+        x.value
+      })
+
+
+    /**
+     * 假设这里有一个orders表.有一个字段,id的类型是int
+     * 在数据插入的时候制造异常:
+     * 1)正确情况: 当从kafka中读取的内容全部是数字时会插入成功,kafka的消费的offset也会更新.
+     * 如: 当前kafka 
size为20,手动输入10个数字,则size为30,然后会将这10个数字写入到Mysql,kafka的offset也会更新
+     * 2)异常情况: 当从kafka中读取的内容非数字会导致插入失败,kafka的消费的offset会回滚
+     * 如: 当前的kafka size为30,offset是30, 
手动输入1个字母,此时size为31,写入mysql会报错,kafka的offset依旧是30,不会发生更新.
+     */
+    JdbcSink(parallelism = 5).sink[String](source)(x => {
+      s"insert into orders(id,timestamp) 
values('$x',${System.currentTimeMillis()})"
+    }).uid("mysqlSink").name("mysqlSink")
+
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
new file mode 100644
index 000000000..b4e82ab69
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
@@ -0,0 +1,84 @@
+/*
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.kafka.sink.KafkaSink
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+import scala.util.Random
+
+
+object KafkaSinkApp extends FlinkStreaming {
+
+  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
+  implicit val entityType: TypeInformation[Behavior] = 
TypeInformation.of(classOf[Behavior])
+
+  override def handle(): Unit = {
+    val source = new BehaviorSource()
+    val ds = context.addSource[Behavior](source).map(_.toString)
+    ds.print()
+    KafkaSink().sink(ds)
+  }
+
+}
+
+
+case class Behavior(user_id: String,
+                    item_id: Long,
+                    category_id: Long,
+                    behavior: String,
+                    ts: Long) {
+  override def toString: String = {
+    s"""
+       |{
+       |user_id:$user_id,
+       |item_id:$item_id,
+       |category_id:$category_id,
+       |behavior:$behavior,
+       |ts:$ts
+       |}
+       |""".stripMargin
+  }
+}
+
+
+class BehaviorSource extends SourceFunction[Behavior] {
+  private[this] var isRunning = true
+
+  override def cancel(): Unit = this.isRunning = false
+
+  val random = new Random()
+  var index = 0
+
+  override def run(ctx: SourceFunction.SourceContext[Behavior]): Unit = {
+    val seq = Seq("view", "click", "search", "buy", "share")
+    while (isRunning && index <= 10000) {
+      index += 1
+      val user_id = random.nextInt(1000)
+      val item_id = random.nextInt(100)
+      val category_id = random.nextInt(20)
+      val behavior = seq(random.nextInt(5))
+      val order = Behavior(user_id.toString, item_id, category_id, behavior, 
System.currentTimeMillis())
+      ctx.collect(order)
+    }
+  }
+
+}
+
+
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
new file mode 100644
index 000000000..e9e3eaa08
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.streampark.flink.connector.kafka.source.KafkaSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+
+object KafkaSourceApp extends FlinkStreaming {
+  override def handle(): Unit = {
+
+    KafkaSource().getDataStream[String]()
+      .map(_.value)
+      .print()
+
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
new file mode 100644
index 000000000..c0db74056
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import com.mongodb.BasicDBObject
+import org.apache.streampark.common.util.{DateUtils, JsonUtils}
+import org.apache.streampark.flink.connector.mongo.source.MongoSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import java.util.Properties
+import scala.collection.JavaConversions._
+
+object MongoSourceApp extends FlinkStreaming {
+
+  implicit val entityType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
+
+  override def handle(): Unit = {
+    implicit val prop: Properties = context.parameter.getProperties
+    val source = MongoSource()
+    source.getDataStream[String](
+      "shop",
+      (a, d) => {
+        Thread.sleep(1000)
+        /**
+         * 从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset
+         */
+        val offset = if (a == null) "2019-09-27 00:00:00" else {
+          JsonUtils.read[Map[String, _]](a).get("updateTime").toString
+        }
+        val cond = new BasicDBObject().append("updateTime", new 
BasicDBObject("$gte", DateUtils.parse(offset)))
+        d.find(cond)
+      },
+      _.toList.map(_.toJson()), null
+    ).print()
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
new file mode 100644
index 000000000..af84ea674
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+import scala.util.Random
+
+
+class MyDataSource extends SourceFunction[Entity] {
+
+  private[this] var isRunning = true
+
+  override def cancel(): Unit = this.isRunning = false
+
+  val random = new Random()
+
+  var index = 0
+
+  override def run(ctx: SourceFunction.SourceContext[Entity]): Unit = {
+    while (isRunning && index <= 1000001) {
+      index += 1
+
+      val entity = new Entity()
+      entity.userId = System.currentTimeMillis()
+      entity.orderId = random.nextInt(100)
+      entity.orderStatus = random.nextInt(1)
+      entity.price = random.nextDouble()
+      entity.quantity = new Random().nextInt(10)
+      entity.cityId = 1
+      entity.siteId = random.nextInt(20)
+      entity.timestamp = System.currentTimeMillis()
+
+      ctx.collect(entity)
+    }
+  }
+
+}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
new file mode 100644
index 000000000..5ab968edb
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.jdbc.source.JdbcSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object MySQLSourceApp extends FlinkStreaming {
+
+  implicit val entityType: TypeInformation[Order] = 
TypeInformation.of(classOf[Order])
+
+  override def handle(): Unit = {
+
+    JdbcSource().getDataStream[Order](lastOne => {
+      val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else 
lastOne.timestamp
+      s"select * from t_order where timestamp > '$laseOffset' order by 
timestamp asc "
+    },
+      _.map(x => new Order(x("market_id").toString, x("timestamp").toString)), 
null
+    ).print()
+
+  }
+
+}
+
+class Order(val marketId: String, val timestamp: String) extends Serializable
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
new file mode 100644
index 000000000..179c57f7d
--- /dev/null
+++ 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.util.Collector
+
+import scala.util.Random
+
+/**
+ * 侧输出流
+ */
+object SideOutApp extends FlinkStreaming {
+
+  implicit val entityType: TypeInformation[SideEntry] = 
TypeInformation.of(classOf[SideEntry])
+
+  override def handle(): Unit = {
+    val source = context.addSource(new SideSource())
+
+    /**
+     * 侧输出流。。。
+     * 官方写法:设置侧输出流
+     */
+    val side1 = source.process(new ProcessFunction[SideEntry, SideEntry] {
+      val tag = new OutputTag[SideEntry]("flink")
+
+      override def processElement(value: SideEntry, ctx: 
ProcessFunction[SideEntry, SideEntry]#Context, out: Collector[SideEntry]): Unit 
= {
+        if (value.userId < 100) {
+          ctx.output(tag, value)
+        } else {
+          out.collect(value)
+        }
+      }
+    })
+
+    //官方写法,获取侧输出流
+    side1.getSideOutput(new 
OutputTag[SideEntry]("flink")).print("flink:========>")
+
+  }
+
+}
+
+/**
+ *
+ * @param userId      : 用户Id
+ * @param orderId     : 订单ID
+ * @param siteId      : 站点ID
+ * @param cityId      : 城市Id
+ * @param orderStatus : 订单状态(1:下单,0:退单)
+ * @param isNewOrder  : 是否是首单
+ * @param price       : 单价
+ * @param quantity    : 订单数量
+ * @param timestamp   : 下单时间
+ */
+case class SideEntry(userId: Long,
+                     orderId: Long,
+                     siteId: Long,
+                     cityId: Long,
+                     orderStatus: Int,
+                     isNewOrder: Int,
+                     price: Double,
+                     quantity: Int,
+                     timestamp: Long)
+
+class SideSource extends SourceFunction[SideEntry] {
+
+  private[this] var isRunning = true
+
+  override def cancel(): Unit = this.isRunning = false
+
+  val random = new Random()
+
+  override def run(ctx: SourceFunction.SourceContext[SideEntry]): Unit = {
+    while (isRunning) {
+      val userId = random.nextInt(1000)
+      val orderId = random.nextInt(100)
+      val status = random.nextInt(1)
+      val isNew = random.nextInt(1)
+      val price = random.nextDouble()
+      val quantity = new Random(10).nextInt()
+      val order = SideEntry(userId, orderId, siteId = 1, cityId = 1, status, 
isNew, price, quantity, System.currentTimeMillis)
+      ctx.collect(order)
+    }
+  }
+
+}
+
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
index b146d281b..ca19fd9e5 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
@@ -37,6 +37,12 @@ public class HBaseJavaSource<T> {
     this.property = property;
   }
 
+    public DataStreamSource<T> getDataStream(
+        HBaseQueryFunction<T> queryFunction,
+        HBaseResultFunction<T> resultFunction) {
+      return getDataStream(queryFunction, resultFunction, null);
+    }
+
   public DataStreamSource<T> getDataStream(
       HBaseQueryFunction<T> queryFunction,
       HBaseResultFunction<T> resultFunction,
@@ -44,8 +50,13 @@ public class HBaseJavaSource<T> {
 
     AssertUtils.notNull(queryFunction, "queryFunction must not be null");
     AssertUtils.notNull(resultFunction, "resultFunction must not be null");
-    HBaseSourceFunction<T> sourceFunction =
-        new HBaseSourceFunction<>(property, queryFunction, resultFunction, 
runningFunc, null);
+    HBaseSourceFunction<T> sourceFunction = new HBaseSourceFunction<>(
+        property,
+        queryFunction,
+        resultFunction,
+        runningFunc,
+        null
+    );
     return context.getJavaEnv().addSource(sourceFunction);
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
index 8d22ec817..04e82941e 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
@@ -168,7 +168,8 @@ class HBaseSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala,
   override def initializeState(context: FunctionInitializationContext): Unit = 
{
     // Recover from checkpoint...
     logInfo("HBaseSource snapshotState initialize")
-    state = FlinkUtils.getUnionListState[R](context, OFFSETS_STATE_NAME)
+    state = FlinkUtils
+      .getUnionListState[R](context, getRuntimeContext.getExecutionConfig, 
OFFSETS_STATE_NAME)
     Try(state.get.head) match {
       case Success(q) => last = q
       case _ =>
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index 083f6d44d..d611de227 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -49,6 +49,12 @@ public class JdbcJavaSource<T> {
     return this;
   }
 
+    public DataStreamSource<T> getDataStream(
+        SQLQueryFunction<T> queryFunction,
+        SQLResultFunction<T> resultFunction) {
+        return getDataStream(queryFunction, resultFunction, null);
+    }
+
   public DataStreamSource<T> getDataStream(
       SQLQueryFunction<T> queryFunction,
       SQLResultFunction<T> resultFunction,
@@ -59,8 +65,13 @@ public class JdbcJavaSource<T> {
     if (this.jdbc == null) {
       this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(), 
alias);
     }
-    JdbcSourceFunction<T> sourceFunction =
-        new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction, 
runningFunc, null);
+    JdbcSourceFunction<T> sourceFunction = new JdbcSourceFunction<>(
+            jdbc,
+            queryFunction,
+            resultFunction,
+            runningFunc,
+            null
+        );
     return context.getJavaEnv().addSource(sourceFunction);
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
index b3cc4f710..8429a1135 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
@@ -46,13 +46,16 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
 
   @volatile private[this] var running = true
   private[this] var scalaRunningFunc: Unit => Boolean = (_) => true
-  private[this] var javaRunningFunc: RunningFunction = _
+  private[this] var javaRunningFunc: RunningFunction = new RunningFunction {
+    override def running(): lang.Boolean = true
+  }
 
   private[this] var scalaSqlFunc: R => String = _
   private[this] var scalaResultFunc: Function[Iterable[Map[String, _]], 
Iterable[R]] = _
   private[this] var javaSqlFunc: SQLQueryFunction[R] = _
   private[this] var javaResultFunc: SQLResultFunction[R] = _
-  @transient private var state: ListState[R] = _
+  @transient private var unionOffsetStates: ListState[R] = null
+
   private val OFFSETS_STATE_NAME: String = "jdbc-source-query-states"
   private[this] var last: R = _
 
@@ -66,7 +69,9 @@ class JdbcSourceFunction[R: TypeInformation](apiType: ApiType 
= ApiType.scala, j
     this(ApiType.scala, jdbc)
     this.scalaSqlFunc = sqlFunc
     this.scalaResultFunc = resultFunc
-    this.scalaRunningFunc = if (runningFunc == null) _ => true else runningFunc
+    if (runningFunc != null) {
+      this.scalaRunningFunc = runningFunc
+    }
   }
 
   // for JAVA
@@ -79,12 +84,9 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
     this(ApiType.java, jdbc)
     this.javaSqlFunc = javaSqlFunc
     this.javaResultFunc = javaResultFunc
-    this.javaRunningFunc =
-      if (runningFunc != null) runningFunc
-      else
-        new RunningFunction {
-          override def running(): lang.Boolean = true
-        }
+    if (runningFunc != null) {
+      this.javaRunningFunc = runningFunc
+    }
   }
 
   @throws[Exception]
@@ -125,9 +127,9 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
 
   override def snapshotState(context: FunctionSnapshotContext): Unit = {
     if (running) {
-      state.clear()
+      unionOffsetStates.clear()
       if (last != null) {
-        state.add(last)
+        unionOffsetStates.add(last)
       }
     } else {
       logError("JdbcSource snapshotState called on closed source")
@@ -136,8 +138,9 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
 
   override def initializeState(context: FunctionInitializationContext): Unit = 
{
     logInfo("JdbcSource snapshotState initialize")
-    state = FlinkUtils.getUnionListState[R](context, OFFSETS_STATE_NAME)
-    Try(state.get.head) match {
+    unionOffsetStates = FlinkUtils
+      .getUnionListState[R](context, getRuntimeContext.getExecutionConfig, 
OFFSETS_STATE_NAME)
+    Try(unionOffsetStates.get.head) match {
       case Success(q) => last = q
       case _ =>
     }
@@ -146,4 +149,5 @@ class JdbcSourceFunction[R: TypeInformation](apiType: 
ApiType = ApiType.scala, j
   override def notifyCheckpointComplete(checkpointId: Long): Unit = {
     logInfo(s"JdbcSource checkpointComplete: $checkpointId")
   }
+
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
index 560de9eb0..9d8e3691f 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
@@ -37,6 +37,13 @@ public class MongoJavaSource<T> {
     this.property = property;
   }
 
+    public DataStreamSource<T> getDataStream(
+        String collectionName,
+        MongoQueryFunction<T> queryFunction,
+        MongoResultFunction<T> resultFunction) {
+      return getDataStream(collectionName, queryFunction, resultFunction, 
null);
+    }
+
   public DataStreamSource<T> getDataStream(
       String collectionName,
       MongoQueryFunction<T> queryFunction,
@@ -46,9 +53,14 @@ public class MongoJavaSource<T> {
     AssertUtils.notNull(collectionName, "collectionName must not be null");
     AssertUtils.notNull(queryFunction, "queryFunction must not be null");
     AssertUtils.notNull(resultFunction, "resultFunction must not be null");
-    MongoSourceFunction<T> sourceFunction =
-        new MongoSourceFunction<>(
-            collectionName, property, queryFunction, resultFunction, 
runningFunc, null);
+    MongoSourceFunction<T> sourceFunction = new MongoSourceFunction<>(
+            collectionName,
+            property,
+            queryFunction,
+            resultFunction,
+            runningFunc,
+            null
+        );
     return context.getJavaEnv().addSource(sourceFunction);
   }
 }
diff --git 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
index 6e700992c..eea0c93c0 100644
--- 
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
+++ 
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
@@ -51,11 +51,13 @@ class MongoSourceFunction[R: TypeInformation](
   with Logger {
 
   @volatile private[this] var running = true
-  private[this] var scalaRunningFunc: Unit => Boolean = _
-  private[this] var javaRunningFunc: RunningFunction = _
+  private[this] var scalaRunningFunc: Unit => Boolean = (_) => true
+  private[this] var javaRunningFunc: RunningFunction = new RunningFunction {
+    override def running(): lang.Boolean = true
+  }
 
   var client: MongoClient = _
-  var mongoCollection: MongoCollection[Document] = _
+  private var mongoCollection: MongoCollection[Document] = _
 
   private[this] var scalaQueryFunc: (R, MongoCollection[Document]) => 
FindIterable[Document] = _
   private[this] var scalaResultFunc: MongoCursor[Document] => List[R] = _
@@ -78,7 +80,9 @@ class MongoSourceFunction[R: TypeInformation](
     this(ApiType.scala, prop, collectionName)
     this.scalaQueryFunc = scalaQueryFunc
     this.scalaResultFunc = scalaResultFunc
-    this.scalaRunningFunc = if (runningFunc == null) _ => true else runningFunc
+    if (runningFunc != null) {
+      this.scalaRunningFunc = runningFunc
+    }
   }
 
   // for JAVA
@@ -92,12 +96,9 @@ class MongoSourceFunction[R: TypeInformation](
     this(ApiType.java, prop, collectionName)
     this.javaQueryFunc = queryFunc
     this.javaResultFunc = resultFunc
-    this.javaRunningFunc =
-      if (runningFunc != null) runningFunc
-      else
-        new RunningFunction {
-          override def running(): lang.Boolean = true
-        }
+    if (runningFunc != null) {
+      this.javaRunningFunc = runningFunc
+    }
   }
 
   override def cancel(): Unit = this.running = false
@@ -146,6 +147,7 @@ class MongoSourceFunction[R: TypeInformation](
   }
 
   override def close(): Unit = {
+    this.running = false
     client.close()
   }
 
@@ -163,8 +165,9 @@ class MongoSourceFunction[R: TypeInformation](
   override def initializeState(context: FunctionInitializationContext): Unit = 
{
     // restore from checkpoint
     logInfo("MongoSource snapshotState initialize")
-    state = FlinkUtils.getUnionListState[R](context, OFFSETS_STATE_NAME)
-    Try(state.get.head) match {
+    state = FlinkUtils
+      .getUnionListState[R](context, getRuntimeContext.getExecutionConfig, 
OFFSETS_STATE_NAME)
+    Try(state.get().head) match {
       case Success(q) => last = q
       case _ =>
     }
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 8b0cd6908..343de6919 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -17,6 +17,12 @@
 
 package org.apache.streampark.flink.kubernetes
 
+import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.common.util.Utils.using
+import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
+import org.apache.streampark.flink.kubernetes.model.ClusterKey
+
 import io.fabric8.kubernetes.client.{DefaultKubernetesClient, 
KubernetesClient, KubernetesClientException}
 import org.apache.flink.client.cli.ClientOptions
 import org.apache.flink.client.deployment.{ClusterClientFactory, 
DefaultClusterClientServiceLoader}
@@ -24,13 +30,9 @@ import org.apache.flink.client.program.ClusterClient
 import org.apache.flink.configuration.{Configuration, DeploymentOptions, 
RestOptions}
 import org.apache.flink.kubernetes.KubernetesClusterDescriptor
 import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
-import org.apache.streampark.common.util.Utils.using
-import org.apache.streampark.common.util.{Logger, Utils}
-import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
-import org.apache.streampark.flink.kubernetes.ingress.IngressController
-import org.apache.streampark.flink.kubernetes.model.ClusterKey
 
 import javax.annotation.Nullable
+
 import scala.collection.JavaConverters._
 import scala.util.{Failure, Success, Try}
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index bd5279441..f2ad03597 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -17,20 +17,23 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
-import org.apache.hc.client5.http.fluent.Request
 import org.apache.streampark.common.util.Logger
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, MetricWatcherConfig}
 import 
org.apache.streampark.flink.kubernetes.event.FlinkJobCheckpointChangeEvent
 import org.apache.streampark.flink.kubernetes.model.{CheckpointCV, ClusterKey, 
TrackId}
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, MetricWatcherConfig}
+
+import org.apache.hc.client5.http.fluent.Request
+import org.json4s.{DefaultFormats, JNull}
 import org.json4s.JsonAST.JNothing
 import org.json4s.jackson.JsonMethods.parse
-import org.json4s.{DefaultFormats, JNull}
+
+import javax.annotation.concurrent.ThreadSafe
 
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.{ScheduledFuture, TimeUnit}
-import javax.annotation.concurrent.ThreadSafe
-import scala.concurrent.duration.DurationLong
+
 import scala.concurrent.{Await, ExecutionContext, 
ExecutionContextExecutorService, Future}
+import scala.concurrent.duration.DurationLong
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 889714a2d..e092557b0 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -17,20 +17,23 @@
 
 package org.apache.streampark.flink.kubernetes.watcher
 
-import org.apache.flink.configuration.{JobManagerOptions, MemorySize, 
TaskManagerOptions}
-import org.apache.hc.client5.http.fluent.Request
 import org.apache.streampark.common.util.Logger
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, MetricWatcherConfig}
 import 
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
 import org.apache.streampark.flink.kubernetes.model.{ClusterKey, 
FlinkMetricCV, TrackId}
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus, 
FlinkK8sWatchController, MetricWatcherConfig}
-import org.json4s.jackson.JsonMethods.parse
+
+import org.apache.flink.configuration.{JobManagerOptions, MemorySize, 
TaskManagerOptions}
+import org.apache.hc.client5.http.fluent.Request
 import org.json4s.{DefaultFormats, JArray}
+import org.json4s.jackson.JsonMethods.parse
+
+import javax.annotation.concurrent.ThreadSafe
 
 import java.nio.charset.StandardCharsets
 import java.util.concurrent.{ScheduledFuture, TimeUnit}
-import javax.annotation.concurrent.ThreadSafe
-import scala.concurrent.duration.DurationLong
+
 import scala.concurrent.{Await, ExecutionContext, 
ExecutionContextExecutorService, Future}
+import scala.concurrent.duration.DurationLong
 import scala.language.postfixOps
 import scala.util.{Failure, Success, Try}
 
diff --git 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
index 6ecb0bbe1..e6f836af5 100644
--- 
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
+++ 
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
@@ -22,12 +22,14 @@ import org.apache.hc.core5.util.Timeout
 import java.time.Duration
 import java.util.concurrent.ScheduledThreadPoolExecutor
 import java.util.concurrent.atomic.AtomicBoolean
+
 import scala.language.implicitConversions
 
 trait FlinkWatcher extends AutoCloseable {
 
   // see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
-  lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout = 
Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
+  lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
+    Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
 
   // see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
   lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout = 
Timeout.ofMilliseconds(30000L)
diff --git 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
index 58d38d311..537e1f70a 100644
--- 
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
+++ 
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
@@ -17,8 +17,10 @@
 
 package org.apache.streampark.flink.util
 
+import org.apache.flink.api.common.ExecutionConfig
 import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
 import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
 import org.apache.flink.runtime.state.FunctionInitializationContext
 import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
 import org.apache.flink.util.TimeUtils
@@ -31,9 +33,14 @@ object FlinkUtils {
 
   def getUnionListState[R: TypeInformation](
       context: FunctionInitializationContext,
+      execConfig: ExecutionConfig,
       descriptorName: String): ListState[R] = {
+
     context.getOperatorStateStore.getUnionListState(
-      new ListStateDescriptor(descriptorName, 
implicitly[TypeInformation[R]].getTypeClass))
+      new ListStateDescriptor(
+        descriptorName,
+        new KryoSerializer[R](implicitly[TypeInformation[R]].getTypeClass, 
execConfig))
+    )
   }
 
   def getFlinkDistJar(flinkHome: String): String = {

Reply via email to