This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7490628c2 [Improve][Connector-V2][iceberg] Refactor iceberg connector
e2e test cases (#3820)
7490628c2 is described below
commit 7490628c290c3a81c8af58e5a914500f1033cacd
Author: Carl-Zhou-CN <[email protected]>
AuthorDate: Tue Jan 3 11:35:12 2023 +0800
[Improve][Connector-V2][iceberg] Refactor iceberg connector e2e test cases
(#3820)
* [Improve][Connector-V2][ioTdb] Refactor iceberg connector e2e test cases
* [Improve][Connector-V2][ioTdb] Refactor iceberg connector e2e test cases
* [Improve][Connector-V2][ioTdb] Refactor iceberg connector e2e test cases
* [Improve][Connector-V2][iceberg] Refactor iceberg connector e2e test cases
* [Improve][Connector-V2][iceberg] Refactor iceberg connector e2e test cases
Co-authored-by: zhouyao <[email protected]>
---
.../connector-iceberg-e2e}/pom.xml | 16 +-
.../e2e/connector/iceberg}/IcebergSourceIT.java | 38 +++--
.../src/test/resources/iceberg/iceberg_source.conf | 3 +-
.../connector-iceberg-hadoop3-e2e}/pom.xml | 20 +--
.../iceberg/hadoop3}/IcebergSourceIT.java | 38 +++--
.../src/test/resources/iceberg/iceberg_source.conf | 2 +-
.../src/test/resources/log4j.properties | 0
seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml | 2 +
.../src/test/resources/iceberg/iceberg_source.conf | 85 -----------
.../connector-iceberg-hadoop3-flink-e2e/pom.xml | 60 --------
.../seatunnel-flink-connector-v2-e2e/pom.xml | 2 -
.../connector-iceberg-hadoop3-spark-e2e/pom.xml | 60 --------
.../spark/v2/iceberg/hadoop3/IcebergSourceIT.java | 165 --------------------
.../src/test/resources/log4j.properties | 22 ---
.../e2e/spark/v2/iceberg/IcebergSourceIT.java | 166 ---------------------
.../src/test/resources/iceberg/iceberg_source.conf | 88 -----------
.../seatunnel-spark-connector-v2-e2e/pom.xml | 2 -
17 files changed, 70 insertions(+), 699 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/pom.xml
similarity index 82%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
rename to seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/pom.xml
index 35bd4733c..87c61bb29 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/pom.xml
@@ -18,25 +18,29 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-iceberg-flink-e2e</artifactId>
+ <artifactId>connector-iceberg-e2e</artifactId>
<dependencies>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-flink-e2e-base</artifactId>
+ <artifactId>connector-iceberg</artifactId>
<version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-iceberg</artifactId>
+ <artifactId>connector-assert</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-console</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/icegerg/hadoop3/IcebergSourceIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
similarity index 86%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/icegerg/hadoop3/IcebergSourceIT.java
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
index 6354b209a..1aa116e53 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/icegerg/hadoop3/IcebergSourceIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/IcebergSourceIT.java
@@ -15,13 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.flink.v2.icegerg.hadoop3;
+package org.apache.seatunnel.e2e.connector.iceberg;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
@@ -42,9 +46,10 @@ import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.utility.MountableFile;
@@ -61,7 +66,7 @@ import java.util.Collections;
import java.util.List;
@Slf4j
-public class IcebergSourceIT extends FlinkContainer {
+public class IcebergSourceIT extends TestSuiteBase implements TestResource {
private static final TableIdentifier TABLE = TableIdentifier.of(
Namespace.of("database1"), "source");
@@ -90,22 +95,31 @@ public class IcebergSourceIT extends FlinkContainer {
private static final String CATALOG_NAME = "seatunnel";
private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
- private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/flink/";
+ private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/hadoop/";
private static final String WAREHOUSE = "file://" + CATALOG_DIR;
private static Catalog CATALOG;
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory = container -> {
+ container.copyFileToContainer(MountableFile.forHostPath(CATALOG_DIR),
CATALOG_DIR);
+ };
+
@BeforeEach
- public void start() {
+ @Override
+ public void startUp() throws Exception {
initializeIcebergTable();
batchInsertData();
- MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
- jobManager.copyFileToContainer(catalogPath, CATALOG_DIR);
- taskManager.copyFileToContainer(catalogPath, CATALOG_DIR);
}
- @Test
- public void testIcebergSource() throws IOException, InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/iceberg/iceberg_source.conf");
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+
+ }
+
+ @TestTemplate
+ public void testIcebergSource(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/iceberg/iceberg_source.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
similarity index 96%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
index dd54abe34..1e2372205 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -34,7 +34,6 @@ source {
f5 = "float"
f6 = "double"
f7 = "date"
- f8 = "time"
f9 = "timestamp"
f10 = "timestamp"
f11 = "string"
@@ -46,7 +45,7 @@ source {
}
catalog_name = "seatunnel"
catalog_type = "hadoop"
- warehouse = "file:///tmp/seatunnel/iceberg/flink/"
+ warehouse = "file:///tmp/seatunnel/iceberg/hadoop/"
namespace = "database1"
table = "source"
}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/pom.xml
similarity index 84%
rename from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/pom.xml
index 9be91c751..ed59f5f83 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/pom.xml
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/pom.xml
@@ -18,47 +18,36 @@
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
+ <artifactId>seatunnel-connector-v2-e2e</artifactId>
<version>${revision}</version>
</parent>
<modelVersion>4.0.0</modelVersion>
- <artifactId>connector-iceberg-spark-e2e</artifactId>
+ <artifactId>connector-iceberg-hadoop3-e2e</artifactId>
<properties>
<hadoop-client.version>3.3.4</hadoop-client.version>
</properties>
<dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-spark-e2e-base</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-iceberg</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-console</artifactId>
+ <artifactId>connector-assert</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-assert</artifactId>
+ <artifactId>connector-console</artifactId>
<version>${project.version}</version>
<scope>test</scope>
</dependency>
-
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
@@ -72,5 +61,4 @@
</exclusions>
</dependency>
</dependencies>
-
</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iceberg/IcebergSourceIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java
similarity index 86%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iceberg/IcebergSourceIT.java
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java
index ccdb123a1..c4ee74c42 100644
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/v2/iceberg/IcebergSourceIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/java/org/apache/seatunnel/e2e/connector/iceberg/hadoop3/IcebergSourceIT.java
@@ -15,13 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.e2e.flink.v2.iceberg;
+package org.apache.seatunnel.e2e.connector.iceberg.hadoop3;
import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
-import org.apache.seatunnel.e2e.flink.FlinkContainer;
+import org.apache.seatunnel.e2e.common.TestResource;
+import org.apache.seatunnel.e2e.common.TestSuiteBase;
+import org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
+import org.apache.seatunnel.e2e.common.container.TestContainer;
+import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import lombok.extern.slf4j.Slf4j;
import org.apache.hadoop.conf.Configuration;
@@ -42,9 +46,10 @@ import org.apache.iceberg.hadoop.HadoopInputFile;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.io.FileAppenderFactory;
import org.apache.iceberg.types.Types;
+import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
import org.testcontainers.utility.MountableFile;
@@ -61,7 +66,7 @@ import java.util.Collections;
import java.util.List;
@Slf4j
-public class IcebergSourceIT extends FlinkContainer {
+public class IcebergSourceIT extends TestSuiteBase implements TestResource {
private static final TableIdentifier TABLE = TableIdentifier.of(
Namespace.of("database1"), "source");
@@ -90,22 +95,31 @@ public class IcebergSourceIT extends FlinkContainer {
private static final String CATALOG_NAME = "seatunnel";
private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
- private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/flink/";
+ private static final String CATALOG_DIR =
"/tmp/seatunnel/iceberg/hadoop3/";
private static final String WAREHOUSE = "file://" + CATALOG_DIR;
private static Catalog CATALOG;
+ @TestContainerExtension
+ private final ContainerExtendedFactory extendedFactory = container -> {
+ container.copyFileToContainer(MountableFile.forHostPath(CATALOG_DIR),
CATALOG_DIR);
+ };
+
@BeforeEach
- public void start() {
+ @Override
+ public void startUp() throws Exception {
initializeIcebergTable();
batchInsertData();
- MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
- jobManager.copyFileToContainer(catalogPath, CATALOG_DIR);
- taskManager.copyFileToContainer(catalogPath, CATALOG_DIR);
}
- @Test
- public void testIcebergSource() throws IOException, InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelFlinkJob("/iceberg/iceberg_source.conf");
+ @AfterAll
+ @Override
+ public void tearDown() throws Exception {
+
+ }
+
+ @TestTemplate
+ public void testIcebergSource(TestContainer container) throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
container.executeJob("/iceberg/iceberg_source.conf");
Assertions.assertEquals(0, execResult.getExitCode(),
execResult.getStderr());
}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
similarity index 97%
rename from
seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
index edaaff916..cd541d8a2 100644
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/iceberg/iceberg_source.conf
@@ -48,7 +48,7 @@ source {
}
catalog_name = "seatunnel"
catalog_type = "hadoop"
- warehouse = "file:///tmp/seatunnel/iceberg/spark/"
+ warehouse = "file:///tmp/seatunnel/iceberg/hadoop3/"
namespace = "database1"
table = "source"
}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/log4j.properties
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/log4j.properties
similarity index 100%
rename from
seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/src/test/resources/log4j.properties
rename to
seatunnel-e2e/seatunnel-connector-v2-e2e/connector-iceberg-hadoop3-e2e/src/test/resources/log4j.properties
diff --git a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
index 07f21f36d..2f11b69ce 100644
--- a/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-connector-v2-e2e/pom.xml
@@ -43,6 +43,8 @@
<module>connector-elasticsearch-e2e</module>
<module>connector-iotdb-e2e</module>
<module>connector-cdc-mysql-e2e</module>
+ <module>connector-iceberg-e2e</module>
+ <module>connector-iceberg-hadoop3-e2e</module>
</modules>
<artifactId>seatunnel-connector-v2-e2e</artifactId>
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
deleted file mode 100644
index dd54abe34..000000000
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-flink-e2e/src/test/resources/iceberg/iceberg_source.conf
+++ /dev/null
@@ -1,85 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- execution.parallelism = 2
- job.mode = "BATCH"
- execution.checkpoint.interval = 10
-}
-
-source {
- Iceberg {
- fields {
- f2 = "boolean"
- f1 = "bigint"
- f3 = "int"
- f4 = "bigint"
- f5 = "float"
- f6 = "double"
- f7 = "date"
- f8 = "time"
- f9 = "timestamp"
- f10 = "timestamp"
- f11 = "string"
- f12 = "bytes"
- f13 = "bytes"
- f14 = "decimal(19,9)"
- f15 = "array<int>"
- f16 = "map<string, int>"
- }
- catalog_name = "seatunnel"
- catalog_type = "hadoop"
- warehouse = "file:///tmp/seatunnel/iceberg/flink/"
- namespace = "database1"
- table = "source"
- }
-}
-
-transform {
-}
-
-sink {
- Console {
- }
- Assert {
- rules =
- {
- field_rules = [
- {
- field_name = f1
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/pom.xml
deleted file mode 100644
index e9838128e..000000000
---
a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/connector-iceberg-hadoop3-flink-e2e/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>seatunnel-flink-connector-v2-e2e</artifactId>
- <groupId>org.apache.seatunnel</groupId>
- <version>${revision}</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>connector-iceberg-hadoop3-flink-e2e</artifactId>
-
- <properties>
- <hadoop-client.version>3.3.4</hadoop-client.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-flink-e2e-base</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-iceberg</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop-client.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-reload4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
index 589930ee2..42ae59143 100644
--- a/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-flink-connector-v2-e2e/pom.xml
@@ -31,8 +31,6 @@
<module>connector-jdbc-flink-e2e</module>
<module>connector-datahub-flink-e2e</module>
<module>connector-mongodb-flink-e2e</module>
- <module>connector-iceberg-flink-e2e</module>
- <module>connector-iceberg-hadoop3-flink-e2e</module>
</modules>
<dependencies>
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/pom.xml
deleted file mode 100644
index d1a06e6be..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/pom.xml
+++ /dev/null
@@ -1,60 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
- http://www.apache.org/licenses/LICENSE-2.0
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-<project xmlns="http://maven.apache.org/POM/4.0.0"
- xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
- xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
- <parent>
- <artifactId>seatunnel-spark-connector-v2-e2e</artifactId>
- <groupId>org.apache.seatunnel</groupId>
- <version>${revision}</version>
- </parent>
- <modelVersion>4.0.0</modelVersion>
-
- <artifactId>connector-iceberg-hadoop3-spark-e2e</artifactId>
-
- <properties>
- <hadoop-client.version>3.3.4</hadoop-client.version>
- </properties>
-
- <dependencies>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-spark-e2e-base</artifactId>
- <version>${project.version}</version>
- <classifier>tests</classifier>
- <type>test-jar</type>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.seatunnel</groupId>
- <artifactId>connector-iceberg</artifactId>
- <version>${project.version}</version>
- <scope>test</scope>
- </dependency>
- <dependency>
- <groupId>org.apache.hadoop</groupId>
- <artifactId>hadoop-client</artifactId>
- <version>${hadoop-client.version}</version>
- <scope>test</scope>
- <exclusions>
- <exclusion>
- <groupId>org.slf4j</groupId>
- <artifactId>slf4j-reload4j</artifactId>
- </exclusion>
- </exclusions>
- </dependency>
- </dependencies>
-</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/hadoop3/IcebergSourceIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/hadoop3/IcebergSourceIT.java
deleted file mode 100644
index 2717e1fd3..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/hadoop3/IcebergSourceIT.java
+++ /dev/null
@@ -1,165 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.iceberg.hadoop3;
-
-import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
-
-import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
-import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
-import org.apache.seatunnel.e2e.spark.SparkContainer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.data.GenericAppenderFactory;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.hadoop.HadoopInputFile;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.FileAppenderFactory;
-import org.apache.iceberg.types.Types;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.OffsetDateTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-@Slf4j
-public class IcebergSourceIT extends SparkContainer {
-
- private static final TableIdentifier TABLE = TableIdentifier.of(
- Namespace.of("database1"), "source");
- private static final Schema SCHEMA = new Schema(
- Types.NestedField.optional(1, "f1", Types.LongType.get()),
- Types.NestedField.optional(2, "f2", Types.BooleanType.get()),
- Types.NestedField.optional(3, "f3", Types.IntegerType.get()),
- Types.NestedField.optional(4, "f4", Types.LongType.get()),
- Types.NestedField.optional(5, "f5", Types.FloatType.get()),
- Types.NestedField.optional(6, "f6", Types.DoubleType.get()),
- Types.NestedField.optional(7, "f7", Types.DateType.get()),
- Types.NestedField.optional(9, "f9", Types.TimestampType.withZone()),
- Types.NestedField.optional(10, "f10",
Types.TimestampType.withoutZone()),
- Types.NestedField.optional(11, "f11", Types.StringType.get()),
- Types.NestedField.optional(12, "f12", Types.FixedType.ofLength(10)),
- Types.NestedField.optional(13, "f13", Types.BinaryType.get()),
- Types.NestedField.optional(14, "f14", Types.DecimalType.of(19, 9)),
- Types.NestedField.optional(15, "f15", Types.ListType.ofOptional(
- 100, Types.IntegerType.get())),
- Types.NestedField.optional(16, "f16", Types.MapType.ofOptional(
- 200, 300, Types.StringType.get(), Types.IntegerType.get())),
- Types.NestedField.optional(17, "f17", Types.StructType.of(
- Types.NestedField.required(400, "f17_a", Types.StringType.get())))
- );
-
- private static final String CATALOG_NAME = "seatunnel";
- private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
- private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/spark/";
- private static final String WAREHOUSE = "file://" + CATALOG_DIR;
- private static Catalog CATALOG;
-
- @BeforeEach
- public void start() {
- initializeIcebergTable();
- batchInsertData();
- MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
- master.copyFileToContainer(catalogPath, CATALOG_DIR);
- }
-
- @Test
- public void testIcebergSource() throws IOException, InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/iceberg/iceberg_source.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- }
-
- private void initializeIcebergTable() {
- CATALOG = new IcebergCatalogFactory(CATALOG_NAME,
- CATALOG_TYPE,
- WAREHOUSE,
- null)
- .create();
- if (!CATALOG.tableExists(TABLE)) {
- CATALOG.createTable(TABLE, SCHEMA);
- }
- }
-
- private void batchInsertData() {
- GenericRecord record = GenericRecord.create(SCHEMA);
- record.setField("f1", Long.valueOf(0));
- record.setField("f2", true);
- record.setField("f3", Integer.MAX_VALUE);
- record.setField("f4", Long.MAX_VALUE);
- record.setField("f5", Float.MAX_VALUE);
- record.setField("f6", Double.MAX_VALUE);
- record.setField("f7", LocalDate.now());
- /*record.setField("f8", LocalTime.now());*/
- record.setField("f9", OffsetDateTime.now());
- record.setField("f10", LocalDateTime.now());
- record.setField("f11", "test");
- record.setField("f12", "abcdefghij".getBytes());
- record.setField("f13", ByteBuffer.wrap("test".getBytes()));
- record.setField("f14", new BigDecimal("1000000000.000000001"));
- record.setField("f15", Arrays.asList(Integer.MAX_VALUE));
- record.setField("f16", Collections.singletonMap("key",
Integer.MAX_VALUE));
- Record structRecord =
GenericRecord.create(SCHEMA.findField("f17").type().asStructType());
- structRecord.setField("f17_a", "test");
- record.setField("f17", structRecord);
-
- Table table = CATALOG.loadTable(TABLE);
- FileAppenderFactory appenderFactory = new
GenericAppenderFactory(SCHEMA);
- List<Record> records = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- records.add(record.copy("f1", Long.valueOf(i)));
- if (i % 10 == 0) {
- String externalFilePath = String.format(CATALOG_DIR +
"external_file/datafile_%s.avro", i);
- FileAppender<Record> fileAppender =
appenderFactory.newAppender(
- Files.localOutput(externalFilePath),
FileFormat.fromFileName(externalFilePath));
- try (FileAppender<Record> fileAppenderCloseable =
fileAppender) {
- fileAppenderCloseable.addAll(records);
- records.clear();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- DataFile datafile =
DataFiles.builder(PartitionSpec.unpartitioned())
-
.withInputFile(HadoopInputFile.fromLocation(externalFilePath, new
Configuration()))
- .withMetrics(fileAppender.metrics())
- .build();
- table.newAppend().appendFile(datafile).commit();
- }
- }
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/log4j.properties
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/log4j.properties
deleted file mode 100644
index db5d9e512..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-hadoop3-spark-e2e/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,22 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-# Set everything to be logged to the console
-log4j.rootCategory=INFO, console
-log4j.appender.console=org.apache.log4j.ConsoleAppender
-log4j.appender.console.target=System.err
-log4j.appender.console.layout=org.apache.log4j.PatternLayout
-log4j.appender.console.layout.ConversionPattern=%d{yy/MM/dd HH:mm:ss} %p
%c{1}: %m%n
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/IcebergSourceIT.java
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/IcebergSourceIT.java
deleted file mode 100644
index 8f813507f..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/v2/iceberg/IcebergSourceIT.java
+++ /dev/null
@@ -1,166 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.e2e.spark.v2.iceberg;
-
-import static
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType.HADOOP;
-
-import org.apache.seatunnel.connectors.seatunnel.iceberg.IcebergCatalogFactory;
-import
org.apache.seatunnel.connectors.seatunnel.iceberg.config.IcebergCatalogType;
-import org.apache.seatunnel.e2e.spark.SparkContainer;
-
-import lombok.extern.slf4j.Slf4j;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.iceberg.DataFile;
-import org.apache.iceberg.DataFiles;
-import org.apache.iceberg.FileFormat;
-import org.apache.iceberg.Files;
-import org.apache.iceberg.PartitionSpec;
-import org.apache.iceberg.Schema;
-import org.apache.iceberg.Table;
-import org.apache.iceberg.catalog.Catalog;
-import org.apache.iceberg.catalog.Namespace;
-import org.apache.iceberg.catalog.TableIdentifier;
-import org.apache.iceberg.data.GenericAppenderFactory;
-import org.apache.iceberg.data.GenericRecord;
-import org.apache.iceberg.data.Record;
-import org.apache.iceberg.hadoop.HadoopInputFile;
-import org.apache.iceberg.io.FileAppender;
-import org.apache.iceberg.io.FileAppenderFactory;
-import org.apache.iceberg.types.Types;
-import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.Container;
-import org.testcontainers.utility.MountableFile;
-
-import java.io.IOException;
-import java.math.BigDecimal;
-import java.nio.ByteBuffer;
-import java.time.LocalDate;
-import java.time.LocalDateTime;
-import java.time.OffsetDateTime;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.List;
-
-@Slf4j
-public class IcebergSourceIT extends SparkContainer {
-
- private static final TableIdentifier TABLE = TableIdentifier.of(
- Namespace.of("database1"), "source");
- private static final Schema SCHEMA = new Schema(
- Types.NestedField.optional(1, "f1", Types.LongType.get()),
- Types.NestedField.optional(2, "f2", Types.BooleanType.get()),
- Types.NestedField.optional(3, "f3", Types.IntegerType.get()),
- Types.NestedField.optional(4, "f4", Types.LongType.get()),
- Types.NestedField.optional(5, "f5", Types.FloatType.get()),
- Types.NestedField.optional(6, "f6", Types.DoubleType.get()),
- Types.NestedField.optional(7, "f7", Types.DateType.get()),
- // Types.NestedField.optional(8, "f8", Types.TimeType.get()),
- Types.NestedField.optional(9, "f9", Types.TimestampType.withZone()),
- Types.NestedField.optional(10, "f10",
Types.TimestampType.withoutZone()),
- Types.NestedField.optional(11, "f11", Types.StringType.get()),
- Types.NestedField.optional(12, "f12", Types.FixedType.ofLength(10)),
- Types.NestedField.optional(13, "f13", Types.BinaryType.get()),
- Types.NestedField.optional(14, "f14", Types.DecimalType.of(19, 9)),
- Types.NestedField.optional(15, "f15", Types.ListType.ofOptional(
- 100, Types.IntegerType.get())),
- Types.NestedField.optional(16, "f16", Types.MapType.ofOptional(
- 200, 300, Types.StringType.get(), Types.IntegerType.get())),
- Types.NestedField.optional(17, "f17", Types.StructType.of(
- Types.NestedField.required(400, "f17_a", Types.StringType.get())))
- );
-
- private static final String CATALOG_NAME = "seatunnel";
- private static final IcebergCatalogType CATALOG_TYPE = HADOOP;
- private static final String CATALOG_DIR = "/tmp/seatunnel/iceberg/spark/";
- private static final String WAREHOUSE = "file://" + CATALOG_DIR;
- private static Catalog CATALOG;
-
- @BeforeEach
- public void start() {
- initializeIcebergTable();
- batchInsertData();
- MountableFile catalogPath = MountableFile.forHostPath(CATALOG_DIR);
- master.copyFileToContainer(catalogPath, CATALOG_DIR);
- }
-
- @Test
- public void testIcebergSource() throws IOException, InterruptedException {
- Container.ExecResult execResult =
executeSeaTunnelSparkJob("/iceberg/iceberg_source.conf");
- Assertions.assertEquals(0, execResult.getExitCode());
- }
-
- private void initializeIcebergTable() {
- CATALOG = new IcebergCatalogFactory(CATALOG_NAME,
- CATALOG_TYPE,
- WAREHOUSE,
- null)
- .create();
- if (!CATALOG.tableExists(TABLE)) {
- CATALOG.createTable(TABLE, SCHEMA);
- }
- }
-
- private void batchInsertData() {
- GenericRecord record = GenericRecord.create(SCHEMA);
- record.setField("f1", Long.valueOf(0));
- record.setField("f2", true);
- record.setField("f3", Integer.MAX_VALUE);
- record.setField("f4", Long.MAX_VALUE);
- record.setField("f5", Float.MAX_VALUE);
- record.setField("f6", Double.MAX_VALUE);
- record.setField("f7", LocalDate.now());
- // record.setField("f8", LocalTime.now());
- record.setField("f9", OffsetDateTime.now());
- record.setField("f10", LocalDateTime.now());
- record.setField("f11", "test");
- record.setField("f12", "abcdefghij".getBytes());
- record.setField("f13", ByteBuffer.wrap("test".getBytes()));
- record.setField("f14", new BigDecimal("1000000000.000000001"));
- record.setField("f15", Arrays.asList(Integer.MAX_VALUE));
- record.setField("f16", Collections.singletonMap("key",
Integer.MAX_VALUE));
- Record structRecord =
GenericRecord.create(SCHEMA.findField("f17").type().asStructType());
- structRecord.setField("f17_a", "test");
- record.setField("f17", structRecord);
-
- Table table = CATALOG.loadTable(TABLE);
- FileAppenderFactory appenderFactory = new
GenericAppenderFactory(SCHEMA);
- List<Record> records = new ArrayList<>();
- for (int i = 0; i < 100; i++) {
- records.add(record.copy("f1", Long.valueOf(i)));
- if (i % 10 == 0) {
- String externalFilePath = String.format(CATALOG_DIR +
"external_file/datafile_%s.avro", i);
- FileAppender<Record> fileAppender =
appenderFactory.newAppender(
- Files.localOutput(externalFilePath),
FileFormat.fromFileName(externalFilePath));
- try (FileAppender<Record> fileAppenderCloseable =
fileAppender) {
- fileAppenderCloseable.addAll(records);
- records.clear();
- } catch (IOException e) {
- throw new RuntimeException(e);
- }
- DataFile datafile =
DataFiles.builder(PartitionSpec.unpartitioned())
-
.withInputFile(HadoopInputFile.fromLocation(externalFilePath, new
Configuration()))
- .withMetrics(fileAppender.metrics())
- .build();
- table.newAppend().appendFile(datafile).commit();
- }
- }
- }
-}
diff --git
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
deleted file mode 100644
index 0582fd784..000000000
---
a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/connector-iceberg-spark-e2e/src/test/resources/iceberg/iceberg_source.conf
+++ /dev/null
@@ -1,88 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-######
-###### This config file is a demonstration of streaming processing in
seatunnel config
-######
-
-env {
- job.name = "SeaTunnel"
- spark.executor.instances = 2
- spark.executor.cores = 1
- spark.executor.memory = "1g"
- spark.master = local
- job.mode = "BATCH"
-}
-
-source {
- Iceberg {
- fields {
- f2 = "boolean"
- f1 = "bigint"
- f3 = "int"
- f4 = "bigint"
- f5 = "float"
- f6 = "double"
- f7 = "date"
- // Spark not support TIME
-// f8 = "time"
- f9 = "timestamp"
- f10 = "timestamp"
- f11 = "string"
- f12 = "bytes"
- f13 = "bytes"
- f14 = "decimal(19,9)"
- f15 = "array<int>"
- f16 = "map<string, int>"
- }
- catalog_name = "seatunnel"
- catalog_type = "hadoop"
- warehouse = "file:///tmp/seatunnel/iceberg/spark/"
- namespace = "database1"
- table = "source"
- }
-}
-
-transform {
-}
-
-sink {
- Console {
- }
- Assert {
- rules = {
- field_rules = [
- {
- field_name = f1
- field_type = long
- field_value = [
- {
- rule_type = NOT_NULL
- },
- {
- rule_type = MIN
- rule_value = 0
- },
- {
- rule_type = MAX
- rule_value = 99
- }
- ]
- }
- ]
- }
- }
-}
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
index 1e4d9ef16..32e46c8dc 100644
--- a/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
+++ b/seatunnel-e2e/seatunnel-spark-connector-v2-e2e/pom.xml
@@ -31,8 +31,6 @@
<module>connector-datahub-spark-e2e</module>
<module>connector-jdbc-spark-e2e</module>
<module>connector-mongodb-spark-e2e</module>
- <module>connector-iceberg-hadoop3-spark-e2e</module>
- <module>connector-iceberg-spark-e2e</module>
</modules>
<dependencies>