This is an automated email from the ASF dual-hosted git repository.
corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5b700a8d97 [Fix][paimon-e2e] e2e test error (#9467)
5b700a8d97 is described below
commit 5b700a8d978884af3a9963e5928a7c8228d957bc
Author: WenDing-Y <[email protected]>
AuthorDate: Sat Jun 28 22:57:09 2025 +0800
[Fix][paimon-e2e] e2e test error (#9467)
---
.../seatunnel/e2e/connector/paimon/PaimonIT.java | 22 ++++++++++++++++++++--
.../paimon/PaimonSinkDynamicBucketIT.java | 12 ++++++++++++
.../fake_to_dynamic_bucket_paimon_case1.conf | 6 +-----
.../src/test/resources/fake_to_paimon.conf | 4 ++--
.../resources/paimon_projection_to_assert.conf | 14 ++++++--------
.../src/test/resources/paimon_to_assert.conf | 14 ++++++--------
.../flink/AbstractTestFlinkContainer.java | 9 +++++++--
7 files changed, 54 insertions(+), 27 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
index a24a375c31..952ff106a1 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonIT.java
@@ -25,6 +25,12 @@ import
org.apache.seatunnel.e2e.common.junit.DisabledOnContainer;
import org.apache.seatunnel.e2e.common.junit.TestContainerExtension;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.apache.paimon.catalog.Catalog;
+import org.apache.paimon.catalog.CatalogContext;
+import org.apache.paimon.catalog.CatalogFactory;
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.options.Options;
+
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.TestTemplate;
import org.testcontainers.containers.Container;
@@ -44,8 +50,8 @@ public class PaimonIT extends TestSuiteBase {
Path schemaPath =
ContainerUtil.getResourcesFile("/schema-0.json").toPath();
container.copyFileToContainer(
MountableFile.forHostPath(schemaPath),
- "/tmp/paimon/default.db/st_test/schema/schema-0");
- container.execInContainer("chmod", "777", "-R", "/tmp/paimon");
+
"/opt/seatunnel_mounts/paimon/default.db/st_test/schema/schema-0");
+ container.execInContainer("chmod", "777", "-R",
"/opt/seatunnel_mounts/paimon");
};
@TestTemplate
@@ -58,5 +64,17 @@ public class PaimonIT extends TestSuiteBase {
Container.ExecResult readProjectionResult =
container.executeJob("/paimon_projection_to_assert.conf");
Assertions.assertEquals(0, readProjectionResult.getExitCode());
+ deleteTable();
+ }
+
+ private void deleteTable() {
+ Options options = new Options();
+ options.set("warehouse", "file://" + "/opt/seatunnel_mounts/paimon");
+ try {
+ CatalogFactory.createCatalog(CatalogContext.create(options))
+ .dropTable(Identifier.create("default", "st_test"), true);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
}
}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
index 1cb31b4f7f..b44da835ee 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/java/org/apache/seatunnel/e2e/connector/paimon/PaimonSinkDynamicBucketIT.java
@@ -130,6 +130,18 @@ public class PaimonSinkDynamicBucketIT extends
TestSuiteBase implements TestReso
Container.ExecResult readProjectionResult =
container.executeJob("/paimon_projection_to_assert.conf");
Assertions.assertEquals(0, readProjectionResult.getExitCode());
+ deleteTable();
+ }
+
+ private void deleteTable() {
+ Options options = new Options();
+ options.set("warehouse", "file://" + "/opt/seatunnel_mounts/paimon");
+ try {
+ CatalogFactory.createCatalog(CatalogContext.create(options))
+ .dropTable(Identifier.create("default", "st_test"), true);
+ } catch (Catalog.TableNotExistException e) {
+ throw new RuntimeException(e);
+ }
}
@TestTemplate
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
index 6005606782..de4e21e3ed 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_dynamic_bucket_paimon_case1.conf
@@ -42,10 +42,6 @@ source {
c_timestamp = timestamp
c_time = time
}
- primaryKey {
- name = "pk_id"
- columnNames = [pk_id]
- }
}
plugin_output = "fake"
}
@@ -53,7 +49,7 @@ source {
sink {
Paimon {
- warehouse = "file:///tmp/paimon"
+ warehouse = "file:///opt/seatunnel_mounts/paimon"
database = "default"
table = "st_test"
paimon.table.write-props = {
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
index 05ce7b3b93..e93a7a8653 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/fake_to_paimon.conf
@@ -55,8 +55,8 @@ source {
sink {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/opt/seatunnel_mounts/paimon"
database = "default"
table = "st_test"
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
index 7b553c4740..a45792848e 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_projection_to_assert.conf
@@ -29,7 +29,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/opt/seatunnel_mounts/paimon"
database = "default"
table = "st_test"
plugin_output = paimon_source
@@ -45,13 +45,11 @@ sink {
{
rule_type = MIN_ROW
rule_value = 100000
- }
- ],
- row_rules = [
+ },
{
- rule_type = MAX_ROW
- rule_value = 100000
- }
+ rule_type = MAX_ROW
+ rule_value = 100000
+ }
],
field_rules = [
{
@@ -75,4 +73,4 @@ sink {
]
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
index a2f6dc3954..d8d5c9f0d2 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-paimon-e2e/src/test/resources/paimon_to_assert.conf
@@ -29,7 +29,7 @@ env {
source {
Paimon {
- warehouse = "/tmp/paimon"
+ warehouse = "/opt/seatunnel_mounts/paimon"
database = "default"
table = "st_test"
plugin_output = paimon_source
@@ -44,13 +44,11 @@ sink {
{
rule_type = MIN_ROW
rule_value = 100000
- }
- ],
- row_rules = [
+ },
{
- rule_type = MAX_ROW
- rule_value = 100000
- }
+ rule_type = MAX_ROW
+ rule_value = 100000
+ }
],
field_rules = [
{
@@ -83,4 +81,4 @@ sink {
]
}
}
-}
\ No newline at end of file
+}
diff --git
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
index 1bb60902bd..1bfe8e8c3c 100644
---
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/common/container/flink/AbstractTestFlinkContainer.java
@@ -24,6 +24,7 @@ import
org.apache.seatunnel.e2e.common.container.ContainerExtendedFactory;
import org.apache.seatunnel.e2e.common.container.TestContainer;
import org.apache.seatunnel.e2e.common.util.ContainerUtil;
+import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.Container;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.output.Slf4jLogConsumer;
@@ -60,6 +61,8 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
protected static final String DEFAULT_DOCKER_IMAGE =
"flink:1.13.6-scala_2.11";
+ protected static final String MOUNTS_PATH = "/opt/seatunnel_mounts";
+
protected GenericContainer<?> jobManager;
protected GenericContainer<?> taskManager;
@@ -85,7 +88,8 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
.waitingFor(
new LogMessageWaitStrategy()
.withRegEx(".*Starting the resource
manager.*")
-
.withStartupTimeout(Duration.ofMinutes(2)));
+
.withStartupTimeout(Duration.ofMinutes(2)))
+ .withFileSystemBind(MOUNTS_PATH, MOUNTS_PATH,
BindMode.READ_WRITE);
copySeaTunnelStarterToContainer(jobManager);
copySeaTunnelStarterLoggingToContainer(jobManager);
@@ -106,7 +110,8 @@ public abstract class AbstractTestFlinkContainer extends
AbstractTestContainer {
new LogMessageWaitStrategy()
.withRegEx(
".*Successful registration at
resource manager.*")
-
.withStartupTimeout(Duration.ofMinutes(2)));
+
.withStartupTimeout(Duration.ofMinutes(2)))
+ .withFileSystemBind(MOUNTS_PATH, MOUNTS_PATH,
BindMode.READ_WRITE);
Startables.deepStart(Stream.of(jobManager)).join();
Startables.deepStart(Stream.of(taskManager)).join();