This is an automated email from the ASF dual-hosted git repository. benjobs pushed a commit to branch 2.1.6 in repository https://gitbox.apache.org/repos/asf/incubator-streampark-quickstart.git
commit 5afb47fb03ff81f5a51bc627901e4de9d8876c04 Author: benjobs <[email protected]> AuthorDate: Mon Jan 27 20:02:26 2025 +0800 [Feat] update streampark version to 2.1.6 --- pom.xml | 2 +- .../streampark/flink/quickstart/connector/MySQLJavaApp.java | 13 +++++++------ .../flink/quickstart/connector/MySQLSourceApp.scala | 2 +- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/pom.xml b/pom.xml index 6d8cb2e..533ca38 100644 --- a/pom.xml +++ b/pom.xml @@ -29,7 +29,7 @@ <properties> <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding> - <streampark.version>2.1.5</streampark.version> + <streampark.version>2.1.6</streampark.version> <scala.binary.version>2.12</scala.binary.version> <scala.version>2.12.8</scala.version> <java.version>1.8</java.version> diff --git a/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java b/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java index 6b0c1e7..e0e12a4 100644 --- a/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java +++ b/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java @@ -16,9 +16,10 @@ */ package org.apache.streampark.flink.quickstart.connector; +import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator; -import org.apache.streampark.flink.connector.function.SQLQueryFunction; -import org.apache.streampark.flink.connector.function.SQLResultFunction; +import org.apache.streampark.flink.connector.function.QueryFunction; +import org.apache.streampark.flink.connector.function.ResultFunction; import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource; import org.apache.streampark.flink.core.StreamEnvConfig; import org.apache.streampark.flink.core.scala.StreamingContext; @@ -38,9 +39,9 @@ public class MySQLJavaApp { StreamingContext context = new StreamingContext(envConfig); //读取MySQL数据源 - SingleOutputStreamOperator<Order> source = new JdbcJavaSource<Order>(context) + DataStream<Order> source = new JdbcJavaSource<>(context, Order.class) .getDataStream( - (SQLQueryFunction<Order>) lastOne -> { + (QueryFunction<Order>) lastOne -> { //5秒抽取一次 Thread.sleep(1000); @@ -53,7 +54,7 @@ public class MySQLJavaApp { lastOffset ); }, - (SQLResultFunction<Order>) map -> { + (ResultFunction<Order>) map -> { List<Order> result = new ArrayList<>(); map.forEach(item -> { Order order = new Order(); @@ -63,7 +64,7 @@ public class MySQLJavaApp { result.add(order); }); return result; - }, null) + }) .returns(TypeInformation.of(Order.class)); source.print("jdbc source: >>>>>"); diff --git a/quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala b/quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala index 5ab968e..6f3d432 100644 --- a/quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala +++ b/quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala @@ -30,7 +30,7 @@ object MySQLSourceApp extends FlinkStreaming { val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else lastOne.timestamp s"select * from t_order where timestamp > '$laseOffset' order by timestamp asc " }, - _.map(x => new Order(x("market_id").toString, x("timestamp").toString)), null + _.map(x => new Order(x("market_id").toString, x("timestamp").toString)) ).print() }
