This is an automated email from the ASF dual-hosted git repository.
ic4y pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e77fdbbef [Improve][Connectors-V2][jdbc] Adapts to multiple versions
of Flink #3589
e77fdbbef is described below
commit e77fdbbef70c566357436291a51dc3d1df4dc68e
Author: liugddx <[email protected]>
AuthorDate: Tue Nov 29 20:47:37 2022 +0800
[Improve][Connectors-V2][jdbc] Adapts to multiple versions of Flink #3589
---
.../seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java | 11 +++++++++--
1 file changed, 9 insertions(+), 2 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
index 56afdc6a6..11ead0291 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSink.java
@@ -131,7 +131,11 @@ public class JdbcSink
@Override
public Optional<Serializer<JdbcAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
+ if (jdbcSinkOptions.isExactlyOnce()) {
+ return Optional.of(new DefaultSerializer<>());
+ }
+ return Optional.empty();
+
}
@Override
@@ -141,6 +145,9 @@ public class JdbcSink
@Override
public Optional<Serializer<XidInfo>> getCommitInfoSerializer() {
- return Optional.of(new DefaultSerializer<>());
+ if (jdbcSinkOptions.isExactlyOnce()) {
+ return Optional.of(new DefaultSerializer<>());
+ }
+ return Optional.empty();
}
}