This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e3103535cc [Feature][Connector-V2] Iceberg-sink supports writing data 
to branches (#6697)
e3103535cc is described below

commit e3103535cccad6928ab09cca76278abbbb57d490
Author: Xiaojian Sun <[email protected]>
AuthorDate: Mon Apr 22 12:12:24 2024 +0800

    [Feature][Connector-V2] Iceberg-sink supports writing data to branches 
(#6697)
---
 docs/en/connector-v2/sink/Iceberg.md               |   1 +
 .../seatunnel/iceberg/config/SinkConfig.java       |   8 +
 .../seatunnel/iceberg/sink/IcebergSinkFactory.java |   3 +-
 .../iceberg/sink/commit/IcebergFilesCommitter.java |   8 +
 .../iceberg/sink/writer/IcebergWriterFactory.java  |   4 +
 .../connector/iceberg/IcebergSinkWithBranchIT.java | 204 +++++++++++++++++++++
 .../iceberg/fake_to_iceberg_with_branch.conf       |  75 ++++++++
 7 files changed, 302 insertions(+), 1 deletion(-)

diff --git a/docs/en/connector-v2/sink/Iceberg.md 
b/docs/en/connector-v2/sink/Iceberg.md
index dc73f491bc..3aa24a0a63 100644
--- a/docs/en/connector-v2/sink/Iceberg.md
+++ b/docs/en/connector-v2/sink/Iceberg.md
@@ -72,6 +72,7 @@ libfb303-xxx.jar
 | iceberg.table.upsert-mode-enabled      | boolean | no       | false          
              | Set to `true` to enable upsert mode, default is `false`         
                                                                                
                                                                                
                                                                                
          |
 | schema_save_mode                       | Enum    | no       | 
CREATE_SCHEMA_WHEN_NOT_EXIST | the schema save mode, please refer to 
`schema_save_mode` below                                                        
                                                                                
                                                                                
                                    |
 | data_save_mode                         | Enum    | no       | APPEND_DATA    
              | the data save mode, please refer to `data_save_mode` below      
                                                                                
                                                                                
                                                                                
          |
+| iceberg.table.commit-branch            | string  | no       | -              
              | Default branch for commits                                      
                                                                                
                                                                                
                                                                                
          |
 
 ## Task Example
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SinkConfig.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SinkConfig.java
index de9c74344f..0c0dc1b138 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/config/SinkConfig.java
@@ -109,6 +109,12 @@ public class SinkConfig extends CommonConfig {
                     .defaultValue(DataSaveMode.APPEND_DATA)
                     .withDescription("data save mode");
 
+    public static final Option<String> TABLES_DEFAULT_COMMIT_BRANCH =
+            Options.key("iceberg.table.commit-branch")
+                    .stringType()
+                    .noDefaultValue()
+                    .withDescription("Default branch for commits");
+
     @VisibleForTesting private static final String COMMA_NO_PARENS_REGEX = 
",(?![^()]*+\\))";
 
     private final ReadonlyConfig readonlyConfig;
@@ -116,6 +122,7 @@ public class SinkConfig extends CommonConfig {
     private Map<String, String> writeProps;
     private List<String> primaryKeys;
     private List<String> partitionKeys;
+    private String commitBranch;
 
     private boolean upsertModeEnabled;
     private boolean tableSchemaEvolutionEnabled;
@@ -133,6 +140,7 @@ public class SinkConfig extends CommonConfig {
         this.tableSchemaEvolutionEnabled = 
readonlyConfig.get(TABLE_SCHEMA_EVOLUTION_ENABLED_PROP);
         this.schemaSaveMode = readonlyConfig.get(SCHEMA_SAVE_MODE);
         this.dataSaveMode = readonlyConfig.get(DATA_SAVE_MODE);
+        this.commitBranch = readonlyConfig.get(TABLES_DEFAULT_COMMIT_BRANCH);
     }
 
     @VisibleForTesting
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
index 3c30c38e0d..3441420226 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/IcebergSinkFactory.java
@@ -62,7 +62,8 @@ public class IcebergSinkFactory implements TableSinkFactory {
                         SinkConfig.TABLE_PRIMARY_KEYS,
                         SinkConfig.TABLE_DEFAULT_PARTITION_KEYS,
                         SinkConfig.TABLE_UPSERT_MODE_ENABLED_PROP,
-                        SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP)
+                        SinkConfig.TABLE_SCHEMA_EVOLUTION_ENABLED_PROP,
+                        SinkConfig.TABLES_DEFAULT_COMMIT_BRANCH)
                 .build();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
index 07363d69e1..0b5e473440 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/commit/IcebergFilesCommitter.java
@@ -39,10 +39,12 @@ import static java.util.stream.Collectors.toList;
 public class IcebergFilesCommitter implements Serializable {
     private IcebergTableLoader icebergTableLoader;
     private boolean caseSensitive;
+    private String branch;
 
     private IcebergFilesCommitter(SinkConfig config, IcebergTableLoader 
icebergTableLoader) {
         this.icebergTableLoader = icebergTableLoader;
         this.caseSensitive = config.isCaseSensitive();
+        this.branch = config.getCommitBranch();
     }
 
     public static IcebergFilesCommitter of(
@@ -77,10 +79,16 @@ public class IcebergFilesCommitter implements Serializable {
         } else {
             if (deleteFiles.isEmpty()) {
                 AppendFiles append = table.newAppend();
+                if (branch != null) {
+                    append.toBranch(branch);
+                }
                 dataFiles.forEach(append::appendFile);
                 append.commit();
             } else {
                 RowDelta delta = table.newRowDelta();
+                if (branch != null) {
+                    delta.toBranch(branch);
+                }
                 delta.caseSensitive(caseSensitive);
                 dataFiles.forEach(delta::addRows);
                 deleteFiles.forEach(delta::addDeletes);
diff --git 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
index 6aa729b115..67809088ef 100644
--- 
a/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
+++ 
b/seatunnel-connectors-v2/connector-iceberg/src/main/java/org/apache/seatunnel/connectors/seatunnel/iceberg/sink/writer/IcebergWriterFactory.java
@@ -81,6 +81,10 @@ public class IcebergWriterFactory {
                                     tableLoader.getTableIdentifier(),
                                     config,
                                     rowType);
+                    // Create an empty snapshot for the branch
+                    if (config.getCommitBranch() != null) {
+                        
table.manageSnapshots().createBranch(config.getCommitBranch()).commit();
+                    }
                     break;
                 default:
                     throw exception;
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkWithBranchIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkWithBranchIT.java
new file mode 100644
index 0000000000..8e0877a895
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSinkWithBranchIT.java
@@ -0,0 +1,204 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.connector.iceberg;
+
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
+import org.apache.seatunnel.common.utils.FileUtils;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergTableLoader;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.config.CommonConfig;
+import org.apache.seatunnel.connectors.seatunnel.iceberg.config.SourceConfig;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.container.TestContainerId;
+import org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.FileScanTask;
+import org.apache.iceberg.SnapshotRef;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.TableScan;
+import org.apache.iceberg.io.CloseableIterable;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.TestTemplate;
+import org.junit.jupiter.api.condition.DisabledOnOs;
+import org.junit.jupiter.api.condition.OS;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+
+import lombok.extern.slf4j.Slf4j;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
+import static org.awaitility.Awaitility.given;
+
+@Slf4j
+@DisabledOnContainer(
+        value = {TestContainerId.SPARK_2_4},
+        type = {},
+        disabledReason = "")
+@DisabledOnOs(OS.WINDOWS)
+public class IcebergSinkWithBranchIT extends TestSuiteBase {
+
+    private static final String CATALOG_DIR = 
"/tmp/seatunnel/iceberg/hadoop-sink/";
+
+    private static final String NAMESPACE = "seatunnel_namespace";
+
+    private static final String commitBranch = "commit-branch";
+
+    private String zstdUrl() {
+        return 
"https://repo1.maven.org/maven2/com/github/luben/zstd-jni/1.5.5-5/zstd-jni-1.5.5-5.jar";;
+    }
+
+    @TestContainerExtension
+    protected final ContainerExtendedFactory extendedFactory =
+            container -> {
+                container.execInContainer("sh", "-c", "mkdir -p " + 
CATALOG_DIR);
+                container.execInContainer("sh", "-c", "chmod -R 777  " + 
CATALOG_DIR);
+                container.execInContainer(
+                        "sh",
+                        "-c",
+                        "mkdir -p /tmp/seatunnel/plugins/Iceberg/lib && cd 
/tmp/seatunnel/plugins/Iceberg/lib && wget "
+                                + zstdUrl());
+            };
+
+    private final String NAMESPACE_TAR = NAMESPACE + ".tar.gz";
+    protected final ContainerExtendedFactory containerExtendedFactory =
+            new ContainerExtendedFactory() {
+                @Override
+                public void extend(GenericContainer<?> container)
+                        throws IOException, InterruptedException {
+                    FileUtils.createNewDir(CATALOG_DIR);
+                    container.execInContainer(
+                            "sh",
+                            "-c",
+                            "cd "
+                                    + CATALOG_DIR
+                                    + " && tar -czvf "
+                                    + NAMESPACE_TAR
+                                    + " "
+                                    + NAMESPACE);
+                    container.copyFileFromContainer(
+                            CATALOG_DIR + NAMESPACE_TAR, CATALOG_DIR + 
NAMESPACE_TAR);
+                    extractFiles();
+                }
+
+                private void extractFiles() {
+                    ProcessBuilder processBuilder = new ProcessBuilder();
+                    processBuilder.command(
+                            "sh", "-c", "cd " + CATALOG_DIR + " && tar -zxvf " 
+ NAMESPACE_TAR);
+                    try {
+                        Process process = processBuilder.start();
+                        int exitCode = process.waitFor();
+                        if (exitCode == 0) {
+                            log.info("Extract files successful.");
+                        } else {
+                            log.error("Extract files failed with exit code " + 
exitCode);
+                        }
+                    } catch (IOException | InterruptedException e) {
+                        log.error("Extract data files from container error :", 
e);
+                    }
+                }
+            };
+
+    @TestTemplate
+    public void testInsertAndCheckDataE2e(TestContainer container)
+            throws IOException, InterruptedException {
+        Container.ExecResult textWriteResult =
+                
container.executeJob("/iceberg/fake_to_iceberg_with_branch.conf");
+        Assertions.assertEquals(0, textWriteResult.getExitCode());
+        // stream stage
+        given().ignoreExceptions()
+                .await()
+                .atMost(60000, TimeUnit.MILLISECONDS)
+                .untilAsserted(
+                        () -> {
+                            // copy iceberg to local
+                            
container.executeExtraCommands(containerExtendedFactory);
+                            // check branch exists
+                            Assertions.assertEquals(true, checkBranchExists());
+                            // load from branch
+                            Assertions.assertEquals(100, 
loadDataFromIcebergTableBranch().size());
+                        });
+    }
+
+    private boolean checkBranchExists() {
+        Table table = getTable();
+        Map<String, SnapshotRef> refs = table.refs();
+        if (refs.containsKey(commitBranch)) {
+            return true;
+        }
+        return false;
+    }
+
+    private List<Object> loadDataFromIcebergTableBranch() {
+        List<Object> results = new ArrayList<>();
+        Table table = getTable();
+        TableScan branchRead = table.newScan().useRef(commitBranch);
+        CloseableIterable<FileScanTask> fileScanTasks = branchRead.planFiles();
+        fileScanTasks.forEach(
+                fileScanTask -> {
+                    try {
+                        DataFile file = fileScanTask.file();
+                        HadoopInputFile inputFile =
+                                HadoopInputFile.fromPath(
+                                        new Path(file.path().toString()), new 
Configuration());
+                        try (ParquetReader<Object> reader =
+                                AvroParquetReader.builder(inputFile).build()) {
+                            Object record;
+                            while ((record = reader.read()) != null) {
+                                results.add(record);
+                            }
+                        }
+                    } catch (IOException e) {
+                        log.error("Table scan branch error :", e);
+                    }
+                });
+        return results;
+    }
+
+    public Table getTable() {
+
+        Map<String, Object> configs = new HashMap<>();
+        Map<String, Object> catalogProps = new HashMap<>();
+        catalogProps.put("type", HADOOP.getType());
+        catalogProps.put("warehouse", "file://" + CATALOG_DIR);
+        configs.put(CommonConfig.KEY_CATALOG_NAME.key(), "seatunnel_test");
+        configs.put(CommonConfig.KEY_NAMESPACE.key(), "seatunnel_namespace");
+        configs.put(CommonConfig.KEY_TABLE.key(), "iceberg_sink_table");
+        configs.put(CommonConfig.CATALOG_PROPS.key(), catalogProps);
+        IcebergTableLoader tableLoader =
+                IcebergTableLoader.create(new 
SourceConfig(ReadonlyConfig.fromMap(configs)));
+        tableLoader.open();
+        // from branch
+        return tableLoader.loadTable();
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/fake_to_iceberg_with_branch.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/fake_to_iceberg_with_branch.conf
new file mode 100644
index 0000000000..91a48193d7
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/fake_to_iceberg_with_branch.conf
@@ -0,0 +1,75 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  FakeSource {
+    row.num = 100
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        c_smallint = smallint
+        c_int = int
+        c_bigint = bigint
+        c_float = float
+        c_double = double
+        c_decimal = "decimal(30, 8)"
+        c_bytes = bytes
+        c_date = date
+        c_timestamp = timestamp
+      }
+    }
+    result_table_name = "fake"
+  }
+}
+
+transform {
+}
+
+sink {
+  Iceberg {
+    catalog_name="seatunnel_test"
+    iceberg.catalog.config={
+      "type"="hadoop"
+      "warehouse"="file:///tmp/seatunnel/iceberg/hadoop-sink/"
+    }
+    namespace="seatunnel_namespace"
+    table="iceberg_sink_table"
+    iceberg.table.write-props={
+      write.format.default="parquet"
+      write.target-file-size-bytes=10
+    }
+    iceberg.table.commit-branch="commit-branch"
+    iceberg.table.partition-keys="c_timestamp"
+    case_sensitive=true
+  }
+}
\ No newline at end of file

Reply via email to