This is an automated email from the ASF dual-hosted git repository.

liugddx pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new cef03f6673 [Bugfix][Clickhouse] Fix clickhouse sink flush bug (#5448)
cef03f6673 is described below

commit cef03f66737b4a9eecde3c450c4ec7b4dedf835b
Author: happyboy1024 <[email protected]>
AuthorDate: Tue Sep 12 15:20:26 2023 +0800

    [Bugfix][Clickhouse] Fix clickhouse sink flush bug (#5448)
    
    * [Bug][connector-cdc-mysql] mysql connections and memory of jvm increased 
abnormally (#5008)
    
    * [bugfix][connector-cdc-mysql] reset the listener of binaryLogClient 
before fetch task start (#5008)
    
    * [Bugfix][Clickhouse] fix when the checkpoint triggers flush, the 
connection is closed, causing subsequent data writing to fail
    
    ---------
    
    Co-authored-by: dengjunjie <[email protected]>
---
 .../seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java   | 9 ++++++++-
 1 file changed, 8 insertions(+), 1 deletion(-)

diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index de29c6cf8b..6220e4b807 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -90,7 +90,14 @@ public class ClickhouseSinkWriter
 
     @Override
     public Optional<CKCommitInfo> prepareCommit() throws IOException {
-        flush();
+        for (ClickhouseBatchStatement batchStatement : statementMap.values()) {
+            JdbcBatchStatementExecutor statement = 
batchStatement.getJdbcBatchStatementExecutor();
+            IntHolder intHolder = batchStatement.getIntHolder();
+            if (intHolder.getValue() > 0) {
+                flush(statement);
+                intHolder.setValue(0);
+            }
+        }
         return Optional.empty();
     }
 

Reply via email to