This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 39a09d9 [Feature][e2e] Add Flink E2E (#1536)
39a09d9 is described below
commit 39a09d91824a517aa0c366d6369375565772ea2a
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Mar 24 12:41:08 2022 +0800
[Feature][e2e] Add Flink E2E (#1536)
* init module
* Exclude jackson
* Add skipIT to skip Integration test in defaule
---
.github/workflows/backend.yml | 3 +
pom.xml | 46 +++++++
seatunnel-e2e/pom.xml | 33 +++++
seatunnel-e2e/seatunnel-flink-e2e/pom.xml | 42 +++++++
.../apache/seatunnel/e2e/flink/FlinkContainer.java | 134 +++++++++++++++++++++
.../e2e/flink/fake/FakeSourceToConsoleIT.java | 35 ++++++
.../e2e/flink/file/FakeSourceToFileIT.java | 37 ++++++
.../test/resources/fake/fakesource_to_console.conf | 8 +-
.../test/resources/file/fakesource_to_file.conf | 10 +-
.../main/resources/examples/fake_to_console.conf | 3 +-
10 files changed, 341 insertions(+), 10 deletions(-)
diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 2770f11..7eebf1a 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -77,3 +77,6 @@ jobs:
- name: Run Unit tests
run: |
./mvnw -T 2C -B clean verify -Dmaven.test.skip=false -Dgpg.skip=true
--no-snapshot-updates
+ - name: Run Integration tests
+ run: |
+ ./mvnw -T 2C -B clean verify -DskipUT=true -DskipIT=false
-Dgpg.skip=true --no-snapshot-updates
diff --git a/pom.xml b/pom.xml
index 2eb978c..1ade5e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
<module>seatunnel-connectors</module>
<module>seatunnel-dist</module>
<module>seatunnel-examples</module>
+ <module>seatunnel-e2e</module>
</modules>
<properties>
@@ -105,6 +106,7 @@
<maven.deploy.skip>false</maven.deploy.skip>
<maven.javadoc.skip>false</maven.javadoc.skip>
<maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
+ <maven-failsafe-plugin.version>2.22.2</maven-failsafe-plugin.version>
<maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>
<checkstyle.fails.on.error>true</checkstyle.fails.on.error>
<nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version>
@@ -152,6 +154,9 @@
<docker-maven-plugin.version>0.38.0</docker-maven-plugin.version>
<p3c-pmd.version>1.3.0</p3c-pmd.version>
<maven-scm-provider-jgit.version>1.9.5</maven-scm-provider-jgit.version>
+ <testcontainer.version>1.16.3</testcontainer.version>
+ <skipUT>false</skipUT>
+ <skipIT>true</skipIT>
</properties>
<dependencyManagement>
@@ -468,6 +473,22 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ <version>${testcontainer.version}</version>
+ <scope>test</scope>
+ <exclusions>
+ <exclusion>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-annotations</artifactId>
+ </exclusion>
+ <exclusion>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
<dependency>
<groupId>com.pingcap.tispark</groupId>
@@ -582,10 +603,30 @@
<artifactId>maven-surefire-plugin</artifactId>
<version>${maven-surefire-plugin.version}</version>
<configuration>
+ <skip>${skipUT}</skip>
<systemPropertyVariables>
<jacoco-agent.destfile>${project.build.directory}/jacoco.exec</jacoco-agent.destfile>
</systemPropertyVariables>
+ <excludes>
+ <exclude>**/*IT.java</exclude>
+ </excludes>
+ </configuration>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ <version>${maven-failsafe-plugin.version}</version>
+ <configuration>
+ <skip>${skipIT}</skip>
</configuration>
+ <executions>
+ <execution>
+ <goals>
+ <goal>integration-test</goal>
+ <goal>verify</goal>
+ </goals>
+ </execution>
+ </executions>
</plugin>
<plugin>
@@ -845,6 +886,11 @@
<plugin>
<groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-failsafe-plugin</artifactId>
+ </plugin>
+
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
</plugin>
diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml
new file mode 100644
index 0000000..1b6bd0d
--- /dev/null
+++ b/seatunnel-e2e/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>2.0.5-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-e2e</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>seatunnel-flink-e2e</module>
+ </modules>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
new file mode 100644
index 0000000..d255618
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-e2e</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>2.0.5-SNAPSHOT</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>seatunnel-flink-e2e</artifactId>
+ <packaging>jar</packaging>
+
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-core-flink</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.testcontainers</groupId>
+ <artifactId>testcontainers</artifactId>
+ </dependency>
+ </dependencies>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
new file mode 100644
index 0000000..d6c2099
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink;
+
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close
the Flink cluster.
+ * You can use {@link FlinkContainer#executeSeaTunnelFLinkJob} to submit a
seatunnel config and run a seatunnel job.
+ */
+public abstract class FlinkContainer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(FlinkContainer.class);
+
+ private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
+ public static final Network NETWORK = Network.newNetwork();
+
+ protected GenericContainer<?> jobManager;
+ protected GenericContainer<?> taskManager;
+ private static final String SEATUNNEL_FLINK_JAR =
"seatunnel-core-flink.jar";
+ private static final String FLINK_JAR_PATH = Paths.get("/tmp",
SEATUNNEL_FLINK_JAR).toString();
+
+ private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
+
+ private static final String FLINK_PROPERTIES = String.join(
+ "\n",
+ Arrays.asList(
+ "jobmanager.rpc.address: jobmanager",
+ "taskmanager.numberOfTaskSlots: 10",
+ "parallelism.default: 4",
+ "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
+
+ @Before
+ public void before() {
+ jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
+ .withCommand("jobmanager")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("jobmanager")
+ .withExposedPorts()
+ .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ taskManager =
+ new GenericContainer<>(FLINK_DOCKER_IMAGE)
+ .withCommand("taskmanager")
+ .withNetwork(NETWORK)
+ .withNetworkAliases("taskmanager")
+ .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+ .dependsOn(jobManager)
+ .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+ Startables.deepStart(Stream.of(jobManager)).join();
+ Startables.deepStart(Stream.of(taskManager)).join();
+ copySeatunnelFlinkCoreJar();
+ LOG.info("Containers are started.");
+ }
+
+ @After
+ public void close() {
+ if (taskManager != null) {
+ taskManager.stop();
+ }
+ if (jobManager != null) {
+ jobManager.stop();
+ }
+ }
+
+ public Container.ExecResult executeSeaTunnelFLinkJob(String confFile)
throws IOException, InterruptedException {
+ final String confPath = getResource(confFile);
+ if (!new File(confPath).exists()) {
+ throw new IllegalArgumentException(confFile + " doesn't exist");
+ }
+ final String targetConfInContainer = Paths.get("/tmp",
confFile).toString();
+ jobManager.copyFileToContainer(MountableFile.forHostPath(confPath),
targetConfInContainer);
+
+ final List<String> command = new ArrayList<>();
+ command.add("flink");
+ command.add("run");
+ command.add("-c org.apache.seatunnel.SeatunnelFlink " +
FLINK_JAR_PATH);
+ command.add("--config " + targetConfInContainer);
+
+ Container.ExecResult execResult = jobManager.execInContainer("bash",
"-c", String.join(" ", command));
+ LOG.info(execResult.getStdout());
+ LOG.error(execResult.getStderr());
+ // wait job start
+ Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
+ return execResult;
+ }
+
+ protected void copySeatunnelFlinkCoreJar() {
+ String currentModuleHome = System.getProperty("user.dir");
+ String seatunnelCoreFlinkJarPath =
currentModuleHome.replace("/seatunnel-e2e/seatunnel-flink-e2e", "")
+ +
"/seatunnel-core/seatunnel-core-flink/target/seatunnel-core-flink.jar";
+
jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
FLINK_JAR_PATH);
+ }
+
+ private String getResource(String confFile) {
+ return System.getProperty("user.dir") + "/src/test/resources" +
confFile;
+ }
+
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
new file mode 100644
index 0000000..0dc56bc
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.fake;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class FakeSourceToConsoleIT extends FlinkContainer {
+
+ @Test
+ public void testFakeSourceToConsoleSine() throws IOException,
InterruptedException {
+ Container.ExecResult execResult =
executeSeaTunnelFLinkJob("/fake/fakesource_to_console.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
new file mode 100644
index 0000000..58d0d37
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.file;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+
+public class FakeSourceToFileIT extends FlinkContainer {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FakeSourceToFileIT.class);
+
+ @Test
+ public void testFakeSource2FileSink() throws Exception {
+ Container.ExecResult execResult =
executeSeaTunnelFLinkJob("/file/fakesource_to_file.conf");
+ Assert.assertEquals(0, execResult.getExitCode());
+ }
+}
diff --git
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/fake/fakesource_to_console.conf
similarity index 97%
copy from
seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
copy to
seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/fake/fakesource_to_console.conf
index b29c159..ba0b573 100644
---
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
+++
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/fake/fakesource_to_console.conf
@@ -27,7 +27,7 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
- FakeSourceStream {
+ FakeSource {
result_table_name = "fake"
field_name = "name,age"
}
@@ -46,9 +46,9 @@ transform {
}
sink {
- ConsoleSink {}
+ ConsoleSink {
+ }
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
# please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
-}
-
+}
\ No newline at end of file
diff --git
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/file/fakesource_to_file.conf
similarity index 95%
copy from
seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
copy to
seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/file/fakesource_to_file.conf
index b29c159..6d27441 100644
---
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
+++
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/file/fakesource_to_file.conf
@@ -27,7 +27,7 @@ env {
source {
# This is a example source plugin **only for test and demonstrate the
feature source plugin**
- FakeSourceStream {
+ FakeSource {
result_table_name = "fake"
field_name = "name,age"
}
@@ -46,9 +46,11 @@ transform {
}
sink {
- ConsoleSink {}
+ FileSink {
+ format = "json"
+ path = "file:///tmp/fakesource_to_file.txt"
+ }
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
# please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
-}
-
+}
\ No newline at end of file
diff --git
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
index b29c159..9c6ad18 100644
---
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
+++
b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
@@ -50,5 +50,4 @@ sink {
# If you would like to get more information about how to configure seatunnel
and see full list of sink plugins,
# please go to
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
-}
-
+}
\ No newline at end of file