This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new b38c50789 [Feature][Connector-V2][Iceberg] Modify the scope of 
flink-shaded-hadoop-2 to provided to be compatible with hadoop3.x (#3046)
b38c50789 is described below

commit b38c50789f45bfcddd4eaef04ad4f6e425dfb0a5
Author: s7monk <[email protected]>
AuthorDate: Sun Nov 27 12:05:21 2022 +0800

    [Feature][Connector-V2][Iceberg] Modify the scope of flink-shaded-hadoop-2 
to provided to be compatible with hadoop3.x (#3046)
    
    * [Connector-V2-Iceberg]Modify the scope of flink-shaded-hadoop-2 to 
provided
    
    * [Doc][Connector-V2][Iceberg] Modify Iceberg doc
    [Feature][Connector-V2][Iceberg]Modify the scope of hive-exec to provided 
to be compatible with hadoop3.x
    
    * add iceberg connector hadoop3.x e2e
    
    * fix pom error
    
    * fix iceberg doc error
    
    * fix pom error
    
    * add iceberg haoop3 spark e2e
    
    * add iceberg haoop3 spark e2e
    
    * add iceberg haoop3 spark e2e
    
    * add iceberg haoop3 spark e2e
    
    * modify iceberg doc
    
    * solve some erroe
    
    * add changed log
    
    * add changed log
    
    * solve iceberg test error
    
    * solve iceberg-spark test error
    
    Co-authored-by: Eric <[email protected]>
    Co-authored-by: s7monk <“[email protected]”>
---
 docs/en/connector-v2/source/Iceberg.md             |  21 ++-
 seatunnel-connectors-v2/connector-iceberg/pom.xml  |   2 +
 .../main/bin/start-seatunnel-flink-connector-v2.sh |   2 +-
 .../connector-iceberg-flink-e2e/pom.xml            |   5 +
 .../pom.xml                                        |  21 ++-
 .../flink/v2/icegerg/hadoop3/IcebergSourceIT.java  | 168 +++++++++++++++++++++
 .../src/test/resources/iceberg/iceberg_source.conf |  85 +++++++++++
 .../src/test/resources/log4j.properties            |  22 +++
 .../seatunnel-flink-connector-v2-e2e/pom.xml       |   1 +
 .../pom.xml                                        |  29 ++--
 .../spark/v2/iceberg/hadoop3/IcebergSourceIT.java  | 165 ++++++++++++++++++++
 .../src/test/resources/iceberg/iceberg_source.conf |  86 +++++++++++
 .../src/test/resources/log4j.properties            |  22 +++
 .../connector-iceberg-spark-e2e/pom.xml            |  17 +++
 .../seatunnel-spark-connector-v2-e2e/pom.xml       |   1 +
 15 files changed, 627 insertions(+), 20 deletions(-)

diff --git a/docs/en/connector-v2/source/Iceberg.md 
b/docs/en/connector-v2/source/Iceberg.md
index bc345e98a..a4400a530 100644
--- a/docs/en/connector-v2/source/Iceberg.md
+++ b/docs/en/connector-v2/source/Iceberg.md
@@ -20,8 +20,8 @@ Source connector for Apache Iceberg. It can support batch and 
stream mode.
     - [x] orc
     - [x] avro
 - [x] iceberg catalog
-    - [x] hadoop(2.7.5)
-    - [x] hive(2.3.9)
+    - [x] hadoop(2.7.1 , 2.7.5 , 3.1.3)
+    - [x] hive(2.3.9 , 3.1.2)
 
 ##  Options
 
@@ -161,8 +161,25 @@ source {
 }
 ```
 
+:::tip
+
+In order to be compatible with different versions of Hadoop and Hive, the 
scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are 
provided, so if you use the Flink engine, first you may need to add the 
following Jar packages to <FLINK_HOME>/lib directory, if you are using the 
Spark engine and integrated with Hadoop, then you do not need to add the 
following Jar packages.
+
+:::
+
+```
+flink-shaded-hadoop-x-xxx.jar
+hive-exec-xxx.jar
+libfb303-xxx.jar
+```
+Some versions of the hive-exec package do not have libfb303-xxx.jar, so you 
also need to manually import the Jar package. 
+
 ## Changelog
 
 ### 2.2.0-beta 2022-09-26
 
 - Add Iceberg Source Connector
+
+### next version
+
+- [Feature] Support Hadoop3.x 
([3046](https://github.com/apache/incubator-seatunnel/pull/3046))
diff --git a/seatunnel-connectors-v2/connector-iceberg/pom.xml 
b/seatunnel-connectors-v2/connector-iceberg/pom.xml
index f474989d5..b08c84776 100644
--- a/seatunnel-connectors-v2/connector-iceberg/pom.xml
+++ b/seatunnel-connectors-v2/connector-iceberg/pom.xml
@@ -94,6 +94,7 @@
             <artifactId>hive-exec</artifactId>
             <version>${hive.version}</version>
             <classifier>core</classifier>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.logging.log4j</groupId>
@@ -120,6 +121,7 @@
         <dependency>
             <groupId>org.apache.flink</groupId>
             <artifactId>flink-shaded-hadoop-2</artifactId>
+            <scope>provided</scope>
             <exclusions>
                 <exclusion>
                     <groupId>org.apache.avro</groupId>
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
 
b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
index 706776cd4..c4a4e0283 100755
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
@@ -71,4 +71,4 @@ elif [ ${EXIT_CODE} -eq 0 ]; then
 else
     echo "${CMD}"
     exit ${EXIT_CODE}
-fi
\ No newline at end of file
+fi
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
index fc016ed71..a2a292ab4 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
@@ -40,6 +40,11 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-shaded-hadoop-2</artifactId>
+            <scope>test</scope>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/pom.xml
similarity index 75%
copy from 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
copy to 
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/pom.xml
index fc016ed71..e9838128e 100644
--- 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/pom.xml
@@ -17,13 +17,17 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <groupId>org.apache.seatunnel</groupId>
         <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>connector-iceberg-flink-e2e</artifactId>
+    <artifactId>connector-iceberg-hadoop3-flink-e2e</artifactId>
+
+    <properties>
+        <hadoop-client.version>3.3.4</hadoop-client.version>
+    </properties>
 
     <dependencies>
         <dependency>
@@ -40,6 +44,17 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop-client.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
-
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/icegerg/hadoop3/IcebergSourceIT.java
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/icegerg/hadoop3/IcebergSourceIT.java
new file mode 100644
index 000000000..6354b209a
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/icegerg/hadoop3/IcebergSourceIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.icegerg.hadoop3;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
+
+import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class IcebergSourceIT extends FlinkContainer {
+
+    private static final TableIdentifier TABLE = TableIdentifier.of(
+        Namespace.of("database1"), "source");
+    private static final Schema SCHEMA = new Schema(
+        Types.NestedField.optional(1, "f1", Types.LongType.get()),
+        Types.NestedField.optional(2, "f2", Types.BooleanType.get()),
+        Types.NestedField.optional(3, "f3", Types.IntegerType.get()),
+        Types.NestedField.optional(4, "f4", Types.LongType.get()),
+        Types.NestedField.optional(5, "f5", Types.FloatType.get()),
+        Types.NestedField.optional(6, "f6", Types.DoubleType.get()),
+        Types.NestedField.optional(7, "f7", Types.DateType.get()),
+        Types.NestedField.optional(8, "f8", Types.TimeType.get()),
+        Types.NestedField.optional(9, "f9", Types.TimestampType.withZone()),
+        Types.NestedField.optional(10, "f10", 
Types.TimestampType.withoutZone()),
+        Types.NestedField.optional(11, "f11", Types.StringType.get()),
+        Types.NestedField.optional(12, "f12", Types.FixedType.ofLength(10)),
+        Types.NestedField.optional(13, "f13", Types.BinaryType.get()),
+        Types.NestedField.optional(14, "f14", Types.DecimalType.of(19, 9)),
+        Types.NestedField.optional(15, "f15", Types.ListType.ofOptional(
+            100, Types.IntegerType.get())),
+        Types.NestedField.optional(16, "f16", Types.MapType.ofOptional(
+            200, 300, Types.StringType.get(), Types.IntegerType.get())),
+        Types.NestedField.optional(17, "f17", Types.StructType.of(
+            Types.NestedField.required(400, "f17_a", Types.StringType.get())))
+    );
+
+    private static final String CATALOG_NAME = "seatunnel";
+    private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
+    private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/flink/";
+    private static final String WAREHOUSE = "file://" + CATALOG_DIR;
+    private static Catalog CATALOG;
+
+    @BeforeEach
+    public void start() {
+        initializeIcebergTable();
+        batchInsertData();
+        MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
+        jobManager.copyFileToContainer(catalogPath, CATALOG_DIR);
+        taskManager.copyFileToContainer(catalogPath, CATALOG_DIR);
+    }
+
+    @Test
+    public void testIcebergSource() throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFlinkJob("/iceberg/iceberg_source.conf");
+        Assertions.assertEquals(0, execResult.getExitCode(), 
execResult.getStderr());
+    }
+
+    private void initializeIcebergTable() {
+        CATALOG = new IcebergCatalogFactory(CATALOG_NAME,
+            CATALOG_TYPE,
+            WAREHOUSE,
+            null)
+            .create();
+        if (!CATALOG.tableExists(TABLE)) {
+            CATALOG.createTable(TABLE, SCHEMA);
+        }
+    }
+
+    private void batchInsertData() {
+        GenericRecord record = GenericRecord.create(SCHEMA);
+        record.setField("f1", Long.valueOf(0));
+        record.setField("f2", true);
+        record.setField("f3", Integer.MAX_VALUE);
+        record.setField("f4", Long.MAX_VALUE);
+        record.setField("f5", Float.MAX_VALUE);
+        record.setField("f6", Double.MAX_VALUE);
+        record.setField("f7", LocalDate.now());
+        record.setField("f8", LocalTime.now());
+        record.setField("f9", OffsetDateTime.now());
+        record.setField("f10", LocalDateTime.now());
+        record.setField("f11", "test");
+        record.setField("f12", "abcdefghij".getBytes());
+        record.setField("f13", ByteBuffer.wrap("test".getBytes()));
+        record.setField("f14", new BigDecimal("1000000000.000000001"));
+        record.setField("f15", Arrays.asList(Integer.MAX_VALUE));
+        record.setField("f16", Collections.singletonMap("key", 
Integer.MAX_VALUE));
+        Record structRecord = 
GenericRecord.create(SCHEMA.findField("f17").type().asStructType());
+        structRecord.setField("f17_a", "test");
+        record.setField("f17", structRecord);
+
+        Table table = CATALOG.loadTable(TABLE);
+        FileAppenderFactory appenderFactory = new 
GenericAppenderFactory(SCHEMA);
+        List<Record> records = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            records.add(record.copy("f1", Long.valueOf(i)));
+            if (i % 10 == 0) {
+                String externalFilePath = String.format(CATALOG_DIR + 
"external_file/datafile_%s.avro", i);
+                FileAppender<Record> fileAppender = 
appenderFactory.newAppender(
+                    Files.localOutput(externalFilePath), 
FileFormat.fromFileName(externalFilePath));
+                try (FileAppender<Record> fileAppenderCloseable = 
fileAppender) {
+                    fileAppenderCloseable.addAll(records);
+                    records.clear();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                DataFile datafile = 
DataFiles.builder(PartitionSpec.unpartitioned())
+                    
.withInputFile(HadoopInputFile.fromLocation(externalFilePath, new 
Configuration()))
+                    .withMetrics(fileAppender.metrics())
+                    .build();
+                table.newAppend().appendFile(datafile).commit();
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
new file mode 100644
index 000000000..dd54abe34
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+  execution.parallelism = 2
+  job.mode = "BATCH"
+  execution.checkpoint.interval = 10
+}
+
+source {
+  Iceberg {
+    fields {
+      f2 = "boolean"
+      f1 = "bigint"
+      f3 = "int"
+      f4 = "bigint"
+      f5 = "float"
+      f6 = "double"
+      f7 = "date"
+      f8 = "time"
+      f9 = "timestamp"
+      f10 = "timestamp"
+      f11 = "string"
+      f12 = "bytes"
+      f13 = "bytes"
+      f14 = "decimal(19,9)"
+      f15 = "array<int>"
+      f16 = "map<string, int>"
+    }
+    catalog_name = "seatunnel"
+    catalog_type = "hadoop"
+    warehouse = "file:///tmp/seatunnel/iceberg/flink/"
+    namespace = "database1"
+    table = "source"
+  }
+}
+
+transform {
+}
+
+sink {
+  Console {
+  }
+  Assert {
+    rules =
+      {
+        field_rules = [
+          {
+            field_name = f1
+            field_type = long
+            field_value = [
+              {
+                rule_type = NOT_NULL
+              },
+              {
+                rule_type = MIN
+                rule_value = 0
+              },
+              {
+                rule_type = MAX
+                rule_value = 99
+              }
+            ]
+          }
+        ]
+      }
+  }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/log4j.properties
 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 24273a267..776b82fe9 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -35,6 +35,7 @@
         <module>connector-fake-flink-e2e</module>
         <module>connector-mongodb-flink-e2e</module>
         <module>connector-iceberg-flink-e2e</module>
+        <module>connector-iceberg-hadoop3-flink-e2e</module>
     </modules>
 
     <dependencies>
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/pom.xml
similarity index 78%
copy from 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
copy to 
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/pom.xml
index 399a52f54..d1a06e6be 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/pom.xml
@@ -17,13 +17,17 @@
          xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
          xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
     <parent>
-        <groupId>org.apache.seatunnel</groupId>
         <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
         <version>${revision}</version>
     </parent>
     <modelVersion>4.0.0</modelVersion>
 
-    <artifactId>connector-iceberg-spark-e2e</artifactId>
+    <artifactId>connector-iceberg-hadoop3-spark-e2e</artifactId>
+
+    <properties>
+        <hadoop-client.version>3.3.4</hadoop-client.version>
+    </properties>
 
     <dependencies>
         <dependency>
@@ -40,20 +44,17 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
-
         <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-console</artifactId>
-            <version>${project.version}</version>
-            <scope>test</scope>
-        </dependency>
-
-        <dependency>
-            <groupId>org.apache.seatunnel</groupId>
-            <artifactId>connector-assert</artifactId>
-            <version>${project.version}</version>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop-client.version}</version>
             <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+            </exclusions>
         </dependency>
     </dependencies>
-
 </project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/hadoop3/IcebergSourceIT.java
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/hadoop3/IcebergSourceIT.java
new file mode 100644
index 000000000..2717e1fd3
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/hadoop3/IcebergSourceIT.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.iceberg.hadoop3;
+
+import static 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
+
+import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
+import 
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class IcebergSourceIT extends SparkContainer {
+
+    private static final TableIdentifier TABLE = TableIdentifier.of(
+        Namespace.of("database1"), "source");
+    private static final Schema SCHEMA = new Schema(
+        Types.NestedField.optional(1, "f1", Types.LongType.get()),
+        Types.NestedField.optional(2, "f2", Types.BooleanType.get()),
+        Types.NestedField.optional(3, "f3", Types.IntegerType.get()),
+        Types.NestedField.optional(4, "f4", Types.LongType.get()),
+        Types.NestedField.optional(5, "f5", Types.FloatType.get()),
+        Types.NestedField.optional(6, "f6", Types.DoubleType.get()),
+        Types.NestedField.optional(7, "f7", Types.DateType.get()),
+        Types.NestedField.optional(9, "f9", Types.TimestampType.withZone()),
+        Types.NestedField.optional(10, "f10", 
Types.TimestampType.withoutZone()),
+        Types.NestedField.optional(11, "f11", Types.StringType.get()),
+        Types.NestedField.optional(12, "f12", Types.FixedType.ofLength(10)),
+        Types.NestedField.optional(13, "f13", Types.BinaryType.get()),
+        Types.NestedField.optional(14, "f14", Types.DecimalType.of(19, 9)),
+        Types.NestedField.optional(15, "f15", Types.ListType.ofOptional(
+            100, Types.IntegerType.get())),
+        Types.NestedField.optional(16, "f16", Types.MapType.ofOptional(
+            200, 300, Types.StringType.get(), Types.IntegerType.get())),
+        Types.NestedField.optional(17, "f17", Types.StructType.of(
+            Types.NestedField.required(400, "f17_a", Types.StringType.get())))
+    );
+
+    private static final String CATALOG_NAME = "seatunnel";
+    private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
+    private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/spark/";
+    private static final String WAREHOUSE = "file://" + CATALOG_DIR;
+    private static Catalog CATALOG;
+
+    @BeforeEach
+    public void start() {
+        initializeIcebergTable();
+        batchInsertData();
+        MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
+        master.copyFileToContainer(catalogPath, CATALOG_DIR);
+    }
+
+    @Test
+    public void testIcebergSource() throws IOException, InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelSparkJob("/iceberg/iceberg_source.conf");
+        Assertions.assertEquals(0, execResult.getExitCode());
+    }
+
+    private void initializeIcebergTable() {
+        CATALOG = new IcebergCatalogFactory(CATALOG_NAME,
+            CATALOG_TYPE,
+            WAREHOUSE,
+            null)
+            .create();
+        if (!CATALOG.tableExists(TABLE)) {
+            CATALOG.createTable(TABLE, SCHEMA);
+        }
+    }
+
+    private void batchInsertData() {
+        GenericRecord record = GenericRecord.create(SCHEMA);
+        record.setField("f1", Long.valueOf(0));
+        record.setField("f2", true);
+        record.setField("f3", Integer.MAX_VALUE);
+        record.setField("f4", Long.MAX_VALUE);
+        record.setField("f5", Float.MAX_VALUE);
+        record.setField("f6", Double.MAX_VALUE);
+        record.setField("f7", LocalDate.now());
+        /*record.setField("f8", LocalTime.now());*/
+        record.setField("f9", OffsetDateTime.now());
+        record.setField("f10", LocalDateTime.now());
+        record.setField("f11", "test");
+        record.setField("f12", "abcdefghij".getBytes());
+        record.setField("f13", ByteBuffer.wrap("test".getBytes()));
+        record.setField("f14", new BigDecimal("1000000000.000000001"));
+        record.setField("f15", Arrays.asList(Integer.MAX_VALUE));
+        record.setField("f16", Collections.singletonMap("key", 
Integer.MAX_VALUE));
+        Record structRecord = 
GenericRecord.create(SCHEMA.findField("f17").type().asStructType());
+        structRecord.setField("f17_a", "test");
+        record.setField("f17", structRecord);
+
+        Table table = CATALOG.loadTable(TABLE);
+        FileAppenderFactory appenderFactory = new 
GenericAppenderFactory(SCHEMA);
+        List<Record> records = new ArrayList<>();
+        for (int i = 0; i < 100; i++) {
+            records.add(record.copy("f1", Long.valueOf(i)));
+            if (i % 10 == 0) {
+                String externalFilePath = String.format(CATALOG_DIR + 
"external_file/datafile_%s.avro", i);
+                FileAppender<Record> fileAppender = 
appenderFactory.newAppender(
+                    Files.localOutput(externalFilePath), 
FileFormat.fromFileName(externalFilePath));
+                try (FileAppender<Record> fileAppenderCloseable = 
fileAppender) {
+                    fileAppenderCloseable.addAll(records);
+                    records.clear();
+                } catch (IOException e) {
+                    throw new RuntimeException(e);
+                }
+                DataFile datafile = 
DataFiles.builder(PartitionSpec.unpartitioned())
+                    
.withInputFile(HadoopInputFile.fromLocation(externalFilePath, new 
Configuration()))
+                    .withMetrics(fileAppender.metrics())
+                    .build();
+                table.newAppend().appendFile(datafile).commit();
+            }
+        }
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
new file mode 100644
index 000000000..edaaff916
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in 
seatunnel config
+######
+
+env {
+    spark.app.name = "SeaTunnel"
+    spark.executor.instances = 2
+    spark.executor.cores = 1
+    spark.executor.memory = "1g"
+    spark.master = local
+    job.mode = "BATCH"
+}
+
+source {
+    Iceberg {
+        fields {
+            f2 = "boolean"
+            f1 = "bigint"
+            f3 = "int"
+            f4 = "bigint"
+            f5 = "float"
+            f6 = "double"
+            f7 = "date"
+            f9 = "timestamp"
+            f10 = "timestamp"
+            f11 = "string"
+            f12 = "bytes"
+            f13 = "bytes"
+            f14 = "decimal(19,9)"
+            f15 = "array<int>"
+            f16 = "map<string, int>"
+        }
+        catalog_name = "seatunnel"
+        catalog_type = "hadoop"
+        warehouse = "file:///tmp/seatunnel/iceberg/spark/"
+        namespace = "database1"
+        table = "source"
+    }
+}
+
+transform {
+}
+
+sink {
+    Console {
+    }
+    Assert {
+        rules = {
+            field_rules = [
+                {
+                    field_name = f1
+                    field_type = long
+                    field_value = [
+                        {
+                            rule_type = NOT_NULL
+                        },
+                        {
+                            rule_type = MIN
+                            rule_value = 0
+                        },
+                        {
+                            rule_type = MAX
+                            rule_value = 99
+                        }
+                    ]
+                }
+            ]
+        }
+   }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/log4j.properties
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p 
%c{1}: %m%n
diff --git 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
index 399a52f54..9be91c751 100644
--- 
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
+++ 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
@@ -25,6 +25,10 @@
 
     <artifactId>connector-iceberg-spark-e2e</artifactId>
 
+    <properties>
+        <hadoop-client.version>3.3.4</hadoop-client.version>
+    </properties>
+
     <dependencies>
         <dependency>
             <groupId>org.apache.seatunnel</groupId>
@@ -54,6 +58,19 @@
             <version>${project.version}</version>
             <scope>test</scope>
         </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-client</artifactId>
+            <version>${hadoop-client.version}</version>
+            <scope>test</scope>
+            <exclusions>
+                <exclusion>
+                    <groupId>org.slf4j</groupId>
+                    <artifactId>slf4j-reload4j</artifactId>
+                </exclusion>
+            </exclusions>
+        </dependency>
     </dependencies>
 
 </project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 260b8ef6c..bbdd1f6a4 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
         <module>connector-iotdb-spark-e2e</module>
         <module>connector-jdbc-spark-e2e</module>
         <module>connector-mongodb-spark-e2e</module>
+        <module>connector-iceberg-hadoop3-spark-e2e</module>
         <module>connector-iceberg-spark-e2e</module>
     </modules>
 


Reply via email to