This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
new d260bb940 [Improve] rat-plugin check failed imporvement
d260bb940 is described below
commit d260bb94012fbb3f3fca0d6fcf18048d71e2dc63
Author: benjobs <[email protected]>
AuthorDate: Mon Jan 27 20:01:38 2025 +0800
[Improve] rat-plugin check failed imporvement
---
streampark-flink/pom.xml | 1 -
.../streampark-flink-connector-test/pom.xml | 267 ---------------------
.../src/test/application.yml | 267 ---------------------
.../quickstart/connector/ClickhouseJavaApp.java | 51 ----
.../flink/quickstart/connector/DorisJavaApp.java | 44 ----
.../flink/quickstart/connector/HttpJavaApp.java | 36 ---
.../flink/quickstart/connector/KafkaJavaApp.java | 61 -----
.../flink/quickstart/connector/MySQLJavaApp.java | 73 ------
.../flink/quickstart/connector/bean/Behavior.java | 29 ---
.../flink/quickstart/connector/bean/Entity.java | 30 ---
.../flink/quickstart/connector/bean/LogBean.java | 30 ---
.../flink/quickstart/connector/bean/Order.java | 29 ---
.../src/test/resources/logback.xml | 142 -----------
.../quickstart/connector/ClickHouseSinkApp.scala | 54 -----
.../flink/quickstart/connector/ES7SinkApp.scala | 44 ----
.../quickstart/connector/HBaseRequestApp.scala | 55 -----
.../flink/quickstart/connector/HBaseSinkApp.scala | 65 -----
.../quickstart/connector/HBaseSourceApp.scala | 82 -------
.../flink/quickstart/connector/HttpSinkApp.scala | 42 ----
.../quickstart/connector/InfluxDBSinkApp.scala | 77 ------
.../flink/quickstart/connector/JdbcSinkApp.scala | 58 -----
.../flink/quickstart/connector/KafkaSinkApp.scala | 77 ------
.../quickstart/connector/KafkaSourceApp.scala | 34 ---
.../quickstart/connector/MongoSourceApp.scala | 58 -----
.../flink/quickstart/connector/MyDataSource.scala | 53 ----
.../quickstart/connector/MySQLSourceApp.scala | 44 ----
.../flink/quickstart/connector/SideOutApp.scala | 121 ----------
27 files changed, 1924 deletions(-)
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index 692adb640..fb00fe5db 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -32,7 +32,6 @@
<module>streampark-flink-shims</module>
<module>streampark-flink-core</module>
<module>streampark-flink-connector</module>
- <module>streampark-flink-connector-test</module>
<module>streampark-flink-sqlclient</module>
<module>streampark-flink-udf</module>
<module>streampark-flink-client</module>
diff --git a/streampark-flink/streampark-flink-connector-test/pom.xml
b/streampark-flink/streampark-flink-connector-test/pom.xml
deleted file mode 100644
index 7302c7849..000000000
--- a/streampark-flink/streampark-flink-connector-test/pom.xml
+++ /dev/null
@@ -1,267 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- https://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
-
- <modelVersion>4.0.0</modelVersion>
- <parent>
- <groupId>org.apache.streampark</groupId>
- <artifactId>streampark-flink</artifactId>
- <version>2.1.6</version>
- </parent>
- <artifactId>streampark-flink-connector-test</artifactId>
- <name>StreamPark-quickstart: connector Test</name>
-
- <properties>
- <streampark.flink.shims.version>1.14</streampark.flink.shims.version>
- <flink112.version>1.12.0</flink112.version>
- <flink113.version>1.13.0</flink113.version>
- <flink114.version>1.14.0</flink114.version>
- <flink115.version>1.15.0</flink115.version>
- <flink116.version>1.16.0</flink116.version>
- <flink117.version>1.17.0</flink117.version>
- <flink118.version>1.18.0</flink118.version>
- <flink119.version>1.19.0</flink119.version>
- </properties>
-
- <dependencies>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
- <artifactId>streampark-common_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-core_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-shims_flink-${streampark.flink.shims.version}_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-clickhouse_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-influx_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-kafka_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-jdbc_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-http_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-hbase_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-doris_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>com.mysql</groupId>
- <artifactId>mysql-connector-j</artifactId>
- <version>8.0.33</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-mongo_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- <scope>provided</scope>
- </dependency>
-
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
- <version>3.6.7</version>
- </dependency>
-
- <dependency>
- <groupId>org.json4s</groupId>
- <artifactId>json4s-native_2.11</artifactId>
- <version>3.6.7</version>
- </dependency>
-
- <!--flink base-->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-core</artifactId>
- <version>${flink114.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-scala_${scala.binary.version}</artifactId>
- <version>${flink114.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-clients_${scala.binary.version}</artifactId>
- <version>${flink114.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
-
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
- <version>${flink114.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
-
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
- <version>${flink114.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.twitter</groupId>
- <artifactId>chill-thrift</artifactId>
- <version>0.7.6</version>
- <!-- exclusions for dependency conversion -->
- <exclusions>
- <exclusion>
- <groupId>com.esotericsoftware.kryo</groupId>
- <artifactId>kryo</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <!-- libthrift is required by chill-thrift -->
- <dependency>
- <groupId>org.apache.thrift</groupId>
- <artifactId>libthrift</artifactId>
- <version>0.14.0</version>
- <exclusions>
- <exclusion>
- <groupId>javax.servlet</groupId>
- <artifactId>servlet-api</artifactId>
- </exclusion>
- <exclusion>
- <groupId>org.apache.httpcomponents</groupId>
- <artifactId>httpclient</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
-
- <dependency>
- <groupId>org.projectlombok</groupId>
- <artifactId>lombok</artifactId>
- <version>1.18.20</version>
- <scope>provided</scope>
- </dependency>
-
- <!-- Kafka里面的消息采用Json格式 -->
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-json</artifactId>
- <version>${flink114.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.flink</groupId>
- <artifactId>flink-csv</artifactId>
- <version>${flink114.version}</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-connector-mysql-cdc</artifactId>
- <version>1.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>com.alibaba.ververica</groupId>
- <artifactId>flink-format-changelog-json</artifactId>
- <version>1.0.0</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-kafka_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- <dependency>
- <groupId>org.apache.streampark</groupId>
-
<artifactId>streampark-flink-connector-jdbc_${scala.binary.version}</artifactId>
- <version>${project.version}</version>
- </dependency>
-
- </dependencies>
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-shade-plugin</artifactId>
- <version>3.2.4</version>
- </plugin>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-assembly-plugin</artifactId>
- <version>3.1.1</version>
- </plugin>
- </plugins>
- </build>
-
-</project>
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/application.yml
b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
deleted file mode 100644
index 2462834b8..000000000
--- a/streampark-flink/streampark-flink-connector-test/src/test/application.yml
+++ /dev/null
@@ -1,267 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# https://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-flink:
- option:
- target: yarn-per-job
- detached:
- shutdownOnAttachedExit:
- jobmanager:
- property: #@see:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/
- $internal.application.main:
org.apache.streampark.flink.quickstart.QuickStartApp
- pipeline.name: streampark-quickstartApp
- yarn.application.queue:
- taskmanager.numberOfTaskSlots: 1
- parallelism.default: 1
- jobmanager.memory:
- flink.size:
- heap.size:
- jvm-metaspace.size:
- jvm-overhead.max:
- off-heap.size:
- process.size:
- taskmanager.memory:
- flink.size:
- framework.heap.size:
- framework.off-heap.size:
- managed.size:
- process.size:
- task.heap.size:
- task.off-heap.size:
- jvm-metaspace.size:
- jvm-overhead.max:
- jvm-overhead.min:
- managed.fraction: 0.4
- pipeline:
- auto-watermark-interval: 200ms
- # checkpoint
- execution:
- checkpointing:
- mode: EXACTLY_ONCE
- interval: 10s
- timeout: 10min
- unaligned: false
- externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
- # state backend
- state:
- backend: rocksdb # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
- backend.incremental: true
- checkpoint-storage: filesystem
- savepoints.dir: file:///tmp/chkdir
- checkpoints.dir: file:///tmp/chkdir
- # restart strategy
- restart-strategy: fixed-delay # Restart strategy
[(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
- restart-strategy.fixed-delay:
- attempts: 3
- delay: 50000
- restart-strategy.failure-rate:
- max-failures-per-interval:
- failure-rate-interval:
- delay:
- # table
- table:
- table.local-time-zone: default # @see
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
-
-app: # user's parameter
- # kafka source config....
- kafka.source:
- bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
- topic: test_user
- group.id: flink_02
- auto.offset.reset: earliest
- #enable.auto.commit: true
- #start.from:
- #timestamp: 1591286400000 #指定timestamp,针对所有的topic生效
- #offset: # 给每个topic的partition指定offset
- #topic: kafka01,kafka02
- #kafka01: 0:182,1:183,2:182 #分区0从182开始消费,分区1从183...
- #kafka02: 0:182,1:183,2:182
-
- # kafka sink config....
- kafka.sink:
- bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
- topic: test_user
- transaction.timeout.ms: 1000
- semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
- batch.size: 1
-
-
- # jdbc config...
- jdbc:
- semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
- driverClassName: com.mysql.cj.jdbc.Driver
- jdbcUrl:
jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
- username: root
- password: 123456
-
- # influx config...
- influx:
- mydb:
- jdbcUrl: http://test9:8086
- #username: admin
- #password: admin
-
- # hbase
- hbase:
- zookeeper.quorum: test1,test2,test6
- zookeeper.property.clientPort: 2181
- zookeeper.session.timeout: 1200000
- rpc.timeout: 5000
- client.pause: 20
-
- ##clickhouse jdbc同步写入配置
- #clickhouse:
- # sink:
- # # 写入节点地址
- # jdbcUrl: jdbc:clickhouse://127.0.0.1:8123,192.168.1.2:8123
- # socketTimeout: 3000000
- # database: test
- # user: $user
- # password: $password
- # # 写结果表及对应的字段,全部可不写字段
- # targetTable: orders(userId,siteId)
- # batch:
- # size: 1
- # delaytime: 6000000
-
- clickhouse:
- sink:
- hosts: 127.0.0.1:8123,192.168.1.2:8123
- socketTimeout: 3000000
- database: test
- user: $user
- password: $password
- targetTable: test.orders(userId,siteId)
- batch:
- size: 1
- delaytime: 60000
- threshold:
- bufferSize: 10
- # 异步写入的并发数
- numWriters: 4
- # 缓存队列大小
- queueCapacity: 100
- delayTime: 10
- requestTimeout: 600
- retries: 1
- # 成功响应码
- successCode: 200
- failover:
- table: chfailover
- # 达到失败最大写入次数后,数据备份的组件
- storage: kafka #kafka|mysql|hbase|hdfs
- mysql:
- driverClassName: com.mysql.cj.jdbc.Driver
- jdbcUrl:
jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
- username: user
- password: pass
- kafka:
- bootstrap.servers: localhost:9092
- topic: test1
- group.id: user_01
- auto.offset.reset: latest
- hbase:
- hdfs:
- path: /data/chfailover
- namenode: hdfs://localhost:8020
- user: hdfs
-
- #http sink 配置
- http.sink:
- threshold:
- numWriters: 3
- queueCapacity: 10000
#队列最大容量,视单条记录大小而自行估量队列大小,如值太大,上游数据源来的太快,下游写入数据跟不上可能会OOM.
- timeout: 100 #发送http请求的超时时间
- retries: 3 #发送失败时的最大重试次数
- successCode: 200 #发送成功状态码,这里可以有多个值,用","号分隔
- failover:
- table: record
- storage: mysql #kafka,hbase,hdfs
- jdbc: # 保存类型为MySQL,将失败的数据保存到MySQL
- jdbcUrl: jdbc:mysql://localhost:3306/test
- username: root
- password: 123456
- kafka:
- topic: bigdata
- bootstrap.servers: localhost:9093
- hbase:
- zookeeper.quorum: localhost
- zookeeper.property.clientPort: 2181
- hdfs:
- namenode: hdfs://localhost:8020 # namenode rpc address and port, e.g:
hdfs://hadoop:8020 , hdfs://hadoop:9000
- user: benjobs # user
- path: /clickhouse/failover # save path
- format: yyyy-MM-dd
-
- #redis sink 配置
- redis.sink:
- # masterName: master 哨兵模式参数
- # host: 192.168.0.1:6379, 192.168.0.3:6379 哨兵模式参数
- host: 127.0.0.1
- port: 6379
- database: 2
- connectType: jedisPool #可选参数:jedisPool(默认)|sentinel
-
- es.sink:
- # 必填参数,多个节点使用 host1:port, host2:port,
- host: localhost:9200
- # 选填参数
- # es:
- # disableFlushOnCheckpoint: false #是否
- # auth:
- # user:
- # password:
- # rest:
- # max.retry.timeout:
- # path.prefix:
- # content.type:
- # connect:
- # request.timeout:
- # timeout:
- # cluster.name: elasticsearch
- # client.transport.sniff:
- # bulk.flush.:
-
-sql:
- my_flinksql: |
- CREATE TABLE datagen (
- f_sequence INT,
- f_random INT,
- f_random_str STRING,
- ts AS localtimestamp,
- WATERMARK FOR ts AS ts
- ) WITH (
- 'connector' = 'datagen',
- -- optional options --
- 'rows-per-second'='5', --这个是注释--
- 'fields.f_sequence.kind'='sequence',
- 'fields.f_sequence.start'='1',
- 'fields.f_sequence.end'='1000',
- 'fields.f_random.min'='1',
- 'fields.f_random.max'='1000',
- 'fields.f_random_str.length'='10'
- );
-
- CREATE TABLE print_table (
- f_sequence INT,
- f_random INT,
- f_random_str STRING
- ) WITH (
- 'connector' = 'print'
- );
-
- INSERT INTO print_table select f_sequence,f_random,f_random_str from
datagen;
-
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
deleted file mode 100644
index 8370a1999..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
+++ /dev/null
@@ -1,51 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-import org.apache.streampark.flink.quickstart.connector.bean.Entity;
-
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
-/** @author benjobs */
-public class ClickhouseJavaApp {
-
- public static void main(String[] args) {
- StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
- StreamingContext context = new StreamingContext(envConfig);
- DataStreamSource<Entity> source = context.getJavaEnv().addSource(new
MyDataSource());
-
- // 2) async高性能写入
- new ClickHouseSink(context)
- .asyncSink(
- source,
- value ->
- String.format(
- "insert into test.orders(userId, siteId) values (%d,%d)",
- value.userId, value.siteId))
- .setParallelism(1);
-
- // 3) jdbc方式写入
- /**
- * new ClickHouseSink(context).jdbcSink(source, bean ->
String.format("insert into
- * test.orders(userId, siteId) values (%d,%d)", bean.userId, bean.siteId)
).setParallelism(1);
- */
- context.start();
- }
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
deleted file mode 100644
index 2d8091199..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.flink.connector.doris.sink.DorisSink;
-import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
-import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/** @author wudi */
-public class DorisJavaApp {
-
- public static void main(String[] args) {
- StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
- StreamingContext context = new StreamingContext(envConfig);
- DataStream<String> source =
- new KafkaJavaSource<String>(context)
- .getDataStream()
- .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value)
- .returns(String.class);
-
- new DorisSink<String>(context).sink(source);
-
- context.start();
- }
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
deleted file mode 100644
index 643d6840b..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
+++ /dev/null
@@ -1,36 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.flink.connector.http.sink.HttpSink;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-import org.apache.streampark.flink.quickstart.connector.bean.Entity;
-
-import org.apache.flink.streaming.api.datastream.DataStreamSource;
-
-/** @author wudi */
-public class HttpJavaApp {
-
- public static void main(String[] args) {
- StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
- StreamingContext context = new StreamingContext(envConfig);
- DataStreamSource<Entity> source = context.getJavaEnv().addSource(new
MyDataSource());
- new HttpSink(context).get(source.map(x ->
String.format("http://www.qq.com?id=%d", x.userId)));
- context.start();
- }
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
deleted file mode 100644
index 740ca08d8..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.common.util.JsonUtils;
-import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
-import org.apache.streampark.flink.connector.kafka.sink.KafkaJavaSink;
-import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-import org.apache.streampark.flink.quickstart.connector.bean.Behavior;
-
-import org.apache.flink.api.common.functions.MapFunction;
-import org.apache.flink.api.common.serialization.SerializationSchema;
-import org.apache.flink.streaming.api.datastream.DataStream;
-
-/** @author benjobs */
-public class KafkaJavaApp {
-
- public static void main(String[] args) {
-
- StreamEnvConfig javaConfig =
- new StreamEnvConfig(
- args,
- (environment, parameterTool) -> {
- // environment.getConfig().enableForceAvro();
- System.out.println("environment argument set...");
- });
-
- StreamingContext context = new StreamingContext(javaConfig);
-
- // 1) 从 kafka 中读取数据
- DataStream<Behavior> source =
- new KafkaJavaSource<String>(context)
- .getDataStream()
- .map(
- (MapFunction<KafkaRecord<String>, Behavior>)
- value -> JsonUtils.read(value, Behavior.class));
-
- // 2) 将数据写入其他 kafka 主题
- new KafkaJavaSink<Behavior>(context)
- .serializer((SerializationSchema<Behavior>) element ->
JsonUtils.write(element).getBytes())
- .sink(source);
-
- context.start();
- }
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
deleted file mode 100644
index a11772ef9..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector;
-
-import org.apache.streampark.flink.connector.function.QueryFunction;
-import org.apache.streampark.flink.connector.function.ResultFunction;
-import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
-import org.apache.streampark.flink.core.StreamEnvConfig;
-import org.apache.streampark.flink.core.scala.StreamingContext;
-import org.apache.streampark.flink.quickstart.connector.bean.Order;
-
-import org.apache.flink.api.common.typeinfo.TypeInformation;
-
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.List;
-
-public class MySQLJavaApp {
-
- public static void main(String[] args) {
-
- StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
-
- StreamingContext context = new StreamingContext(envConfig);
-
- // 读取MySQL数据源
- new JdbcJavaSource<>(context, Order.class)
- .getDataStream(
- (QueryFunction<Order>)
- lastOne -> {
- // 5秒抽取一次
- Thread.sleep(3000);
- Serializable lastOffset =
- lastOne == null ? "2020-10-10 23:00:00" :
lastOne.getTimestamp();
- return String.format(
- "select * from t_order "
- + "where timestamp > '%s' "
- + "order by timestamp asc ",
- lastOffset);
- },
- (ResultFunction<Order>)
- map -> {
- List<Order> result = new ArrayList<>();
- map.forEach(
- item -> {
- Order order = new Order();
- order.setOrderId(item.get("order_id").toString());
- order.setMarketId(item.get("market_id").toString());
-
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
- result.add(order);
- });
- return result;
- })
- .returns(TypeInformation.of(Order.class))
- .print("jdbc source: >>>>>");
-
- context.start();
- }
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
deleted file mode 100644
index d01165370..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector.bean;
-
-import lombok.Data;
-
-/** @author benjobs */
-@Data
-public class Behavior {
- private String user_id;
- private Long item_id;
- private Long category_id;
- private String behavior;
- private Long ts;
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
deleted file mode 100644
index 09653fe2a..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector.bean;
-
-/** @author benjobs */
-public class Entity {
-
- public Long userId;
- public Long orderId;
- public Long siteId;
- public Long cityId;
- public Integer orderStatus;
- public Double price;
- public Integer quantity;
- public Long timestamp;
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
deleted file mode 100644
index b143d70e3..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector.bean;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class LogBean implements Serializable {
- private String platenum;
- private String cardType;
- private Long inTime;
- private Long outTime;
- private String controlid;
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
deleted file mode 100644
index 8f98f4d67..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector.bean;
-
-import lombok.Data;
-
-import java.io.Serializable;
-
-@Data
-public class Order implements Serializable {
- private String orderId;
- private String marketId;
- private Double price;
- private Long timestamp;
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
b/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
deleted file mode 100755
index 537cb762c..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
+++ /dev/null
@@ -1,142 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
-
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- https://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-
--->
-<configuration>
- <!-- 日志文件存储路径 -->
- <property name="LOG_HOME" value="logs/"/>
- <property name="FILE_SIZE" value="50MB"/>
- <property name="MAX_HISTORY" value="100"/>
- <timestamp key="DATE_TIME" datePattern="yyyy-MM-dd HH:mm:ss"/>
-
- <property name="log.colorPattern"
- value="%d{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) |
%boldYellow(%thread) | %boldGreen(%logger) | %msg%n"/>
- <property name="log.pattern"
- value="%d{yyyy-MM-dd HH:mm:ss.SSS} %contextName [%thread]
%-5level %logger{36} - %msg%n"/>
-
- <!-- 控制台打印 -->
- <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
- <encoder charset="utf-8">
- <pattern>${log.colorPattern}</pattern>
- </encoder>
- </appender>
- <!-- ERROR 输入到文件,按日期和文件大小 -->
- <appender name="ERROR"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder charset="utf-8">
- <pattern>${log.pattern}</pattern>
- </encoder>
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>ERROR</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
- </filter>
- <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_HOME}%d/error.%i.log</fileNamePattern>
- <maxHistory>${MAX_HISTORY}</maxHistory>
- <timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <maxFileSize>${FILE_SIZE}</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
- </rollingPolicy>
- </appender>
-
- <!-- WARN 输入到文件,按日期和文件大小 -->
- <appender name="WARN"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder charset="utf-8">
- <pattern>${log.pattern}</pattern>
- </encoder>
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>WARN</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
- </filter>
- <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_HOME}%d/warn.%i.log</fileNamePattern>
- <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
- <timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <maxFileSize>${FILE_SIZE}</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
- </rollingPolicy>
- </appender>
-
- <!-- INFO 输入到文件,按日期和文件大小 -->
- <appender name="INFO"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder charset="utf-8">
- <pattern>${log.pattern}</pattern>
- </encoder>
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>INFO</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
- </filter>
- <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_HOME}%d/info.%i.log</fileNamePattern>
- <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
- <timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <maxFileSize>${FILE_SIZE}</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
- </rollingPolicy>
- </appender>
- <!-- DEBUG 输入到文件,按日期和文件大小 -->
- <appender name="DEBUG"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder charset="utf-8">
- <pattern>${log.pattern}</pattern>
- </encoder>
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>DEBUG</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
- </filter>
- <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_HOME}%d/debug.%i.log</fileNamePattern>
- <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
- <timeBasedFileNamingAndTriggeringPolicy
- class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <maxFileSize>${FILE_SIZE}</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
- </rollingPolicy>
- </appender>
- <!-- TRACE 输入到文件,按日期和文件大小 -->
- <appender name="TRACE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
- <encoder charset="utf-8">
- <pattern>${log.pattern}</pattern>
- </encoder>
- <filter class="ch.qos.logback.classic.filter.LevelFilter">
- <level>TRACE</level>
- <onMatch>ACCEPT</onMatch>
- <onMismatch>DENY</onMismatch>
- </filter>
- <rollingPolicy
- class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
- <fileNamePattern>${LOG_HOME}%d/trace.%i.log</fileNamePattern>
- <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
- <timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
- <maxFileSize>${FILE_SIZE}</maxFileSize>
- </timeBasedFileNamingAndTriggeringPolicy>
- </rollingPolicy>
- </appender>
-
- <!-- Logger 根目录 -->
- <root level="INFO">
- <appender-ref ref="STDOUT"/>
- <appender-ref ref="DEBUG"/>
- <appender-ref ref="ERROR"/>
- <appender-ref ref="WARN"/>
- <appender-ref ref="INFO"/>
- <appender-ref ref="TRACE"/>
- </root>
-</configuration>
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
deleted file mode 100644
index db70be422..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-object ClickHouseSinkApp extends FlinkStreaming {
-
- implicit val entityType: TypeInformation[Entity] =
TypeInformation.of(classOf[Entity])
-
- override def handle(): Unit = {
- // 假如在clickhouse里已经有以下表.
- val createTable =
- """
- |create TABLE test.orders(
- |userId UInt16,
- |siteId UInt8,
- |timestamp UInt16
- |)ENGINE = TinyLog;
- |""".stripMargin
-
- println(createTable)
-
- // 1) 接入数据源
- val source = context.addSource(new MyDataSource)
-
- // 2)高性能异步写入
- ClickHouseSink().asyncSink(source)(
- x => { s"insert into test.orders(userId,siteId) values
(${x.userId},${x.siteId})" })
-
- // 3) jdbc方式写入
- // ClickHouseSink().jdbcSink(source)(x => {s"insert into
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
-
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
deleted file mode 100644
index 190c0eca6..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.elasticsearch7.sink.ES7Sink
-import
org.apache.streampark.flink.connector.elasticsearch7.util.ElasticsearchUtils
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.scala.DataStream
-import org.elasticsearch.action.index.IndexRequest
-import org.json4s.jackson.JsonMethods
-
-object ES7SinkApp extends FlinkStreaming {
-
- implicit val entityType: TypeInformation[Entity] =
TypeInformation.of(classOf[Entity])
-
- override def handle(): Unit = {
- val source: DataStream[Entity] = context.addSource(new MyDataSource)
-
- implicit def indexedSeq(x: Entity): IndexRequest =
ElasticsearchUtils.indexRequest(
- "flink_order",
- s"${x.orderId}_${x.timestamp}",
- JsonMethods.mapper.writeValueAsString(x)
- )
-
- ES7Sink().sink[Entity](source)
- }
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
deleted file mode 100644
index c3e49b36f..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.common.util.ConfigUtils
-import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
-import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.hadoop.hbase.client.Get
-
-object HBaseRequestApp extends FlinkStreaming {
-
- implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
-
- implicit val reqType: TypeInformation[(String, Boolean)] =
- TypeInformation.of(classOf[(String, Boolean)])
-
- override def handle(): Unit = {
-
- implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
- // one topic
- val source = context.fromCollection(Seq("123456", "1111", "222"))
-
- source.print("source:>>>")
-
- HBaseRequest(source)
- .requestOrdered[(String, Boolean)](
- x => {
- new HBaseQuery("person", new Get(x.getBytes()))
- },
- timeout = 5000,
- resultFunc = (a, r) => {
- a -> !r.isEmpty
- })
- .print(" check.... ")
-
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
deleted file mode 100644
index 11fa2d484..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
+++ /dev/null
@@ -1,65 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.common.util.ConfigUtils
-import org.apache.streampark.flink.connector.hbase.sink.{HBaseOutputFormat,
HBaseSink}
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.hadoop.hbase.client.{Mutation, Put}
-import org.apache.hadoop.hbase.util.Bytes
-
-import java.util.{Collections, Random}
-
-import scala.language.implicitConversions
-
-object HBaseSinkApp extends FlinkStreaming {
-
- implicit val entityType: TypeInformation[Entity] =
TypeInformation.of(classOf[Entity])
- implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
-
- override def handle(): Unit = {
- val source = context.addSource(new MyDataSource)
- val random = new Random()
-
- // 定义转换规则...
- implicit def entry2Put(entity: Entity): java.lang.Iterable[Mutation] = {
- val put =
- new Put(Bytes.toBytes(System.nanoTime() + random.nextInt(1000000)),
entity.timestamp)
- put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"),
Bytes.toBytes(entity.cityId))
- put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"),
Bytes.toBytes(entity.orderId))
- put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"),
Bytes.toBytes(entity.orderStatus))
- put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oq"),
Bytes.toBytes(entity.quantity))
- put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"),
Bytes.toBytes(entity.siteId))
- Collections.singleton(put)
- }
- // source ===> trans ===> sink
-
- // 1)插入方式1
- HBaseSink().sink[Entity](source, "order")
-
- // 2) 插入方式2
- // 1.指定HBase 配置文件
- val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
- // 2.插入...
- source.writeUsingOutputFormat(new HBaseOutputFormat[Entity]("order", prop))
-
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
deleted file mode 100644
index 67e072d2e..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
+++ /dev/null
@@ -1,82 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.common.util.ConfigUtils
-import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
-import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
-import org.apache.streampark.flink.connector.hbase.source.HBaseSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.hadoop.hbase.CellUtil
-import org.apache.hadoop.hbase.client.{Get, Scan}
-import org.apache.hadoop.hbase.util.Bytes
-
-import java.util
-
-object HBaseSourceApp extends FlinkStreaming {
-
- implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
-
- override def handle(): Unit = {
-
- implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
-
- val id = HBaseSource().getDataStream[String](
- query => {
- Thread.sleep(10)
- if (query == null) {
- new HBaseQuery("person", new Scan())
- } else {
- // TODO 从上一条记录中获取便宜量,决定下次查询的条件...
- new HBaseQuery("person", new Scan())
- }
- },
- r => new String(r.getRow)
- )
-
- HBaseRequest(id)
- .requestOrdered(
- x => {
- new HBaseQuery("person", new Get(x.getBytes()))
- },
- (a, r) => {
- val map = new util.HashMap[String, String]()
- val cellScanner = r.cellScanner()
- while (cellScanner.advance()) {
- val cell = cellScanner.current()
- val q = Bytes.toString(CellUtil.cloneQualifier(cell))
- val (name, v) = q.split("_") match {
- case Array(_type, name) =>
- _type match {
- case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
- case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
- case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
- case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
- }
- case _ =>
- }
- map.put(name.toString, v.toString)
- }
- map.toString
- }
- )
- .print("Async")
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
deleted file mode 100644
index 80b238798..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.http.sink.HttpSink
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-object HttpSinkApp extends FlinkStreaming {
-
- implicit val entityType: TypeInformation[Entity] =
TypeInformation.of(classOf[Entity])
- implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
-
- override def handle(): Unit = {
-
- /** source */
- val source = context
- .addSource(new MyDataSource)
- .map(x => s"http://www.qq.com?id=${x.userId}")
-
- // sink
- new HttpSink(context).get(source)
-
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
deleted file mode 100644
index 0f18055e0..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.influx.bean.InfluxEntity
-import org.apache.streampark.flink.connector.influx.sink.InfluxSink
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-
-import scala.util.Random
-
-/** 侧输出流 */
-object InfluxDBSinkApp extends FlinkStreaming {
-
- implicit val entityType: TypeInformation[Weather] =
TypeInformation.of(classOf[Weather])
-
- override def handle(): Unit = {
- val source = context.addSource(new WeatherSource())
-
- // weather,altitude=1000,area=北 temperature=11,humidity=-4
-
- implicit val entity: InfluxEntity[Weather] = new InfluxEntity[Weather](
- "mydb",
- "test",
- "autogen",
- (x: Weather) => Map("altitude" -> x.altitude.toString, "area" -> x.area),
- (x: Weather) =>
- Map(
- "temperature" -> x.temperature.asInstanceOf[Object],
- "humidity" -> x.humidity.asInstanceOf[Object])
- )
-
- InfluxSink().sink(source, "mydb")
-
- }
-
-}
-
-/** 温度 temperature 湿度 humidity 地区 area 海拔 altitude */
-case class Weather(temperature: Long, humidity: Long, area: String, altitude:
Long)
-
-class WeatherSource extends SourceFunction[Weather] {
-
- private[this] var isRunning = true
-
- override def cancel(): Unit = this.isRunning = false
-
- val random = new Random()
-
- override def run(ctx: SourceFunction.SourceContext[Weather]): Unit = {
- while (isRunning) {
- val temperature = random.nextInt(100)
- val humidity = random.nextInt(30)
- val area = List("北", "上", "广", "深")(random.nextInt(4))
- val altitude = random.nextInt(10000)
- val order = Weather(temperature, humidity, area, altitude)
- ctx.collect(order)
- }
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
deleted file mode 100644
index cb4c993ad..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.jdbc.sink.JdbcSink
-import org.apache.streampark.flink.connector.kafka.source.KafkaSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-object JdbcSinkApp extends FlinkStreaming {
-
- implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
-
- override def handle(): Unit = {
-
- /** 从kafka里读数据.这里的数据是数字或者字母,每次读取1条 */
- val source = KafkaSource()
- .getDataStream[String]()
- .uid("kfkSource1")
- .name("kfkSource1")
- .map(
- x => {
- x.value
- })
-
- /**
- * 假设这里有一个orders表.有一个字段,id的类型是int 在数据插入的时候制造异常: 1)正确情况:
- * 当从kafka中读取的内容全部是数字时会插入成功,kafka的消费的offset也会更新. 如: 当前kafka
- * size为20,手动输入10个数字,则size为30,然后会将这10个数字写入到Mysql,kafka的offset也会更新 2)异常情况:
- * 当从kafka中读取的内容非数字会导致插入失败,kafka的消费的offset会回滚 如: 当前的kafka
size为30,offset是30,
- * 手动输入1个字母,此时size为31,写入mysql会报错,kafka的offset依旧是30,不会发生更新.
- */
- JdbcSink(parallelism = 5)
- .sink[String](source)(
- x => {
- s"insert into orders(id,timestamp)
values('$x',${System.currentTimeMillis()})"
- })
- .uid("mysqlSink")
- .name("mysqlSink")
-
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
deleted file mode 100644
index af8aa55aa..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.kafka.sink.KafkaSink
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-
-import scala.util.Random
-
-object KafkaSinkApp extends FlinkStreaming {
-
- implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
- implicit val entityType: TypeInformation[Behavior] =
TypeInformation.of(classOf[Behavior])
-
- override def handle(): Unit = {
- val source = new BehaviorSource()
- val ds = context.addSource[Behavior](source).map(_.toString)
- ds.print()
- KafkaSink().sink(ds)
- }
-
-}
-
-case class Behavior(user_id: String, item_id: Long, category_id: Long,
behavior: String, ts: Long) {
- override def toString: String = {
- s"""
- |{
- |user_id:$user_id,
- |item_id:$item_id,
- |category_id:$category_id,
- |behavior:$behavior,
- |ts:$ts
- |}
- |""".stripMargin
- }
-}
-
-class BehaviorSource extends SourceFunction[Behavior] {
- private[this] var isRunning = true
-
- override def cancel(): Unit = this.isRunning = false
-
- val random = new Random()
- var index = 0
-
- override def run(ctx: SourceFunction.SourceContext[Behavior]): Unit = {
- val seq = Seq("view", "click", "search", "buy", "share")
- while (isRunning && index <= 10000) {
- index += 1
- val user_id = random.nextInt(1000)
- val item_id = random.nextInt(100)
- val category_id = random.nextInt(20)
- val behavior = seq(random.nextInt(5))
- val order =
- Behavior(user_id.toString, item_id, category_id, behavior,
System.currentTimeMillis())
- ctx.collect(order)
- }
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
deleted file mode 100644
index 651605678..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.kafka.source.KafkaSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.streaming.api.scala._
-
-object KafkaSourceApp extends FlinkStreaming {
- override def handle(): Unit = {
-
- KafkaSource()
- .getDataStream[String]()
- .map(_.value)
- .print()
-
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
deleted file mode 100644
index 37873a964..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
+++ /dev/null
@@ -1,58 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.common.util.{DateUtils, JsonUtils}
-import org.apache.streampark.flink.connector.mongo.source.MongoSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import com.mongodb.BasicDBObject
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-import java.util.Properties
-
-import scala.collection.JavaConversions._
-
-object MongoSourceApp extends FlinkStreaming {
-
- implicit val entityType: TypeInformation[String] =
TypeInformation.of(classOf[String])
-
- override def handle(): Unit = {
- implicit val prop: Properties = context.parameter.getProperties
- val source = MongoSource()
- source
- .getDataStream[String](
- "shop",
- (a, d) => {
- Thread.sleep(1000)
-
- /**
从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset */
- val offset =
- if (a == null) "2019-09-27 00:00:00"
- else {
- JsonUtils.read[Map[String, _]](a).get("updateTime").toString
- }
- val cond = new BasicDBObject()
- .append("updateTime", new BasicDBObject("$gte",
DateUtils.parse(offset)))
- d.find(cond)
- },
- _.toList.map(_.toJson())
- )
- .print()
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
deleted file mode 100644
index 5348d104d..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
+++ /dev/null
@@ -1,53 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.quickstart.connector.bean.Entity
-
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-
-import scala.util.Random
-
-class MyDataSource extends SourceFunction[Entity] {
-
- private[this] var isRunning = true
-
- override def cancel(): Unit = this.isRunning = false
-
- val random = new Random()
-
- var index = 0
-
- override def run(ctx: SourceFunction.SourceContext[Entity]): Unit = {
- while (isRunning && index <= 1000001) {
- index += 1
-
- val entity = new Entity()
- entity.userId = System.currentTimeMillis()
- entity.orderId = random.nextInt(100)
- entity.orderStatus = random.nextInt(1)
- entity.price = random.nextDouble()
- entity.quantity = new Random().nextInt(10)
- entity.cityId = 1
- entity.siteId = random.nextInt(20)
- entity.timestamp = System.currentTimeMillis()
-
- ctx.collect(entity)
- }
- }
-
-}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
deleted file mode 100644
index 3b7447a67..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
+++ /dev/null
@@ -1,44 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.connector.jdbc.source.JdbcSource
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-
-object MySQLSourceApp extends FlinkStreaming {
-
- implicit val entityType: TypeInformation[Order] =
TypeInformation.of(classOf[Order])
-
- override def handle(): Unit = {
-
- JdbcSource()
- .getDataStream[Order](
- lastOne => {
- val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else
lastOne.timestamp
- s"select * from t_order where timestamp > '$laseOffset' order by
timestamp asc "
- },
- _.map(x => new Order(x("market_id").toString, x("timestamp").toString))
- )
- .print()
-
- }
-
-}
-
-class Order(val marketId: String, val timestamp: String) extends Serializable
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
deleted file mode 100644
index 24180b834..000000000
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
+++ /dev/null
@@ -1,121 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.streampark.flink.quickstart.connector
-
-import org.apache.streampark.flink.core.scala.FlinkStreaming
-
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.functions.ProcessFunction
-import org.apache.flink.streaming.api.functions.source.SourceFunction
-import org.apache.flink.streaming.api.scala._
-import org.apache.flink.util.Collector
-
-import scala.util.Random
-
-/** 侧输出流 */
-object SideOutApp extends FlinkStreaming {
-
- implicit val entityType: TypeInformation[SideEntry] =
TypeInformation.of(classOf[SideEntry])
-
- override def handle(): Unit = {
- val source = context.addSource(new SideSource())
-
- /** 侧输出流。。。 官方写法:设置侧输出流 */
- val side1 = source.process(new ProcessFunction[SideEntry, SideEntry] {
- val tag = new OutputTag[SideEntry]("flink")
-
- override def processElement(
- value: SideEntry,
- ctx: ProcessFunction[SideEntry, SideEntry]#Context,
- out: Collector[SideEntry]): Unit = {
- if (value.userId < 100) {
- ctx.output(tag, value)
- } else {
- out.collect(value)
- }
- }
- })
-
- // 官方写法,获取侧输出流
- side1.getSideOutput(new
OutputTag[SideEntry]("flink")).print("flink:========>")
-
- }
-
-}
-
-/**
- * @param userId
- * : 用户Id
- * @param orderId
- * : 订单ID
- * @param siteId
- * : 站点ID
- * @param cityId
- * : 城市Id
- * @param orderStatus
- * : 订单状态(1:下单,0:退单)
- * @param isNewOrder
- * : 是否是首单
- * @param price
- * : 单价
- * @param quantity
- * : 订单数量
- * @param timestamp
- * : 下单时间
- */
-case class SideEntry(
- userId: Long,
- orderId: Long,
- siteId: Long,
- cityId: Long,
- orderStatus: Int,
- isNewOrder: Int,
- price: Double,
- quantity: Int,
- timestamp: Long)
-
-class SideSource extends SourceFunction[SideEntry] {
-
- private[this] var isRunning = true
-
- override def cancel(): Unit = this.isRunning = false
-
- val random = new Random()
-
- override def run(ctx: SourceFunction.SourceContext[SideEntry]): Unit = {
- while (isRunning) {
- val userId = random.nextInt(1000)
- val orderId = random.nextInt(100)
- val status = random.nextInt(1)
- val isNew = random.nextInt(1)
- val price = random.nextDouble()
- val quantity = new Random(10).nextInt()
- val order = SideEntry(
- userId,
- orderId,
- siteId = 1,
- cityId = 1,
- status,
- isNew,
- price,
- quantity,
- System.currentTimeMillis)
- ctx.collect(order)
- }
- }
-
-}