This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/master by this push:
     new 277d38ee8fb [feature][pulsar-io] Support transactions for JDBC 
connector (#16468)
277d38ee8fb is described below

commit 277d38ee8fbef826e822969225015bcc998fae07
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Jul 22 15:21:17 2022 +0800

    [feature][pulsar-io] Support transactions for JDBC connector (#16468)
---
 .../java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java  | 15 ++++++++++++---
 .../java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java    |  6 ++++++
 site2/docs/io-jdbc-sink.md                                |  3 +++
 3 files changed, 21 insertions(+), 3 deletions(-)

diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 8c8febb6d31..dbf27407ca1 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -92,7 +92,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
 
         
Class.forName(JdbcUtils.getDriverClassName(jdbcSinkConfig.getJdbcUrl()));
         connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(), 
properties);
-        connection.setAutoCommit(false);
+        connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
         log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, 
connection.getAutoCommit());
 
         tableName = jdbcSinkConfig.getTableName();
@@ -137,7 +137,7 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
 
     @Override
     public void close() throws Exception {
-        if (connection != null && !connection.getAutoCommit()) {
+        if (connection != null && jdbcSinkConfig.isUseTransactions()) {
             connection.commit();
         }
         if (insertStatement != null) {
@@ -262,11 +262,20 @@ public abstract class JdbcAbstractSink<T> implements 
Sink<T> {
                             throw new IllegalArgumentException(msg);
                     }
                 }
-                connection.commit();
+                if (jdbcSinkConfig.isUseTransactions()) {
+                    connection.commit();
+                }
                 swapList.forEach(Record::ack);
             } catch (Exception e) {
                 log.error("Got exception ", e.getMessage(), e);
                 swapList.forEach(Record::fail);
+                try {
+                    if (jdbcSinkConfig.isUseTransactions()) {
+                        connection.rollback();
+                    }
+                } catch (Exception ex) {
+                    throw new RuntimeException(ex);
+                }
             }
 
             if (swapList.size() != count) {
diff --git 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 89e48980858..609fbacc904 100644
--- 
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ 
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -85,6 +85,12 @@ public class JdbcSinkConfig implements Serializable {
         help = "The batch size of updates made to the database"
     )
     private int batchSize = 200;
+    @FieldDoc(
+            required = false,
+            defaultValue = "true",
+            help = "Enable transactions of the database."
+    )
+    private boolean useTransactions = true;
 
     @FieldDoc(
             required = false,
diff --git a/site2/docs/io-jdbc-sink.md b/site2/docs/io-jdbc-sink.md
index abc028132c3..ed3700b9027 100644
--- a/site2/docs/io-jdbc-sink.md
+++ b/site2/docs/io-jdbc-sink.md
@@ -28,6 +28,7 @@ The configuration of all JDBC sink connectors has the 
following properties.
 | `batchSize` | int    | false    | 200                | The batch size of 
updates made to the database.                                                   
                       |
 | `insertMode` | enum( INSERT,UPSERT,UPDATE) | false    | INSERT | If it is 
configured as UPSERT, the sink uses upsert semantics rather than plain 
INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row 
or updating the existing row if there is a primary key constraint violation, 
which provides idempotence. |
 | `nullValueAction` | enum(FAIL, DELETE) | false    | FAIL | How to handle 
records with NULL values. Possible options are `DELETE` or `FAIL`. |
+| `useTransactions` | boolean | false    | true               | Enable 
transactions of the database.
 
 ### Example for ClickHouse
 
@@ -41,6 +42,7 @@ The configuration of all JDBC sink connectors has the 
following properties.
         "password": "password",
         "jdbcUrl": 
"jdbc:clickhouse://localhost:8123/pulsar_clickhouse_jdbc_sink",
         "tableName": "pulsar_clickhouse_jdbc_sink"
+        "useTransactions": "false"
      }
   }
   
@@ -60,6 +62,7 @@ The configuration of all JDBC sink connectors has the 
following properties.
       password: "password"
       jdbcUrl: "jdbc:clickhouse://localhost:8123/pulsar_clickhouse_jdbc_sink"
       tableName: "pulsar_clickhouse_jdbc_sink"
+      useTransactions: "false"
   
   ```
 

Reply via email to