This is an automated email from the ASF dual-hosted git repository.
benjobs pushed a commit to branch TLP
in repository
https://gitbox.apache.org/repos/asf/incubator-streampark-quickstart.git
The following commit(s) were added to refs/heads/TLP by this push:
new 8c1dae0 [TLP] streampark version update to 2.1.5
8c1dae0 is described below
commit 8c1dae0d3e44573e92b7c309d98ef5d3d220f1da
Author: benjobs <[email protected]>
AuthorDate: Thu Jan 23 11:33:20 2025 +0800
[TLP] streampark version update to 2.1.5
---
pom.xml | 2 +-
.../quickstart-connector/assembly/conf/application.yml | 4 ++--
quickstart-flink/quickstart-connector/pom.xml | 7 +++++++
.../streampark/flink/quickstart/connector/MySQLJavaApp.java | 9 ++++++---
4 files changed, 16 insertions(+), 6 deletions(-)
diff --git a/pom.xml b/pom.xml
index d375305..6d8cb2e 100644
--- a/pom.xml
+++ b/pom.xml
@@ -29,7 +29,7 @@
<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
- <streampark.version>2.0.0</streampark.version>
+ <streampark.version>2.1.5</streampark.version>
<scala.binary.version>2.12</scala.binary.version>
<scala.version>2.12.8</scala.version>
<java.version>1.8</java.version>
diff --git
a/quickstart-flink/quickstart-connector/assembly/conf/application.yml
b/quickstart-flink/quickstart-connector/assembly/conf/application.yml
index dee9980..fa21ef2 100644
--- a/quickstart-flink/quickstart-connector/assembly/conf/application.yml
+++ b/quickstart-flink/quickstart-connector/assembly/conf/application.yml
@@ -57,7 +57,7 @@ flink:
externalized-checkpoint-retention: RETAIN_ON_CANCELLATION
# state backend
state:
- backend: hashmap # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
+ backend: filesystem # Special note: flink1.12 optional configuration
('jobmanager', 'filesystem', 'rocksdb'), flink1.12+ optional configuration
('hashmap', 'rocksdb'),
backend.incremental: true
checkpoint-storage: filesystem
savepoints.dir: file:///tmp/chkdir
@@ -102,7 +102,7 @@ app: # user's parameter
# jdbc config...
jdbc:
semantic: EXACTLY_ONCE # EXACTLY_ONCE|AT_LEAST_ONCE|NONE
- driverClassName: com.mysql.jdbc.Driver
+ driverClassName: com.mysql.cj.jdbc.Driver
jdbcUrl:
jdbc:mysql://localhost:3306/test?useSSL=false&allowPublicKeyRetrieval=true
username: root
password: 123456
diff --git a/quickstart-flink/quickstart-connector/pom.xml
b/quickstart-flink/quickstart-connector/pom.xml
index ea94727..0b396ea 100644
--- a/quickstart-flink/quickstart-connector/pom.xml
+++ b/quickstart-flink/quickstart-connector/pom.xml
@@ -102,6 +102,12 @@
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>com.mysql</groupId>
+ <artifactId>mysql-connector-j</artifactId>
+ <version>8.0.33</version>
+ </dependency>
+
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-connector-mongo_${scala.binary.version}</artifactId>
@@ -220,6 +226,7 @@
<artifactId>flink-format-changelog-json</artifactId>
<version>1.0.0</version>
</dependency>
+
<dependency>
<groupId>org.apache.streampark</groupId>
<artifactId>streampark-flink-connector-kafka_${scala.binary.version}</artifactId>
diff --git
a/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
b/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
index 0d2bd44..6b0c1e7 100644
---
a/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
+++
b/quickstart-flink/quickstart-connector/src/main/java/org/apache/streampark/flink/quickstart/connector/MySQLJavaApp.java
@@ -16,6 +16,7 @@
*/
package org.apache.streampark.flink.quickstart.connector;
+import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.streampark.flink.connector.function.SQLQueryFunction;
import org.apache.streampark.flink.connector.function.SQLResultFunction;
import org.apache.streampark.flink.connector.jdbc.source.JdbcJavaSource;
@@ -37,11 +38,11 @@ public class MySQLJavaApp {
StreamingContext context = new StreamingContext(envConfig);
//读取MySQL数据源
- new JdbcJavaSource<Order>(context)
+ SingleOutputStreamOperator<Order> source = new
JdbcJavaSource<Order>(context)
.getDataStream(
(SQLQueryFunction<Order>) lastOne -> {
//5秒抽取一次
- Thread.sleep(5000);
+ Thread.sleep(1000);
Serializable lastOffset = lastOne == null ?
"2020-10-10 23:00:00" : lastOne.getTimestamp();
@@ -65,7 +66,9 @@ public class MySQLJavaApp {
}, null)
.returns(TypeInformation.of(Order.class));
- context.start();
+ source.print("jdbc source: >>>>>");
+
+ context.execute();
}
}