This is an automated email from the ASF dual-hosted git repository.
zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 439f686d9 [Improve][Connector-v2] Unset AutoCommit default to true
(#3451)
439f686d9 is described below
commit 439f686d92426b3df95806c622cbff17ced2a41a
Author: ic4y <[email protected]>
AuthorDate: Wed Nov 16 15:45:44 2022 +0800
[Improve][Connector-v2] Unset AutoCommit default to true (#3451)
---
.../connection/SimpleJdbcConnectionProvider.java | 3 ---
.../seatunnel/jdbc/sink/JdbcSinkWriter.java | 19 ++++++++++++++++++-
2 files changed, 18 insertions(+), 4 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
index d4bd4fb34..f6e4d56f3 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
@@ -129,9 +129,6 @@ public class SimpleJdbcConnectionProvider
"No suitable driver found for " + jdbcOptions.getUrl(),
"08001");
}
- //Auto commit is used by default
- connection.setAutoCommit(true);
-
return connection;
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 86aa5f9ae..f185a2eb7 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -32,6 +32,7 @@ import
org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
import org.apache.commons.lang3.SerializationUtils;
import java.io.IOException;
+import java.sql.SQLException;
import java.util.Collections;
import java.util.List;
import java.util.Optional;
@@ -42,12 +43,14 @@ public class JdbcSinkWriter implements
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
private final SinkWriter.Context context;
private transient boolean isOpen;
+ private JdbcConnectionProvider connectionProvider;
+
public JdbcSinkWriter(
SinkWriter.Context context,
JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
JdbcSinkOptions jdbcSinkOptions) {
- JdbcConnectionProvider connectionProvider = new
SimpleJdbcConnectionProvider(jdbcSinkOptions.getJdbcConnectionOptions());
+ connectionProvider = new
SimpleJdbcConnectionProvider(jdbcSinkOptions.getJdbcConnectionOptions());
this.context = context;
this.outputFormat = new JdbcOutputFormat<>(
@@ -81,6 +84,13 @@ public class JdbcSinkWriter implements
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
throws IOException {
tryOpen();
outputFormat.flush();
+ try {
+ if (!connectionProvider.getConnection().getAutoCommit()){
+ connectionProvider.getConnection().commit();
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
return Optional.empty();
}
@@ -94,6 +104,13 @@ public class JdbcSinkWriter implements
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
throws IOException {
tryOpen();
outputFormat.flush();
+ try {
+ if (!connectionProvider.getConnection().getAutoCommit()){
+ connectionProvider.getConnection().commit();
+ }
+ } catch (SQLException e) {
+ throw new IOException(e);
+ }
outputFormat.close();
}
}