This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new f8fefa1e57 [Feature][Connector-V2][Clickhouse] clickhouse writes with
checkpoints (#4999)
f8fefa1e57 is described below
commit f8fefa1e574f08e2e7092b1ebcf64b0e1b9d4582
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Wed Jul 5 10:15:28 2023 +0800
[Feature][Connector-V2][Clickhouse] clickhouse writes with checkpoints
(#4999)
---
docs/en/connector-v2/sink/Clickhouse.md | 2 +-
.../sink/client/ClickhouseSinkWriter.java | 39 ++++++++++++----------
2 files changed, 23 insertions(+), 18 deletions(-)
diff --git a/docs/en/connector-v2/sink/Clickhouse.md
b/docs/en/connector-v2/sink/Clickhouse.md
index 05d03330c7..7c4bab991b 100644
--- a/docs/en/connector-v2/sink/Clickhouse.md
+++ b/docs/en/connector-v2/sink/Clickhouse.md
@@ -58,7 +58,7 @@ In addition to the above mandatory parameters that must be
specified by `clickho
### bulk_size [number]
-The number of rows written through
[Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the
`default is 20000` .
+The number of rows written through
[Clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc) each time, the
`default is 20000`, if checkpoints are enabled, writing will also occur at the
times when the checkpoints are satisfied .
### split_mode [boolean]
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index 443eec921a..235279b4d5 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -90,6 +90,7 @@ public class ClickhouseSinkWriter
@Override
public Optional<CKCommitInfo> prepareCommit() throws IOException {
+ flush();
return Optional.empty();
}
@@ -99,23 +100,7 @@ public class ClickhouseSinkWriter
@Override
public void close() throws IOException {
this.proxy.close();
- for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
- try (ClickHouseConnectionImpl needClosedConnection =
- batchStatement.getClickHouseConnection();
- JdbcBatchStatementExecutor needClosedStatement =
- batchStatement.getJdbcBatchStatementExecutor()) {
- IntHolder intHolder = batchStatement.getIntHolder();
- if (intHolder.getValue() > 0) {
- flush(needClosedStatement);
- intHolder.setValue(0);
- }
- } catch (SQLException e) {
- throw new ClickhouseConnectorException(
- CommonErrorCode.SQL_OPERATION_FAILED,
- "Failed to close prepared statement.",
- e);
- }
- }
+ flush();
}
private void addIntoBatch(SeaTunnelRow row, JdbcBatchStatementExecutor
clickHouseStatement) {
@@ -138,6 +123,26 @@ public class ClickhouseSinkWriter
}
}
+ private void flush() {
+ for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
+ try (ClickHouseConnectionImpl needClosedConnection =
+ batchStatement.getClickHouseConnection();
+ JdbcBatchStatementExecutor needClosedStatement =
+ batchStatement.getJdbcBatchStatementExecutor()) {
+ IntHolder intHolder = batchStatement.getIntHolder();
+ if (intHolder.getValue() > 0) {
+ flush(needClosedStatement);
+ intHolder.setValue(0);
+ }
+ } catch (SQLException e) {
+ throw new ClickhouseConnectorException(
+ CommonErrorCode.SQL_OPERATION_FAILED,
+ "Failed to close prepared statement.",
+ e);
+ }
+ }
+ }
+
private Map<Shard, ClickhouseBatchStatement> initStatementMap() {
Map<Shard, ClickhouseBatchStatement> result = new
HashMap<>(Common.COLLECTION_SIZE);
shardRouter