This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new df0d11d632 [Fix][API] Fixed not invoke the `SinkAggregatedCommitter`'s
init method (#9070)
df0d11d632 is described below
commit df0d11d63273d93877659e0d188fd4db7803efb3
Author: Jia Fan <[email protected]>
AuthorDate: Wed Apr 16 09:35:09 2025 +0800
[Fix][API] Fixed not invoke the `SinkAggregatedCommitter`'s init method
(#9070)
---
.../api/sink/SinkAggregatedCommitter.java | 5 +-
.../MultiTableSinkAggregatedCommitter.java | 9 ++-
.../MultiTableSinkAggregatedCommitterTest.java | 73 ++++++++++++++++++++++
.../sink/file/ClickhouseFileSinkAggCommitter.java | 14 +++--
.../seatunnel/sink/SinkFlowTestUtils.java | 26 ++++++--
.../sink/commit/FileSinkAggregatedCommitter.java | 6 ++
.../sink/commit/IcebergAggregatedCommitter.java | 12 +++-
.../jdbc/sink/JdbcSinkAggregatedCommitter.java | 10 ++-
8 files changed, 135 insertions(+), 20 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
index c5bdc6926d..54842cef0d 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/SinkAggregatedCommitter.java
@@ -32,7 +32,10 @@ import java.util.List;
*/
public interface SinkAggregatedCommitter<CommitInfoT, AggregatedCommitInfoT>
extends Serializable {
- /** init sink aggregated committer */
+ /**
+ * init sink aggregated committer, this method will be called not once.
Each retry will call
+ * this.
+ */
default void init() {};
/** Re-commit message to third party data receiver, The method need to
achieve idempotency. */
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
index 6ed04d871b..863237f78e 100644
---
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitter.java
@@ -53,17 +53,16 @@ public class MultiTableSinkAggregatedCommitter
for (String tableIdentifier : aggCommitters.keySet()) {
SinkAggregatedCommitter<?, ?> aggCommitter =
aggCommitters.get(tableIdentifier);
if (!(aggCommitter instanceof
SupportMultiTableSinkAggregatedCommitter)) {
- return;
+ break;
}
resourceManager =
((SupportMultiTableSinkAggregatedCommitter<?>)
aggCommitter)
.initMultiTableResourceManager(aggCommitters.size(), 1);
break;
}
- if (resourceManager != null) {
- for (String tableIdentifier : aggCommitters.keySet()) {
- SinkAggregatedCommitter<?, ?> aggCommitter =
aggCommitters.get(tableIdentifier);
- aggCommitter.init();
+ for (SinkAggregatedCommitter<?, ?> aggCommitter :
aggCommitters.values()) {
+ aggCommitter.init();
+ if (resourceManager != null) {
((SupportMultiTableSinkAggregatedCommitter<?>) aggCommitter)
.setMultiTableResourceManager(resourceManager, 0);
}
diff --git
a/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitterTest.java
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitterTest.java
new file mode 100644
index 0000000000..b01c04699f
--- /dev/null
+++
b/seatunnel-api/src/test/java/org/apache/seatunnel/api/sink/multitablesink/MultiTableSinkAggregatedCommitterTest.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.sink.multitablesink;
+
+import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class MultiTableSinkAggregatedCommitterTest {
+
+ @Test
+ void testInitBeInvoked() throws IOException {
+ Map<String, SinkAggregatedCommitter<?, ?>> aggCommitters = new
HashMap<>();
+ List<String> methodInvoked = new ArrayList<>();
+ aggCommitters.put(
+ "table1",
+ new SinkAggregatedCommitter<Object, Object>() {
+
+ @Override
+ public void init() {
+ methodInvoked.add("init");
+ }
+
+ @Override
+ public List<Object> commit(List<Object>
aggregatedCommitInfo)
+ throws IOException {
+ return Collections.emptyList();
+ }
+
+ @Override
+ public Object combine(List<Object> commitInfos) {
+ return null;
+ }
+
+ @Override
+ public void abort(List<Object> aggregatedCommitInfo)
throws Exception {}
+
+ @Override
+ public void close() throws IOException {
+ methodInvoked.add("close");
+ }
+ });
+ MultiTableSinkAggregatedCommitter committer =
+ new MultiTableSinkAggregatedCommitter(aggCommitters);
+ committer.init();
+ committer.close();
+ Assertions.assertIterableEquals(Arrays.asList("init", "close"),
methodInvoked);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
index 28c6d250d7..eb66b9ecb5 100644
---
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
+++
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkAggCommitter.java
@@ -41,18 +41,24 @@ public class ClickhouseFileSinkAggCommitter
implements SinkAggregatedCommitter<CKFileCommitInfo,
CKFileAggCommitInfo> {
private transient ClickhouseProxy proxy;
- private final ClickhouseTable clickhouseTable;
+ private ClickhouseTable clickhouseTable;
private final FileReaderOption fileReaderOption;
public ClickhouseFileSinkAggCommitter(FileReaderOption readerOption) {
fileReaderOption = readerOption;
- proxy = new
ClickhouseProxy(readerOption.getShardMetadata().getDefaultShard().getNode());
+ }
+
+ @Override
+ public void init() {
+ proxy =
+ new ClickhouseProxy(
+
fileReaderOption.getShardMetadata().getDefaultShard().getNode());
clickhouseTable =
proxy.getClickhouseTable(
proxy.getClickhouseConnection(),
- readerOption.getShardMetadata().getDatabase(),
- readerOption.getShardMetadata().getTable());
+ fileReaderOption.getShardMetadata().getDatabase(),
+ fileReaderOption.getShardMetadata().getTable());
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
index cd9b9cca67..40c440f924 100644
---
a/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
+++
b/seatunnel-connectors-v2/connector-common/src/test/java/org/apache/seatunnel/connectors/seatunnel/sink/SinkFlowTestUtils.java
@@ -20,10 +20,12 @@ package org.apache.seatunnel.connectors.seatunnel.sink;
import org.apache.seatunnel.api.common.JobContext;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.sink.DefaultSinkWriterContext;
+import org.apache.seatunnel.api.sink.MultiTableResourceManager;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.sink.SupportMultiTableSinkAggregatedCommitter;
import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.factory.TableSinkFactory;
import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
@@ -108,16 +110,30 @@ public class SinkFlowTestUtils {
}
Optional<? extends SinkCommitter<?>> sinkCommitter =
sink.createCommitter();
- Optional<? extends SinkAggregatedCommitter<?, ?>> aggregatedCommitter =
+ Optional<? extends SinkAggregatedCommitter<?, ?>>
aggregatedCommitterOptional =
sink.createAggregatedCommitter();
if (!commitInfos.isEmpty()) {
- if (aggregatedCommitter.isPresent()) {
+ if (aggregatedCommitterOptional.isPresent()) {
+ SinkAggregatedCommitter<?, ?> aggregatedCommitter =
+ aggregatedCommitterOptional.get();
+ MultiTableResourceManager resourceManager = null;
+ if (aggregatedCommitter instanceof
SupportMultiTableSinkAggregatedCommitter) {
+ resourceManager =
+ ((SupportMultiTableSinkAggregatedCommitter<?>)
aggregatedCommitter)
+ .initMultiTableResourceManager(1, 1);
+ }
+ aggregatedCommitter.init();
+ if (resourceManager != null) {
+ ((SupportMultiTableSinkAggregatedCommitter<?>)
aggregatedCommitter)
+ .setMultiTableResourceManager(resourceManager, 0);
+ }
+
Object aggregatedCommitInfoT =
- ((SinkAggregatedCommitter)
aggregatedCommitter.get()).combine(commitInfos);
- ((SinkAggregatedCommitter) aggregatedCommitter.get())
+ ((SinkAggregatedCommitter)
aggregatedCommitter).combine(commitInfos);
+ ((SinkAggregatedCommitter) aggregatedCommitter)
.commit(Collections.singletonList(aggregatedCommitInfoT));
- aggregatedCommitter.get().close();
+ aggregatedCommitter.close();
} else if (sinkCommitter.isPresent()) {
((SinkCommitter) sinkCommitter.get()).commit(commitInfos);
} else {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
index 26d2e4f5f1..934374cab9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/commit/FileSinkAggregatedCommitter.java
@@ -33,8 +33,14 @@ import java.util.Map;
public class FileSinkAggregatedCommitter
implements SinkAggregatedCommitter<FileCommitInfo,
FileAggregatedCommitInfo> {
protected HadoopFileSystemProxy hadoopFileSystemProxy;
+ private final HadoopConf hadoopConf;
public FileSinkAggregatedCommitter(HadoopConf hadoopConf) {
+ this.hadoopConf = hadoopConf;
+ }
+
+ @Override
+ public void init() {
this.hadoopFileSystemProxy = new HadoopFileSystemProxy(hadoopConf);
}
diff --git
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
index 118f7a50cd..77d34a17f3 100644
---
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergAggregatedCommitter.java
@@ -33,10 +33,18 @@ import java.util.List;
public class IcebergAggregatedCommitter
implements SinkAggregatedCommitter<IcebergCommitInfo,
IcebergAggregatedCommitInfo> {
- private final IcebergTableLoader tableLoader;
- private final IcebergFilesCommitter filesCommitter;
+ private IcebergTableLoader tableLoader;
+ private IcebergFilesCommitter filesCommitter;
+ private final IcebergSinkConfig config;
+ private final CatalogTable catalogTable;
public IcebergAggregatedCommitter(IcebergSinkConfig config, CatalogTable
catalogTable) {
+ this.config = config;
+ this.catalogTable = catalogTable;
+ }
+
+ @Override
+ public void init() {
this.tableLoader = IcebergTableLoader.create(config, catalogTable);
this.filesCommitter = IcebergFilesCommitter.of(config, tableLoader);
}
diff --git
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
index b09303f253..c4abb10cb8 100644
---
a/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
+++
b/seatunnel-connectors-v2/connector-jdbc/src/main/java/org/apache/seatunnel/connectors/seatunnel/jdbc/sink/JdbcSinkAggregatedCommitter.java
@@ -39,15 +39,19 @@ import java.util.stream.Collectors;
public class JdbcSinkAggregatedCommitter
implements SinkAggregatedCommitter<XidInfo, JdbcAggregatedCommitInfo> {
- private final XaFacade xaFacade;
- private final XaGroupOps xaGroupOps;
+ private XaFacade xaFacade;
+ private XaGroupOps xaGroupOps;
private final JdbcSinkConfig jdbcSinkConfig;
public JdbcSinkAggregatedCommitter(JdbcSinkConfig jdbcSinkConfig) {
+ this.jdbcSinkConfig = jdbcSinkConfig;
+ }
+
+ @Override
+ public void init() {
this.xaFacade =
XaFacade.fromJdbcConnectionOptions(jdbcSinkConfig.getJdbcConnectionConfig());
this.xaGroupOps = new XaGroupOpsImpl(xaFacade);
- this.jdbcSinkConfig = jdbcSinkConfig;
}
private void tryOpen() throws IOException {