This is an automated email from the ASF dual-hosted git repository.

zhouyao2023 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 48968fb4e9 [Fix][Connector-V2][Databend] Ensure CDC final merge on 
committer close (#10349)
48968fb4e9 is described below

commit 48968fb4e93246c2ddc5b53617373faa0cf676b8
Author: yzeng1618 <[email protected]>
AuthorDate: Fri Jan 23 16:11:16 2026 +0800

    [Fix][Connector-V2][Databend] Ensure CDC final merge on committer close 
(#10349)
    
    Co-authored-by: zengyi <[email protected]>
---
 .../sink/DatabendSinkAggregatedCommitter.java      | 53 ++++++++++++++++++----
 1 file changed, 45 insertions(+), 8 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
index 0f55132fa9..8a22e78653 100644
--- 
a/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-databend/src/main/java/org/apache/seatunnel/connectors/seatunnel/databend/sink/DatabendSinkAggregatedCommitter.java
@@ -59,6 +59,7 @@ public class DatabendSinkAggregatedCommitter
 
     private Connection connection;
     private boolean isCdcMode;
+    private volatile boolean aborted;
     // Store catalog table to access schema information
     private CatalogTable catalogTable;
 
@@ -130,14 +131,21 @@ public class DatabendSinkAggregatedCommitter
             List<DatabendSinkAggregatedCommitInfo> aggregatedCommitInfos) 
throws IOException {
         // Perform final merge operation in CDC mode only when necessary
         if (isCdcMode) {
-            performMerge(aggregatedCommitInfos);
+            if (log.isDebugEnabled()) {
+                log.debug(
+                        "[Instance {}] Committing aggregatedCommitInfos size: 
{}",
+                        instanceId,
+                        aggregatedCommitInfos == null ? 0 : 
aggregatedCommitInfos.size());
+            }
+            performMerge();
         }
 
         // Return empty list as there's no need to retry
         return new ArrayList<>();
     }
 
-    private void performMerge(List<DatabendSinkAggregatedCommitInfo> 
aggregatedCommitInfos) {
+    /** Perform merge from CDC stream to target table. */
+    private void performMerge() {
         // Merge all the data from raw table to target table
         String mergeSql = generateMergeSql();
         log.info("[Instance {}] Executing MERGE INTO statement: {}", 
instanceId, mergeSql);
@@ -214,6 +222,7 @@ public class DatabendSinkAggregatedCommitter
     @Override
     public void abort(List<DatabendSinkAggregatedCommitInfo> 
aggregatedCommitInfos)
             throws IOException {
+        aborted = true;
         // In case of abort, we might want to clean up the raw table and stream
         log.info("[Instance {}] Aborting Databend sink operations", 
instanceId);
         try {
@@ -235,16 +244,44 @@ public class DatabendSinkAggregatedCommitter
 
     @Override
     public void close() throws IOException {
+        Exception closeException = null;
         try {
-            if (connection != null && !connection.isClosed()) {
-                connection.close();
+            if (!aborted && isCdcMode && connection != null && 
!connection.isClosed()) {
+                try {
+                    log.info("[Instance {}] Performing final merge before 
closing", instanceId);
+                    performMerge();
+                } catch (Exception mergeEx) {
+                    log.error(
+                            "[Instance {}] Final merge failed, will still 
close connection: {}",
+                            instanceId,
+                            mergeEx.getMessage(),
+                            mergeEx);
+                }
             }
-        } catch (SQLException e) {
+        } catch (Exception e) {
+            closeException = e;
+        } finally {
+            if (connection != null) {
+                try {
+                    connection.close();
+                } catch (SQLException e) {
+                    if (closeException != null) {
+                        closeException.addSuppressed(e);
+                    } else {
+                        closeException = e;
+                    }
+                }
+            }
+        }
+
+        if (closeException != null) {
             throw new DatabendConnectorException(
                     DatabendConnectorErrorCode.CONNECT_FAILED,
-                    "[Instance {}] Failed to close connection in 
DatabendSinkAggregatedCommitter: "
-                            + e.getMessage(),
-                    e);
+                    "[Instance "
+                            + instanceId
+                            + "] Failed to close connection in 
DatabendSinkAggregatedCommitter: "
+                            + closeException.getMessage(),
+                    closeException);
         }
     }
 }

Reply via email to