This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git


The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
     new d260bb940 [Improve] rat-plugin check failed imporvement
d260bb940 is described below

commit d260bb94012fbb3f3fca0d6fcf18048d71e2dc63
Author: benjobs <[email protected]>
AuthorDate: Mon Jan 27 20:01:38 2025 +0800

    [Improve] rat-plugin check failed imporvement
---
 streampark-flink/pom.xml                           |   1 -
 .../streampark-flink-connector-test/pom.xml        | 267 ---------------------
 .../src/test/application.yml                       | 267 ---------------------
 .../quickstart/connector/ClickhouseJavaApp.java    |  51 ----
 .../flink/quickstart/connector/DorisJavaApp.java   |  44 ----
 .../flink/quickstart/connector/HttpJavaApp.java    |  36 ---
 .../flink/quickstart/connector/KafkaJavaApp.java   |  61 -----
 .../flink/quickstart/connector/MySQLJavaApp.java   |  73 ------
 .../flink/quickstart/connector/bean/Behavior.java  |  29 ---
 .../flink/quickstart/connector/bean/Entity.java    |  30 ---
 .../flink/quickstart/connector/bean/LogBean.java   |  30 ---
 .../flink/quickstart/connector/bean/Order.java     |  29 ---
 .../src/test/resources/logback.xml                 | 142 -----------
 .../quickstart/connector/ClickHouseSinkApp.scala   |  54 -----
 .../flink/quickstart/connector/ES7SinkApp.scala    |  44 ----
 .../quickstart/connector/HBaseRequestApp.scala     |  55 -----
 .../flink/quickstart/connector/HBaseSinkApp.scala  |  65 -----
 .../quickstart/connector/HBaseSourceApp.scala      |  82 -------
 .../flink/quickstart/connector/HttpSinkApp.scala   |  42 ----
 .../quickstart/connector/InfluxDBSinkApp.scala     |  77 ------
 .../flink/quickstart/connector/JdbcSinkApp.scala   |  58 -----
 .../flink/quickstart/connector/KafkaSinkApp.scala  |  77 ------
 .../quickstart/connector/KafkaSourceApp.scala      |  34 ---
 .../quickstart/connector/MongoSourceApp.scala      |  58 -----
 .../flink/quickstart/connector/MyDataSource.scala  |  53 ----
 .../quickstart/connector/MySQLSourceApp.scala      |  44 ----
 .../flink/quickstart/connector/SideOutApp.scala    | 121 ----------
 27 files changed, 1924 deletions(-)

diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 692adb640..fb00fe5db 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -32,7 +32,6 @@
         <module>streampark-flink-shims</module>
         <module>streampark-flink-core</module>
         <module>streampark-flink-connector</module>
-        <module>streampark-flink-connector-test</module>
         <module>streampark-flink-sqlclient</module>
         <module>streampark-flink-udf</module>
         <module>streampark-flink-client</module>
