This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 39a09d9  [Feature][e2e] Add Flink E2E (#1536)
39a09d9 is described below

commit 39a09d91824a517aa0c366d6369375565772ea2a
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Mar 24 12:41:08 2022 +0800

    [Feature][e2e] Add Flink E2E (#1536)
    
    * init module
    
    * Exclude jackson
    
    * Add skipIT to skip Integration test in defaule
---
 .github/workflows/backend.yml                      |   3 +
 pom.xml                                            |  46 +++++++
 seatunnel-e2e/pom.xml                              |  33 +++++
 seatunnel-e2e/seatunnel-flink-e2e/pom.xml          |  42 +++++++
 .../apache/seatunnel/e2e/flink/FlinkContainer.java | 134 +++++++++++++++++++++
 .../e2e/flink/fake/FakeSourceToConsoleIT.java      |  35 ++++++
 .../e2e/flink/file/FakeSourceToFileIT.java         |  37 ++++++
 .../test/resources/fake/fakesource_to_console.conf |   8 +-
 .../test/resources/file/fakesource_to_file.conf    |  10 +-
 .../main/resources/examples/fake_to_console.conf   |   3 +-
 10 files changed, 341 insertions(+), 10 deletions(-)

diff --git a/.github/workflows/backend.yml b/.github/workflows/backend.yml
index 2770f11..7eebf1a 100644
--- a/.github/workflows/backend.yml
+++ b/.github/workflows/backend.yml
@@ -77,3 +77,6 @@ jobs:
       - name: Run Unit tests
         run: |
           ./mvnw -T 2C -B clean verify -Dmaven.test.skip=false -Dgpg.skip=true 
--no-snapshot-updates
+      - name: Run Integration tests
+        run: |
+          ./mvnw -T 2C -B clean verify -DskipUT=true -DskipIT=false 
-Dgpg.skip=true --no-snapshot-updates
diff --git a/pom.xml b/pom.xml
index 2eb978c..1ade5e3 100644
--- a/pom.xml
+++ b/pom.xml
@@ -80,6 +80,7 @@
         <module>seatunnel-connectors</module>
         <module>seatunnel-dist</module>
         <module>seatunnel-examples</module>
+        <module>seatunnel-e2e</module>
     </modules>
 
     <properties>
@@ -105,6 +106,7 @@
         <maven.deploy.skip>false</maven.deploy.skip>
         <maven.javadoc.skip>false</maven.javadoc.skip>
         <maven-surefire-plugin.version>2.22.2</maven-surefire-plugin.version>
+        <maven-failsafe-plugin.version>2.22.2</maven-failsafe-plugin.version>
         
<maven-checkstyle-plugin.version>3.1.2</maven-checkstyle-plugin.version>
         <checkstyle.fails.on.error>true</checkstyle.fails.on.error>
         
<nexus-staging-maven-plugin.version>1.6.8</nexus-staging-maven-plugin.version>
@@ -152,6 +154,9 @@
         <docker-maven-plugin.version>0.38.0</docker-maven-plugin.version>
         <p3c-pmd.version>1.3.0</p3c-pmd.version>
         
<maven-scm-provider-jgit.version>1.9.5</maven-scm-provider-jgit.version>
+        <testcontainer.version>1.16.3</testcontainer.version>
+        <skipUT>false</skipUT>
+        <skipIT>true</skipIT>
     </properties>
 
     <dependencyManagement>
@@ -468,6 +473,22 @@
                 <version>${junit.version}</version>
                 <scope>test</scope>
             </dependency>
+            <dependency>
+                <groupId>org.testcontainers</groupId>
+                <artifactId>testcontainers</artifactId>
+                <version>${testcontainer.version}</version>
+                <scope>test</scope>
+                <exclusions>
+                    <exclusion>
+                        <groupId>com.fasterxml.jackson.core</groupId>
+                        <artifactId>jackson-annotations</artifactId>
+                    </exclusion>
+                    <exclusion>
+                        <groupId>org.slf4j</groupId>
+                        <artifactId>slf4j-api</artifactId>
+                    </exclusion>
+                </exclusions>
+            </dependency>
 
             <dependency>
                 <groupId>com.pingcap.tispark</groupId>
@@ -582,10 +603,30 @@
                     <artifactId>maven-surefire-plugin</artifactId>
                     <version>${maven-surefire-plugin.version}</version>
                     <configuration>
+                        <skip>${skipUT}</skip>
                         <systemPropertyVariables>
                             
<jacoco-agent.destfile>${project.build.directory}/jacoco.exec</jacoco-agent.destfile>
                         </systemPropertyVariables>
+                        <excludes>
+                            <exclude>**/*IT.java</exclude>
+                        </excludes>
+                    </configuration>
+                </plugin>
+                <plugin>
+                    <groupId>org.apache.maven.plugins</groupId>
+                    <artifactId>maven-failsafe-plugin</artifactId>
+                    <version>${maven-failsafe-plugin.version}</version>
+                    <configuration>
+                        <skip>${skipIT}</skip>
                     </configuration>
+                    <executions>
+                        <execution>
+                            <goals>
+                                <goal>integration-test</goal>
+                                <goal>verify</goal>
+                            </goals>
+                        </execution>
+                    </executions>
                 </plugin>
 
                 <plugin>
@@ -845,6 +886,11 @@
 
             <plugin>
                 <groupId>org.apache.maven.plugins</groupId>
+                <artifactId>maven-failsafe-plugin</artifactId>
+            </plugin>
+
+            <plugin>
+                <groupId>org.apache.maven.plugins</groupId>
                 <artifactId>maven-checkstyle-plugin</artifactId>
             </plugin>
 
diff --git a/seatunnel-e2e/pom.xml b/seatunnel-e2e/pom.xml
new file mode 100644
index 0000000..1b6bd0d
--- /dev/null
+++ b/seatunnel-e2e/pom.xml
@@ -0,0 +1,33 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-e2e</artifactId>
+    <packaging>pom</packaging>
+
+    <modules>
+        <module>seatunnel-flink-e2e</module>
+    </modules>
+
+</project>
\ No newline at end of file
diff --git a/seatunnel-e2e/seatunnel-flink-e2e/pom.xml 
b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
new file mode 100644
index 0000000..d255618
--- /dev/null
+++ b/seatunnel-e2e/seatunnel-flink-e2e/pom.xml
@@ -0,0 +1,42 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+       http://www.apache.org/licenses/LICENSE-2.0
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0";
+         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 
http://maven.apache.org/xsd/maven-4.0.0.xsd";>
+    <parent>
+        <artifactId>seatunnel-e2e</artifactId>
+        <groupId>org.apache.seatunnel</groupId>
+        <version>2.0.5-SNAPSHOT</version>
+    </parent>
+    <modelVersion>4.0.0</modelVersion>
+
+    <artifactId>seatunnel-flink-e2e</artifactId>
+    <packaging>jar</packaging>
+
+    <dependencies>
+        <dependency>
+            <groupId>org.apache.seatunnel</groupId>
+            <artifactId>seatunnel-core-flink</artifactId>
+            <version>${project.version}</version>
+        </dependency>
+
+        <dependency>
+            <groupId>org.testcontainers</groupId>
+            <artifactId>testcontainers</artifactId>
+        </dependency>
+    </dependencies>
+
+</project>
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
new file mode 100644
index 0000000..d6c2099
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink;
+
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+import org.testcontainers.containers.GenericContainer;
+import org.testcontainers.containers.Network;
+import org.testcontainers.containers.output.Slf4jLogConsumer;
+import org.testcontainers.lifecycle.Startables;
+import org.testcontainers.utility.MountableFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.stream.Stream;
+
+/**
+ * This class is the base class of FlinkEnvironment test.
+ * The before method will create a Flink cluster, and after method will close 
the Flink cluster.
+ * You can use {@link FlinkContainer#executeSeaTunnelFLinkJob} to submit a 
seatunnel config and run a seatunnel job.
+ */
+public abstract class FlinkContainer {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(FlinkContainer.class);
+
+    private static final String FLINK_DOCKER_IMAGE = "flink:1.13.6-scala_2.11";
+    public static final Network NETWORK = Network.newNetwork();
+
+    protected GenericContainer<?> jobManager;
+    protected GenericContainer<?> taskManager;
+    private static final String SEATUNNEL_FLINK_JAR = 
"seatunnel-core-flink.jar";
+    private static final String FLINK_JAR_PATH = Paths.get("/tmp", 
SEATUNNEL_FLINK_JAR).toString();
+
+    private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
+
+    private static final String FLINK_PROPERTIES = String.join(
+        "\n",
+        Arrays.asList(
+            "jobmanager.rpc.address: jobmanager",
+            "taskmanager.numberOfTaskSlots: 10",
+            "parallelism.default: 4",
+            "env.java.opts: -Doracle.jdbc.timezoneAsRegion=false"));
+
+    @Before
+    public void before() {
+        jobManager = new GenericContainer<>(FLINK_DOCKER_IMAGE)
+            .withCommand("jobmanager")
+            .withNetwork(NETWORK)
+            .withNetworkAliases("jobmanager")
+            .withExposedPorts()
+            .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+            .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        taskManager =
+            new GenericContainer<>(FLINK_DOCKER_IMAGE)
+                .withCommand("taskmanager")
+                .withNetwork(NETWORK)
+                .withNetworkAliases("taskmanager")
+                .withEnv("FLINK_PROPERTIES", FLINK_PROPERTIES)
+                .dependsOn(jobManager)
+                .withLogConsumer(new Slf4jLogConsumer(LOG));
+
+        Startables.deepStart(Stream.of(jobManager)).join();
+        Startables.deepStart(Stream.of(taskManager)).join();
+        copySeatunnelFlinkCoreJar();
+        LOG.info("Containers are started.");
+    }
+
+    @After
+    public void close() {
+        if (taskManager != null) {
+            taskManager.stop();
+        }
+        if (jobManager != null) {
+            jobManager.stop();
+        }
+    }
+
+    public Container.ExecResult executeSeaTunnelFLinkJob(String confFile) 
throws IOException, InterruptedException {
+        final String confPath = getResource(confFile);
+        if (!new File(confPath).exists()) {
+            throw new IllegalArgumentException(confFile + " doesn't exist");
+        }
+        final String targetConfInContainer = Paths.get("/tmp", 
confFile).toString();
+        jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), 
targetConfInContainer);
+
+        final List<String> command = new ArrayList<>();
+        command.add("flink");
+        command.add("run");
+        command.add("-c org.apache.seatunnel.SeatunnelFlink " + 
FLINK_JAR_PATH);
+        command.add("--config " + targetConfInContainer);
+
+        Container.ExecResult execResult = jobManager.execInContainer("bash", 
"-c", String.join(" ", command));
+        LOG.info(execResult.getStdout());
+        LOG.error(execResult.getStderr());
+        // wait job start
+        Thread.sleep(WAIT_FLINK_JOB_SUBMIT);
+        return execResult;
+    }
+
+    protected void copySeatunnelFlinkCoreJar() {
+        String currentModuleHome = System.getProperty("user.dir");
+        String seatunnelCoreFlinkJarPath = 
currentModuleHome.replace("/seatunnel-e2e/seatunnel-flink-e2e", "")
+            + 
"/seatunnel-core/seatunnel-core-flink/target/seatunnel-core-flink.jar";
+        
jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
 FLINK_JAR_PATH);
+    }
+
+    private String getResource(String confFile) {
+        return System.getProperty("user.dir") + "/src/test/resources" + 
confFile;
+    }
+
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
new file mode 100644
index 0000000..0dc56bc
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/fake/FakeSourceToConsoleIT.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.fake;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.testcontainers.containers.Container;
+
+import java.io.IOException;
+
+public class FakeSourceToConsoleIT extends FlinkContainer {
+
+    @Test
+    public void testFakeSourceToConsoleSine() throws IOException, 
InterruptedException {
+        Container.ExecResult execResult = 
executeSeaTunnelFLinkJob("/fake/fakesource_to_console.conf");
+        Assert.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
new file mode 100644
index 0000000..58d0d37
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/file/FakeSourceToFileIT.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.e2e.flink.file;
+
+import org.apache.seatunnel.e2e.flink.FlinkContainer;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testcontainers.containers.Container;
+
+public class FakeSourceToFileIT extends FlinkContainer {
+
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FakeSourceToFileIT.class);
+
+    @Test
+    public void testFakeSource2FileSink() throws Exception {
+        Container.ExecResult execResult = 
executeSeaTunnelFLinkJob("/file/fakesource_to_file.conf");
+        Assert.assertEquals(0, execResult.getExitCode());
+    }
+}
diff --git 
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/fake/fakesource_to_console.conf
similarity index 97%
copy from 
seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
copy to 
seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/fake/fakesource_to_console.conf
index b29c159..ba0b573 100644
--- 
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/fake/fakesource_to_console.conf
@@ -27,7 +27,7 @@ env {
 
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-    FakeSourceStream {
+    FakeSource {
       result_table_name = "fake"
       field_name = "name,age"
     }
@@ -46,9 +46,9 @@ transform {
 }
 
 sink {
-  ConsoleSink {}
+  ConsoleSink {
+  }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
   # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
-}
-
+}
\ No newline at end of file
diff --git 
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/file/fakesource_to_file.conf
similarity index 95%
copy from 
seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
copy to 
seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/file/fakesource_to_file.conf
index b29c159..6d27441 100644
--- 
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
+++ 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/resources/file/fakesource_to_file.conf
@@ -27,7 +27,7 @@ env {
 
 source {
   # This is a example source plugin **only for test and demonstrate the 
feature source plugin**
-    FakeSourceStream {
+    FakeSource {
       result_table_name = "fake"
       field_name = "name,age"
     }
@@ -46,9 +46,11 @@ transform {
 }
 
 sink {
-  ConsoleSink {}
+  FileSink {
+    format = "json"
+    path = "file:///tmp/fakesource_to_file.txt"
+  }
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
   # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
-}
-
+}
\ No newline at end of file
diff --git 
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
 
b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
index b29c159..9c6ad18 100644
--- 
a/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
+++ 
b/seatunnel-examples/seatunnel-flink-examples/src/main/resources/examples/fake_to_console.conf
@@ -50,5 +50,4 @@ sink {
 
   # If you would like to get more information about how to configure seatunnel 
and see full list of sink plugins,
   # please go to 
https://seatunnel.apache.org/docs/flink/configuration/sink-plugins/Console
-}
-
+}
\ No newline at end of file

Reply via email to