This is an automated email from the ASF dual-hosted git repository.
tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new b38c50789 [Feature][Connector-V2][Iceberg] Modify the scope of
flink-shaded-hadoop-2 to provided to be compatible with hadoop3.x (#3046)
b38c50789 is described below
commit b38c50789f45bfcddd4eaef04ad4f6e425dfb0a5
Author: s7monk <[email protected]>
AuthorDate: Sun Nov 27 12:05:21 2022 +0800
[Feature][Connector-V2][Iceberg] Modify the scope of flink-shaded-hadoop-2
to provided to be compatible with hadoop3.x (#3046)
* [Connector-V2-Iceberg]Modify the scope of flink-shaded-hadoop-2 to
provided
* [Doc][Connector-V2][Iceberg] Modify Iceberg doc
[Feature][Connector-V2][Iceberg]Modify the scope of hive-exec to provided
to be compatible with hadoop3.x
* add iceberg connector hadoop3.x e2e
* fix pom error
* fix iceberg doc error
* fix pom error
* add iceberg haoop3 spark e2e
* add iceberg haoop3 spark e2e
* add iceberg haoop3 spark e2e
* add iceberg haoop3 spark e2e
* modify iceberg doc
* solve some erroe
* add changed log
* add changed log
* solve iceberg test error
* solve iceberg-spark test error
Co-authored-by: Eric <[email protected]>
Co-authored-by: s7monk <“[email protected]”>
---
docs/en/connector-v2/source/Iceberg.md | 21 ++-
seatunnel-connectors-v2/connector-iceberg/pom.xml | 2 +
.../main/bin/start-seatunnel-flink-connector-v2.sh | 2 +-
.../connector-iceberg-flink-e2e/pom.xml | 5 +
.../pom.xml | 21 ++-
.../flink/v2/icegerg/hadoop3/IcebergSourceIT.java | 168 +++++++++++++++++++++
.../src/test/resources/iceberg/iceberg_source.conf | 85 +++++++++++
.../src/test/resources/log4j.properties | 22 +++
.../seatunnel-flink-connector-v2-e2e/pom.xml | 1 +
.../pom.xml | 29 ++--
.../spark/v2/iceberg/hadoop3/IcebergSourceIT.java | 165 ++++++++++++++++++++
.../src/test/resources/iceberg/iceberg_source.conf | 86 +++++++++++
.../src/test/resources/log4j.properties | 22 +++
.../connector-iceberg-spark-e2e/pom.xml | 17 +++
.../seatunnel-spark-connector-v2-e2e/pom.xml | 1 +
15 files changed, 627 insertions(+), 20 deletions(-)
diff --git a/docs/en/connector-v2/source/Iceberg.md
b/docs/en/connector-v2/source/Iceberg.md
index bc345e98a..a4400a530 100644
--- a/docs/en/connector-v2/source/Iceberg.md
+++ b/docs/en/connector-v2/source/Iceberg.md
@@ -20,8 +20,8 @@ Source connector for Apache Iceberg. It can support batch and
stream mode.
- [x] orc
- [x] avro
- [x] iceberg catalog
- - [x] hadoop(2.7.5)
- - [x] hive(2.3.9)
+ - [x] hadoop(2.7.1 , 2.7.5 , 3.1.3)
+ - [x] hive(2.3.9 , 3.1.2)
## Options
@@ -161,8 +161,25 @@ source {
}
```
+:::tip
+
+In order to be compatible with different versions of Hadoop and Hive, the
scope of hive-exec and flink-shaded-hadoop-2 in the project pom file are
provided, so if you use the Flink engine, first you may need to add the
following Jar packages to <FLINK_HOME>/lib directory, if you are using the
Spark engine and integrated with Hadoop, then you do not need to add the
following Jar packages.
+
+:::
+
+```
+flink-shaded-hadoop-x-xxx.jar
+hive-exec-xxx.jar
+libfb303-xxx.jar
+```
+Some versions of the hive-exec package do not have libfb303-xxx.jar, so you
also need to manually import the Jar package.
+
## Changelog
### 2.2.0-beta 2022-09-26
- Add Iceberg Source Connector
+
+### next version
+
+- [Feature] Support Hadoop3.x
([3046](https://github.com/apache/incubator-seatunnel/pull/3046))
diff --git a/seatunnel-connectors-v2/connector-iceberg/pom.xml
b/seatunnel-connectors-v2/connector-iceberg/pom.xml
index f474989d5..b08c84776 100644
--- a/seatunnel-connectors-v2/connector-iceberg/pom.xml
+++ b/seatunnel-connectors-v2/connector-iceberg/pom.xml
@@ -94,6 +94,7 @@
<artifactId>hive-exec</artifactId>
<version>${hive.version}</version>
<classifier>core</classifier>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.logging.log4j</groupId>
@@ -120,6 +121,7 @@
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-shaded-hadoop-2</artifactId>
+ <scope>provided</scope>
<exclusions>
<exclusion>
<groupId>org.apache.avro</groupId>
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
index 706776cd4..c4a4e0283 100755
---
a/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/bin/start-seatunnel-flink-connector-v2.sh
@@ -71,4 +71,4 @@ elif [ ${EXIT_CODE} -eq 0 ]; then
else
echo "${CMD}"
exit ${EXIT_CODE}
-fi
\ No newline at end of file
+fi
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
index fc016ed71..a2a292ab4 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
@@ -40,6 +40,11 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-shaded-hadoop-2</artifactId>
+ <scope>test</scope>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/pom.xml
similarity index 75%
copy from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
copy to
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/pom.xml
index fc016ed71..e9838128e 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/pom.xml
@@ -17,13 +17,17 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-iceberg-flink-e2e</artifactId>
+ <artifactId>connector-iceberg-hadoop3-flink-e2e</artifactId>
+
+ <properties>
+ <hadoop-client.version>3.3.4</hadoop-client.version>
+ </properties>
<dependencies>
<dependency>
@@ -40,6 +44,17 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop-client.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
-
</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/icegerg/hadoop3/IcebergSourceIT.java
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/icegerg/hadoop3/IcebergSourceIT.java
new file mode 100644
index 000000000..6354b209a
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/icegerg/hadoop3/IcebergSourceIT.java
@@ -0,0 +1,168 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.v2.icegerg.hadoop3;
+
+import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
+
+import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class IcebergSourceIT extends FlinkContainer {
+
+ private static final TableIdentifier TABLE = TableIdentifier.of(
+ Namespace.of("database1"), "source");
+ private static final Schema SCHEMA = new Schema(
+ Types.NestedField.optional(1, "f1", Types.LongType.get()),
+ Types.NestedField.optional(2, "f2", Types.BooleanType.get()),
+ Types.NestedField.optional(3, "f3", Types.IntegerType.get()),
+ Types.NestedField.optional(4, "f4", Types.LongType.get()),
+ Types.NestedField.optional(5, "f5", Types.FloatType.get()),
+ Types.NestedField.optional(6, "f6", Types.DoubleType.get()),
+ Types.NestedField.optional(7, "f7", Types.DateType.get()),
+ Types.NestedField.optional(8, "f8", Types.TimeType.get()),
+ Types.NestedField.optional(9, "f9", Types.TimestampType.withZone()),
+ Types.NestedField.optional(10, "f10",
Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(11, "f11", Types.StringType.get()),
+ Types.NestedField.optional(12, "f12", Types.FixedType.ofLength(10)),
+ Types.NestedField.optional(13, "f13", Types.BinaryType.get()),
+ Types.NestedField.optional(14, "f14", Types.DecimalType.of(19, 9)),
+ Types.NestedField.optional(15, "f15", Types.ListType.ofOptional(
+ 100, Types.IntegerType.get())),
+ Types.NestedField.optional(16, "f16", Types.MapType.ofOptional(
+ 200, 300, Types.StringType.get(), Types.IntegerType.get())),
+ Types.NestedField.optional(17, "f17", Types.StructType.of(
+ Types.NestedField.required(400, "f17_a", Types.StringType.get())))
+ );
+
+ private static final String CATALOG_NAME = "seatunnel";
+ private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
+ private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/flink/";
+ private static final String WAREHOUSE = "file://" + CATALOG_DIR;
+ private static Catalog CATALOG;
+
+ @BeforeEach
+ public void start() {
+ initializeIcebergTable();
+ batchInsertData();
+ MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
+ jobManager.copyFileToContainer(catalogPath, CATALOG_DIR);
+ taskManager.copyFileToContainer(catalogPath, CATALOG_DIR);
+ }
+
+ @Test
+ public void testIcebergSource() throws IOException, InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/iceberg/iceberg_source.conf");
+ Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
+ }
+
+ private void initializeIcebergTable() {
+ CATALOG = new IcebergCatalogFactory(CATALOG_NAME,
+ CATALOG_TYPE,
+ WAREHOUSE,
+ null)
+ .create();
+ if (!CATALOG.tableExists(TABLE)) {
+ CATALOG.createTable(TABLE, SCHEMA);
+ }
+ }
+
+ private void batchInsertData() {
+ GenericRecord record = GenericRecord.create(SCHEMA);
+ record.setField("f1", Long.valueOf(0));
+ record.setField("f2", true);
+ record.setField("f3", Integer.MAX_VALUE);
+ record.setField("f4", Long.MAX_VALUE);
+ record.setField("f5", Float.MAX_VALUE);
+ record.setField("f6", Double.MAX_VALUE);
+ record.setField("f7", LocalDate.now());
+ record.setField("f8", LocalTime.now());
+ record.setField("f9", OffsetDateTime.now());
+ record.setField("f10", LocalDateTime.now());
+ record.setField("f11", "test");
+ record.setField("f12", "abcdefghij".getBytes());
+ record.setField("f13", ByteBuffer.wrap("test".getBytes()));
+ record.setField("f14", new BigDecimal("1000000000.000000001"));
+ record.setField("f15", Arrays.asList(Integer.MAX_VALUE));
+ record.setField("f16", Collections.singletonMap("key",
Integer.MAX_VALUE));
+ Record structRecord =
GenericRecord.create(SCHEMA.findField("f17").type().asStructType());
+ structRecord.setField("f17_a", "test");
+ record.setField("f17", structRecord);
+
+ Table table = CATALOG.loadTable(TABLE);
+ FileAppenderFactory appenderFactory = new
GenericAppenderFactory(SCHEMA);
+ List<Record> records = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ records.add(record.copy("f1", Long.valueOf(i)));
+ if (i % 10 == 0) {
+ String externalFilePath = String.format(CATALOG_DIR +
"external_file/datafile_%s.avro", i);
+ FileAppender<Record> fileAppender =
appenderFactory.newAppender(
+ Files.localOutput(externalFilePath),
FileFormat.fromFileName(externalFilePath));
+ try (FileAppender<Record> fileAppenderCloseable =
fileAppender) {
+ fileAppenderCloseable.addAll(records);
+ records.clear();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ DataFile datafile =
DataFiles.builder(PartitionSpec.unpartitioned())
+
.withInputFile(HadoopInputFile.fromLocation(externalFilePath, new
Configuration()))
+ .withMetrics(fileAppender.metrics())
+ .build();
+ table.newAppend().appendFile(datafile).commit();
+ }
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
new file mode 100644
index 000000000..dd54abe34
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -0,0 +1,85 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ execution.parallelism = 2
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 10
+}
+
+source {
+ Iceberg {
+ fields {
+ f2 = "boolean"
+ f1 = "bigint"
+ f3 = "int"
+ f4 = "bigint"
+ f5 = "float"
+ f6 = "double"
+ f7 = "date"
+ f8 = "time"
+ f9 = "timestamp"
+ f10 = "timestamp"
+ f11 = "string"
+ f12 = "bytes"
+ f13 = "bytes"
+ f14 = "decimal(19,9)"
+ f15 = "array<int>"
+ f16 = "map<string, int>"
+ }
+ catalog_name = "seatunnel"
+ catalog_type = "hadoop"
+ warehouse = "file:///tmp/seatunnel/iceberg/flink/"
+ namespace = "database1"
+ table = "source"
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {
+ }
+ Assert {
+ rules =
+ {
+ field_rules = [
+ {
+ field_name = f1
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/log4j.properties
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
%c{1}: %m%n
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 24273a267..776b82fe9 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -35,6 +35,7 @@
<module>connector-fake-flink-e2e</module>
<module>connector-mongodb-flink-e2e</module>
<module>connector-iceberg-flink-e2e</module>
+ <module>connector-iceberg-hadoop3-flink-e2e</module>
</modules>
<dependencies>
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/pom.xml
similarity index 78%
copy from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
copy to
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/pom.xml
index 399a52f54..d1a06e6be 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/pom.xml
@@ -17,13 +17,17 @@
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
- <groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-iceberg-spark-e2e</artifactId>
+ <artifactId>connector-iceberg-hadoop3-spark-e2e</artifactId>
+
+ <properties>
+ <hadoop-client.version>3.3.4</hadoop-client.version>
+ </properties>
<dependencies>
<dependency>
@@ -40,20 +44,17 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-console</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
-
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-assert</artifactId>
- <version>${project.version}</version>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop-client.version}</version>
<scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ </exclusions>
</dependency>
</dependencies>
-
</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/hadoop3/IcebergSourceIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/hadoop3/IcebergSourceIT.java
new file mode 100644
index 000000000..2717e1fd3
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/hadoop3/IcebergSourceIT.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.spark.v2.iceberg.hadoop3;
+
+import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
+
+import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
+import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
+import org.apache.seatunnel.e2e.spark.SparkContainer;
+
+import lombok.extern.slf4j.Slf4j;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.iceberg.DataFile;
+import org.apache.iceberg.DataFiles;
+import org.apache.iceberg.FileFormat;
+import org.apache.iceberg.Files;
+import org.apache.iceberg.PartitionSpec;
+import org.apache.iceberg.Schema;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.catalog.Catalog;
+import org.apache.iceberg.catalog.Namespace;
+import org.apache.iceberg.catalog.TableIdentifier;
+import org.apache.iceberg.data.GenericAppenderFactory;
+import org.apache.iceberg.data.GenericRecord;
+import org.apache.iceberg.data.Record;
+import org.apache.iceberg.hadoop.HadoopInputFile;
+import org.apache.iceberg.io.FileAppender;
+import org.apache.iceberg.io.FileAppenderFactory;
+import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.testcontainers.containers.Container;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
+import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.OffsetDateTime;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+@Slf4j
+public class IcebergSourceIT extends SparkContainer {
+
+ private static final TableIdentifier TABLE = TableIdentifier.of(
+ Namespace.of("database1"), "source");
+ private static final Schema SCHEMA = new Schema(
+ Types.NestedField.optional(1, "f1", Types.LongType.get()),
+ Types.NestedField.optional(2, "f2", Types.BooleanType.get()),
+ Types.NestedField.optional(3, "f3", Types.IntegerType.get()),
+ Types.NestedField.optional(4, "f4", Types.LongType.get()),
+ Types.NestedField.optional(5, "f5", Types.FloatType.get()),
+ Types.NestedField.optional(6, "f6", Types.DoubleType.get()),
+ Types.NestedField.optional(7, "f7", Types.DateType.get()),
+ Types.NestedField.optional(9, "f9", Types.TimestampType.withZone()),
+ Types.NestedField.optional(10, "f10",
Types.TimestampType.withoutZone()),
+ Types.NestedField.optional(11, "f11", Types.StringType.get()),
+ Types.NestedField.optional(12, "f12", Types.FixedType.ofLength(10)),
+ Types.NestedField.optional(13, "f13", Types.BinaryType.get()),
+ Types.NestedField.optional(14, "f14", Types.DecimalType.of(19, 9)),
+ Types.NestedField.optional(15, "f15", Types.ListType.ofOptional(
+ 100, Types.IntegerType.get())),
+ Types.NestedField.optional(16, "f16", Types.MapType.ofOptional(
+ 200, 300, Types.StringType.get(), Types.IntegerType.get())),
+ Types.NestedField.optional(17, "f17", Types.StructType.of(
+ Types.NestedField.required(400, "f17_a", Types.StringType.get())))
+ );
+
+ private static final String CATALOG_NAME = "seatunnel";
+ private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
+ private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/spark/";
+ private static final String WAREHOUSE = "file://" + CATALOG_DIR;
+ private static Catalog CATALOG;
+
+ @BeforeEach
+ public void start() {
+ initializeIcebergTable();
+ batchInsertData();
+ MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
+ master.copyFileToContainer(catalogPath, CATALOG_DIR);
+ }
+
+ @Test
+ public void testIcebergSource() throws IOException, InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelSparkJob("/iceberg/iceberg_source.conf");
+ Assertions.assertEquals(0, execResult.getExitCode());
+ }
+
+ private void initializeIcebergTable() {
+ CATALOG = new IcebergCatalogFactory(CATALOG_NAME,
+ CATALOG_TYPE,
+ WAREHOUSE,
+ null)
+ .create();
+ if (!CATALOG.tableExists(TABLE)) {
+ CATALOG.createTable(TABLE, SCHEMA);
+ }
+ }
+
+ private void batchInsertData() {
+ GenericRecord record = GenericRecord.create(SCHEMA);
+ record.setField("f1", Long.valueOf(0));
+ record.setField("f2", true);
+ record.setField("f3", Integer.MAX_VALUE);
+ record.setField("f4", Long.MAX_VALUE);
+ record.setField("f5", Float.MAX_VALUE);
+ record.setField("f6", Double.MAX_VALUE);
+ record.setField("f7", LocalDate.now());
+ /*record.setField("f8", LocalTime.now());*/
+ record.setField("f9", OffsetDateTime.now());
+ record.setField("f10", LocalDateTime.now());
+ record.setField("f11", "test");
+ record.setField("f12", "abcdefghij".getBytes());
+ record.setField("f13", ByteBuffer.wrap("test".getBytes()));
+ record.setField("f14", new BigDecimal("1000000000.000000001"));
+ record.setField("f15", Arrays.asList(Integer.MAX_VALUE));
+ record.setField("f16", Collections.singletonMap("key",
Integer.MAX_VALUE));
+ Record structRecord =
GenericRecord.create(SCHEMA.findField("f17").type().asStructType());
+ structRecord.setField("f17_a", "test");
+ record.setField("f17", structRecord);
+
+ Table table = CATALOG.loadTable(TABLE);
+ FileAppenderFactory appenderFactory = new
GenericAppenderFactory(SCHEMA);
+ List<Record> records = new ArrayList<>();
+ for (int i = 0; i < 100; i++) {
+ records.add(record.copy("f1", Long.valueOf(i)));
+ if (i % 10 == 0) {
+ String externalFilePath = String.format(CATALOG_DIR +
"external_file/datafile_%s.avro", i);
+ FileAppender<Record> fileAppender =
appenderFactory.newAppender(
+ Files.localOutput(externalFilePath),
FileFormat.fromFileName(externalFilePath));
+ try (FileAppender<Record> fileAppenderCloseable =
fileAppender) {
+ fileAppenderCloseable.addAll(records);
+ records.clear();
+ } catch (IOException e) {
+ throw new RuntimeException(e);
+ }
+ DataFile datafile =
DataFiles.builder(PartitionSpec.unpartitioned())
+
.withInputFile(HadoopInputFile.fromLocation(externalFilePath, new
Configuration()))
+ .withMetrics(fileAppender.metrics())
+ .build();
+ table.newAppend().appendFile(datafile).commit();
+ }
+ }
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
new file mode 100644
index 000000000..edaaff916
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -0,0 +1,86 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+ job.mode = "BATCH"
+}
+
+source {
+ Iceberg {
+ fields {
+ f2 = "boolean"
+ f1 = "bigint"
+ f3 = "int"
+ f4 = "bigint"
+ f5 = "float"
+ f6 = "double"
+ f7 = "date"
+ f9 = "timestamp"
+ f10 = "timestamp"
+ f11 = "string"
+ f12 = "bytes"
+ f13 = "bytes"
+ f14 = "decimal(19,9)"
+ f15 = "array<int>"
+ f16 = "map<string, int>"
+ }
+ catalog_name = "seatunnel"
+ catalog_type = "hadoop"
+ warehouse = "file:///tmp/seatunnel/iceberg/spark/"
+ namespace = "database1"
+ table = "source"
+ }
+}
+
+transform {
+}
+
+sink {
+ Console {
+ }
+ Assert {
+ rules = {
+ field_rules = [
+ {
+ field_name = f1
+ field_type = long
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ },
+ {
+ rule_type = MIN
+ rule_value = 0
+ },
+ {
+ rule_type = MAX
+ rule_value = 99
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/log4j.properties
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/log4j.properties
new file mode 100644
index 000000000..db5d9e512
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/log4j.properties
@@ -0,0 +1,22 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+# Set everything to be logged to the console
+log4j.rootCategory=INFO, console
+log4j.appender.console=org.apache.log4j.ConsoleAppender
+log4j.appender.console.target=System.err
+log4j.appender.console.layout=org.apache.log4j.PatternLayout
+log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
%c{1}: %m%n
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
index 399a52f54..9be91c751 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
@@ -25,6 +25,10 @@
<artifactId>connector-iceberg-spark-e2e</artifactId>
+ <properties>
+ <hadoop-client.version>3.3.4</hadoop-client.version>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
@@ -54,6 +58,19 @@
<version>${project.version}</version>
<scope>test</scope>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-client</artifactId>
+ <version>${hadoop-client.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-reload4j</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 260b8ef6c..bbdd1f6a4 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -34,6 +34,7 @@
<module>connector-iotdb-spark-e2e</module>
<module>connector-jdbc-spark-e2e</module>
<module>connector-mongodb-spark-e2e</module>
+ <module>connector-iceberg-hadoop3-spark-e2e</module>
<module>connector-iceberg-spark-e2e</module>
</modules>