This is an automated email from the ASF dual-hosted git repository.
liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new cef03f6673 [Bugfix][Clickhouse] Fix clickhouse sink flush bug (#5448)
cef03f6673 is described below
commit cef03f66737b4a9eecde3c450c4ec7b4dedf835b
Author: happyboy1024 <[email protected]>
AuthorDate: Tue Sep 12 15:20:26 2023 +0800
[Bugfix][Clickhouse] Fix clickhouse sink flush bug (#5448)
* [Bug][connector-cdc-mysql] mysql connections and memory of jvm increased
abnormally (#5008)
* [bugfix][connector-cdc-mysql] reset the listener of binaryLogClient
before fetch task start (#5008)
* [Bugfix][Clickhouse] fix when the checkpoint triggers flush, the
connection is closed, causing subsequent data writing to fail
---------
Co-authored-by: dengjunjie <[email protected]>
---
.../seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java | 9 ++++++++-
1 file changed, 8 insertions(+), 1 deletion(-)
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index de29c6cf8b..6220e4b807 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -90,7 +90,14 @@ public class ClickhouseSinkWriter
@Override
public Optional<CKCommitInfo> prepareCommit() throws IOException {
- flush();
+ for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
+ JdbcBatchStatementExecutor statement =
batchStatement.getJdbcBatchStatementExecutor();
+ IntHolder intHolder = batchStatement.getIntHolder();
+ if (intHolder.getValue() > 0) {
+ flush(statement);
+ intHolder.setValue(0);
+ }
+ }
return Optional.empty();
}