This is an automated email from the ASF dual-hosted git repository.
penghui pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/master by this push:
new 277d38ee8fb [feature][pulsar-io] Support transactions for JDBC
connector (#16468)
277d38ee8fb is described below
commit 277d38ee8fbef826e822969225015bcc998fae07
Author: Cong Zhao <[email protected]>
AuthorDate: Fri Jul 22 15:21:17 2022 +0800
[feature][pulsar-io] Support transactions for JDBC connector (#16468)
---
.../java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java | 15 ++++++++++++---
.../java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 6 ++++++
site2/docs/io-jdbc-sink.md | 3 +++
3 files changed, 21 insertions(+), 3 deletions(-)
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 8c8febb6d31..dbf27407ca1 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -92,7 +92,7 @@ public abstract class JdbcAbstractSink<T> implements Sink<T> {
Class.forName(JdbcUtils.getDriverClassName(jdbcSinkConfig.getJdbcUrl()));
connection = DriverManager.getConnection(jdbcSinkConfig.getJdbcUrl(),
properties);
- connection.setAutoCommit(false);
+ connection.setAutoCommit(!jdbcSinkConfig.isUseTransactions());
log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl,
connection.getAutoCommit());
tableName = jdbcSinkConfig.getTableName();
@@ -137,7 +137,7 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
@Override
public void close() throws Exception {
- if (connection != null && !connection.getAutoCommit()) {
+ if (connection != null && jdbcSinkConfig.isUseTransactions()) {
connection.commit();
}
if (insertStatement != null) {
@@ -262,11 +262,20 @@ public abstract class JdbcAbstractSink<T> implements
Sink<T> {
throw new IllegalArgumentException(msg);
}
}
- connection.commit();
+ if (jdbcSinkConfig.isUseTransactions()) {
+ connection.commit();
+ }
swapList.forEach(Record::ack);
} catch (Exception e) {
log.error("Got exception ", e.getMessage(), e);
swapList.forEach(Record::fail);
+ try {
+ if (jdbcSinkConfig.isUseTransactions()) {
+ connection.rollback();
+ }
+ } catch (Exception ex) {
+ throw new RuntimeException(ex);
+ }
}
if (swapList.size() != count) {
diff --git
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 89e48980858..609fbacc904 100644
---
a/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++
b/pulsar-io/jdbc/core/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -85,6 +85,12 @@ public class JdbcSinkConfig implements Serializable {
help = "The batch size of updates made to the database"
)
private int batchSize = 200;
+ @FieldDoc(
+ required = false,
+ defaultValue = "true",
+ help = "Enable transactions of the database."
+ )
+ private boolean useTransactions = true;
@FieldDoc(
required = false,
diff --git a/site2/docs/io-jdbc-sink.md b/site2/docs/io-jdbc-sink.md
index abc028132c3..ed3700b9027 100644
--- a/site2/docs/io-jdbc-sink.md
+++ b/site2/docs/io-jdbc-sink.md
@@ -28,6 +28,7 @@ The configuration of all JDBC sink connectors has the
following properties.
| `batchSize` | int | false | 200 | The batch size of
updates made to the database.
|
| `insertMode` | enum( INSERT,UPSERT,UPDATE) | false | INSERT | If it is
configured as UPSERT, the sink uses upsert semantics rather than plain
INSERT/UPDATE statements. Upsert semantics refer to atomically adding a new row
or updating the existing row if there is a primary key constraint violation,
which provides idempotence. |
| `nullValueAction` | enum(FAIL, DELETE) | false | FAIL | How to handle
records with NULL values. Possible options are `DELETE` or `FAIL`. |
+| `useTransactions` | boolean | false | true | Enable
transactions of the database.
### Example for ClickHouse
@@ -41,6 +42,7 @@ The configuration of all JDBC sink connectors has the
following properties.
"password": "password",
"jdbcUrl":
"jdbc:clickhouse://localhost:8123/pulsar_clickhouse_jdbc_sink",
"tableName": "pulsar_clickhouse_jdbc_sink"
+ "useTransactions": "false"
}
}
@@ -60,6 +62,7 @@ The configuration of all JDBC sink connectors has the
following properties.
password: "password"
jdbcUrl: "jdbc:clickhouse://localhost:8123/pulsar_clickhouse_jdbc_sink"
tableName: "pulsar_clickhouse_jdbc_sink"
+ useTransactions: "false"
```