diff --git a/streampark-flink/streampark-flink-connector-test/pom.xml 
b/streampark-flink/streampark-flink-connector-test/pom.xml
deleted file mode 100644
index 7302c7849..000000000
--- a/streampark-flink/streampark-flink-connector-test/pom.xml
+++ /dev/null
@@ -1,267 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       https://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"; 
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
-         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
-
-    <modelVersion>4.0.0</modelVersion>
-    <parent>
-        <groupId>org.apache.streampark</groupId>
-        <artifactId>streampark-flink</artifactId>
-        <version>2.1.6</version>
-    </parent>
-    <artifactId>streampark-flink-connector-test</artifactId>
-    <name>StreamPark-quickstart: connector Test</name>
-
-    <properties>
-        <streampark.flink.shims.version>1.14</streampark.flink.shims.version>
-        <flink112.version>1.12.0</flink112.version>
-        <flink113.version>1.13.0</flink113.version>
-        <flink114.version>1.14.0</flink114.version>
-        <flink115.version>1.15.0</flink115.version>
-        <flink116.version>1.16.0</flink116.version>
-        <flink117.version>1.17.0</flink117.version>
-        <flink118.version>1.18.0</flink118.version>
-        <flink119.version>1.19.0</flink119.version>
-    </properties>
-
-    <dependencies>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            <artifactId>streampark-common_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-core_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-shims_flink-${streampark.flink.shims.version}_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-clickhouse_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-influx_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-kafka_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-jdbc_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-http_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-hbase_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-doris_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>com.mysql</groupId>
-            <artifactId>mysql-connector-j</artifactId>
-            <version>8.0.33</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-mongo_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.json4s</groupId>
-            <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
-            <version>3.6.7</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.json4s</groupId>
-            <artifactId>json4s-native_2.11</artifactId>
-            <version>3.6.7</version>
-        </dependency>
-
-        <!--flink base-->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-core</artifactId>
-            <version>${flink114.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-scala_${scala.binary.version}</artifactId>
-            <version>${flink114.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-clients_${scala.binary.version}</artifactId>
-            <version>${flink114.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
-            <version>${flink114.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
-            <version>${flink114.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.twitter</groupId>
-            <artifactId>chill-thrift</artifactId>
-            <version>0.7.6</version>
-            <!-- exclusions for dependency conversion -->
-            <exclusions>
-                <exclusion>
-                    <groupId>com.esotericsoftware.kryo</groupId>
-                    <artifactId>kryo</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <!-- libthrift is required by chill-thrift -->
-        <dependency>
-            <groupId>org.apache.thrift</groupId>
-            <artifactId>libthrift</artifactId>
-            <version>0.14.0</version>
-            <exclusions>
-                <exclusion>
-                    <groupId>javax.servlet</groupId>
-                    <artifactId>servlet-api</artifactId>
-                </exclusion>
-                <exclusion>
-                    <groupId>org.apache.httpcomponents</groupId>
-                    <artifactId>httpclient</artifactId>
-                </exclusion>
-            </exclusions>
-        </dependency>
-
-        <dependency>
-            <groupId>org.projectlombok</groupId>
-            <artifactId>lombok</artifactId>
-            <version>1.18.20</version>
-            <scope>provided</scope>
-        </dependency>
-
-        <!-- Kafka里面的消息采用Json格式 -->
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-json</artifactId>
-            <version>${flink114.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.flink</groupId>
-            <artifactId>flink-csv</artifactId>
-            <version>${flink114.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.alibaba.ververica</groupId>
-            <artifactId>flink-connector-mysql-cdc</artifactId>
-            <version>1.0.0</version>
-        </dependency>
-
-        <dependency>
-            <groupId>com.alibaba.ververica</groupId>
-            <artifactId>flink-format-changelog-json</artifactId>
-            <version>1.0.0</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-kafka_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.streampark</groupId>
-            
<artifactId>streampark-flink-connector-jdbc_${scala.binary.version}</artifactId>
-            <version>${project.version}</version>
-        </dependency>
-
-    </dependencies>
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-shade-plugin</artifactId>
-                <version>3.2.4</version>
-            </plugin>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-assembly-plugin</artifactId>
-                <version>3.1.1</version>
-            </plugin>
-        </plugins>
-    </build>
-
-</project>
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/application.yml 
b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
deleted file mode 100644
index 2462834b8..000000000
--- a/streampark-flink/streampark-flink-connector-test/src/test/application.yml
+++ /dev/null
@@ -1,267 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-flink:
-  option:
-    target: yarn-per-job
-    detached:
-    shutdownOnAttachedExit:
-    jobmanager:
-  property: #@see: 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/
-    $internal.application.main: 
org.apache.streampark.flink.quickstart.QuickStartApp
-    pipeline.name: streampark-quickstartApp
-    yarn.application.queue:
-    taskmanager.numberOfTaskSlots: 1
-    parallelism.default: 1
-    jobmanager.memory:
-      flink.size:
-      heap.size:
-      jvm-metaspace.size:
-      jvm-overhead.max:
-      off-heap.size:
-      process.size:
-    taskmanager.memory:
-      flink.size:
-      framework.heap.size:
-      framework.off-heap.size:
-      managed.size:
-      process.size:
-      task.heap.size:
-      task.off-heap.size:
-      jvm-metaspace.size:
-      jvm-overhead.max:
-      jvm-overhead.min:
-      managed.fraction: 0.4
-    pipeline:
-      auto-watermark-interval: 200ms
-    # checkpoint
-    execution:
-      checkpointing:
-        mode: EXACTLY_ONCE
-        interval: 10s
-        timeout: 10min
-        unaligned: false
-        externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
-    # state backend
-    state:
-      backend: rocksdb # Special note: flink1.12 optional configuration 
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration 
('hashmap', 'rocksdb'),
-      backend.incremental: true
-      checkpoint-storage: filesystem
-      savepoints.dir: file:///tmp/chkdir
-      checkpoints.dir: file:///tmp/chkdir
-    # restart strategy
-    restart-strategy: fixed-delay  # Restart strategy 
[(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
-    restart-strategy.fixed-delay:
-      attempts: 3
-      delay: 50000
-    restart-strategy.failure-rate:
-      max-failures-per-interval:
-      failure-rate-interval:
-      delay:
-  # table
-  table:
-    table.local-time-zone: default # @see 
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
-
-app: # user's parameter
-  # kafka source config....
-  kafka.source:
-    bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
-    topic: test_user
-    group.id: flink_02
-    auto.offset.reset: earliest
-      #enable.auto.commit: true
-      #start.from:
-    #timestamp: 1591286400000 #指定timestamp,针对所有的topic生效
-    #offset: # 给每个topic的partition指定offset
-    #topic: kafka01,kafka02
-    #kafka01: 0:182,1:183,2:182 #分区0从182开始消费,分区1从183...
-    #kafka02: 0:182,1:183,2:182
-
-  # kafka sink config....
-  kafka.sink:
-    bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
-    topic: test_user
-    transaction.timeout.ms: 1000
-    semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
-    batch.size: 1
-
-
-  # jdbc config...
-  jdbc:
-    semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
-    driverClassName: com.mysql.cj.jdbc.Driver
-    jdbcUrl: 
jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
-    username: root
-    password: 123456
-
-  # influx config...
-  influx:
-    mydb:
-      jdbcUrl: http://test9:8086
-      #username: admin
-      #password: admin
-
-  # hbase
-  hbase:
-    zookeeper.quorum: test1,test2,test6
-    zookeeper.property.clientPort: 2181
-    zookeeper.session.timeout: 1200000
-    rpc.timeout: 5000
-    client.pause: 20
-
-  ##clickhouse  jdbc同步写入配置
-  #clickhouse:
-  #  sink:
-  #    #    写入节点地址
-  #    jdbcUrl: jdbc:clickhouse://127.0.0.1:8123,192.168.1.2:8123
-  #    socketTimeout: 3000000
-  #    database: test
-  #    user: $user
-  #    password: $password
-  #    #    写结果表及对应的字段,全部可不写字段
-  #    targetTable: orders(userId,siteId)
-  #    batch:
-  #      size: 1
-  #      delaytime: 6000000
-
-  clickhouse:
-    sink:
-      hosts: 127.0.0.1:8123,192.168.1.2:8123
-      socketTimeout: 3000000
-      database: test
-      user: $user
-      password: $password
-      targetTable: test.orders(userId,siteId)
-      batch:
-        size: 1
-        delaytime: 60000
-      threshold:
-        bufferSize: 10
-        #      异步写入的并发数
-        numWriters: 4
-        #      缓存队列大小
-        queueCapacity: 100
-        delayTime: 10
-        requestTimeout: 600
-        retries: 1
-        #      成功响应码
-        successCode: 200
-      failover:
-        table: chfailover
-        #      达到失败最大写入次数后,数据备份的组件
-        storage: kafka #kafka|mysql|hbase|hdfs
-        mysql:
-          driverClassName: com.mysql.cj.jdbc.Driver
-          jdbcUrl: 
jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
-          username: user
-          password: pass
-        kafka:
-          bootstrap.servers: localhost:9092
-          topic: test1
-          group.id: user_01
-          auto.offset.reset: latest
-        hbase:
-        hdfs:
-          path: /data/chfailover
-          namenode: hdfs://localhost:8020
-          user: hdfs
-
-  #http sink 配置
-  http.sink:
-    threshold:
-      numWriters: 3
-      queueCapacity: 10000 
#队列最大容量,视单条记录大小而自行估量队列大小,如值太大,上游数据源来的太快,下游写入数据跟不上可能会OOM.
-      timeout: 100 #发送http请求的超时时间
-      retries: 3 #发送失败时的最大重试次数
-      successCode: 200 #发送成功状态码,这里可以有多个值,用","号分隔
-    failover:
-      table: record
-      storage: mysql #kafka,hbase,hdfs
-      jdbc: # 保存类型为MySQL,将失败的数据保存到MySQL
-        jdbcUrl: jdbc:mysql://localhost:3306/test
-        username: root
-        password: 123456
-      kafka:
-        topic: bigdata
-        bootstrap.servers: localhost:9093
-      hbase:
-        zookeeper.quorum: localhost
-        zookeeper.property.clientPort: 2181
-      hdfs:
-        namenode: hdfs://localhost:8020 # namenode rpc address and port, e.g: 
hdfs://hadoop:8020 , hdfs://hadoop:9000
-        user: benjobs # user
-        path: /clickhouse/failover # save path
-        format: yyyy-MM-dd
-
-  #redis sink 配置
-  redis.sink:
-    #  masterName: master 哨兵模式参数
-    #  host: 192.168.0.1:6379, 192.168.0.3:6379 哨兵模式参数
-    host: 127.0.0.1
-    port: 6379
-    database: 2
-    connectType: jedisPool #可选参数:jedisPool(默认)|sentinel
-
-  es.sink:
-    #  必填参数,多个节点使用 host1:port, host2:port,
-    host: localhost:9200
-  #  选填参数
-  #  es:
-  #    disableFlushOnCheckpoint: false #是否
-  #    auth:
-  #    user:
-  #      password:
-  #    rest:
-  #      max.retry.timeout:
-  #      path.prefix:
-  #      content.type:
-  #    connect:
-  #      request.timeout:
-  #      timeout:
-  #    cluster.name: elasticsearch
-  #  client.transport.sniff:
-  #  bulk.flush.:
-
-sql:
-  my_flinksql: |
-    CREATE TABLE datagen (
-      f_sequence INT,
-      f_random INT,
-      f_random_str STRING,
-      ts AS localtimestamp,
-      WATERMARK FOR ts AS ts
-    ) WITH (
-      'connector' = 'datagen',
-      -- optional options --
-      'rows-per-second'='5', --这个是注释--
-      'fields.f_sequence.kind'='sequence',
-      'fields.f_sequence.start'='1',
-      'fields.f_sequence.end'='1000',
-      'fields.f_random.min'='1',
-      'fields.f_random.max'='1000',
-      'fields.f_random_str.length'='10'
-    );
-
-    CREATE TABLE print_table (
-      f_sequence INT,
-      f_random INT,
-      f_random_str STRING
-      ) WITH (
-      'connector' = 'print'
-    );
-
-    INSERT INTO print_table select f_sequence,f_random,f_random_str from 
datagen;
-
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
deleted file mode 100644
index 8370a1999..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-import org.apache.streampark.flink.quickstart.connector.bean.Entity;
-
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
-/** @author benjobs */
-public class ClickhouseJavaApp {
-
-  public static void main(String[] args) {
-    StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
-    StreamingContext context = new StreamingContext(envConfig);
-    DataStreamSource<Entity> source = context.getJavaEnv().addSource(new 
MyDataSource());
-
-    // 2) async高性能写入
-    new ClickHouseSink(context)
-        .asyncSink(
-            source,
-            value ->
-                String.format(
-                    "insert into test.orders(userId, siteId) values (%d,%d)",
-                    value.userId, value.siteId))
-        .setParallelism(1);
-
-    // 3) jdbc方式写入
-    /**
-     * new ClickHouseSink(context).jdbcSink(source, bean -> 
String.format("insert into
-     * test.orders(userId, siteId) values (%d,%d)", bean.userId, bean.siteId) 
).setParallelism(1);
-     */
-    context.start();
-  }
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
deleted file mode 100644
index 2d8091199..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.flink.connector.doris.sink.DorisSink;
-import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
-import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/** @author wudi */
-public class DorisJavaApp {
-
-  public static void main(String[] args) {
-    StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
-    StreamingContext context = new StreamingContext(envConfig);
-    DataStream<String> source =
-        new KafkaJavaSource<String>(context)
-            .getDataStream()
-            .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value)
-            .returns(String.class);
-
-    new DorisSink<String>(context).sink(source);
-
-    context.start();
-  }
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
deleted file mode 100644
index 643d6840b..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.flink.connector.http.sink.HttpSink;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-import org.apache.streampark.flink.quickstart.connector.bean.Entity;
-
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
-/** @author wudi */
-public class HttpJavaApp {
-
-  public static void main(String[] args) {
-    StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
-    StreamingContext context = new StreamingContext(envConfig);
-    DataStreamSource<Entity> source = context.getJavaEnv().addSource(new 
MyDataSource());
-    new HttpSink(context).get(source.map(x -> 
String.format("http://www.qq.com?id=%d";, x.userId)));
-    context.start();
-  }
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
deleted file mode 100644
index 740ca08d8..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.common.util.JsonUtils;
-import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
-import org.apache.streampark.flink.connector.kafka.sink.KafkaJavaSink;
-import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-import org.apache.streampark.flink.quickstart.connector.bean.Behavior;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/** @author benjobs */
-public class KafkaJavaApp {
-
-  public static void main(String[] args) {
-
-    StreamEnvConfig javaConfig =
-        new StreamEnvConfig(
-            args,
-            (environment, parameterTool) -> {
-              // environment.getConfig().enableForceAvro();
-              System.out.println("environment argument set...");
-            });
-
-    StreamingContext context = new StreamingContext(javaConfig);
-
-    // 1) 从 kafka 中读取数据
-    DataStream<Behavior> source =
-        new KafkaJavaSource<String>(context)
-            .getDataStream()
-            .map(
-                (MapFunction<KafkaRecord<String>, Behavior>)
-                    value -> JsonUtils.read(value, Behavior.class));
-
-    // 2) 将数据写入其他 kafka 主题
-    new KafkaJavaSink<Behavior>(context)
-        .serializer((SerializationSchema<Behavior>) element -> 
JsonUtils.write(element).getBytes())
-        .sink(source);
-
-    context.start();
-  }
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
deleted file mode 100644
index a11772ef9..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.flink.connector.function.QueryFunction;
-import org.apache.streampark.flink.connector.function.ResultFunction;
-import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-import org.apache.streampark.flink.quickstart.connector.bean.Order;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-public class MySQLJavaApp {
-
-  public static void main(String[] args) {
-
-    StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
-
-    StreamingContext context = new StreamingContext(envConfig);
-
-    // 读取MySQL数据源
-    new JdbcJavaSource<>(context, Order.class)
-        .getDataStream(
-            (QueryFunction<Order>)
-                lastOne -> {
-                  // 5秒抽取一次
-                  Thread.sleep(3000);
-                  Serializable lastOffset =
-                      lastOne == null ? "2020-10-10 23:00:00" : 
lastOne.getTimestamp();
-                  return String.format(
-                      "select * from t_order "
-                          + "where timestamp > '%s' "
-                          + "order by timestamp asc ",
-                      lastOffset);
-                },
-            (ResultFunction<Order>)
-                map -> {
-                  List<Order> result = new ArrayList<>();
-                  map.forEach(
-                      item -> {
-                        Order order = new Order();
-                        order.setOrderId(item.get("order_id").toString());
-                        order.setMarketId(item.get("market_id").toString());
-                        
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
-                        result.add(order);
-                      });
-                  return result;
-                })
-        .returns(TypeInformation.of(Order.class))
-        .print("jdbc source: >>>>>");
-
-    context.start();
-  }
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
deleted file mode 100644
index d01165370..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector.bean;
-
-import lombok.Data;
-
-/** @author benjobs */
-@Data
-public class Behavior {
-  private String user_id;
-  private Long item_id;
-  private Long category_id;
-  private String behavior;
-  private Long ts;
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
deleted file mode 100644
index 09653fe2a..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector.bean;
-
-/** @author benjobs */
-public class Entity {
-
-  public Long userId;
-  public Long orderId;
-  public Long siteId;
-  public Long cityId;
-  public Integer orderStatus;
-  public Double price;
-  public Integer quantity;
-  public Long timestamp;
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
deleted file mode 100644
index b143d70e3..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector.bean;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class LogBean implements Serializable {
-  private String platenum;
-  private String cardType;
-  private Long inTime;
-  private Long outTime;
-  private String controlid;
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
 
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
deleted file mode 100644
index 8f98f4d67..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector.bean;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class Order implements Serializable {
-  private String orderId;
-  private String marketId;
-  private Double price;
-  private Long timestamp;
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
 
b/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
deleted file mode 100755
index 537cb762c..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
-    Licensed to the Apache Software Foundation (ASF) under one or more
-    contributor license agreements.  See the NOTICE file distributed with
-    this work for additional information regarding copyright ownership.
-    The ASF licenses this file to You under the Apache License, Version 2.0
-    (the "License"); you may not use this file except in compliance with
-    the License.  You may obtain a copy of the License at
-
-       https://www.apache.org/licenses/LICENSE-2.0
-
-    Unless required by applicable law or agreed to in writing, software
-    distributed under the License is distributed on an "AS IS" BASIS,
-    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-    See the License for the specific language governing permissions and
-    limitations under the License.
-
--->
-<configuration>
-    <!-- 日志文件存储路径 -->
-    <property name="LOG_HOME" value="logs/"/>
-    <property name="FILE_SIZE" value="50MB"/>
-    <property name="MAX_HISTORY" value="100"/>
-    <timestamp key="DATE_TIME" datePattern="yyyy-MM-dd HH:mm:ss"/>
-
-    <property name="log.colorPattern"
-              value="%d{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) | 
%boldYellow(%thread) | %boldGreen(%logger) | %msg%n"/>
-    <property name="log.pattern"
-              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %contextName [%thread] 
%-5level %logger{36} - %msg%n"/>
-
-    <!-- 控制台打印 -->
-    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
-        <encoder charset="utf-8">
-            <pattern>${log.colorPattern}</pattern>
-        </encoder>
-    </appender>
-    <!-- ERROR 输入到文件,按日期和文件大小 -->
-    <appender name="ERROR" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <encoder charset="utf-8">
-            <pattern>${log.pattern}</pattern>
-        </encoder>
-        <filter class="ch.qos.logback.classic.filter.LevelFilter">
-            <level>ERROR</level>
-            <onMatch>ACCEPT</onMatch>
-            <onMismatch>DENY</onMismatch>
-        </filter>
-        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-            <fileNamePattern>${LOG_HOME}%d/error.%i.log</fileNamePattern>
-            <maxHistory>${MAX_HISTORY}</maxHistory>
-            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-                <maxFileSize>${FILE_SIZE}</maxFileSize>
-            </timeBasedFileNamingAndTriggeringPolicy>
-        </rollingPolicy>
-    </appender>
-
-    <!-- WARN 输入到文件,按日期和文件大小 -->
-    <appender name="WARN" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <encoder charset="utf-8">
-            <pattern>${log.pattern}</pattern>
-        </encoder>
-        <filter class="ch.qos.logback.classic.filter.LevelFilter">
-            <level>WARN</level>
-            <onMatch>ACCEPT</onMatch>
-            <onMismatch>DENY</onMismatch>
-        </filter>
-        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-            <fileNamePattern>${LOG_HOME}%d/warn.%i.log</fileNamePattern>
-            <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
-            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-                <maxFileSize>${FILE_SIZE}</maxFileSize>
-            </timeBasedFileNamingAndTriggeringPolicy>
-        </rollingPolicy>
-    </appender>
-
-    <!-- INFO 输入到文件,按日期和文件大小 -->
-    <appender name="INFO" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <encoder charset="utf-8">
-            <pattern>${log.pattern}</pattern>
-        </encoder>
-        <filter class="ch.qos.logback.classic.filter.LevelFilter">
-            <level>INFO</level>
-            <onMatch>ACCEPT</onMatch>
-            <onMismatch>DENY</onMismatch>
-        </filter>
-        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-            <fileNamePattern>${LOG_HOME}%d/info.%i.log</fileNamePattern>
-            <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
-            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-                <maxFileSize>${FILE_SIZE}</maxFileSize>
-            </timeBasedFileNamingAndTriggeringPolicy>
-        </rollingPolicy>
-    </appender>
-    <!-- DEBUG 输入到文件,按日期和文件大小 -->
-    <appender name="DEBUG" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <encoder charset="utf-8">
-            <pattern>${log.pattern}</pattern>
-        </encoder>
-        <filter class="ch.qos.logback.classic.filter.LevelFilter">
-            <level>DEBUG</level>
-            <onMatch>ACCEPT</onMatch>
-            <onMismatch>DENY</onMismatch>
-        </filter>
-        <rollingPolicy 
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-            <fileNamePattern>${LOG_HOME}%d/debug.%i.log</fileNamePattern>
-            <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
-            <timeBasedFileNamingAndTriggeringPolicy
-                    class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-                <maxFileSize>${FILE_SIZE}</maxFileSize>
-            </timeBasedFileNamingAndTriggeringPolicy>
-        </rollingPolicy>
-    </appender>
-    <!-- TRACE 输入到文件,按日期和文件大小 -->
-    <appender name="TRACE" 
class="ch.qos.logback.core.rolling.RollingFileAppender">
-        <encoder charset="utf-8">
-            <pattern>${log.pattern}</pattern>
-        </encoder>
-        <filter class="ch.qos.logback.classic.filter.LevelFilter">
-            <level>TRACE</level>
-            <onMatch>ACCEPT</onMatch>
-            <onMismatch>DENY</onMismatch>
-        </filter>
-        <rollingPolicy
-                class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
-            <fileNamePattern>${LOG_HOME}%d/trace.%i.log</fileNamePattern>
-            <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
-            <timeBasedFileNamingAndTriggeringPolicy 
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
-                <maxFileSize>${FILE_SIZE}</maxFileSize>
-            </timeBasedFileNamingAndTriggeringPolicy>
-        </rollingPolicy>
-    </appender>
-
-    <!-- Logger 根目录 -->
-    <root level="INFO">
-        <appender-ref ref="STDOUT"/>
-        <appender-ref ref="DEBUG"/>
-        <appender-ref ref="ERROR"/>
-        <appender-ref ref="WARN"/>
-        <appender-ref ref="INFO"/>
-        <appender-ref ref="TRACE"/>
-    </root>
-</configuration>
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
deleted file mode 100644
index db70be422..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-object ClickHouseSinkApp extends FlinkStreaming {
-
-  implicit val entityType: TypeInformation[Entity] = 
TypeInformation.of(classOf[Entity])
-
-  override def handle(): Unit = {
-    // 假如在clickhouse里已经有以下表.
-    val createTable =
-      """
-        |create TABLE test.orders(
-        |userId UInt16,
-        |siteId UInt8,
-        |timestamp UInt16
-        |)ENGINE = TinyLog;
-        |""".stripMargin
-
-    println(createTable)
-
-    // 1) 接入数据源
-    val source = context.addSource(new MyDataSource)
-
-    // 2)高性能异步写入
-    ClickHouseSink().asyncSink(source)(
-      x => { s"insert into test.orders(userId,siteId) values 
(${x.userId},${x.siteId})" })
-
-    // 3) jdbc方式写入
-    // ClickHouseSink().jdbcSink(source)(x => {s"insert into 
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
-
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
deleted file mode 100644
index 190c0eca6..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.elasticsearch7.sink.ES7Sink
-import 
org.apache.streampark.flink.connector.elasticsearch7.util.ElasticsearchUtils
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.scala.DataStream
-import org.elasticsearch.action.index.IndexRequest
-import org.json4s.jackson.JsonMethods
-
-object ES7SinkApp extends FlinkStreaming {
-
-  implicit val entityType: TypeInformation[Entity] = 
TypeInformation.of(classOf[Entity])
-
-  override def handle(): Unit = {
-    val source: DataStream[Entity] = context.addSource(new MyDataSource)
-
-    implicit def indexedSeq(x: Entity): IndexRequest = 
ElasticsearchUtils.indexRequest(
-      "flink_order",
-      s"${x.orderId}_${x.timestamp}",
-      JsonMethods.mapper.writeValueAsString(x)
-    )
-
-    ES7Sink().sink[Entity](source)
-  }
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
deleted file mode 100644
index c3e49b36f..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.common.util.ConfigUtils
-import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
-import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.hadoop.hbase.client.Get
-
-object HBaseRequestApp extends FlinkStreaming {
-
-  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
-
-  implicit val reqType: TypeInformation[(String, Boolean)] =
-    TypeInformation.of(classOf[(String, Boolean)])
-
-  override def handle(): Unit = {
-
-    implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
-    // one topic
-    val source = context.fromCollection(Seq("123456", "1111", "222"))
-
-    source.print("source:>>>")
-
-    HBaseRequest(source)
-      .requestOrdered[(String, Boolean)](
-        x => {
-          new HBaseQuery("person", new Get(x.getBytes()))
-        },
-        timeout = 5000,
-        resultFunc = (a, r) => {
-          a -> !r.isEmpty
-        })
-      .print(" check.... ")
-
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
deleted file mode 100644
index 11fa2d484..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.common.util.ConfigUtils
-import org.apache.streampark.flink.connector.hbase.sink.{HBaseOutputFormat, 
HBaseSink}
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.hadoop.hbase.client.{Mutation, Put}
-import org.apache.hadoop.hbase.util.Bytes
-
-import java.util.{Collections, Random}
-
-import scala.language.implicitConversions
-
-object HBaseSinkApp extends FlinkStreaming {
-
-  implicit val entityType: TypeInformation[Entity] = 
TypeInformation.of(classOf[Entity])
-  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
-
-  override def handle(): Unit = {
-    val source = context.addSource(new MyDataSource)
-    val random = new Random()
-
-    // 定义转换规则...
-    implicit def entry2Put(entity: Entity): java.lang.Iterable[Mutation] = {
-      val put =
-        new Put(Bytes.toBytes(System.nanoTime() + random.nextInt(1000000)), 
entity.timestamp)
-      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"), 
Bytes.toBytes(entity.cityId))
-      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"), 
Bytes.toBytes(entity.orderId))
-      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"), 
Bytes.toBytes(entity.orderStatus))
-      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oq"), 
Bytes.toBytes(entity.quantity))
-      put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"), 
Bytes.toBytes(entity.siteId))
-      Collections.singleton(put)
-    }
-    // source ===> trans ===> sink
-
-    // 1)插入方式1
-    HBaseSink().sink[Entity](source, "order")
-
-    // 2) 插入方式2
-    // 1.指定HBase 配置文件
-    val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
-    // 2.插入...
-    source.writeUsingOutputFormat(new HBaseOutputFormat[Entity]("order", prop))
-
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
deleted file mode 100644
index 67e072d2e..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.common.util.ConfigUtils
-import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
-import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
-import org.apache.streampark.flink.connector.hbase.source.HBaseSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.hadoop.hbase.CellUtil
-import org.apache.hadoop.hbase.client.{Get, Scan}
-import org.apache.hadoop.hbase.util.Bytes
-
-import java.util
-
-object HBaseSourceApp extends FlinkStreaming {
-
-  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
-
-  override def handle(): Unit = {
-
-    implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
-
-    val id = HBaseSource().getDataStream[String](
-      query => {
-        Thread.sleep(10)
-        if (query == null) {
-          new HBaseQuery("person", new Scan())
-        } else {
-          // TODO 从上一条记录中获取便宜量,决定下次查询的条件...
-          new HBaseQuery("person", new Scan())
-        }
-      },
-      r => new String(r.getRow)
-    )
-
-    HBaseRequest(id)
-      .requestOrdered(
-        x => {
-          new HBaseQuery("person", new Get(x.getBytes()))
-        },
-        (a, r) => {
-          val map = new util.HashMap[String, String]()
-          val cellScanner = r.cellScanner()
-          while (cellScanner.advance()) {
-            val cell = cellScanner.current()
-            val q = Bytes.toString(CellUtil.cloneQualifier(cell))
-            val (name, v) = q.split("_") match {
-              case Array(_type, name) =>
-                _type match {
-                  case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
-                  case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
-                  case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
-                  case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
-                }
-              case _ =>
-            }
-            map.put(name.toString, v.toString)
-          }
-          map.toString
-        }
-      )
-      .print("Async")
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
deleted file mode 100644
index 80b238798..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.http.sink.HttpSink
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-object HttpSinkApp extends FlinkStreaming {
-
-  implicit val entityType: TypeInformation[Entity] = 
TypeInformation.of(classOf[Entity])
-  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
-
-  override def handle(): Unit = {
-
-    /** source */
-    val source = context
-      .addSource(new MyDataSource)
-      .map(x => s"http://www.qq.com?id=${x.userId}";)
-
-    // sink
-    new HttpSink(context).get(source)
-
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
deleted file mode 100644
index 0f18055e0..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.influx.bean.InfluxEntity
-import org.apache.streampark.flink.connector.influx.sink.InfluxSink
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-
-import scala.util.Random
-
-/** 侧输出流 */
-object InfluxDBSinkApp extends FlinkStreaming {
-
-  implicit val entityType: TypeInformation[Weather] = 
TypeInformation.of(classOf[Weather])
-
-  override def handle(): Unit = {
-    val source = context.addSource(new WeatherSource())
-
-    // weather,altitude=1000,area=北 temperature=11,humidity=-4
-
-    implicit val entity: InfluxEntity[Weather] = new InfluxEntity[Weather](
-      "mydb",
-      "test",
-      "autogen",
-      (x: Weather) => Map("altitude" -> x.altitude.toString, "area" -> x.area),
-      (x: Weather) =>
-        Map(
-          "temperature" -> x.temperature.asInstanceOf[Object],
-          "humidity" -> x.humidity.asInstanceOf[Object])
-    )
-
-    InfluxSink().sink(source, "mydb")
-
-  }
-
-}
-
-/** 温度 temperature 湿度 humidity 地区 area 海拔 altitude */
-case class Weather(temperature: Long, humidity: Long, area: String, altitude: 
Long)
-
-class WeatherSource extends SourceFunction[Weather] {
-
-  private[this] var isRunning = true
-
-  override def cancel(): Unit = this.isRunning = false
-
-  val random = new Random()
-
-  override def run(ctx: SourceFunction.SourceContext[Weather]): Unit = {
-    while (isRunning) {
-      val temperature = random.nextInt(100)
-      val humidity = random.nextInt(30)
-      val area = List("北", "上", "广", "深")(random.nextInt(4))
-      val altitude = random.nextInt(10000)
-      val order = Weather(temperature, humidity, area, altitude)
-      ctx.collect(order)
-    }
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
deleted file mode 100644
index cb4c993ad..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.jdbc.sink.JdbcSink
-import org.apache.streampark.flink.connector.kafka.source.KafkaSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-object JdbcSinkApp extends FlinkStreaming {
-
-  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
-
-  override def handle(): Unit = {
-
-    /** 从kafka里读数据.这里的数据是数字或者字母,每次读取1条 */
-    val source = KafkaSource()
-      .getDataStream[String]()
-      .uid("kfkSource1")
-      .name("kfkSource1")
-      .map(
-        x => {
-          x.value
-        })
-
-    /**
-     * 假设这里有一个orders表.有一个字段,id的类型是int 在数据插入的时候制造异常: 1)正确情况:
-     * 当从kafka中读取的内容全部是数字时会插入成功,kafka的消费的offset也会更新. 如: 当前kafka
-     * size为20,手动输入10个数字,则size为30,然后会将这10个数字写入到Mysql,kafka的offset也会更新 2)异常情况:
-     * 当从kafka中读取的内容非数字会导致插入失败,kafka的消费的offset会回滚 如: 当前的kafka 
size为30,offset是30,
-     * 手动输入1个字母,此时size为31,写入mysql会报错,kafka的offset依旧是30,不会发生更新.
-     */
-    JdbcSink(parallelism = 5)
-      .sink[String](source)(
-        x => {
-          s"insert into orders(id,timestamp) 
values('$x',${System.currentTimeMillis()})"
-        })
-      .uid("mysqlSink")
-      .name("mysqlSink")
-
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
deleted file mode 100644
index af8aa55aa..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.kafka.sink.KafkaSink
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-
-import scala.util.Random
-
-object KafkaSinkApp extends FlinkStreaming {
-
-  implicit val stringType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
-  implicit val entityType: TypeInformation[Behavior] = 
TypeInformation.of(classOf[Behavior])
-
-  override def handle(): Unit = {
-    val source = new BehaviorSource()
-    val ds = context.addSource[Behavior](source).map(_.toString)
-    ds.print()
-    KafkaSink().sink(ds)
-  }
-
-}
-
-case class Behavior(user_id: String, item_id: Long, category_id: Long, 
behavior: String, ts: Long) {
-  override def toString: String = {
-    s"""
-       |{
-       |user_id:$user_id,
-       |item_id:$item_id,
-       |category_id:$category_id,
-       |behavior:$behavior,
-       |ts:$ts
-       |}
-       |""".stripMargin
-  }
-}
-
-class BehaviorSource extends SourceFunction[Behavior] {
-  private[this] var isRunning = true
-
-  override def cancel(): Unit = this.isRunning = false
-
-  val random = new Random()
-  var index = 0
-
-  override def run(ctx: SourceFunction.SourceContext[Behavior]): Unit = {
-    val seq = Seq("view", "click", "search", "buy", "share")
-    while (isRunning && index <= 10000) {
-      index += 1
-      val user_id = random.nextInt(1000)
-      val item_id = random.nextInt(100)
-      val category_id = random.nextInt(20)
-      val behavior = seq(random.nextInt(5))
-      val order =
-        Behavior(user_id.toString, item_id, category_id, behavior, 
System.currentTimeMillis())
-      ctx.collect(order)
-    }
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
deleted file mode 100644
index 651605678..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.kafka.source.KafkaSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.streaming.api.scala._
-
-object KafkaSourceApp extends FlinkStreaming {
-  override def handle(): Unit = {
-
-    KafkaSource()
-      .getDataStream[String]()
-      .map(_.value)
-      .print()
-
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
deleted file mode 100644
index 37873a964..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.common.util.{DateUtils, JsonUtils}
-import org.apache.streampark.flink.connector.mongo.source.MongoSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import com.mongodb.BasicDBObject
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import java.util.Properties
-
-import scala.collection.JavaConversions._
-
-object MongoSourceApp extends FlinkStreaming {
-
-  implicit val entityType: TypeInformation[String] = 
TypeInformation.of(classOf[String])
-
-  override def handle(): Unit = {
-    implicit val prop: Properties = context.parameter.getProperties
-    val source = MongoSource()
-    source
-      .getDataStream[String](
-        "shop",
-        (a, d) => {
-          Thread.sleep(1000)
-
-          /** 
从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset */
-          val offset =
-            if (a == null) "2019-09-27 00:00:00"
-            else {
-              JsonUtils.read[Map[String, _]](a).get("updateTime").toString
-            }
-          val cond = new BasicDBObject()
-            .append("updateTime", new BasicDBObject("$gte", 
DateUtils.parse(offset)))
-          d.find(cond)
-        },
-        _.toList.map(_.toJson())
-      )
-      .print()
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
deleted file mode 100644
index 5348d104d..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-
-import scala.util.Random
-
-class MyDataSource extends SourceFunction[Entity] {
-
-  private[this] var isRunning = true
-
-  override def cancel(): Unit = this.isRunning = false
-
-  val random = new Random()
-
-  var index = 0
-
-  override def run(ctx: SourceFunction.SourceContext[Entity]): Unit = {
-    while (isRunning && index <= 1000001) {
-      index += 1
-
-      val entity = new Entity()
-      entity.userId = System.currentTimeMillis()
-      entity.orderId = random.nextInt(100)
-      entity.orderStatus = random.nextInt(1)
-      entity.price = random.nextDouble()
-      entity.quantity = new Random().nextInt(10)
-      entity.cityId = 1
-      entity.siteId = random.nextInt(20)
-      entity.timestamp = System.currentTimeMillis()
-
-      ctx.collect(entity)
-    }
-  }
-
-}
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
deleted file mode 100644
index 3b7447a67..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.jdbc.source.JdbcSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-object MySQLSourceApp extends FlinkStreaming {
-
-  implicit val entityType: TypeInformation[Order] = 
TypeInformation.of(classOf[Order])
-
-  override def handle(): Unit = {
-
-    JdbcSource()
-      .getDataStream[Order](
-        lastOne => {
-          val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else 
lastOne.timestamp
-          s"select * from t_order where timestamp > '$laseOffset' order by 
timestamp asc "
-        },
-        _.map(x => new Order(x("market_id").toString, x("timestamp").toString))
-      )
-      .print()
-
-  }
-
-}
-
-class Order(val marketId: String, val timestamp: String) extends Serializable
diff --git 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
 
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
deleted file mode 100644
index 24180b834..000000000
--- 
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.util.Collector
-
-import scala.util.Random
-
-/** 侧输出流 */
-object SideOutApp extends FlinkStreaming {
-
-  implicit val entityType: TypeInformation[SideEntry] = 
TypeInformation.of(classOf[SideEntry])
-
-  override def handle(): Unit = {
-    val source = context.addSource(new SideSource())
-
-    /** 侧输出流。。。 官方写法:设置侧输出流 */
-    val side1 = source.process(new ProcessFunction[SideEntry, SideEntry] {
-      val tag = new OutputTag[SideEntry]("flink")
-
-      override def processElement(
-          value: SideEntry,
-          ctx: ProcessFunction[SideEntry, SideEntry]#Context,
-          out: Collector[SideEntry]): Unit = {
-        if (value.userId < 100) {
-          ctx.output(tag, value)
-        } else {
-          out.collect(value)
-        }
-      }
-    })
-
-    // 官方写法,获取侧输出流
-    side1.getSideOutput(new 
OutputTag[SideEntry]("flink")).print("flink:========>")
-
-  }
-
-}
-
-/**
- * @param userId
- *   : 用户Id
- * @param orderId
- *   : 订单ID
- * @param siteId
- *   : 站点ID
- * @param cityId
- *   : 城市Id
- * @param orderStatus
- *   : 订单状态(1:下单,0:退单)
- * @param isNewOrder
- *   : 是否是首单
- * @param price
- *   : 单价
- * @param quantity
- *   : 订单数量
- * @param timestamp
- *   : 下单时间
- */
-case class SideEntry(
-    userId: Long,
-    orderId: Long,
-    siteId: Long,
-    cityId: Long,
-    orderStatus: Int,
-    isNewOrder: Int,
-    price: Double,
-    quantity: Int,
-    timestamp: Long)
-
-class SideSource extends SourceFunction[SideEntry] {
-
-  private[this] var isRunning = true
-
-  override def cancel(): Unit = this.isRunning = false
-
-  val random = new Random()
-
-  override def run(ctx: SourceFunction.SourceContext[SideEntry]): Unit = {
-    while (isRunning) {
-      val userId = random.nextInt(1000)
-      val orderId = random.nextInt(100)
-      val status = random.nextInt(1)
-      val isNew = random.nextInt(1)
-      val price = random.nextDouble()
-      val quantity = new Random(10).nextInt()
-      val order = SideEntry(
-        userId,
-        orderId,
-        siteId = 1,
-        cityId = 1,
-        status,
-        isNew,
-        price,
-        quantity,
-        System.currentTimeMillis)
-      ctx.collect(order)
-    }
-  }
-
-}


Reply via email to