This is an automated email from the ASF dual-hosted git repository.

benjobs pushed a commit to branch 2.1.6
in repository 
https://gitbox.apache.org/repos/asf/incubator-streampark-quickstart.git

commit 5afb47fb03ff81f5a51bc627901e4de9d8876c04
Author: benjobs <[email protected]>
AuthorDate: Mon Jan 27 20:02:26 2025 +0800

    [Feat] update streampark version to 2.1.6
---
 pom.xml                                                     |  2 +-
 .../streampark/flink/quickstart/connector/MySQLJavaApp.java | 13 +++++++------
 .../flink/quickstart/connector/MySQLSourceApp.scala         |  2 +-
 3 files changed, 9 insertions(+), 8 deletions(-)

diff --git a/pom.xml b/pom.xml
index 6d8cb2e..533ca38 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
 
     <properties>
         <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
-        <streampark.version>2.1.5</streampark.version>
+        <streampark.version>2.1.6</streampark.version>
         <scala.binary.version>2.12</scala.binary.version>
         <scala.version>2.12.8</scala.version>
         <java.version>1.8</java.version>
diff --git 
a/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
 
b/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
index 6b0c1e7..e0e12a4 100644
--- 
a/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
+++ 
b/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
@@ -16,9 +16,10 @@
  */
 package org.apache.streampark.flink.quickstart.connector;
 
+import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
-import org.apache.streampark.flink.connector.function.SQLQueryFunction;
-import org.apache.streampark.flink.connector.function.SQLResultFunction;
+import org.apache.streampark.flink.connector.function.QueryFunction;
+import org.apache.streampark.flink.connector.function.ResultFunction;
 import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
 import org.apache.streampark.flink.core.StreamEnvConfig;
 import org.apache.streampark.flink.core.scala.StreamingContext;
@@ -38,9 +39,9 @@ public class MySQLJavaApp {
         StreamingContext context = new StreamingContext(envConfig);
 
         //读取MySQL数据源
-        SingleOutputStreamOperator<Order> source =  new 
JdbcJavaSource<Order>(context)
+        DataStream<Order> source =  new JdbcJavaSource<>(context, Order.class)
                 .getDataStream(
-                        (SQLQueryFunction<Order>) lastOne -> {
+                        (QueryFunction<Order>) lastOne -> {
                             //5秒抽取一次
                             Thread.sleep(1000);
 
@@ -53,7 +54,7 @@ public class MySQLJavaApp {
                                     lastOffset
                             );
                         },
-                        (SQLResultFunction<Order>) map -> {
+                        (ResultFunction<Order>) map -> {
                             List<Order> result = new ArrayList<>();
                             map.forEach(item -> {
                                 Order order = new Order();
@@ -63,7 +64,7 @@ public class MySQLJavaApp {
                                 result.add(order);
                             });
                             return result;
-                        }, null)
+                        })
                 .returns(TypeInformation.of(Order.class));
 
         source.print("jdbc source: >>>>>");
diff --git 
a/quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
 
b/quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
index 5ab968e..6f3d432 100644
--- 
a/quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
+++ 
b/quickstart-flink/quickstart-connector/src/main/scala/org/apache/streampark/flink/quickstart/connector/MySQLSourceApp.scala
@@ -30,7 +30,7 @@ object MySQLSourceApp extends FlinkStreaming {
       val laseOffset = if (lastOne == null) "2020-10-10 23:00:00" else 
lastOne.timestamp
       s"select * from t_order where timestamp > '$laseOffset' order by 
timestamp asc "
     },
-      _.map(x => new Order(x("market_id").toString, x("timestamp").toString)), 
null
+      _.map(x => new Order(x("market_id").toString, x("timestamp").toString))
     ).print()
 
   }

Reply via email to