This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 5b714f0c [Feature][e2e] Use seatunnel script to submit job in e2e 
(#1937)
5b714f0c is described below

commit 5b714f0c83dfa49492053acda6c2c1c13939aa98
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri May 20 19:45:19 2022 +0800

    [Feature][e2e] Use seatunnel script to submit job in e2e (#1937)
---
 .../apache/seatunnel/e2e/flink/FlinkContainer.java | 28 ++++++++++-------
 .../seatunnel/e2e/flink/sql/FlinkContainer.java    | 18 +++++++----
 .../apache/seatunnel/e2e/spark/SparkContainer.java | 35 ++++++++--------------
 3 files changed, 44 insertions(+), 37 deletions(-)

diff --git 
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index ffa1c991..50bd498d 100644
--- 
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -53,11 +53,13 @@ public abstract class FlinkContainer {
     protected GenericContainer<?> jobManager;
     protected GenericContainer<?> taskManager;
     private static final Path PROJECT_ROOT_PATH = 
Paths.get(System.getProperty("user.dir")).getParent().getParent();
+    private static final String SEATUNNEL_FLINK_BIN = 
"start-seatunnel-flink.sh";
     private static final String SEATUNNEL_FLINK_JAR = 
"seatunnel-core-flink.jar";
     private static final String PLUGIN_MAPPING_FILE = 
"plugin-mapping.properties";
     private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
-    private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, 
"lib", SEATUNNEL_FLINK_JAR).toString();
-    private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, 
"connectors").toString();
+    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, 
"bin").toString();
+    private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME, 
"lib").toString();
+    private static final String SEATUNNEL_CONNECTORS = 
Paths.get(SEATUNNEL_HOME, "connectors").toString();
 
     private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
 
@@ -113,12 +115,9 @@ public abstract class FlinkContainer {
         jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), 
targetConfInContainer);
 
         // Running IT use cases under Windows requires replacing \ with /
-        String jar = FLINK_JAR_PATH.replaceAll("\\\\", "/");
         String conf = targetConfInContainer.replaceAll("\\\\", "/");
         final List<String> command = new ArrayList<>();
-        command.add("flink");
-        command.add("run");
-        command.add("-c org.apache.seatunnel.core.flink.SeatunnelFlink " + 
jar);
+        command.add(Paths.get(SEATUNNEL_HOME, 
"bin/start-seatunnel-flink.sh").toString());
         command.add("--config " + conf);
 
         Container.ExecResult execResult = jobManager.execInContainer("bash", 
"-c", String.join(" ", command));
@@ -130,11 +129,20 @@ public abstract class FlinkContainer {
     }
 
     protected void copySeaTunnelFlinkFile() {
+        // copy lib
         String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH
             + 
"/seatunnel-core/seatunnel-core-flink/target/seatunnel-core-flink.jar";
-        
jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
 FLINK_JAR_PATH);
+        jobManager.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
+            Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString());
+
+        // copy bin
+        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + 
"/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh";
+        jobManager.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelFlinkBinPath),
+            Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString());
 
-        // copy connectors jar
+        // copy connectors
         File jars = new File(PROJECT_ROOT_PATH +
             
"/seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib");
         Arrays.stream(Objects.requireNonNull(jars.listFiles(f -> 
f.getName().startsWith("seatunnel-connector-flink"))))
@@ -146,7 +154,7 @@ public abstract class FlinkContainer {
         // copy plugin-mapping.properties
         jobManager.copyFileToContainer(
             MountableFile.forHostPath(PROJECT_ROOT_PATH + 
"/seatunnel-connectors/plugin-mapping.properties"),
-            Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString());
+            Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
     }
 
     private String getResource(String confFile) {
@@ -154,7 +162,7 @@ public abstract class FlinkContainer {
     }
 
     private String getConnectorPath(String fileName) {
-        return Paths.get(CONNECTORS_PATH.toString(), "flink", 
fileName).toString();
+        return Paths.get(SEATUNNEL_CONNECTORS, "flink", fileName).toString();
     }
 
 }
diff --git 
a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
 
b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
index 8453515d..c5397ba5 100644
--- 
a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
@@ -53,8 +53,10 @@ public abstract class FlinkContainer {
     protected GenericContainer<?> jobManager;
     protected GenericContainer<?> taskManager;
     private static final Path PROJECT_ROOT_PATH = 
Paths.get(System.getProperty("user.dir")).getParent().getParent();
+    private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-sql.sh";
     private static final String SEATUNNEL_FLINK_SQL_JAR = 
"seatunnel-core-flink-sql.jar";
     private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
+    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, 
"bin", SEATUNNEL_FLINK_BIN).toString();
     private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME, 
"lib", SEATUNNEL_FLINK_SQL_JAR).toString();
 
     private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
