This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch dev-2.1.6
in repository https://gitbox.apache.org/repos/asf/incubator-streampark.git
The following commit(s) were added to refs/heads/dev-2.1.6 by this push:
new 1143f6458 [Bug] Scala TypeInformation type conversion bug fixed.
1143f6458 is described below
commit 1143f6458af407cef672736dcfff1570d869b1f4
Author: benjobs <[email protected]>
AuthorDate: Sun Jan 26 18:11:46 2025 +0800
[Bug] Scala TypeInformation type conversion bug fixed.
---
.../console/core/controller/AlertController.java | 2 +-
.../src/test/application.yml | 4 +-
.../quickstart/connector/ClickhouseJavaApp.java | 44 ++++++-------
.../flink/quickstart/connector/DorisJavaApp.java | 28 ++++----
.../flink/quickstart/connector/HttpJavaApp.java | 19 +++---
.../flink/quickstart/connector/KafkaJavaApp.java | 57 ++++++++--------
.../flink/quickstart/connector/MySQLJavaApp.java | 71 ++++++++++----------
.../flink/quickstart/connector/bean/Behavior.java | 14 ++--
.../flink/quickstart/connector/bean/Entity.java | 23 +++----
.../flink/quickstart/connector/bean/LogBean.java | 10 +--
.../flink/quickstart/connector/bean/Order.java | 12 ++--
.../quickstart/connector/ClickHouseSinkApp.scala | 7 +-
.../flink/quickstart/connector/ES7SinkApp.scala | 11 ++--
.../quickstart/connector/HBaseRequestApp.scala | 22 ++++---
.../flink/quickstart/connector/HBaseSinkApp.scala | 18 ++---
.../quickstart/connector/HBaseSourceApp.scala | 68 ++++++++++---------
.../flink/quickstart/connector/HttpSinkApp.scala | 8 +--
.../quickstart/connector/InfluxDBSinkApp.scala | 26 +++-----
.../flink/quickstart/connector/JdbcSinkApp.scala | 37 ++++++-----
.../flink/quickstart/connector/KafkaSinkApp.scala | 17 ++---
.../quickstart/connector/KafkaSourceApp.scala | 6 +-
.../quickstart/connector/MongoSourceApp.scala | 39 ++++++-----
.../flink/quickstart/connector/MyDataSource.scala | 2 +-
.../quickstart/connector/MySQLSourceApp.scala | 17 +++--
.../flink/quickstart/connector/SideOutApp.scala | 76 +++++++++++++---------
.../connector/hbase/source/HBaseJavaSource.java | 57 +++++++++++-----
.../flink/connector/hbase/source/HBaseSource.scala | 25 +++++--
.../connector/jdbc/source/JdbcJavaSource.java | 43 +++++++-----
.../connector/mongo/source/MongoJavaSource.java | 56 ++++++++++------
29 files changed, 459 insertions(+), 360 deletions(-)
diff --git
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
index 9c4965ff1..1de322c69 100644
---
a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
+++
b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/controller/AlertController.java
@@ -93,7 +93,7 @@ public class AlertController {
@DeleteMapping("/delete")
public RestResponse deleteAlertConfig(
- @RequestParam("id") @NotNull(message = "config id must be not null")
Long id) {
+ @RequestParam("id") @NotNull(message = "config id must not be null")
Long id) {
boolean result = alertConfigService.deleteById(id);
return RestResponse.success(result);
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/application.yml
b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
index 29e1c7333..2462834b8 100644
--- a/streampark-flink/streampark-flink-connector-test/src/test/application.yml
+++ b/streampark-flink/streampark-flink-connector-test/src/test/application.yml
@@ -25,7 +25,7 @@ flink:
pipeline.name: streampark-quickstartApp
yarn.application.queue:
taskmanager.numberOfTaskSlots: 1
- parallelism.default: 2
+ parallelism.default: 1
jobmanager.memory:
flink.size:
heap.size:
@@ -51,7 +51,7 @@ flink:
execution:
checkpointing:
mode: EXACTLY_ONCE
- interval: 30s
+ interval: 10s
timeout: 10min
unaligned: false
externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
index 4ba8fd674..8370a1999 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/ClickhouseJavaApp.java
@@ -20,32 +20,32 @@ import
org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink;
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.quickstart.connector.bean.Entity;
+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-/**
- * @author benjobs
- */
+/** @author benjobs */
public class ClickhouseJavaApp {
- public static void main(String[] args) {
- StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
- StreamingContext context = new StreamingContext(envConfig);
- DataStreamSource<Entity> source = context.getJavaEnv().addSource(new
MyDataSource());
-
- //2) async高性能写入
- new ClickHouseSink(context).asyncSink(source, value ->
- String.format("insert into test.orders(userId, siteId) values
(%d,%d)", value.userId, value.siteId)
- ).setParallelism(1);
+ public static void main(String[] args) {
+ StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+ StreamingContext context = new StreamingContext(envConfig);
+ DataStreamSource<Entity> source = context.getJavaEnv().addSource(new
MyDataSource());
- //3) jdbc方式写入
- /**
- *
- * new ClickHouseSink(context).jdbcSink(source, bean ->
- * String.format("insert into test.orders(userId, siteId) values
(%d,%d)", bean.userId, bean.siteId)
- * ).setParallelism(1);
- *
- */
- context.start();
- }
+ // 2) async高性能写入
+ new ClickHouseSink(context)
+ .asyncSink(
+ source,
+ value ->
+ String.format(
+ "insert into test.orders(userId, siteId) values (%d,%d)",
+ value.userId, value.siteId))
+ .setParallelism(1);
+ // 3) jdbc方式写入
+ /**
+ * new ClickHouseSink(context).jdbcSink(source, bean ->
String.format("insert into
+ * test.orders(userId, siteId) values (%d,%d)", bean.userId, bean.siteId)
).setParallelism(1);
+ */
+ context.start();
+ }
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
index 551a28583..2d8091199 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/DorisJavaApp.java
@@ -17,28 +17,28 @@
package org.apache.streampark.flink.quickstart.connector;
import org.apache.streampark.flink.connector.doris.sink.DorisSink;
-import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
+import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
+
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
-/**
- * @author wudi
- **/
+/** @author wudi */
public class DorisJavaApp {
- public static void main(String[] args) {
- StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
- StreamingContext context = new StreamingContext(envConfig);
- DataStream<String> source = new KafkaJavaSource<String>(context)
- .getDataStream()
- .map((MapFunction<KafkaRecord<String>, String>)
KafkaRecord::value)
- .returns(String.class);
+ public static void main(String[] args) {
+ StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+ StreamingContext context = new StreamingContext(envConfig);
+ DataStream<String> source =
+ new KafkaJavaSource<String>(context)
+ .getDataStream()
+ .map((MapFunction<KafkaRecord<String>, String>) KafkaRecord::value)
+ .returns(String.class);
- new DorisSink<String>(context).sink(source);
+ new DorisSink<String>(context).sink(source);
- context.start();
- }
+ context.start();
+ }
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
index 81bfaa338..643d6840b 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/HttpJavaApp.java
@@ -20,18 +20,17 @@ import
org.apache.streampark.flink.connector.http.sink.HttpSink;
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.quickstart.connector.bean.Entity;
+
import org.apache.flink.streaming.api.datastream.DataStreamSource;
-/**
- * @author wudi
- **/
+/** @author wudi */
public class HttpJavaApp {
- public static void main(String[] args) {
- StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
- StreamingContext context = new StreamingContext(envConfig);
- DataStreamSource<Entity> source = context.getJavaEnv().addSource(new
MyDataSource());
- new HttpSink(context).get(source.map(x ->
String.format("http://www.qq.com?id=%d", x.userId)));
- context.start();
- }
+ public static void main(String[] args) {
+ StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+ StreamingContext context = new StreamingContext(envConfig);
+ DataStreamSource<Entity> source = context.getJavaEnv().addSource(new
MyDataSource());
+ new HttpSink(context).get(source.map(x ->
String.format("http://www.qq.com?id=%d", x.userId)));
+ context.start();
+ }
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
index e45dadf3c..740ca08d8 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/KafkaJavaApp.java
@@ -17,42 +17,45 @@
package org.apache.streampark.flink.quickstart.connector;
import org.apache.streampark.common.util.JsonUtils;
+import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
import org.apache.streampark.flink.connector.kafka.sink.KafkaJavaSink;
import org.apache.streampark.flink.connector.kafka.source.KafkaJavaSource;
-import org.apache.streampark.flink.connector.kafka.bean.KafkaRecord;
import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.quickstart.connector.bean.Behavior;
+
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SerializationSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
-/**
- * @author benjobs
- */
+/** @author benjobs */
public class KafkaJavaApp {
- public static void main(String[] args) {
-
- StreamEnvConfig javaConfig = new StreamEnvConfig(args, (environment,
parameterTool) -> {
- //environment.getConfig().enableForceAvro();
- System.out.println("environment argument set...");
- });
-
- StreamingContext context = new StreamingContext(javaConfig);
-
- //1) 从 kafka 中读取数据
- DataStream<Behavior> source = new KafkaJavaSource<String>(context)
- .getDataStream()
- .map((MapFunction<KafkaRecord<String>, Behavior>) value ->
JsonUtils.read(value, Behavior.class));
-
-
- // 2) 将数据写入其他 kafka 主题
- new KafkaJavaSink<Behavior>(context)
- .serializer((SerializationSchema<Behavior>) element ->
JsonUtils.write(element).getBytes())
- .sink(source);
-
- context.start();
- }
-
+ public static void main(String[] args) {
+
+ StreamEnvConfig javaConfig =
+ new StreamEnvConfig(
+ args,
+ (environment, parameterTool) -> {
+ // environment.getConfig().enableForceAvro();
+ System.out.println("environment argument set...");
+ });
+
+ StreamingContext context = new StreamingContext(javaConfig);
+
+ // 1) 从 kafka 中读取数据
+ DataStream<Behavior> source =
+ new KafkaJavaSource<String>(context)
+ .getDataStream()
+ .map(
+ (MapFunction<KafkaRecord<String>, Behavior>)
+ value -> JsonUtils.read(value, Behavior.class));
+
+ // 2) 将数据写入其他 kafka 主题
+ new KafkaJavaSink<Behavior>(context)
+ .serializer((SerializationSchema<Behavior>) element ->
JsonUtils.write(element).getBytes())
+ .sink(source);
+
+ context.start();
+ }
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
index 97611b0d1..9c86f1b0e 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
@@ -16,7 +16,6 @@
*/
package org.apache.streampark.flink.quickstart.connector;
-import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.streampark.flink.connector.function.SQLQueryFunction;
import org.apache.streampark.flink.connector.function.SQLResultFunction;
import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
@@ -24,47 +23,51 @@ import org.apache.streampark.flink.core.StreamEnvConfig;
import org.apache.streampark.flink.core.scala.StreamingContext;
import org.apache.streampark.flink.quickstart.connector.bean.Order;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
+
import java.io.Serializable;
import java.util.ArrayList;
import java.util.List;
public class MySQLJavaApp {
- public static void main(String[] args) {
-
- StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
+ public static void main(String[] args) {
- StreamingContext context = new StreamingContext(envConfig);
+ StreamEnvConfig envConfig = new StreamEnvConfig(args, null);
- //读取MySQL数据源
- new JdbcJavaSource<Order>(context)
- .getDataStream(
- (SQLQueryFunction<Order>) lastOne -> {
- //5秒抽取一次
- Thread.sleep(3000);
- Serializable lastOffset = lastOne == null ?
"2020-10-10 23:00:00" : lastOne.getTimestamp();
- return String.format(
- "select * from t_order " +
- "where timestamp > '%s' " +
- "order by timestamp asc ",
- lastOffset
- );
- },
- (SQLResultFunction<Order>) map -> {
- List<Order> result = new ArrayList<>();
- map.forEach(item -> {
- Order order = new Order();
-
order.setOrderId(item.get("order_id").toString());
-
order.setMarketId(item.get("market_id").toString());
-
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
- result.add(order);
- });
- return result;
- })
- .returns(TypeInformation.of(Order.class))
- .print("jdbc source: >>>>>");
+ StreamingContext context = new StreamingContext(envConfig);
- context.start();
+ // 读取MySQL数据源
+ new JdbcJavaSource<Order>(context, Order.class)
+ .getDataStream(
+ (SQLQueryFunction<Order>)
+ lastOne -> {
+ // 5秒抽取一次
+ Thread.sleep(3000);
+ Serializable lastOffset =
+ lastOne == null ? "2020-10-10 23:00:00" :
lastOne.getTimestamp();
+ return String.format(
+ "select * from t_order "
+ + "where timestamp > '%s' "
+ + "order by timestamp asc ",
+ lastOffset);
+ },
+ (SQLResultFunction<Order>)
+ map -> {
+ List<Order> result = new ArrayList<>();
+ map.forEach(
+ item -> {
+ Order order = new Order();
+ order.setOrderId(item.get("order_id").toString());
+ order.setMarketId(item.get("market_id").toString());
+
order.setTimestamp(Long.parseLong(item.get("timestamp").toString()));
+ result.add(order);
+ });
+ return result;
+ })
+ .returns(TypeInformation.of(Order.class))
+ .print("jdbc source: >>>>>");
- }
+ context.start();
+ }
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
index 6bfbb3a33..d01165370 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Behavior.java
@@ -18,14 +18,12 @@ package
org.apache.streampark.flink.quickstart.connector.bean;
import lombok.Data;
-/**
- * @author benjobs
- */
+/** @author benjobs */
@Data
public class Behavior {
- private String user_id;
- private Long item_id;
- private Long category_id;
- private String behavior;
- private Long ts;
+ private String user_id;
+ private Long item_id;
+ private Long category_id;
+ private String behavior;
+ private Long ts;
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
index a5b5b3282..09653fe2a 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Entity.java
@@ -16,20 +16,15 @@
*/
package org.apache.streampark.flink.quickstart.connector.bean;
-
-/**
- * @author benjobs
- */
-
+/** @author benjobs */
public class Entity {
- public Long userId;
- public Long orderId;
- public Long siteId;
- public Long cityId;
- public Integer orderStatus;
- public Double price;
- public Integer quantity;
- public Long timestamp;
-
+ public Long userId;
+ public Long orderId;
+ public Long siteId;
+ public Long cityId;
+ public Integer orderStatus;
+ public Double price;
+ public Integer quantity;
+ public Long timestamp;
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
index c0114d2c0..b143d70e3 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/LogBean.java
@@ -22,9 +22,9 @@ import java.io.Serializable;
@Data
public class LogBean implements Serializable {
- private String platenum;
- private String cardType;
- private Long inTime;
- private Long outTime;
- private String controlid;
+ private String platenum;
+ private String cardType;
+ private Long inTime;
+ private Long outTime;
+ private String controlid;
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
index c561a1b52..8f98f4d67 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
+++
b/streampark-flink/streampark-flink-connector-test/src/test/java/org/apache/streampark/flink/quickstart/connector/bean/Order.java
@@ -18,10 +18,12 @@ package
org.apache.streampark.flink.quickstart.connector.bean;
import lombok.Data;
+import java.io.Serializable;
+
@Data
-public class Order {
- private String orderId;
- private String marketId;
- private Double price;
- private Long timestamp;
+public class Order implements Serializable {
+ private String orderId;
+ private String marketId;
+ private Double price;
+ private Long timestamp;
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
index 79c0950af..db70be422 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ClickHouseSinkApp.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.quickstart.connector
import org.apache.streampark.flink.connector.clickhouse.sink.ClickHouseSink
import org.apache.streampark.flink.core.scala.FlinkStreaming
import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
import org.apache.flink.api.common.typeinfo.TypeInformation
object ClickHouseSinkApp extends FlinkStreaming {
@@ -42,12 +43,12 @@ object ClickHouseSinkApp extends FlinkStreaming {
val source = context.addSource(new MyDataSource)
// 2)高性能异步写入
- ClickHouseSink().asyncSink(source)(x => {s"insert into
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
+ ClickHouseSink().asyncSink(source)(
+ x => { s"insert into test.orders(userId,siteId) values
(${x.userId},${x.siteId})" })
- //3) jdbc方式写入
+ // 3) jdbc方式写入
// ClickHouseSink().jdbcSink(source)(x => {s"insert into
test.orders(userId,siteId) values (${x.userId},${x.siteId})"})
-
}
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
index fb9d80038..190c0eca6 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/ES7SinkApp.scala
@@ -16,12 +16,13 @@
*/
package org.apache.streampark.flink.quickstart.connector
-import org.apache.flink.api.common.typeinfo.TypeInformation
-import org.apache.flink.streaming.api.scala.DataStream
import org.apache.streampark.flink.connector.elasticsearch7.sink.ES7Sink
import
org.apache.streampark.flink.connector.elasticsearch7.util.ElasticsearchUtils
import org.apache.streampark.flink.core.scala.FlinkStreaming
import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
+import org.apache.flink.api.common.typeinfo.TypeInformation
+import org.apache.flink.streaming.api.scala.DataStream
import org.elasticsearch.action.index.IndexRequest
import org.json4s.jackson.JsonMethods
@@ -33,9 +34,9 @@ object ES7SinkApp extends FlinkStreaming {
val source: DataStream[Entity] = context.addSource(new MyDataSource)
implicit def indexedSeq(x: Entity): IndexRequest =
ElasticsearchUtils.indexRequest(
- "flink_order",
- s"${x.orderId}_${x.timestamp}",
- JsonMethods.mapper.writeValueAsString(x)
+ "flink_order",
+ s"${x.orderId}_${x.timestamp}",
+ JsonMethods.mapper.writeValueAsString(x)
)
ES7Sink().sink[Entity](source)
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
index e41c62ff7..c3e49b36f 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseRequestApp.scala
@@ -20,6 +20,7 @@ import org.apache.streampark.common.util.ConfigUtils
import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
import org.apache.streampark.flink.core.scala.FlinkStreaming
+
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.hadoop.hbase.client.Get
@@ -27,22 +28,27 @@ object HBaseRequestApp extends FlinkStreaming {
implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
- implicit val reqType: TypeInformation[(String, Boolean)] =
TypeInformation.of(classOf[(String, Boolean)])
+ implicit val reqType: TypeInformation[(String, Boolean)] =
+ TypeInformation.of(classOf[(String, Boolean)])
override def handle(): Unit = {
implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
- //one topic
+ // one topic
val source = context.fromCollection(Seq("123456", "1111", "222"))
source.print("source:>>>")
- HBaseRequest(source).requestOrdered[(String, Boolean)](x => {
- new HBaseQuery("person", new Get(x.getBytes()))
- }, timeout = 5000, resultFunc = (a, r) => {
- a -> !r.isEmpty
- }).print(" check.... ")
-
+ HBaseRequest(source)
+ .requestOrdered[(String, Boolean)](
+ x => {
+ new HBaseQuery("person", new Get(x.getBytes()))
+ },
+ timeout = 5000,
+ resultFunc = (a, r) => {
+ a -> !r.isEmpty
+ })
+ .print(" check.... ")
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
index 0a4695590..11fa2d484 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSinkApp.scala
@@ -20,11 +20,13 @@ import org.apache.streampark.common.util.ConfigUtils
import org.apache.streampark.flink.connector.hbase.sink.{HBaseOutputFormat,
HBaseSink}
import org.apache.streampark.flink.core.scala.FlinkStreaming
import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.hadoop.hbase.client.{Mutation, Put}
import org.apache.hadoop.hbase.util.Bytes
import java.util.{Collections, Random}
+
import scala.language.implicitConversions
object HBaseSinkApp extends FlinkStreaming {
@@ -36,9 +38,10 @@ object HBaseSinkApp extends FlinkStreaming {
val source = context.addSource(new MyDataSource)
val random = new Random()
- //定义转换规则...
+ // 定义转换规则...
implicit def entry2Put(entity: Entity): java.lang.Iterable[Mutation] = {
- val put = new Put(Bytes.toBytes(System.nanoTime() +
random.nextInt(1000000)), entity.timestamp)
+ val put =
+ new Put(Bytes.toBytes(System.nanoTime() + random.nextInt(1000000)),
entity.timestamp)
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("cid"),
Bytes.toBytes(entity.cityId))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("oid"),
Bytes.toBytes(entity.orderId))
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("os"),
Bytes.toBytes(entity.orderStatus))
@@ -46,18 +49,17 @@ object HBaseSinkApp extends FlinkStreaming {
put.addColumn(Bytes.toBytes("cf"), Bytes.toBytes("sid"),
Bytes.toBytes(entity.siteId))
Collections.singleton(put)
}
- //source ===> trans ===> sink
+ // source ===> trans ===> sink
- //1)插入方式1
+ // 1)插入方式1
HBaseSink().sink[Entity](source, "order")
- //2) 插入方式2
- //1.指定HBase 配置文件
+ // 2) 插入方式2
+ // 1.指定HBase 配置文件
val prop = ConfigUtils.getHBaseConfig(context.parameter.toMap)
- //2.插入...
+ // 2.插入...
source.writeUsingOutputFormat(new HBaseOutputFormat[Entity]("order", prop))
-
}
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
index 5e2a9b5c4..3d56679d7 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HBaseSourceApp.scala
@@ -21,6 +21,7 @@ import
org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
import org.apache.streampark.flink.connector.hbase.request.HBaseRequest
import org.apache.streampark.flink.connector.hbase.source.HBaseSource
import org.apache.streampark.flink.core.scala.FlinkStreaming
+
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.hadoop.hbase.CellUtil
import org.apache.hadoop.hbase.client.{Get, Scan}
@@ -36,38 +37,47 @@ object HBaseSourceApp extends FlinkStreaming {
implicit val conf = ConfigUtils.getHBaseConfig(context.parameter.toMap)
- val id = HBaseSource().getDataStream[String](query => {
- Thread.sleep(10)
- if (query == null) {
- new HBaseQuery("person", new Scan())
- } else {
- //TODO 从上一条记录中获取便宜量,决定下次查询的条件...
- new HBaseQuery("person", new Scan())
- }
- }, r => new String(r.getRow), null)
+ val id = HBaseSource().getDataStream[String](
+ query => {
+ Thread.sleep(10)
+ if (query == null) {
+ new HBaseQuery("person", new Scan())
+ } else {
+ // TODO 从上一条记录中获取便宜量,决定下次查询的条件...
+ new HBaseQuery("person", new Scan())
+ }
+ },
+ r => new String(r.getRow),
+ null
+ )
- HBaseRequest(id).requestOrdered(x => {
- new HBaseQuery("person", new Get(x.getBytes()))
- }, (a, r) => {
- val map = new util.HashMap[String, String]()
- val cellScanner = r.cellScanner()
- while (cellScanner.advance()) {
- val cell = cellScanner.current()
- val q = Bytes.toString(CellUtil.cloneQualifier(cell))
- val (name, v) = q.split("_") match {
- case Array(_type, name) =>
- _type match {
- case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
- case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
- case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
- case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
+ HBaseRequest(id)
+ .requestOrdered(
+ x => {
+ new HBaseQuery("person", new Get(x.getBytes()))
+ },
+ (a, r) => {
+ val map = new util.HashMap[String, String]()
+ val cellScanner = r.cellScanner()
+ while (cellScanner.advance()) {
+ val cell = cellScanner.current()
+ val q = Bytes.toString(CellUtil.cloneQualifier(cell))
+ val (name, v) = q.split("_") match {
+ case Array(_type, name) =>
+ _type match {
+ case "i" => name -> Bytes.toInt(CellUtil.cloneValue(cell))
+ case "s" => name -> Bytes.toString(CellUtil.cloneValue(cell))
+ case "d" => name -> Bytes.toDouble(CellUtil.cloneValue(cell))
+ case "f" => name -> Bytes.toFloat(CellUtil.cloneValue(cell))
+ }
+ case _ =>
}
- case _ =>
+ map.put(name.toString, v.toString)
+ }
+ map.toString
}
- map.put(name.toString, v.toString)
- }
- map.toString
- }).print("Async")
+ )
+ .print("Async")
}
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
index 763daf92d..80b238798 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/HttpSinkApp.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.quickstart.connector
import org.apache.streampark.flink.connector.http.sink.HttpSink
import org.apache.streampark.flink.core.scala.FlinkStreaming
import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
import org.apache.flink.api.common.typeinfo.TypeInformation
object HttpSinkApp extends FlinkStreaming {
@@ -28,10 +29,9 @@ object HttpSinkApp extends FlinkStreaming {
override def handle(): Unit = {
- /**
- * source
- */
- val source = context.addSource(new MyDataSource)
+ /** source */
+ val source = context
+ .addSource(new MyDataSource)
.map(x => s"http://www.qq.com?id=${x.userId}")
// sink
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
index f1c289604..0f18055e0 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/InfluxDBSinkApp.scala
@@ -19,14 +19,13 @@ package org.apache.streampark.flink.quickstart.connector
import org.apache.streampark.flink.connector.influx.bean.InfluxEntity
import org.apache.streampark.flink.connector.influx.sink.InfluxSink
import org.apache.streampark.flink.core.scala.FlinkStreaming
+
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
-/**
- * 侧输出流
- */
+/** 侧输出流 */
object InfluxDBSinkApp extends FlinkStreaming {
implicit val entityType: TypeInformation[Weather] =
TypeInformation.of(classOf[Weather])
@@ -34,14 +33,17 @@ object InfluxDBSinkApp extends FlinkStreaming {
override def handle(): Unit = {
val source = context.addSource(new WeatherSource())
- //weather,altitude=1000,area=北 temperature=11,humidity=-4
+ // weather,altitude=1000,area=北 temperature=11,humidity=-4
implicit val entity: InfluxEntity[Weather] = new InfluxEntity[Weather](
"mydb",
"test",
"autogen",
(x: Weather) => Map("altitude" -> x.altitude.toString, "area" -> x.area),
- (x: Weather) => Map("temperature" -> x.temperature.asInstanceOf[Object],
"humidity" -> x.humidity.asInstanceOf[Object])
+ (x: Weather) =>
+ Map(
+ "temperature" -> x.temperature.asInstanceOf[Object],
+ "humidity" -> x.humidity.asInstanceOf[Object])
)
InfluxSink().sink(source, "mydb")
@@ -50,17 +52,8 @@ object InfluxDBSinkApp extends FlinkStreaming {
}
-/**
- *
- * 温度 temperature
- * 湿度 humidity
- * 地区 area
- * 海拔 altitude
- */
-case class Weather(temperature: Long,
- humidity: Long,
- area: String,
- altitude: Long)
+/** 温度 temperature 湿度 humidity 地区 area 海拔 altitude */
+case class Weather(temperature: Long, humidity: Long, area: String, altitude:
Long)
class WeatherSource extends SourceFunction[Weather] {
@@ -82,4 +75,3 @@ class WeatherSource extends SourceFunction[Weather] {
}
}
-
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
index 01a792474..cb4c993ad 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/JdbcSinkApp.scala
@@ -19,6 +19,7 @@ package org.apache.streampark.flink.quickstart.connector
import org.apache.streampark.flink.connector.jdbc.sink.JdbcSink
import org.apache.streampark.flink.connector.kafka.source.KafkaSource
import org.apache.streampark.flink.core.scala.FlinkStreaming
+
import org.apache.flink.api.common.typeinfo.TypeInformation
object JdbcSinkApp extends FlinkStreaming {
@@ -27,28 +28,30 @@ object JdbcSinkApp extends FlinkStreaming {
override def handle(): Unit = {
- /**
- * 从kafka里读数据.这里的数据是数字或者字母,每次读取1条
- */
- val source = KafkaSource().getDataStream[String]()
+ /** 从kafka里读数据.这里的数据是数字或者字母,每次读取1条 */
+ val source = KafkaSource()
+ .getDataStream[String]()
.uid("kfkSource1")
.name("kfkSource1")
- .map(x => {
- x.value
- })
-
+ .map(
+ x => {
+ x.value
+ })
/**
- * 假设这里有一个orders表.有一个字段,id的类型是int
- * 在数据插入的时候制造异常:
- * 1)正确情况: 当从kafka中读取的内容全部是数字时会插入成功,kafka的消费的offset也会更新.
- * 如: 当前kafka
size为20,手动输入10个数字,则size为30,然后会将这10个数字写入到Mysql,kafka的offset也会更新
- * 2)异常情况: 当从kafka中读取的内容非数字会导致插入失败,kafka的消费的offset会回滚
- * 如: 当前的kafka size为30,offset是30,
手动输入1个字母,此时size为31,写入mysql会报错,kafka的offset依旧是30,不会发生更新.
+ * 假设这里有一个orders表.有一个字段,id的类型是int 在数据插入的时候制造异常: 1)正确情况:
+ * 当从kafka中读取的内容全部是数字时会插入成功,kafka的消费的offset也会更新. 如: 当前kafka
+ * size为20,手动输入10个数字,则size为30,然后会将这10个数字写入到Mysql,kafka的offset也会更新 2)异常情况:
+ * 当从kafka中读取的内容非数字会导致插入失败,kafka的消费的offset会回滚 如: 当前的kafka
size为30,offset是30,
+ * 手动输入1个字母,此时size为31,写入mysql会报错,kafka的offset依旧是30,不会发生更新.
*/
- JdbcSink(parallelism = 5).sink[String](source)(x => {
- s"insert into orders(id,timestamp)
values('$x',${System.currentTimeMillis()})"
- }).uid("mysqlSink").name("mysqlSink")
+ JdbcSink(parallelism = 5)
+ .sink[String](source)(
+ x => {
+ s"insert into orders(id,timestamp)
values('$x',${System.currentTimeMillis()})"
+ })
+ .uid("mysqlSink")
+ .name("mysqlSink")
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
index b4e82ab69..af8aa55aa 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSinkApp.scala
@@ -1,5 +1,5 @@
/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
+ * Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
@@ -18,12 +18,12 @@ package org.apache.streampark.flink.quickstart.connector
import org.apache.streampark.flink.connector.kafka.sink.KafkaSink
import org.apache.streampark.flink.core.scala.FlinkStreaming
+
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
-
object KafkaSinkApp extends FlinkStreaming {
implicit val stringType: TypeInformation[String] =
TypeInformation.of(classOf[String])
@@ -38,12 +38,7 @@ object KafkaSinkApp extends FlinkStreaming {
}
-
-case class Behavior(user_id: String,
- item_id: Long,
- category_id: Long,
- behavior: String,
- ts: Long) {
+case class Behavior(user_id: String, item_id: Long, category_id: Long,
behavior: String, ts: Long) {
override def toString: String = {
s"""
|{
@@ -57,7 +52,6 @@ case class Behavior(user_id: String,
}
}
-
class BehaviorSource extends SourceFunction[Behavior] {
private[this] var isRunning = true
@@ -74,11 +68,10 @@ class BehaviorSource extends SourceFunction[Behavior] {
val item_id = random.nextInt(100)
val category_id = random.nextInt(20)
val behavior = seq(random.nextInt(5))
- val order = Behavior(user_id.toString, item_id, category_id, behavior,
System.currentTimeMillis())
+ val order =
+ Behavior(user_id.toString, item_id, category_id, behavior,
System.currentTimeMillis())
ctx.collect(order)
}
}
}
-
-
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
index e9e3eaa08..651605678 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/KafkaSourceApp.scala
@@ -16,14 +16,16 @@
*/
package org.apache.streampark.flink.quickstart.connector
-import org.apache.flink.streaming.api.scala._
import org.apache.streampark.flink.connector.kafka.source.KafkaSource
import org.apache.streampark.flink.core.scala.FlinkStreaming
+import org.apache.flink.streaming.api.scala._
+
object KafkaSourceApp extends FlinkStreaming {
override def handle(): Unit = {
- KafkaSource().getDataStream[String]()
+ KafkaSource()
+ .getDataStream[String]()
.map(_.value)
.print()
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
index c0db74056..4329758ff 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MongoSourceApp.scala
@@ -16,13 +16,15 @@
*/
package org.apache.streampark.flink.quickstart.connector
-import com.mongodb.BasicDBObject
import org.apache.streampark.common.util.{DateUtils, JsonUtils}
import org.apache.streampark.flink.connector.mongo.source.MongoSource
import org.apache.streampark.flink.core.scala.FlinkStreaming
+
+import com.mongodb.BasicDBObject
import org.apache.flink.api.common.typeinfo.TypeInformation
import java.util.Properties
+
import scala.collection.JavaConversions._
object MongoSourceApp extends FlinkStreaming {
@@ -32,21 +34,26 @@ object MongoSourceApp extends FlinkStreaming {
override def handle(): Unit = {
implicit val prop: Properties = context.parameter.getProperties
val source = MongoSource()
- source.getDataStream[String](
- "shop",
- (a, d) => {
- Thread.sleep(1000)
- /**
- * 从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset
- */
- val offset = if (a == null) "2019-09-27 00:00:00" else {
- JsonUtils.read[Map[String, _]](a).get("updateTime").toString
- }
- val cond = new BasicDBObject().append("updateTime", new
BasicDBObject("$gte", DateUtils.parse(offset)))
- d.find(cond)
- },
- _.toList.map(_.toJson()), null
- ).print()
+ source
+ .getDataStream[String](
+ "shop",
+ (a, d) => {
+ Thread.sleep(1000)
+
+ /**
从上一条记录提前offset数据,作为下一条数据查询的条件,如果offset为Null,则表明是第一次查询,需要指定默认offset */
+ val offset =
+ if (a == null) "2019-09-27 00:00:00"
+ else {
+ JsonUtils.read[Map[String, _]](a).get("updateTime").toString
+ }
+ val cond = new BasicDBObject()
+ .append("updateTime", new BasicDBObject("$gte",
DateUtils.parse(offset)))
+ d.find(cond)
+ },
+ _.toList.map(_.toJson()),
+ null
+ )
+ .print()
}
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
index af84ea674..5348d104d 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MyDataSource.scala
@@ -17,11 +17,11 @@
package org.apache.streampark.flink.quickstart.connector
import org.apache.streampark.flink.quickstart.connector.bean.Entity
+
import org.apache.flink.streaming.api.functions.source.SourceFunction
import scala.util.Random
-
class MyDataSource extends SourceFunction[Entity] {
private[this] var isRunning = true
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
index 5ab968edb..dcc28ffd1 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
@@ -18,6 +18,7 @@ package org.apache.streampark.flink.quickstart.connector
import org.apache.streampark.flink.connector.jdbc.source.JdbcSource
import org.apache.streampark.flink.core.scala.FlinkStreaming
+
import org.apache.flink.api.common.typeinfo.TypeInformation
object MySQLSourceApp extends FlinkStreaming {
@@ -26,12 +27,16 @@ object MySQLSourceApp extends FlinkStreaming {
override def handle(): Unit = {
- JdbcSource().getDataStream[Order](lastOne => {
- val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else
lastOne.timestamp
- s"select * from t_order where timestamp > '$laseOffset' order by
timestamp asc "
- },
- _.map(x => new Order(x("market_id").toString, x("timestamp").toString)),
null
- ).print()
+ JdbcSource()
+ .getDataStream[Order](
+ lastOne => {
+ val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else
lastOne.timestamp
+ s"select * from t_order where timestamp > '$laseOffset' order by
timestamp asc "
+ },
+ _.map(x => new Order(x("market_id").toString,
x("timestamp").toString)),
+ null
+ )
+ .print()
}
diff --git
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
index 179c57f7d..24180b834 100644
---
a/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
+++
b/streampark-flink/streampark-flink-connector-test/src/test/scala/org/apache/streampark/flink/quickstart/connector/SideOutApp.scala
@@ -17,6 +17,7 @@
package org.apache.streampark.flink.quickstart.connector
import org.apache.streampark.flink.core.scala.FlinkStreaming
+
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.streaming.api.functions.ProcessFunction
import org.apache.flink.streaming.api.functions.source.SourceFunction
@@ -25,9 +26,7 @@ import org.apache.flink.util.Collector
import scala.util.Random
-/**
- * 侧输出流
- */
+/** 侧输出流 */
object SideOutApp extends FlinkStreaming {
implicit val entityType: TypeInformation[SideEntry] =
TypeInformation.of(classOf[SideEntry])
@@ -35,14 +34,14 @@ object SideOutApp extends FlinkStreaming {
override def handle(): Unit = {
val source = context.addSource(new SideSource())
- /**
- * 侧输出流。。。
- * 官方写法:设置侧输出流
- */
+ /** 侧输出流。。。 官方写法:设置侧输出流 */
val side1 = source.process(new ProcessFunction[SideEntry, SideEntry] {
val tag = new OutputTag[SideEntry]("flink")
- override def processElement(value: SideEntry, ctx:
ProcessFunction[SideEntry, SideEntry]#Context, out: Collector[SideEntry]): Unit
= {
+ override def processElement(
+ value: SideEntry,
+ ctx: ProcessFunction[SideEntry, SideEntry]#Context,
+ out: Collector[SideEntry]): Unit = {
if (value.userId < 100) {
ctx.output(tag, value)
} else {
@@ -51,7 +50,7 @@ object SideOutApp extends FlinkStreaming {
}
})
- //官方写法,获取侧输出流
+ // 官方写法,获取侧输出流
side1.getSideOutput(new
OutputTag[SideEntry]("flink")).print("flink:========>")
}
@@ -59,26 +58,35 @@ object SideOutApp extends FlinkStreaming {
}
/**
- *
- * @param userId : 用户Id
- * @param orderId : 订单ID
- * @param siteId : 站点ID
- * @param cityId : 城市Id
- * @param orderStatus : 订单状态(1:下单,0:退单)
- * @param isNewOrder : 是否是首单
- * @param price : 单价
- * @param quantity : 订单数量
- * @param timestamp : 下单时间
+ * @param userId
+ * : 用户Id
+ * @param orderId
+ * : 订单ID
+ * @param siteId
+ * : 站点ID
+ * @param cityId
+ * : 城市Id
+ * @param orderStatus
+ * : 订单状态(1:下单,0:退单)
+ * @param isNewOrder
+ * : 是否是首单
+ * @param price
+ * : 单价
+ * @param quantity
+ * : 订单数量
+ * @param timestamp
+ * : 下单时间
*/
-case class SideEntry(userId: Long,
- orderId: Long,
- siteId: Long,
- cityId: Long,
- orderStatus: Int,
- isNewOrder: Int,
- price: Double,
- quantity: Int,
- timestamp: Long)
+case class SideEntry(
+ userId: Long,
+ orderId: Long,
+ siteId: Long,
+ cityId: Long,
+ orderStatus: Int,
+ isNewOrder: Int,
+ price: Double,
+ quantity: Int,
+ timestamp: Long)
class SideSource extends SourceFunction[SideEntry] {
@@ -96,10 +104,18 @@ class SideSource extends SourceFunction[SideEntry] {
val isNew = random.nextInt(1)
val price = random.nextDouble()
val quantity = new Random(10).nextInt()
- val order = SideEntry(userId, orderId, siteId = 1, cityId = 1, status,
isNew, price, quantity, System.currentTimeMillis)
+ val order = SideEntry(
+ userId,
+ orderId,
+ siteId = 1,
+ cityId = 1,
+ status,
+ isNew,
+ price,
+ quantity,
+ System.currentTimeMillis)
ctx.collect(order)
}
}
}
-
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
index ca19fd9e5..4eccc4490 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/java/org/apache/streampark/flink/connector/hbase/source/HBaseJavaSource.java
@@ -17,46 +17,69 @@
package org.apache.streampark.flink.connector.hbase.source;
-import org.apache.streampark.common.util.AssertUtils;
+import org.apache.streampark.common.util.ConfigUtils;
import org.apache.streampark.flink.connector.function.RunningFunction;
import org.apache.streampark.flink.connector.hbase.function.HBaseQueryFunction;
import
org.apache.streampark.flink.connector.hbase.function.HBaseResultFunction;
import
org.apache.streampark.flink.connector.hbase.internal.HBaseSourceFunction;
import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import java.util.Properties;
public class HBaseJavaSource<T> {
private final StreamingContext context;
- private final Properties property;
+ private Properties property;
+ private String alias;
+ private final TypeInformation<T> typeInformation;
- public HBaseJavaSource(StreamingContext context, Properties property) {
+ public HBaseJavaSource(StreamingContext context, Class<T> typeInfo) {
+ if (context == null) {
+ throw new NullPointerException("context must not be null");
+ }
+ if (typeInfo == null) {
+ throw new NullPointerException("typeInfo must not be null");
+ }
this.context = context;
+ this.typeInformation = TypeInformation.of(typeInfo);
+ }
+
+ public HBaseJavaSource<T> property(Properties property) {
this.property = property;
+ return this;
}
- public DataStreamSource<T> getDataStream(
- HBaseQueryFunction<T> queryFunction,
- HBaseResultFunction<T> resultFunction) {
- return getDataStream(queryFunction, resultFunction, null);
- }
+ public HBaseJavaSource<T> alias(String alias) {
+ this.alias = alias;
+ return this;
+ }
+
+ public DataStreamSource<T> getDataStream(
+ HBaseQueryFunction<T> queryFunction, HBaseResultFunction<T>
resultFunction) {
+ return getDataStream(queryFunction, resultFunction, null);
+ }
public DataStreamSource<T> getDataStream(
HBaseQueryFunction<T> queryFunction,
HBaseResultFunction<T> resultFunction,
RunningFunction runningFunc) {
- AssertUtils.notNull(queryFunction, "queryFunction must not be null");
- AssertUtils.notNull(resultFunction, "resultFunction must not be null");
- HBaseSourceFunction<T> sourceFunction = new HBaseSourceFunction<>(
- property,
- queryFunction,
- resultFunction,
- runningFunc,
- null
- );
+ if (queryFunction == null) {
+ throw new NullPointerException("HBaseJavaSource error: query function
cannot be null");
+ }
+ if (resultFunction == null) {
+ throw new NullPointerException("HBaseJavaSource error: result function
cannot be null");
+ }
+
+ if (this.property == null) {
+ this.property = ConfigUtils.getHBaseConfig(context.parameter().toMap(),
alias);
+ }
+
+ HBaseSourceFunction<T> sourceFunction =
+ new HBaseSourceFunction<>(
+ property, queryFunction, resultFunction, runningFunc,
typeInformation);
return context.getJavaEnv().addSource(sourceFunction);
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
index 726ff74ee..50bfc4ea6 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-hbase/src/main/scala/org/apache/streampark/flink/connector/hbase/source/HBaseSource.scala
@@ -17,7 +17,7 @@
package org.apache.streampark.flink.connector.hbase.source
-import org.apache.streampark.common.util.Utils
+import org.apache.streampark.common.util.ConfigUtils
import org.apache.streampark.flink.connector.hbase.bean.HBaseQuery
import org.apache.streampark.flink.connector.hbase.internal.HBaseSourceFunction
import org.apache.streampark.flink.core.scala.StreamingContext
@@ -32,8 +32,8 @@ import scala.annotation.meta.param
object HBaseSource {
- def apply(@(transient @param) property: Properties = new
Properties())(implicit
- ctx: StreamingContext): HBaseSource = new HBaseSource(ctx, property)
+ def apply(@(transient @param) alias: String = "", properties: Properties =
new Properties())(
+ implicit ctx: StreamingContext): HBaseSource = new HBaseSource(ctx,
alias, properties)
}
@@ -45,14 +45,25 @@ object HBaseSource {
*/
class HBaseSource(
@(transient @param) val ctx: StreamingContext,
- property: Properties = new Properties()) {
+ alias: String,
+ property: Properties) {
def getDataStream[R: TypeInformation](
query: R => HBaseQuery,
func: Result => R,
- running: Unit => Boolean)(implicit prop: Properties = new Properties()):
DataStream[R] = {
- Utils.copyProperties(property, prop)
- val hBaseFunc = new HBaseSourceFunction[R](prop, query, func, running)
+ running: Unit => Boolean): DataStream[R] = {
+
+ if (query == null) {
+ throw new NullPointerException("getDataStream error, SQLQueryFunction
must not be null")
+ }
+ if (func == null) {
+ throw new NullPointerException("getDataStream error, SQLResultFunction
must not be null")
+ }
+ val jdbc = ConfigUtils.getHBaseConfig(ctx.parameter.toMap)
+ if (property != null) {
+ jdbc.putAll(property)
+ }
+ val hBaseFunc = new HBaseSourceFunction[R](jdbc, query, func, running)
ctx.addSource(hBaseFunc)
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
index d611de227..fdb6b8c50 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-jdbc/src/main/java/org/apache/streampark/flink/connector/jdbc/source/JdbcJavaSource.java
@@ -17,7 +17,6 @@
package org.apache.streampark.flink.connector.jdbc.source;
-import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.common.util.ConfigUtils;
import org.apache.streampark.flink.connector.function.RunningFunction;
import org.apache.streampark.flink.connector.function.SQLQueryFunction;
@@ -25,6 +24,7 @@ import
org.apache.streampark.flink.connector.function.SQLResultFunction;
import org.apache.streampark.flink.connector.jdbc.internal.JdbcSourceFunction;
import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import java.util.Properties;
@@ -32,11 +32,20 @@ import java.util.Properties;
public class JdbcJavaSource<T> {
private final StreamingContext context;
+ private final TypeInformation<T> typeInformation;
+
private Properties jdbc;
private String alias = null;
- public JdbcJavaSource(StreamingContext context) {
+ public JdbcJavaSource(StreamingContext context, Class<T> typeInfo) {
+ if (typeInfo == null) {
+ throw new NullPointerException("typeInfo must not be null");
+ }
+ if (context == null) {
+ throw new NullPointerException("context must not be null");
+ }
this.context = context;
+ this.typeInformation = TypeInformation.of(typeInfo);
}
public JdbcJavaSource<T> jdbc(Properties jdbc) {
@@ -49,29 +58,31 @@ public class JdbcJavaSource<T> {
return this;
}
- public DataStreamSource<T> getDataStream(
- SQLQueryFunction<T> queryFunction,
- SQLResultFunction<T> resultFunction) {
- return getDataStream(queryFunction, resultFunction, null);
- }
+ public DataStreamSource<T> getDataStream(
+ SQLQueryFunction<T> queryFunction, SQLResultFunction<T> resultFunction) {
+ return getDataStream(queryFunction, resultFunction, null);
+ }
public DataStreamSource<T> getDataStream(
SQLQueryFunction<T> queryFunction,
SQLResultFunction<T> resultFunction,
RunningFunction runningFunc) {
- AssertUtils.notNull(queryFunction, "queryFunction must not be null");
- AssertUtils.notNull(resultFunction, "resultFunction must not be null");
+ if (queryFunction == null) {
+ throw new NullPointerException(
+ "JdbcJavaSource getDataStream error: SQLQueryFunction must not be
null");
+ }
+ if (resultFunction == null) {
+ throw new NullPointerException(
+ "JdbcJavaSource getDataStream error: SQLResultFunction must not be
null");
+ }
+
if (this.jdbc == null) {
this.jdbc = ConfigUtils.getJdbcProperties(context.parameter().toMap(),
alias);
}
- JdbcSourceFunction<T> sourceFunction = new JdbcSourceFunction<>(
- jdbc,
- queryFunction,
- resultFunction,
- runningFunc,
- null
- );
+
+ JdbcSourceFunction<T> sourceFunction =
+ new JdbcSourceFunction<>(jdbc, queryFunction, resultFunction,
runningFunc, typeInformation);
return context.getJavaEnv().addSource(sourceFunction);
}
}
diff --git
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
index 9d8e3691f..41b230cc9 100644
---
a/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
+++
b/streampark-flink/streampark-flink-connector/streampark-flink-connector-mongo/src/main/java/org/apache/streampark/flink/connector/mongo/source/MongoJavaSource.java
@@ -17,32 +17,44 @@
package org.apache.streampark.flink.connector.mongo.source;
-import org.apache.streampark.common.util.AssertUtils;
import org.apache.streampark.flink.connector.function.RunningFunction;
import org.apache.streampark.flink.connector.mongo.function.MongoQueryFunction;
import
org.apache.streampark.flink.connector.mongo.function.MongoResultFunction;
import
org.apache.streampark.flink.connector.mongo.internal.MongoSourceFunction;
import org.apache.streampark.flink.core.scala.StreamingContext;
+import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import java.util.Properties;
public class MongoJavaSource<T> {
private final StreamingContext context;
- private final Properties property;
+ private Properties property;
+ private String alias;
+ private final TypeInformation<T> typeInformation;
- public MongoJavaSource(StreamingContext context, Properties property) {
+ public MongoJavaSource(StreamingContext context, Class<T> typeInfo) {
this.context = context;
+ this.typeInformation = TypeInformation.of(typeInfo);
+ }
+
+ public MongoJavaSource<T> property(Properties property) {
this.property = property;
+ return this;
}
- public DataStreamSource<T> getDataStream(
- String collectionName,
- MongoQueryFunction<T> queryFunction,
- MongoResultFunction<T> resultFunction) {
- return getDataStream(collectionName, queryFunction, resultFunction,
null);
- }
+ public MongoJavaSource<T> alias(String alias) {
+ this.alias = alias;
+ return this;
+ }
+
+ public DataStreamSource<T> getDataStream(
+ String collectionName,
+ MongoQueryFunction<T> queryFunction,
+ MongoResultFunction<T> resultFunction) {
+ return getDataStream(collectionName, queryFunction, resultFunction, null);
+ }
public DataStreamSource<T> getDataStream(
String collectionName,
@@ -50,17 +62,21 @@ public class MongoJavaSource<T> {
MongoResultFunction<T> resultFunction,
RunningFunction runningFunc) {
- AssertUtils.notNull(collectionName, "collectionName must not be null");
- AssertUtils.notNull(queryFunction, "queryFunction must not be null");
- AssertUtils.notNull(resultFunction, "resultFunction must not be null");
- MongoSourceFunction<T> sourceFunction = new MongoSourceFunction<>(
- collectionName,
- property,
- queryFunction,
- resultFunction,
- runningFunc,
- null
- );
+ if (collectionName == null) {
+ throw new NullPointerException("MongoJavaSource error: collectionName
must not be null");
+ }
+
+ if (queryFunction == null) {
+ throw new NullPointerException("MongoJavaSource error:
mongoQueryFunction must not be null");
+ }
+
+ if (resultFunction == null) {
+ throw new NullPointerException("MongoJavaSource error:
mongoResultFunction must not be null");
+ }
+
+ MongoSourceFunction<T> sourceFunction =
+ new MongoSourceFunction<>(
+ collectionName, property, queryFunction, resultFunction,
runningFunc, typeInformation);
return context.getJavaEnv().addSource(sourceFunction);
}
}