This is an automated email from the ASF dual-hosted git repository.

zongwen pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 439f686d9 [Improve][Connector-v2] Unset AutoCommit default to true 
(#3451)
439f686d9 is described below

commit 439f686d92426b3df95806c622cbff17ced2a41a
Author: ic4y <[email protected]>
AuthorDate: Wed Nov 16 15:45:44 2022 +0800

    [Improve][Connector-v2] Unset AutoCommit default to true (#3451)
---
 .../connection/SimpleJdbcConnectionProvider.java      |  3 ---
 .../seatunnel/jdbc/sink/JdbcSinkWriter.java           | 19 ++++++++++++++++++-
 2 files changed, 18 insertions(+), 4 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
index d4bd4fb34..f6e4d56f3 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/internal/connection/SimpleJdbcConnectionProvider.java
@@ -129,9 +129,6 @@ public class SimpleJdbcConnectionProvider
                 "No suitable driver found for " + jdbcOptions.getUrl(), 
"08001");
         }
 
-        //Auto commit is used by default
-        connection.setAutoCommit(true);
-
         return connection;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
index 86aa5f9ae..f185a2eb7 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkWriter.java
@@ -32,6 +32,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.jdbc.state.XidInfo;
 import org.apache.commons.lang3.SerializationUtils;
 
 import java.io.IOException;
+import java.sql.SQLException;
 import java.util.Collections;
 import java.util.List;
 import java.util.Optional;
@@ -42,12 +43,14 @@ public class JdbcSinkWriter implements 
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
     private final SinkWriter.Context context;
     private transient boolean isOpen;
 
+    private JdbcConnectionProvider connectionProvider;
+
     public JdbcSinkWriter(
         SinkWriter.Context context,
         JdbcStatementBuilder<SeaTunnelRow> statementBuilder,
         JdbcSinkOptions jdbcSinkOptions) {
 
-        JdbcConnectionProvider connectionProvider = new 
SimpleJdbcConnectionProvider(jdbcSinkOptions.getJdbcConnectionOptions());
+        connectionProvider = new 
SimpleJdbcConnectionProvider(jdbcSinkOptions.getJdbcConnectionOptions());
 
         this.context = context;
         this.outputFormat = new JdbcOutputFormat<>(
@@ -81,6 +84,13 @@ public class JdbcSinkWriter implements 
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
         throws IOException {
         tryOpen();
         outputFormat.flush();
+        try {
+            if (!connectionProvider.getConnection().getAutoCommit()){
+                connectionProvider.getConnection().commit();
+            }
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
         return Optional.empty();
     }
 
@@ -94,6 +104,13 @@ public class JdbcSinkWriter implements 
SinkWriter<SeaTunnelRow, XidInfo, JdbcSin
         throws IOException {
         tryOpen();
         outputFormat.flush();
+        try {
+            if (!connectionProvider.getConnection().getAutoCommit()){
+                connectionProvider.getConnection().commit();
+            }
+        } catch (SQLException e) {
+            throw new IOException(e);
+        }
         outputFormat.close();
     }
 }

Reply via email to