This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new df0d11d632 [Fix][API] Fixed not invoke the `SinkAggregatedCommitter`'s 
init method (#9070)
df0d11d632 is described below

commit df0d11d63273d93877659e0d188fd4db7803efb3
Author: Jia Fan <[email protected]>
AuthorDate: Wed Apr 16 09:35:09 2025 +0800

    [Fix][API] Fixed not invoke the `SinkAggregatedCommitter`'s init method 
(#9070)
---
 .../api/sink/SinkAggregatedCommitter.java          |  5 +-
 .../MultiTableSinkAggregatedCommitter.java         |  9 ++-
 .../MultiTableSinkAggregatedCommitterTest.java     | 73 ++++++++++++++++++++++
 .../sink/file/ClickhouseFileSinkAggCommitter.java  | 14 +++--
 .../seatunnel/sink/SinkFlowTestUtils.java          | 26 ++++++--
 .../sink/commit/FileSinkAggregatedCommitter.java   |  6 ++
 .../sink/commit/IcebergAggregatedCommitter.java    | 12 +++-
 .../jdbc/sink/JdbcSinkAggregatedCommitter.java     | 10 ++-
 8 files changed, 135 insertions(+), 20 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index c5bdc6926d..54842cef0d 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -32,7 +32,10 @@ import java.util.List;
  */
 public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT> 
extends Serializable {
 
-    /** init sink aggregated committer */
+    /**
+     * init sink aggregated committer, this method will be called not once. 
Each retry will call
+     * this.
+     */
     default void init() {};
 
     /** Re-commit message to third party data receiver, The method need to 
achieve idempotency. */
diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
index 6ed04d871b..863237f78e 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
@@ -53,17 +53,16 @@ public class MultiTableSinkAggregatedCommitter
         for (String tableIdentifier : aggCommitters.keySet()) {
             SinkAggregatedCommitter<?, ?> aggCommitter = 
aggCommitters.get(tableIdentifier);
             if (!(aggCommitter instanceof 
SupportMultiTableSinkAggregatedCommitter)) {
-                return;
+                break;
             }
             resourceManager =
                     ((SupportMultiTableSinkAggregatedCommitter<?>) 
aggCommitter)
                             
.initMultiTableResourceManager(aggCommitters.size(), 1);
             break;
         }
-        if (resourceManager != null) {
-            for (String tableIdentifier : aggCommitters.keySet()) {
-                SinkAggregatedCommitter<?, ?> aggCommitter = 
aggCommitters.get(tableIdentifier);
-                aggCommitter.init();
+        for (SinkAggregatedCommitter<?, ?> aggCommitter : 
aggCommitters.values()) {
+            aggCommitter.init();
+            if (resourceManager != null) {
                 ((SupportMultiTableSinkAggregatedCommitter<?>) aggCommitter)
                         .setMultiTableResourceManager(resourceManager, 0);
             }
diff --git 
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitterTest.java
 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitterTest.java
new file mode 100644
index 0000000000..b01c04699f
--- /dev/null
+++ 
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink.multitablesink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MultiTableSinkAggregatedCommitterTest {
+
+    @Test
+    void testInitBeInvoked() throws IOException {
+        Map<String, SinkAggregatedCommitter<?, ?>> aggCommitters = new 
HashMap<>();
+        List<String> methodInvoked = new ArrayList<>();
+        aggCommitters.put(
+                "table1",
+                new SinkAggregatedCommitter<Object, Object>() {
+
+                    @Override
+                    public void init() {
+                        methodInvoked.add("init");
+                    }
+
+                    @Override
+                    public List<Object> commit(List<Object> 
aggregatedCommitInfo)
+                            throws IOException {
+                        return Collections.emptyList();
+                    }
+
+                    @Override
+                    public Object combine(List<Object> commitInfos) {
+                        return null;
+                    }
+
+                    @Override
+                    public void abort(List<Object> aggregatedCommitInfo) 
throws Exception {}
+
+                    @Override
+                    public void close() throws IOException {
+                        methodInvoked.add("close");
+                    }
+                });
+        MultiTableSinkAggregatedCommitter committer =
+                new MultiTableSinkAggregatedCommitter(aggCommitters);
+        committer.init();
+        committer.close();
+        Assertions.assertIterableEquals(Arrays.asList("init", "close"), 
methodInvoked);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
index 28c6d250d7..eb66b9ecb5 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -41,18 +41,24 @@ public class ClickhouseFileSinkAggCommitter
         implements SinkAggregatedCommitter<CKFileCommitInfo, 
CKFileAggCommitInfo> {
 
     private transient ClickhouseProxy proxy;
-    private final ClickhouseTable clickhouseTable;
+    private ClickhouseTable clickhouseTable;
 
     private final FileReaderOption fileReaderOption;
 
     public ClickhouseFileSinkAggCommitter(FileReaderOption readerOption) {
         fileReaderOption = readerOption;
-        proxy = new 
ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
+    }
+
+    @Override
+    public void init() {
+        proxy =
+                new ClickhouseProxy(
+                        
fileReaderOption.getShardMetadata().getDefaultShard().getNode());
         clickhouseTable =
                 proxy.getClickhouseTable(
                         proxy.getClickhouseConnection(),
-                        readerOption.getShardMetadata().getDatabase(),
-                        readerOption.getShardMetadata().getTable());
+                        fileReaderOption.getShardMetadata().getDatabase(),
+                        fileReaderOption.getShardMetadata().getTable());
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
 
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
index cd9b9cca67..40c440f924 100644
--- 
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
+++ 
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
@@ -20,10 +20,12 @@ package org.apache.seatunnel.connectors.seatunnel.sink;
 import org.apache.seatunnel.api.common.JobContext;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.MultiTableResourceManager;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
@@ -108,16 +110,30 @@ public class SinkFlowTestUtils {
         }
 
         Optional<? extends SinkCommitter<?>> sinkCommitter = 
sink.createCommitter();
-        Optional<? extends SinkAggregatedCommitter<?, ?>> aggregatedCommitter =
+        Optional<? extends SinkAggregatedCommitter<?, ?>> 
aggregatedCommitterOptional =
                 sink.createAggregatedCommitter();
 
         if (!commitInfos.isEmpty()) {
-            if (aggregatedCommitter.isPresent()) {
+            if (aggregatedCommitterOptional.isPresent()) {
+                SinkAggregatedCommitter<?, ?> aggregatedCommitter =
+                        aggregatedCommitterOptional.get();
+                MultiTableResourceManager resourceManager = null;
+                if (aggregatedCommitter instanceof 
SupportMultiTableSinkAggregatedCommitter) {
+                    resourceManager =
+                            ((SupportMultiTableSinkAggregatedCommitter<?>) 
aggregatedCommitter)
+                                    .initMultiTableResourceManager(1, 1);
+                }
+                aggregatedCommitter.init();
+                if (resourceManager != null) {
+                    ((SupportMultiTableSinkAggregatedCommitter<?>) 
aggregatedCommitter)
+                            .setMultiTableResourceManager(resourceManager, 0);
+                }
+
                 Object aggregatedCommitInfoT =
-                        ((SinkAggregatedCommitter) 
aggregatedCommitter.get()).combine(commitInfos);
-                ((SinkAggregatedCommitter) aggregatedCommitter.get())
+                        ((SinkAggregatedCommitter) 
aggregatedCommitter).combine(commitInfos);
+                ((SinkAggregatedCommitter) aggregatedCommitter)
                         
.commit(Collections.singletonList(aggregatedCommitInfoT));
-                aggregatedCommitter.get().close();
+                aggregatedCommitter.close();
             } else if (sinkCommitter.isPresent()) {
                 ((SinkCommitter) sinkCommitter.get()).commit(commitInfos);
             } else {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index 26d2e4f5f1..934374cab9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -33,8 +33,14 @@ import java.util.Map;
 public class FileSinkAggregatedCommitter
         implements SinkAggregatedCommitter<FileCommitInfo, 
FileAggregatedCommitInfo> {
     protected HadoopFileSystemProxy hadoopFileSystemProxy;
+    private final HadoopConf hadoopConf;
 
     public FileSinkAggregatedCommitter(HadoopConf hadoopConf) {
+        this.hadoopConf = hadoopConf;
+    }
+
+    @Override
+    public void init() {
         this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
index 118f7a50cd..77d34a17f3 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
@@ -33,10 +33,18 @@ import java.util.List;
 public class IcebergAggregatedCommitter
         implements SinkAggregatedCommitter<IcebergCommitInfo, 
IcebergAggregatedCommitInfo> {
 
-    private final IcebergTableLoader tableLoader;
-    private final IcebergFilesCommitter filesCommitter;
+    private IcebergTableLoader tableLoader;
+    private IcebergFilesCommitter filesCommitter;
+    private final IcebergSinkConfig config;
+    private final CatalogTable catalogTable;
 
     public IcebergAggregatedCommitter(IcebergSinkConfig config, CatalogTable 
catalogTable) {
+        this.config = config;
+        this.catalogTable = catalogTable;
+    }
+
+    @Override
+    public void init() {
         this.tableLoader = IcebergTableLoader.create(config, catalogTable);
         this.filesCommitter = IcebergFilesCommitter.of(config, tableLoader);
     }
diff --git 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
index b09303f253..c4abb10cb8 100644
--- 
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
@@ -39,15 +39,19 @@ import java.util.stream.Collectors;
 public class JdbcSinkAggregatedCommitter
         implements SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo> {
 
-    private final XaFacade xaFacade;
-    private final XaGroupOps xaGroupOps;
+    private XaFacade xaFacade;
+    private XaGroupOps xaGroupOps;
     private final JdbcSinkConfig jdbcSinkConfig;
 
     public JdbcSinkAggregatedCommitter(JdbcSinkConfig jdbcSinkConfig) {
+        this.jdbcSinkConfig = jdbcSinkConfig;
+    }
+
+    @Override
+    public void init() {
         this.xaFacade =
                 
XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig());
         this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
-        this.jdbcSinkConfig = jdbcSinkConfig;
     }
 
     private void tryOpen() throws IOException {

Reply via email to