This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
new eb7176408 [Improve] add flink jdbc|mongo|hbase datastream-connector
test case
eb7176408 is described below
commit eb717640847ad8c41e659c0c1936364f20738f5e
Author: benjobs <[email protected]>
AuthorDate: Sat Jan 25 09:16:35 2025 +0800
[Improve] add flink jdbc|mongo|hbase datastream-connector test case
---
.../streampark/common/util/ConfigUtils.scala | 2 +-
streampark-flink/pom.xml | 1 +
.../client/tool/FlinkSessionClientHelper.scala | 7 +-
.../streampark-flink-connector-test/pom.xml | 267 +++++++++++++++++++++
.../src/test/application.yml | 267 +++++++++++++++++++++
.../quickstart/connector/ClickhouseJavaApp.java | 51 ++++
.../flink/quickstart/connector/DorisJavaApp.java | 44 ++++
.../flink/quickstart/connector/HttpJavaApp.java | 37 +++
.../flink/quickstart/connector/KafkaJavaApp.java | 58 +++++
.../flink/quickstart/connector/MySQLJavaApp.java | 70 ++++++
.../flink/quickstart/connector/bean/Behavior.java | 31 +++
.../flink/quickstart/connector/bean/Entity.java | 35 +++
.../flink/quickstart/connector/bean/LogBean.java | 30 +++
.../flink/quickstart/connector/bean/Order.java | 27 +++
.../src/test/resources/logback.xml | 142 +++++++++++
.../quickstart/connector/ClickHouseSinkApp.scala | 53 ++++
.../flink/quickstart/connector/ES7SinkApp.scala | 43 ++++
.../quickstart/connector/HBaseRequestApp.scala | 49 ++++
.../flink/quickstart/connector/HBaseSinkApp.scala | 63 +++++
.../quickstart/connector/HBaseSourceApp.scala | 73 ++++++
.../flink/quickstart/connector/HttpSinkApp.scala | 42 ++++
.../quickstart/connector/InfluxDBSinkApp.scala | 85 +++++++
.../flink/quickstart/connector/JdbcSinkApp.scala | 55 +++++
.../flink/quickstart/connector/KafkaSinkApp.scala | 84 +++++++
.../quickstart/connector/KafkaSourceApp.scala | 32 +++
.../quickstart/connector/MongoSourceApp.scala | 52 ++++
.../flink/quickstart/connector/MyDataSource.scala | 53 ++++
.../quickstart/connector/MySQLSourceApp.scala | 40 +++
.../flink/quickstart/connector/SideOutApp.scala | 105 ++++++++
.../connector/hbase/source/HBaseJavaSource.java | 15 +-
.../hbase/internal/HBaseSourceFunction.scala | 3 +-
.../connector/jdbc/source/JdbcJavaSource.java | 15 +-
.../jdbc/internal/JdbcSourceFunction.scala | 30 ++-
.../connector/mongo/source/MongoJavaSource.java | 18 +-
.../mongo/internal/MongoSourceFunction.scala | 27 ++-
.../flink/kubernetes/KubernetesRetriever.scala | 12 +-
.../watcher/FlinkCheckpointWatcher.scala | 13 +-
.../kubernetes/watcher/FlinkMetricsWatcher.scala | 15 +-
.../flink/kubernetes/watcher/FlinkWatcher.scala | 4 +-
.../apache/streampark/flink/util/FlinkUtils.scala | 9 +-
40 files changed, 2005 insertions(+), 54 deletions(-)
diff --git
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
index cc8b30e9d..f93ca5461 100644
---
a/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
+++
b/streampark-common/src/main/scala/org/apache/streampark/common/util/ConfigUtils.scala
@@ -24,7 +24,7 @@ import scala.collection.JavaConversions._
import scala.collection.immutable.{Map => ScalaMap}
import scala.util.Try
-object ConfigUtils {
+private[streampark] object ConfigUtils {
def getConf(parameter: JavaMap[String, String], prefix: String = "", addfix:
String = "")(implicit
alias: String = ""): Properties = {
diff --git a/streampark-flink/pom.xml b/streampark-flink/pom.xml
index fb00fe5db..692adb640 100644
--- a/streampark-flink/pom.xml
+++ b/streampark-flink/pom.xml
@@ -32,6 +32,7 @@
<module>streampark-flink-shims</module>
<module>streampark-flink-core</module>
<module>streampark-flink-connector</module>
+ <module>streampark-flink-connector-test</module>
<module>streampark-flink-sqlclient</module>
<module>streampark-flink-udf</module>
<module>streampark-flink-client</module>
diff --git
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
index 1fc86c789..51da01f88 100644
---
a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
+++
b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/tool/FlinkSessionClientHelper.scala
@@ -17,6 +17,8 @@
package org.apache.streampark.flink.client.tool
+import org.apache.streampark.common.util.Logger
+
import org.apache.flink.client.deployment.application.ApplicationConfiguration
import org.apache.flink.configuration.{Configuration, CoreOptions}
import org.apache.flink.runtime.jobgraph.SavepointConfigOptions
@@ -25,7 +27,6 @@ import org.apache.hc.client5.http.fluent.Request
import org.apache.hc.core5.http.ContentType
import org.apache.hc.core5.http.io.entity.StringEntity
import org.apache.hc.core5.util.Timeout
-import org.apache.streampark.common.util.Logger
import org.json4s.DefaultFormats
import org.json4s.jackson.JsonMethods._
import org.json4s.jackson.Serialization
@@ -33,13 +34,15 @@ import org.json4s.jackson.Serialization
import java.io.File
import java.nio.charset.StandardCharsets
import java.time.Duration
+
import scala.collection.JavaConversions._
import scala.util.{Failure, Success, Try}
object FlinkSessionSubmitHelper extends Logger {
// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
- private lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
+ private lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
+ Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
private lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(30000L)
diff --git a/streampark-flink/streampark-flink-connector-test/pom.xml
b/streampark-flink/streampark-flink-connector-test/pom.xml
new file mode 100644
index 000000000..7a53ccc47
--- /dev/null
+++ b/streampark-flink/streampark-flink-connector-test/pom.xml
@@ -0,0 +1,267 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-flink-connector</artifactId>
+ <version>2.1.6</version>
+ </parent>
+ <artifactId>streampark-flink-connector-test</artifactId>
+ <name>StreamPark-quickstart: connector Test</name>
+
+ <properties>
+ <streampark.flink.shims.version>1.14</streampark.flink.shims.version>
+ <flink112.version>1.12.0</flink112.version>
+ <flink113.version>1.13.0</flink113.version>
+ <flink114.version>1.14.0</flink114.version>
+ <flink115.version>1.15.0</flink115.version>
+ <flink116.version>1.16.0</flink116.version>
+ <flink117.version>1.17.0</flink117.version>
+ <flink118.version>1.18.0</flink118.version>
+ <flink119.version>1.19.0</flink119.version>
+ </properties>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+ <artifactId>streampark-common_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-core_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-shims_flink-${streampark.flink.shims.version}_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-clickhouse_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-influx_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-kafka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-jdbc_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-http_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-hbase_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-doris_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>com.mysql</groupId>
+ <artifactId>mysql-connector-j</artifactId>
+ <version>8.0.33</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-mongo_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-elasticsearch7_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.json4s</groupId>
+ <artifactId>json4s-jackson_${scala.binary.version}</artifactId>
+ <version>3.6.7</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.json4s</groupId>
+ <artifactId>json4s-native_2.11</artifactId>
+ <version>3.6.7</version>
+ </dependency>
+
+ <!--flink base-->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-core</artifactId>
+ <version>${flink114.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-scala_${scala.binary.version}</artifactId>
+ <version>${flink114.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-clients_${scala.binary.version}</artifactId>
+ <version>${flink114.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-scala_${scala.binary.version}</artifactId>
+ <version>${flink114.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-statebackend-rocksdb_${scala.binary.version}</artifactId>
+ <version>${flink114.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>chill-thrift</artifactId>
+ <version>0.7.6</version>
+ <!-- exclusions for dependency conversion -->
+ <exclusions>
+ <exclusion>
+ <groupId>com.esotericsoftware.kryo</groupId>
+ <artifactId>kryo</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <!-- libthrift is required by chill-thrift -->
+ <dependency>
+ <groupId>org.apache.thrift</groupId>
+ <artifactId>libthrift</artifactId>
+ <version>0.14.0</version>
+ <exclusions>
+ <exclusion>
+ <groupId>javax.servlet</groupId>
+ <artifactId>servlet-api</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.apache.httpcomponents</groupId>
+ <artifactId>httpclient</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+
+ <dependency>
+ <groupId>org.projectlombok</groupId>
+ <artifactId>lombok</artifactId>
+ <version>1.18.20</version>
+ <scope>provided</scope>
+ </dependency>
+
+ <!-- Kafka里面的消息采用Json格式 -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-json</artifactId>
+ <version>${flink114.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-csv</artifactId>
+ <version>${flink114.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba.ververica</groupId>
+ <artifactId>flink-connector-mysql-cdc</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>com.alibaba.ververica</groupId>
+ <artifactId>flink-format-changelog-json</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-kafka_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.streampark</groupId>
+
<artifactId>streampark-flink-connector-jdbc_${scala.binary.version}</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <version>3.2.4</version>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-assembly-plugin</artifactId>
+ <version>3.1.1</version>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/application.yml
b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
new file mode 100644
index 000000000..29e1c7333
--- /dev/null
+++ b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
@@ -0,0 +1,267 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+flink:
+ option:
+ target: yarn-per-job
+ detached:
+ shutdownOnAttachedExit:
+ jobmanager:
+ property: #@see:
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/deployment/config/
+ $internal.application.main:
org.apache.streampark.flink.quickstart.QuickStartApp
+ pipeline.name: streampark-quickstartApp
+ yarn.application.queue:
+ taskmanager.numberOfTaskSlots: 1
+ parallelism.default: 2
+ jobmanager.memory:
+ flink.size:
+ heap.size:
+ jvm-metaspace.size:
+ jvm-overhead.max:
+ off-heap.size:
+ process.size:
+ taskmanager.memory:
+ flink.size:
+ framework.heap.size:
+ framework.off-heap.size:
+ managed.size:
+ process.size:
+ task.heap.size:
+ task.off-heap.size:
+ jvm-metaspace.size:
+ jvm-overhead.max:
+ jvm-overhead.min:
+ managed.fraction: 0.4
+ pipeline:
+ auto-watermark-interval: 200ms
+ # checkpoint
+ execution:
+ checkpointing:
+ mode: EXACTLY_ONCE
+ interval: 30s
+ timeout: 10min
+ unaligned: false
+ externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
+ # state backend
+ state:
+ backend: rocksdb # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
+ backend.incremental: true
+ checkpoint-storage: filesystem
+ savepoints.dir: file:///tmp/chkdir
+ checkpoints.dir: file:///tmp/chkdir
+ # restart strategy
+ restart-strategy: fixed-delay # Restart strategy
[(fixed-delay|failure-rate|none) a total of 3 configurable strategies]
+ restart-strategy.fixed-delay:
+ attempts: 3
+ delay: 50000
+ restart-strategy.failure-rate:
+ max-failures-per-interval:
+ failure-rate-interval:
+ delay:
+ # table
+ table:
+ table.local-time-zone: default # @see
https://nightlies.apache.org/flink/flink-docs-release-1.15/docs/dev/table/config/
+
+app: # user's parameter
+ # kafka source config....
+ kafka.source:
+ bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
+ topic: test_user
+ group.id: flink_02
+ auto.offset.reset: earliest
+ #enable.auto.commit: true
+ #start.from:
+ #timestamp: 1591286400000 #指定timestamp,针对所有的topic生效
+ #offset: # 给每个topic的partition指定offset
+ #topic: kafka01,kafka02
+ #kafka01: 0:182,1:183,2:182 #分区0从182开始消费,分区1从183...
+ #kafka02: 0:182,1:183,2:182
+
+ # kafka sink config....
+ kafka.sink:
+ bootstrap.servers: kfk01:9092,kfk02:9092,kfk03:9092
+ topic: test_user
+ transaction.timeout.ms: 1000
+ semantic: AT_LEAST_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
+ batch.size: 1
+
+
+ # jdbc config...
+ jdbc:
+ semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
+ driverClassName: com.mysql.cj.jdbc.Driver
+ jdbcUrl:
jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
+ username: root
+ password: 123456
+
+ # influx config...
+ influx:
+ mydb:
+ jdbcUrl: http://test9:8086
+ #username: admin
+ #password: admin
+
+ # hbase
+ hbase:
+ zookeeper.quorum: test1,test2,test6
+ zookeeper.property.clientPort: 2181
+ zookeeper.session.timeout: 1200000
+ rpc.timeout: 5000
+ client.pause: 20
+
+ ##clickhouse jdbc同步写入配置
+ #clickhouse:
+ # sink:
+ # # 写入节点地址
+ # jdbcUrl: jdbc:clickhouse://127.0.0.1:8123,192.168.1.2:8123
+ # socketTimeout: 3000000
+ # database: test
+ # user: $user
+ # password: $password
+ # # 写结果表及对应的字段,全部可不写字段
+ # targetTable: orders(userId,siteId)
+ # batch:
+ # size: 1
+ # delaytime: 6000000
+
+ clickhouse:
+ sink:
+ hosts: 127.0.0.1:8123,192.168.1.2:8123
+ socketTimeout: 3000000
+ database: test
+ user: $user
+ password: $password
+ targetTable: test.orders(userId,siteId)
+ batch:
+ size: 1
+ delaytime: 60000
+ threshold:
+ bufferSize: 10
+ # 异步写入的并发数
+ numWriters: 4
+ # 缓存队列大小
+ queueCapacity: 100
+ delayTime: 10
+ requestTimeout: 600
+ retries: 1
+ # 成功响应码
+ successCode: 200
+ failover:
+ table: chfailover
+ # 达到失败最大写入次数后,数据备份的组件
+ storage: kafka #kafka|mysql|hbase|hdfs
+ mysql:
+ driverClassName: com.mysql.cj.jdbc.Driver
+ jdbcUrl:
jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
+ username: user
+ password: pass
+ kafka:
+ bootstrap.servers: localhost:9092
+ topic: test1
+ group.id: user_01
+ auto.offset.reset: latest
+ hbase:
+ hdfs:
+ path: /data/chfailover
+ namenode: hdfs://localhost:8020
+ user: hdfs
+
+ #http sink 配置
+ http.sink:
+ threshold:
+ numWriters: 3
+ queueCapacity: 10000
#队列最大容量,视单条记录大小而自行估量队列大小,如值太大,上游数据源来的太快,下游写入数据跟不上可能会OOM.
+ timeout: 100 #发送http请求的超时时间
+ retries: 3 #发送失败时的最大重试次数
+ successCode: 200 #发送成功状态码,这里可以有多个值,用","号分隔
+ failover:
+ table: record
+ storage: mysql #kafka,hbase,hdfs
+ jdbc: # 保存类型为MySQL,将失败的数据保存到MySQL
+ jdbcUrl: jdbc:mysql://localhost:3306/test
+ username: root
+ password: 123456
+ kafka:
+ topic: bigdata
+ bootstrap.servers: localhost:9093
+ hbase:
+ zookeeper.quorum: localhost
+ zookeeper.property.clientPort: 2181
+ hdfs:
+ namenode: hdfs://localhost:8020 # namenode rpc address and port, e.g:
hdfs://hadoop:8020 , hdfs://hadoop:9000
+ user: benjobs # user
+ path: /clickhouse/failover # save path
+ format: yyyy-MM-dd
+
+ #redis sink 配置
+ redis.sink:
+ # masterName: master 哨兵模式参数
+ # host: 192.168.0.1:6379, 192.168.0.3:6379 哨兵模式参数
+ host: 127.0.0.1
+ port: 6379
+ database: 2
+ connectType: jedisPool #可选参数:jedisPool(默认)|sentinel
+
+ es.sink:
+ # 必填参数,多个节点使用 host1:port, host2:port,
+ host: localhost:9200
+ # 选填参数
+ # es:
+ # disableFlushOnCheckpoint: false #是否
+ # auth:
+ # user:
+ # password:
+ # rest:
+ # max.retry.timeout:
+ # path.prefix:
+ # content.type:
+ # connect:
+ # request.timeout:
+ # timeout:
+ # cluster.name: elasticsearch
+ # client.transport.sniff:
+ # bulk.flush.:
+
+sql:
+ my_flinksql: |
+ CREATE TABLE datagen (
+ f_sequence INT,
+ f_random INT,
+ f_random_str STRING,
+ ts AS localtimestamp,
+ WATERMARK FOR ts AS ts
+ ) WITH (
+ 'connector' = 'datagen',
+ -- optional options --
+ 'rows-per-second'='5', --这个是注释--
+ 'fields.f_sequence.kind'='sequence',
+ 'fields.f_sequence.start'='1',
+ 'fields.f_sequence.end'='1000',
+ 'fields.f_random.min'='1',
+ 'fields.f_random.max'='1000',
+ 'fields.f_random_str.length'='10'
+ );
+
+ CREATE TABLE print_table (
+ f_sequence INT,
+ f_random INT,
+ f_random_str STRING
+ ) WITH (
+ 'connector' = 'print'
+ );
+
+ INSERT INTO print_table select f_sequence,f_random,f_random_str from
datagen;
+
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
new file mode 100644
index 000000000..4ba8fd674
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.streampark.flink.quickstart.connector.bean.Entity;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+/**
+ * @author benjobs
+ */
+public class ClickhouseJavaApp {
+
+ public static void main(String[] args) {
+ StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+ StreamingContext context = new StreamingContext(envConfig);
+ DataStreamSource<Entity> source = context.getJavaEnv().addSource(new
MyDataSource());
+
+ //2) async高性能写入
+ new ClickHouseSink(context).asyncSink(source, value ->
+ String.format("insert into test.orders(userId, siteId) values
(%d,%d)", value.userId, value.siteId)
+ ).setParallelism(1);
+
+ //3) jdbc方式写入
+ /**
+ *
+ * new ClickHouseSink(context).jdbcSink(source, bean ->
+ * String.format("insert into test.orders(userId, siteId) values
(%d,%d)", bean.userId, bean.siteId)
+ * ).setParallelism(1);
+ *
+ */
+ context.start();
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
new file mode 100644
index 000000000..551a28583
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.streampark.flink.connector.doris.sink.DorisSink;
+import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
+import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * @author wudi
+ **/
+public class DorisJavaApp {
+
+ public static void main(String[] args) {
+ StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+ StreamingContext context = new StreamingContext(envConfig);
+ DataStream<String> source = new KafkaJavaSource<String>(context)
+ .getDataStream()
+ .map((MapFunction<KafkaRecord<String>, String>)
KafkaRecord::value)
+ .returns(String.class);
+
+ new DorisSink<String>(context).sink(source);
+
+ context.start();
+ }
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
new file mode 100644
index 000000000..81bfaa338
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.streampark.flink.connector.http.sink.HttpSink;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.streampark.flink.quickstart.connector.bean.Entity;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+
+/**
+ * @author wudi
+ **/
+public class HttpJavaApp {
+
+ public static void main(String[] args) {
+ StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+ StreamingContext context = new StreamingContext(envConfig);
+ DataStreamSource<Entity> source = context.getJavaEnv().addSource(new
MyDataSource());
+ new HttpSink(context).get(source.map(x ->
String.format("http://www.qq.com?id=%d", x.userId)));
+ context.start();
+ }
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
new file mode 100644
index 000000000..e45dadf3c
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.streampark.common.util.JsonUtils;
+import org.apache.streampark.flink.connector.kafka.sink.KafkaJavaSink;
+import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
+import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.streampark.flink.quickstart.connector.bean.Behavior;
+import org.apache.flink.api.common.functions.MapFunction;
+import org.apache.flink.api.common.serialization.SerializationSchema;
+import org.apache.flink.streaming.api.datastream.DataStream;
+
+/**
+ * @author benjobs
+ */
+public class KafkaJavaApp {
+
+ public static void main(String[] args) {
+
+ StreamEnvConfig javaConfig = new StreamEnvConfig(args, (environment,
parameterTool) -> {
+ //environment.getConfig().enableForceAvro();
+ System.out.println("environment argument set...");
+ });
+
+ StreamingContext context = new StreamingContext(javaConfig);
+
+ //1) 从 kafka 中读取数据
+ DataStream<Behavior> source = new KafkaJavaSource<String>(context)
+ .getDataStream()
+ .map((MapFunction<KafkaRecord<String>, Behavior>) value ->
JsonUtils.read(value, Behavior.class));
+
+
+ // 2) 将数据写入其他 kafka 主题
+ new KafkaJavaSink<Behavior>(context)
+ .serializer((SerializationSchema<Behavior>) element ->
JsonUtils.write(element).getBytes())
+ .sink(source);
+
+ context.start();
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
new file mode 100644
index 000000000..97611b0d1
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector;
+
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+import org.apache.streampark.flink.connector.function.SQLQueryFunction;
+import org.apache.streampark.flink.connector.function.SQLResultFunction;
+import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
+import org.apache.streampark.flink.core.StreamEnvConfig;
+import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.streampark.flink.quickstart.connector.bean.Order;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+
+public class MySQLJavaApp {
+
+ public static void main(String[] args) {
+
+ StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+
+ StreamingContext context = new StreamingContext(envConfig);
+
+ //读取MySQL数据源
+ new JdbcJavaSource<Order>(context)
+ .getDataStream(
+ (SQLQueryFunction<Order>) lastOne -> {
+ //5秒抽取一次
+ Thread.sleep(3000);
+ Serializable lastOffset = lastOne == null ?
"2020-10-10 23:00:00" : lastOne.getTimestamp();
+ return String.format(
+ "select * from t_order " +
+ "where timestamp > '%s' " +
+ "order by timestamp asc ",
+ lastOffset
+ );
+ },
+ (SQLResultFunction<Order>) map -> {
+ List<Order> result = new ArrayList<>();
+ map.forEach(item -> {
+ Order order = new Order();
+
order.setOrderId(item.get("order_id").toString());
+
order.setMarketId(item.get("market_id").toString());
+
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
+ result.add(order);
+ });
+ return result;
+ })
+ .returns(TypeInformation.of(Order.class))
+ .print("jdbc source: >>>>>");
+
+ context.start();
+
+ }
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
new file mode 100644
index 000000000..6bfbb3a33
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector.bean;
+
+import lombok.Data;
+
+/**
+ * @author benjobs
+ */
+@Data
+public class Behavior {
+ private String user_id;
+ private Long item_id;
+ private Long category_id;
+ private String behavior;
+ private Long ts;
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
new file mode 100644
index 000000000..a5b5b3282
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector.bean;
+
+
+/**
+ * @author benjobs
+ */
+
+public class Entity {
+
+ public Long userId;
+ public Long orderId;
+ public Long siteId;
+ public Long cityId;
+ public Integer orderStatus;
+ public Double price;
+ public Integer quantity;
+ public Long timestamp;
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
new file mode 100644
index 000000000..c0114d2c0
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
@@ -0,0 +1,30 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector.bean;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class LogBean implements Serializable {
+ private String platenum;
+ private String cardType;
+ private Long inTime;
+ private Long outTime;
+ private String controlid;
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
new file mode 100644
index 000000000..c561a1b52
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector.bean;
+
+import lombok.Data;
+
+@Data
+public class Order {
+ private String orderId;
+ private String marketId;
+ private Double price;
+ private Long timestamp;
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
b/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
new file mode 100755
index 000000000..537cb762c
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/resources/logback.xml
@@ -0,0 +1,142 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<configuration>
+ <!-- 日志文件存储路径 -->
+ <property name="LOG_HOME" value="logs/"/>
+ <property name="FILE_SIZE" value="50MB"/>
+ <property name="MAX_HISTORY" value="100"/>
+ <timestamp key="DATE_TIME" datePattern="yyyy-MM-dd HH:mm:ss"/>
+
+ <property name="log.colorPattern"
+ value="%d{yyyy-MM-dd HH:mm:ss} | %highlight(%-5level) |
%boldYellow(%thread) | %boldGreen(%logger) | %msg%n"/>
+ <property name="log.pattern"
+ value="%d{yyyy-MM-dd HH:mm:ss.SSS} %contextName [%thread]
%-5level %logger{36} - %msg%n"/>
+
+ <!-- 控制台打印 -->
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder charset="utf-8">
+ <pattern>${log.colorPattern}</pattern>
+ </encoder>
+ </appender>
+ <!-- ERROR 输入到文件,按日期和文件大小 -->
+ <appender name="ERROR"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder charset="utf-8">
+ <pattern>${log.pattern}</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.LevelFilter">
+ <level>ERROR</level>
+ <onMatch>ACCEPT</onMatch>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_HOME}%d/error.%i.log</fileNamePattern>
+ <maxHistory>${MAX_HISTORY}</maxHistory>
+ <timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>${FILE_SIZE}</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+ </appender>
+
+ <!-- WARN 输入到文件,按日期和文件大小 -->
+ <appender name="WARN"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder charset="utf-8">
+ <pattern>${log.pattern}</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.LevelFilter">
+ <level>WARN</level>
+ <onMatch>ACCEPT</onMatch>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_HOME}%d/warn.%i.log</fileNamePattern>
+ <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
+ <timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>${FILE_SIZE}</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+ </appender>
+
+ <!-- INFO 输入到文件,按日期和文件大小 -->
+ <appender name="INFO"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder charset="utf-8">
+ <pattern>${log.pattern}</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.LevelFilter">
+ <level>INFO</level>
+ <onMatch>ACCEPT</onMatch>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_HOME}%d/info.%i.log</fileNamePattern>
+ <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
+ <timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>${FILE_SIZE}</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+ </appender>
+ <!-- DEBUG 输入到文件,按日期和文件大小 -->
+ <appender name="DEBUG"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder charset="utf-8">
+ <pattern>${log.pattern}</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.LevelFilter">
+ <level>DEBUG</level>
+ <onMatch>ACCEPT</onMatch>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ <rollingPolicy
class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_HOME}%d/debug.%i.log</fileNamePattern>
+ <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
+ <timeBasedFileNamingAndTriggeringPolicy
+ class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>${FILE_SIZE}</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+ </appender>
+ <!-- TRACE 输入到文件,按日期和文件大小 -->
+ <appender name="TRACE"
class="ch.qos.logback.core.rolling.RollingFileAppender">
+ <encoder charset="utf-8">
+ <pattern>${log.pattern}</pattern>
+ </encoder>
+ <filter class="ch.qos.logback.classic.filter.LevelFilter">
+ <level>TRACE</level>
+ <onMatch>ACCEPT</onMatch>
+ <onMismatch>DENY</onMismatch>
+ </filter>
+ <rollingPolicy
+ class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
+ <fileNamePattern>${LOG_HOME}%d/trace.%i.log</fileNamePattern>
+ <MAX_HISTORY>${MAX_HISTORY}</MAX_HISTORY>
+ <timeBasedFileNamingAndTriggeringPolicy
class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
+ <maxFileSize>${FILE_SIZE}</maxFileSize>
+ </timeBasedFileNamingAndTriggeringPolicy>
+ </rollingPolicy>
+ </appender>
+
+ <!-- Logger 根目录 -->
+ <root level="INFO">
+ <appender-ref ref="STDOUT"/>
+ <appender-ref ref="DEBUG"/>
+ <appender-ref ref="ERROR"/>
+ <appender-ref ref="WARN"/>
+ <appender-ref ref="INFO"/>
+ <appender-ref ref="TRACE"/>
+ </root>
+</configuration>
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
new file mode 100644
index 000000000..79c0950af
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object ClickHouseSinkApp extends FlinkStreaming {
+
+ implicit val entityType: TypeInformation[Entity] =
TypeInformation.of(classOf[Entity])
+
+ override def handle(): Unit = {
+ // 假如在clickhouse里已经有以下表.
+ val createTable =
+ """
+ |create TABLE test.orders(
+ |userId UInt16,
+ |siteId UInt8,
+ |timestamp UInt16
+ |)ENGINE = TinyLog;
+ |""".stripMargin
+
+ println(createTable)
+
+ // 1) 接入数据源
+ val source = context.addSource(new MyDataSource)
+
+ // 2)高性能异步写入
+ ClickHouseSink().asyncSink(source)(x => {s"insert into
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
+
+ //3) jdbc方式写入
+ // ClickHouseSink().jdbcSink(source)(x => {s"insert into
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
+
+
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
new file mode 100644
index 000000000..fb9d80038
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
+import org.apache.streampark.flink.connector.elasticsearch7.sink.ES7Sink
+import
org.apache.streampark.flink.connector.elasticsearch7.util.ElasticsearchUtils
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.elasticsearch.action.index.IndexRequest
+import org.json4s.jackson.JsonMethods
+
+object ES7SinkApp extends FlinkStreaming {
+
+ implicit val entityType: TypeInformation[Entity] =
TypeInformation.of(classOf[Entity])
+
+ override def handle(): Unit = {
+ val source: DataStream[Entity] = context.addSource(new MyDataSource)
+
+ implicit def indexedSeq(x: Entity): IndexRequest =
ElasticsearchUtils.indexRequest(
+ "flink_order",
+ s"${x.orderId}_${x.timestamp}",
+ JsonMethods.mapper.writeValueAsString(x)
+ )
+
+ ES7Sink().sink[Entity](source)
+ }
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
new file mode 100644
index 000000000..e41c62ff7
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.common.util.ConfigUtils
+import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
+import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.hadoop.hbase.client.Get
+
+object HBaseRequestApp extends FlinkStreaming {
+
+ implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
+
+ implicit val reqType: TypeInformation[(String, Boolean)] =
TypeInformation.of(classOf[(String, Boolean)])
+
+ override def handle(): Unit = {
+
+ implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
+ //one topic
+ val source = context.fromCollection(Seq("123456", "1111", "222"))
+
+ source.print("source:>>>")
+
+ HBaseRequest(source).requestOrdered[(String, Boolean)](x => {
+ new HBaseQuery("person", new Get(x.getBytes()))
+ }, timeout = 5000, resultFunc = (a, r) => {
+ a -> !r.isEmpty
+ }).print(" check.... ")
+
+
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
new file mode 100644
index 000000000..0a4695590
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.common.util.ConfigUtils
+import org.apache.streampark.flink.connector.hbase.sink.{HBaseOutputFormat,
HBaseSink}
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.hadoop.hbase.client.{Mutation, Put}
+import org.apache.hadoop.hbase.util.Bytes
+
+import java.util.{Collections, Random}
+import scala.language.implicitConversions
+
+object HBaseSinkApp extends FlinkStreaming {
+
+ implicit val entityType: TypeInformation[Entity] =
TypeInformation.of(classOf[Entity])
+ implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
+
+ override def handle(): Unit = {
+ val source = context.addSource(new MyDataSource)
+ val random = new Random()
+
+ //定义转换规则...
+ implicit def entry2Put(entity: Entity): java.lang.Iterable[Mutation] = {
+ val put = new Put(Bytes.toBytes(System.nanoTime() +
random.nextInt(1000000)), entity.timestamp)
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"),
Bytes.toBytes(entity.cityId))
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"),
Bytes.toBytes(entity.orderId))
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"),
Bytes.toBytes(entity.orderStatus))
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oq"),
Bytes.toBytes(entity.quantity))
+ put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"),
Bytes.toBytes(entity.siteId))
+ Collections.singleton(put)
+ }
+ //source ===> trans ===> sink
+
+ //1)插入方式1
+ HBaseSink().sink[Entity](source, "order")
+
+ //2) 插入方式2
+ //1.指定HBase 配置文件
+ val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
+ //2.插入...
+ source.writeUsingOutputFormat(new HBaseOutputFormat[Entity]("order", prop))
+
+
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
new file mode 100644
index 000000000..5e2a9b5c4
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.common.util.ConfigUtils
+import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
+import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
+import org.apache.streampark.flink.connector.hbase.source.HBaseSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.hadoop.hbase.CellUtil
+import org.apache.hadoop.hbase.client.{Get, Scan}
+import org.apache.hadoop.hbase.util.Bytes
+
+import java.util
+
+object HBaseSourceApp extends FlinkStreaming {
+
+ implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
+
+ override def handle(): Unit = {
+
+ implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
+
+ val id = HBaseSource().getDataStream[String](query => {
+ Thread.sleep(10)
+ if (query == null) {
+ new HBaseQuery("person", new Scan())
+ } else {
+ //TODO 从上一条记录中获取便宜量,决定下次查询的条件...
+ new HBaseQuery("person", new Scan())
+ }
+ }, r => new String(r.getRow), null)
+
+ HBaseRequest(id).requestOrdered(x => {
+ new HBaseQuery("person", new Get(x.getBytes()))
+ }, (a, r) => {
+ val map = new util.HashMap[String, String]()
+ val cellScanner = r.cellScanner()
+ while (cellScanner.advance()) {
+ val cell = cellScanner.current()
+ val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+ val (name, v) = q.split("_") match {
+ case Array(_type, name) =>
+ _type match {
+ case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
+ case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
+ case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
+ case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
+ }
+ case _ =>
+ }
+ map.put(name.toString, v.toString)
+ }
+ map.toString
+ }).print("Async")
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
new file mode 100644
index 000000000..763daf92d
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.http.sink.HttpSink
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object HttpSinkApp extends FlinkStreaming {
+
+ implicit val entityType: TypeInformation[Entity] =
TypeInformation.of(classOf[Entity])
+ implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
+
+ override def handle(): Unit = {
+
+ /**
+ * source
+ */
+ val source = context.addSource(new MyDataSource)
+ .map(x => s"http://www.qq.com?id=${x.userId}")
+
+ // sink
+ new HttpSink(context).get(source)
+
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
new file mode 100644
index 000000000..f1c289604
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.influx.bean.InfluxEntity
+import org.apache.streampark.flink.connector.influx.sink.InfluxSink
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+import scala.util.Random
+
+/**
+ * 侧输出流
+ */
+object InfluxDBSinkApp extends FlinkStreaming {
+
+ implicit val entityType: TypeInformation[Weather] =
TypeInformation.of(classOf[Weather])
+
+ override def handle(): Unit = {
+ val source = context.addSource(new WeatherSource())
+
+ //weather,altitude=1000,area=北 temperature=11,humidity=-4
+
+ implicit val entity: InfluxEntity[Weather] = new InfluxEntity[Weather](
+ "mydb",
+ "test",
+ "autogen",
+ (x: Weather) => Map("altitude" -> x.altitude.toString, "area" -> x.area),
+ (x: Weather) => Map("temperature" -> x.temperature.asInstanceOf[Object],
"humidity" -> x.humidity.asInstanceOf[Object])
+ )
+
+ InfluxSink().sink(source, "mydb")
+
+ }
+
+}
+
+/**
+ *
+ * 温度 temperature
+ * 湿度 humidity
+ * 地区 area
+ * 海拔 altitude
+ */
+case class Weather(temperature: Long,
+ humidity: Long,
+ area: String,
+ altitude: Long)
+
+class WeatherSource extends SourceFunction[Weather] {
+
+ private[this] var isRunning = true
+
+ override def cancel(): Unit = this.isRunning = false
+
+ val random = new Random()
+
+ override def run(ctx: SourceFunction.SourceContext[Weather]): Unit = {
+ while (isRunning) {
+ val temperature = random.nextInt(100)
+ val humidity = random.nextInt(30)
+ val area = List("北", "上", "广", "深")(random.nextInt(4))
+ val altitude = random.nextInt(10000)
+ val order = Weather(temperature, humidity, area, altitude)
+ ctx.collect(order)
+ }
+ }
+
+}
+
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
new file mode 100644
index 000000000..01a792474
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.jdbc.sink.JdbcSink
+import org.apache.streampark.flink.connector.kafka.source.KafkaSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object JdbcSinkApp extends FlinkStreaming {
+
+ implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
+
+ override def handle(): Unit = {
+
+ /**
+ * 从kafka里读数据.这里的数据是数字或者字母,每次读取1条
+ */
+ val source = KafkaSource().getDataStream[String]()
+ .uid("kfkSource1")
+ .name("kfkSource1")
+ .map(x => {
+ x.value
+ })
+
+
+ /**
+ * 假设这里有一个orders表.有一个字段,id的类型是int
+ * 在数据插入的时候制造异常:
+ * 1)正确情况: 当从kafka中读取的内容全部是数字时会插入成功,kafka的消费的offset也会更新.
+ * 如: 当前kafka
size为20,手动输入10个数字,则size为30,然后会将这10个数字写入到Mysql,kafka的offset也会更新
+ * 2)异常情况: 当从kafka中读取的内容非数字会导致插入失败,kafka的消费的offset会回滚
+ * 如: 当前的kafka size为30,offset是30,
手动输入1个字母,此时size为31,写入mysql会报错,kafka的offset依旧是30,不会发生更新.
+ */
+ JdbcSink(parallelism = 5).sink[String](source)(x => {
+ s"insert into orders(id,timestamp)
values('$x',${System.currentTimeMillis()})"
+ }).uid("mysqlSink").name("mysqlSink")
+
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
new file mode 100644
index 000000000..b4e82ab69
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.kafka.sink.KafkaSink
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+import scala.util.Random
+
+
+object KafkaSinkApp extends FlinkStreaming {
+
+ implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
+ implicit val entityType: TypeInformation[Behavior] =
TypeInformation.of(classOf[Behavior])
+
+ override def handle(): Unit = {
+ val source = new BehaviorSource()
+ val ds = context.addSource[Behavior](source).map(_.toString)
+ ds.print()
+ KafkaSink().sink(ds)
+ }
+
+}
+
+
+case class Behavior(user_id: String,
+ item_id: Long,
+ category_id: Long,
+ behavior: String,
+ ts: Long) {
+ override def toString: String = {
+ s"""
+ |{
+ |user_id:$user_id,
+ |item_id:$item_id,
+ |category_id:$category_id,
+ |behavior:$behavior,
+ |ts:$ts
+ |}
+ |""".stripMargin
+ }
+}
+
+
+class BehaviorSource extends SourceFunction[Behavior] {
+ private[this] var isRunning = true
+
+ override def cancel(): Unit = this.isRunning = false
+
+ val random = new Random()
+ var index = 0
+
+ override def run(ctx: SourceFunction.SourceContext[Behavior]): Unit = {
+ val seq = Seq("view", "click", "search", "buy", "share")
+ while (isRunning && index <= 10000) {
+ index += 1
+ val user_id = random.nextInt(1000)
+ val item_id = random.nextInt(100)
+ val category_id = random.nextInt(20)
+ val behavior = seq(random.nextInt(5))
+ val order = Behavior(user_id.toString, item_id, category_id, behavior,
System.currentTimeMillis())
+ ctx.collect(order)
+ }
+ }
+
+}
+
+
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
new file mode 100644
index 000000000..e9e3eaa08
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.flink.streaming.api.scala._
+import org.apache.streampark.flink.connector.kafka.source.KafkaSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+
+object KafkaSourceApp extends FlinkStreaming {
+ override def handle(): Unit = {
+
+ KafkaSource().getDataStream[String]()
+ .map(_.value)
+ .print()
+
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
new file mode 100644
index 000000000..c0db74056
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import com.mongodb.BasicDBObject
+import org.apache.streampark.common.util.{DateUtils, JsonUtils}
+import org.apache.streampark.flink.connector.mongo.source.MongoSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+import java.util.Properties
+import scala.collection.JavaConversions._
+
+object MongoSourceApp extends FlinkStreaming {
+
+ implicit val entityType: TypeInformation[String] =
TypeInformation.of(classOf[String])
+
+ override def handle(): Unit = {
+ implicit val prop: Properties = context.parameter.getProperties
+ val source = MongoSource()
+ source.getDataStream[String](
+ "shop",
+ (a, d) => {
+ Thread.sleep(1000)
+ /**
+ * 从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset
+ */
+ val offset = if (a == null) "2019-09-27 00:00:00" else {
+ JsonUtils.read[Map[String, _]](a).get("updateTime").toString
+ }
+ val cond = new BasicDBObject().append("updateTime", new
BasicDBObject("$gte", DateUtils.parse(offset)))
+ d.find(cond)
+ },
+ _.toList.map(_.toJson()), null
+ ).print()
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
new file mode 100644
index 000000000..af84ea674
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.quickstart.connector.bean.Entity
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+
+import scala.util.Random
+
+
+class MyDataSource extends SourceFunction[Entity] {
+
+ private[this] var isRunning = true
+
+ override def cancel(): Unit = this.isRunning = false
+
+ val random = new Random()
+
+ var index = 0
+
+ override def run(ctx: SourceFunction.SourceContext[Entity]): Unit = {
+ while (isRunning && index <= 1000001) {
+ index += 1
+
+ val entity = new Entity()
+ entity.userId = System.currentTimeMillis()
+ entity.orderId = random.nextInt(100)
+ entity.orderStatus = random.nextInt(1)
+ entity.price = random.nextDouble()
+ entity.quantity = new Random().nextInt(10)
+ entity.cityId = 1
+ entity.siteId = random.nextInt(20)
+ entity.timestamp = System.currentTimeMillis()
+
+ ctx.collect(entity)
+ }
+ }
+
+}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
new file mode 100644
index 000000000..5ab968edb
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.connector.jdbc.source.JdbcSource
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+
+object MySQLSourceApp extends FlinkStreaming {
+
+ implicit val entityType: TypeInformation[Order] =
TypeInformation.of(classOf[Order])
+
+ override def handle(): Unit = {
+
+ JdbcSource().getDataStream[Order](lastOne => {
+ val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else
lastOne.timestamp
+ s"select * from t_order where timestamp > '$laseOffset' order by
timestamp asc "
+ },
+ _.map(x => new Order(x("market_id").toString, x("timestamp").toString)),
null
+ ).print()
+
+ }
+
+}
+
+class Order(val marketId: String, val timestamp: String) extends Serializable
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
new file mode 100644
index 000000000..179c57f7d
--- /dev/null
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.streampark.flink.quickstart.connector
+
+import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.functions.ProcessFunction
+import org.apache.flink.streaming.api.functions.source.SourceFunction
+import org.apache.flink.streaming.api.scala._
+import org.apache.flink.util.Collector
+
+import scala.util.Random
+
+/**
+ * 侧输出流
+ */
+object SideOutApp extends FlinkStreaming {
+
+ implicit val entityType: TypeInformation[SideEntry] =
TypeInformation.of(classOf[SideEntry])
+
+ override def handle(): Unit = {
+ val source = context.addSource(new SideSource())
+
+ /**
+ * 侧输出流。。。
+ * 官方写法:设置侧输出流
+ */
+ val side1 = source.process(new ProcessFunction[SideEntry, SideEntry] {
+ val tag = new OutputTag[SideEntry]("flink")
+
+ override def processElement(value: SideEntry, ctx:
ProcessFunction[SideEntry, SideEntry]#Context, out: Collector[SideEntry]): Unit
= {
+ if (value.userId < 100) {
+ ctx.output(tag, value)
+ } else {
+ out.collect(value)
+ }
+ }
+ })
+
+ //官方写法,获取侧输出流
+ side1.getSideOutput(new
OutputTag[SideEntry]("flink")).print("flink:========>")
+
+ }
+
+}
+
+/**
+ *
+ * @param userId : 用户Id
+ * @param orderId : 订单ID
+ * @param siteId : 站点ID
+ * @param cityId : 城市Id
+ * @param orderStatus : 订单状态(1:下单,0:退单)
+ * @param isNewOrder : 是否是首单
+ * @param price : 单价
+ * @param quantity : 订单数量
+ * @param timestamp : 下单时间
+ */
+case class SideEntry(userId: Long,
+ orderId: Long,
+ siteId: Long,
+ cityId: Long,
+ orderStatus: Int,
+ isNewOrder: Int,
+ price: Double,
+ quantity: Int,
+ timestamp: Long)
+
+class SideSource extends SourceFunction[SideEntry] {
+
+ private[this] var isRunning = true
+
+ override def cancel(): Unit = this.isRunning = false
+
+ val random = new Random()
+
+ override def run(ctx: SourceFunction.SourceContext[SideEntry]): Unit = {
+ while (isRunning) {
+ val userId = random.nextInt(1000)
+ val orderId = random.nextInt(100)
+ val status = random.nextInt(1)
+ val isNew = random.nextInt(1)
+ val price = random.nextDouble()
+ val quantity = new Random(10).nextInt()
+ val order = SideEntry(userId, orderId, siteId = 1, cityId = 1, status,
isNew, price, quantity, System.currentTimeMillis)
+ ctx.collect(order)
+ }
+ }
+
+}
+
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
index b146d281b..ca19fd9e5 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
@@ -37,6 +37,12 @@ public class HBaseJavaSource<T> {
this.property = property;
}
+ public DataStreamSource<T> getDataStream(
+ HBaseQueryFunction<T> queryFunction,
+ HBaseResultFunction<T> resultFunction) {
+ return getDataStream(queryFunction, resultFunction, null);
+ }
+
public DataStreamSource<T> getDataStream(
HBaseQueryFunction<T> queryFunction,
HBaseResultFunction<T> resultFunction,
@@ -44,8 +50,13 @@ public class HBaseJavaSource<T> {
AssertUtils.notNull(queryFunction, "queryFunction must not be null");
AssertUtils.notNull(resultFunction, "resultFunction must not be null");
- HBaseSourceFunction<T> sourceFunction =
- new HBaseSourceFunction<>(property, queryFunction, resultFunction,
runningFunc, null);
+ HBaseSourceFunction<T> sourceFunction = new HBaseSourceFunction<>(
+ property,
+ queryFunction,
+ resultFunction,
+ runningFunc,
+ null
+ );
return context.getJavaEnv().addSource(sourceFunction);
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
index 8d22ec817..04e82941e 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/internal/HBaseSourceFunction.scala
@@ -168,7 +168,8 @@ class HBaseSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala,
override def initializeState(context: FunctionInitializationContext): Unit =
{
// Recover from checkpoint...
logInfo("HBaseSource snapshotState initialize")
- state = FlinkUtils.getUnionListState[R](context, OFFSETS_STATE_NAME)
+ state = FlinkUtils
+ .getUnionListState[R](context, getRuntimeContext.getExecutionConfig,
OFFSETS_STATE_NAME)
Try(state.get.head) match {
case Success(q) => last = q
case _ =>
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index 083f6d44d..d611de227 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -49,6 +49,12 @@ public class JdbcJavaSource<T> {
return this;
}
+ public DataStreamSource<T> getDataStream(
+ SQLQueryFunction<T> queryFunction,
+ SQLResultFunction<T> resultFunction) {
+ return getDataStream(queryFunction, resultFunction, null);
+ }
+
public DataStreamSource<T> getDataStream(
SQLQueryFunction<T> queryFunction,
SQLResultFunction<T> resultFunction,
@@ -59,8 +65,13 @@ public class JdbcJavaSource<T> {
if (this.jdbc == null) {
this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(),
alias);
}
- JdbcSourceFunction<T> sourceFunction =
- new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction,
runningFunc, null);
+ JdbcSourceFunction<T> sourceFunction = new JdbcSourceFunction<>(
+ jdbc,
+ queryFunction,
+ resultFunction,
+ runningFunc,
+ null
+ );
return context.getJavaEnv().addSource(sourceFunction);
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
index b3cc4f710..8429a1135 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/scala/org/apache/streampark/flink/connector/jdbc/internal/JdbcSourceFunction.scala
@@ -46,13 +46,16 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
@volatile private[this] var running = true
private[this] var scalaRunningFunc: Unit => Boolean = (_) => true
- private[this] var javaRunningFunc: RunningFunction = _
+ private[this] var javaRunningFunc: RunningFunction = new RunningFunction {
+ override def running(): lang.Boolean = true
+ }
private[this] var scalaSqlFunc: R => String = _
private[this] var scalaResultFunc: Function[Iterable[Map[String, _]],
Iterable[R]] = _
private[this] var javaSqlFunc: SQLQueryFunction[R] = _
private[this] var javaResultFunc: SQLResultFunction[R] = _
- @transient private var state: ListState[R] = _
+ @transient private var unionOffsetStates: ListState[R] = null
+
private val OFFSETS_STATE_NAME: String = "jdbc-source-query-states"
private[this] var last: R = _
@@ -66,7 +69,9 @@ class JdbcSourceFunction[R: TypeInformation](apiType: ApiType
= ApiType.scala, j
this(ApiType.scala, jdbc)
this.scalaSqlFunc = sqlFunc
this.scalaResultFunc = resultFunc
- this.scalaRunningFunc = if (runningFunc == null) _ => true else runningFunc
+ if (runningFunc != null) {
+ this.scalaRunningFunc = runningFunc
+ }
}
// for JAVA
@@ -79,12 +84,9 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
this(ApiType.java, jdbc)
this.javaSqlFunc = javaSqlFunc
this.javaResultFunc = javaResultFunc
- this.javaRunningFunc =
- if (runningFunc != null) runningFunc
- else
- new RunningFunction {
- override def running(): lang.Boolean = true
- }
+ if (runningFunc != null) {
+ this.javaRunningFunc = runningFunc
+ }
}
@throws[Exception]
@@ -125,9 +127,9 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
override def snapshotState(context: FunctionSnapshotContext): Unit = {
if (running) {
- state.clear()
+ unionOffsetStates.clear()
if (last != null) {
- state.add(last)
+ unionOffsetStates.add(last)
}
} else {
logError("JdbcSource snapshotState called on closed source")
@@ -136,8 +138,9 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
override def initializeState(context: FunctionInitializationContext): Unit =
{
logInfo("JdbcSource snapshotState initialize")
- state = FlinkUtils.getUnionListState[R](context, OFFSETS_STATE_NAME)
- Try(state.get.head) match {
+ unionOffsetStates = FlinkUtils
+ .getUnionListState[R](context, getRuntimeContext.getExecutionConfig,
OFFSETS_STATE_NAME)
+ Try(unionOffsetStates.get.head) match {
case Success(q) => last = q
case _ =>
}
@@ -146,4 +149,5 @@ class JdbcSourceFunction[R: TypeInformation](apiType:
ApiType = ApiType.scala, j
override def notifyCheckpointComplete(checkpointId: Long): Unit = {
logInfo(s"JdbcSource checkpointComplete: $checkpointId")
}
+
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
index 560de9eb0..9d8e3691f 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
@@ -37,6 +37,13 @@ public class MongoJavaSource<T> {
this.property = property;
}
+ public DataStreamSource<T> getDataStream(
+ String collectionName,
+ MongoQueryFunction<T> queryFunction,
+ MongoResultFunction<T> resultFunction) {
+ return getDataStream(collectionName, queryFunction, resultFunction,
null);
+ }
+
public DataStreamSource<T> getDataStream(
String collectionName,
MongoQueryFunction<T> queryFunction,
@@ -46,9 +53,14 @@ public class MongoJavaSource<T> {
AssertUtils.notNull(collectionName, "collectionName must not be null");
AssertUtils.notNull(queryFunction, "queryFunction must not be null");
AssertUtils.notNull(resultFunction, "resultFunction must not be null");
- MongoSourceFunction<T> sourceFunction =
- new MongoSourceFunction<>(
- collectionName, property, queryFunction, resultFunction,
runningFunc, null);
+ MongoSourceFunction<T> sourceFunction = new MongoSourceFunction<>(
+ collectionName,
+ property,
+ queryFunction,
+ resultFunction,
+ runningFunc,
+ null
+ );
return context.getJavaEnv().addSource(sourceFunction);
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
index 6e700992c..eea0c93c0 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/scala/org/apache/streampark/flink/connector/mongo/internal/MongoSourceFunction.scala
@@ -51,11 +51,13 @@ class MongoSourceFunction[R: TypeInformation](
with Logger {
@volatile private[this] var running = true
- private[this] var scalaRunningFunc: Unit => Boolean = _
- private[this] var javaRunningFunc: RunningFunction = _
+ private[this] var scalaRunningFunc: Unit => Boolean = (_) => true
+ private[this] var javaRunningFunc: RunningFunction = new RunningFunction {
+ override def running(): lang.Boolean = true
+ }
var client: MongoClient = _
- var mongoCollection: MongoCollection[Document] = _
+ private var mongoCollection: MongoCollection[Document] = _
private[this] var scalaQueryFunc: (R, MongoCollection[Document]) =>
FindIterable[Document] = _
private[this] var scalaResultFunc: MongoCursor[Document] => List[R] = _
@@ -78,7 +80,9 @@ class MongoSourceFunction[R: TypeInformation](
this(ApiType.scala, prop, collectionName)
this.scalaQueryFunc = scalaQueryFunc
this.scalaResultFunc = scalaResultFunc
- this.scalaRunningFunc = if (runningFunc == null) _ => true else runningFunc
+ if (runningFunc != null) {
+ this.scalaRunningFunc = runningFunc
+ }
}
// for JAVA
@@ -92,12 +96,9 @@ class MongoSourceFunction[R: TypeInformation](
this(ApiType.java, prop, collectionName)
this.javaQueryFunc = queryFunc
this.javaResultFunc = resultFunc
- this.javaRunningFunc =
- if (runningFunc != null) runningFunc
- else
- new RunningFunction {
- override def running(): lang.Boolean = true
- }
+ if (runningFunc != null) {
+ this.javaRunningFunc = runningFunc
+ }
}
override def cancel(): Unit = this.running = false
@@ -146,6 +147,7 @@ class MongoSourceFunction[R: TypeInformation](
}
override def close(): Unit = {
+ this.running = false
client.close()
}
@@ -163,8 +165,9 @@ class MongoSourceFunction[R: TypeInformation](
override def initializeState(context: FunctionInitializationContext): Unit =
{
// restore from checkpoint
logInfo("MongoSource snapshotState initialize")
- state = FlinkUtils.getUnionListState[R](context, OFFSETS_STATE_NAME)
- Try(state.get.head) match {
+ state = FlinkUtils
+ .getUnionListState[R](context, getRuntimeContext.getExecutionConfig,
OFFSETS_STATE_NAME)
+ Try(state.get().head) match {
case Success(q) => last = q
case _ =>
}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
index 8b0cd6908..343de6919 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/KubernetesRetriever.scala
@@ -17,6 +17,12 @@
package org.apache.streampark.flink.kubernetes
+import org.apache.streampark.common.util.{Logger, Utils}
+import org.apache.streampark.common.util.Utils.using
+import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
+import org.apache.streampark.flink.kubernetes.ingress.IngressController
+import org.apache.streampark.flink.kubernetes.model.ClusterKey
+
import io.fabric8.kubernetes.client.{DefaultKubernetesClient,
KubernetesClient, KubernetesClientException}
import org.apache.flink.client.cli.ClientOptions
import org.apache.flink.client.deployment.{ClusterClientFactory,
DefaultClusterClientServiceLoader}
@@ -24,13 +30,9 @@ import org.apache.flink.client.program.ClusterClient
import org.apache.flink.configuration.{Configuration, DeploymentOptions,
RestOptions}
import org.apache.flink.kubernetes.KubernetesClusterDescriptor
import org.apache.flink.kubernetes.configuration.KubernetesConfigOptions
-import org.apache.streampark.common.util.Utils.using
-import org.apache.streampark.common.util.{Logger, Utils}
-import org.apache.streampark.flink.kubernetes.enums.FlinkK8sExecuteMode
-import org.apache.streampark.flink.kubernetes.ingress.IngressController
-import org.apache.streampark.flink.kubernetes.model.ClusterKey
import javax.annotation.Nullable
+
import scala.collection.JavaConverters._
import scala.util.{Failure, Success, Try}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
index bd5279441..f2ad03597 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkCheckpointWatcher.scala
@@ -17,20 +17,23 @@
package org.apache.streampark.flink.kubernetes.watcher
-import org.apache.hc.client5.http.fluent.Request
import org.apache.streampark.common.util.Logger
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, MetricWatcherConfig}
import
org.apache.streampark.flink.kubernetes.event.FlinkJobCheckpointChangeEvent
import org.apache.streampark.flink.kubernetes.model.{CheckpointCV, ClusterKey,
TrackId}
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, MetricWatcherConfig}
+
+import org.apache.hc.client5.http.fluent.Request
+import org.json4s.{DefaultFormats, JNull}
import org.json4s.JsonAST.JNothing
import org.json4s.jackson.JsonMethods.parse
-import org.json4s.{DefaultFormats, JNull}
+
+import javax.annotation.concurrent.ThreadSafe
import java.nio.charset.StandardCharsets
import java.util.concurrent.{ScheduledFuture, TimeUnit}
-import javax.annotation.concurrent.ThreadSafe
-import scala.concurrent.duration.DurationLong
+
import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
+import scala.concurrent.duration.DurationLong
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
index 889714a2d..e092557b0 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkMetricsWatcher.scala
@@ -17,20 +17,23 @@
package org.apache.streampark.flink.kubernetes.watcher
-import org.apache.flink.configuration.{JobManagerOptions, MemorySize,
TaskManagerOptions}
-import org.apache.hc.client5.http.fluent.Request
import org.apache.streampark.common.util.Logger
+import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, MetricWatcherConfig}
import
org.apache.streampark.flink.kubernetes.event.FlinkClusterMetricChangeEvent
import org.apache.streampark.flink.kubernetes.model.{ClusterKey,
FlinkMetricCV, TrackId}
-import org.apache.streampark.flink.kubernetes.{ChangeEventBus,
FlinkK8sWatchController, MetricWatcherConfig}
-import org.json4s.jackson.JsonMethods.parse
+
+import org.apache.flink.configuration.{JobManagerOptions, MemorySize,
TaskManagerOptions}
+import org.apache.hc.client5.http.fluent.Request
import org.json4s.{DefaultFormats, JArray}
+import org.json4s.jackson.JsonMethods.parse
+
+import javax.annotation.concurrent.ThreadSafe
import java.nio.charset.StandardCharsets
import java.util.concurrent.{ScheduledFuture, TimeUnit}
-import javax.annotation.concurrent.ThreadSafe
-import scala.concurrent.duration.DurationLong
+
import scala.concurrent.{Await, ExecutionContext,
ExecutionContextExecutorService, Future}
+import scala.concurrent.duration.DurationLong
import scala.language.postfixOps
import scala.util.{Failure, Success, Try}
diff --git
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
index 6ecb0bbe1..e6f836af5 100644
---
a/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
+++
b/streampark-flink/streampark-flink-kubernetes/src/main/scala/org/apache/streampark/flink/kubernetes/watcher/FlinkWatcher.scala
@@ -22,12 +22,14 @@ import org.apache.hc.core5.util.Timeout
import java.time.Duration
import java.util.concurrent.ScheduledThreadPoolExecutor
import java.util.concurrent.atomic.AtomicBoolean
+
import scala.language.implicitConversions
trait FlinkWatcher extends AutoCloseable {
// see org.apache.flink.client.cli.ClientOptions.CLIENT_TIMEOUT}
- lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
+ lazy val FLINK_CLIENT_TIMEOUT_SEC: Timeout =
+ Timeout.ofMilliseconds(Duration.ofSeconds(60).toMillis).toTimeout
// see org.apache.flink.configuration.RestOptions.AWAIT_LEADER_TIMEOUT
lazy val FLINK_REST_AWAIT_TIMEOUT_SEC: Timeout =
Timeout.ofMilliseconds(30000L)
diff --git
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
index 58d38d311..537e1f70a 100644
---
a/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
+++
b/streampark-flink/streampark-flink-shims/streampark-flink-shims-base/src/main/scala/org/apache/streampark/flink/util/FlinkUtils.scala
@@ -17,8 +17,10 @@
package org.apache.streampark.flink.util
+import org.apache.flink.api.common.ExecutionConfig
import org.apache.flink.api.common.state.{ListState, ListStateDescriptor}
import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer
import org.apache.flink.runtime.state.FunctionInitializationContext
import org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions
import org.apache.flink.util.TimeUtils
@@ -31,9 +33,14 @@ object FlinkUtils {
def getUnionListState[R: TypeInformation](
context: FunctionInitializationContext,
+ execConfig: ExecutionConfig,
descriptorName: String): ListState[R] = {
+
context.getOperatorStateStore.getUnionListState(
- new ListStateDescriptor(descriptorName,
implicitly[TypeInformation[R]].getTypeClass))
+ new ListStateDescriptor(
+ descriptorName,
+ new KryoSerializer[R](implicitly[TypeInformation[R]].getTypeClass,
execConfig))
+ )
}
def getFlinkDistJar(flinkHome: String): String = {