@@ -112,12 +114,9 @@ public abstract class FlinkContainer {
         jobManager.copyFileToContainer(MountableFile.forHostPath(confPath), 
targetConfInContainer);
 
         // Running IT use cases under Windows requires replacing \ with /
-        String jar = FLINK_JAR_PATH.replaceAll("\\\\", "/");
         String conf = targetConfInContainer.replaceAll("\\\\", "/");
         final List<String> command = new ArrayList<>();
-        command.add("flink");
-        command.add("run");
-        command.add("-c org.apache.seatunnel.core.sql.SeatunnelSql " + jar);
+        command.add(Paths.get(SEATUNNEL_HOME, 
"bin/start-seatunnel-sql.sh").toString());
         command.add("--config " + conf);
 
         Container.ExecResult execResult = jobManager.execInContainer("bash", 
"-c", String.join(" ", command));
@@ -129,8 +128,17 @@ public abstract class FlinkContainer {
     }
 
     protected void copySeaTunnelFlinkFile() {
+        // copy lib
         String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH + 
"/seatunnel-core/seatunnel-core-flink-sql/target/" + SEATUNNEL_FLINK_SQL_JAR;
-        
jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
 FLINK_JAR_PATH);
+        jobManager.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
+            FLINK_JAR_PATH);
+
+        // copy bin
+        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + 
"/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh";
+        jobManager.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelFlinkBinPath),
+            Paths.get(SEATUNNEL_BIN).toString());
     }
 
 }
diff --git 
a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
 
b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index b3dd5d16..552522d4 100644
--- 
a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++ 
b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -52,16 +52,18 @@ public abstract class SparkContainer {
 
     protected GenericContainer<?> master;
     private static final Path PROJECT_ROOT_PATH = 
Paths.get(System.getProperty("user.dir")).getParent().getParent();
+    private static final String SEATUNNEL_SPARK_BIN = 
"start-seatunnel-spark.sh";
     private static final String SEATUNNEL_SPARK_JAR = 
"seatunnel-core-spark.jar";
     private static final String PLUGIN_MAPPING_FILE = 
"plugin-mapping.properties";
     private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
+    private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME, 
"bin").toString();
     private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME, 
"lib", SEATUNNEL_SPARK_JAR).toString();
     private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME, 
"connectors").toString();
 
     private static final int WAIT_SPARK_JOB_SUBMIT = 5000;
 
     @Before
-    public void before() throws InterruptedException {
+    public void before() {
         master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
             .withNetwork(NETWORK)
             .withNetworkAliases("spark-master")
@@ -93,31 +95,14 @@ public abstract class SparkContainer {
 
         // TODO: use start-seatunnel-spark.sh to run the spark job. Need to 
modified the SparkStarter can find the seatunnel-core-spark.jar.
         // Running IT use cases under Windows requires replacing \ with /
-        String jar = SPARK_JAR_PATH.replaceAll("\\\\", "/");
         String conf = targetConfInContainer.replaceAll("\\\\", "/");
         final List<String> command = new ArrayList<>();
-        command.add("spark-submit");
-        command.add("--class");
-        command.add("org.apache.seatunnel.core.spark.SeatunnelSpark");
-        command.add("--name");
-        command.add("SeaTunnel");
-        command.add("--master");
-        command.add("local");
-        command.add("--jars");
-        command.add(
-            getConnectorJarFiles()
-                .stream()
-                .map(j -> getConnectorPath(j.getName()))
-                .collect(Collectors.joining(",")));
-        command.add("--deploy-mode");
-        command.add("client");
-        command.add(jar);
-        command.add("-c");
-        command.add(conf);
+        command.add(Paths.get(SEATUNNEL_HOME, 
"bin/start-seatunnel-spark.sh").toString());
         command.add("--master");
         command.add("local");
         command.add("--deploy-mode");
         command.add("client");
+        command.add("--config " + conf);
 
         Container.ExecResult execResult = master.execInContainer("bash", "-c", 
String.join(" ", command));
         LOG.info(execResult.getStdout());
@@ -128,12 +113,18 @@ public abstract class SparkContainer {
     }
 
     protected void copySeaTunnelSparkFile() {
-        // copy jar to container
+        // copy lib
         String seatunnelCoreSparkJarPath = PROJECT_ROOT_PATH
             + 
"/seatunnel-core/seatunnel-core-spark/target/seatunnel-core-spark.jar";
         
master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath),
 SPARK_JAR_PATH);
 
-        // copy connectors jar
+        // copy bin
+        String seatunnelFlinkBinPath = PROJECT_ROOT_PATH + 
"/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh";
+        master.copyFileToContainer(
+            MountableFile.forHostPath(seatunnelFlinkBinPath),
+            Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString());
+
+        // copy connectors
         getConnectorJarFiles()
             .forEach(jar ->
                 master.copyFileToContainer(

Reply via email to