This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 5b714f0c [Feature][e2e] Use seatunnel script to submit job in e2e
(#1937)
5b714f0c is described below
commit 5b714f0c83dfa49492053acda6c2c1c13939aa98
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri May 20 19:45:19 2022 +0800
[Feature][e2e] Use seatunnel script to submit job in e2e (#1937)
---
.../apache/seatunnel/e2e/flink/FlinkContainer.java | 28 ++++++++++-------
.../seatunnel/e2e/flink/sql/FlinkContainer.java | 18 +++++++----
.../apache/seatunnel/e2e/spark/SparkContainer.java | 35 ++++++++--------------
3 files changed, 44 insertions(+), 37 deletions(-)
diff --git
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
index ffa1c991..50bd498d 100644
---
a/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-flink-e2e/src/test/java/org/apache/seatunnel/e2e/flink/FlinkContainer.java
@@ -53,11 +53,13 @@ public abstract class FlinkContainer {
protected GenericContainer<?> jobManager;
protected GenericContainer<?> taskManager;
private static final Path PROJECT_ROOT_PATH =
Paths.get(System.getProperty("user.dir")).getParent().getParent();
+ private static final String SEATUNNEL_FLINK_BIN =
"start-seatunnel-flink.sh";
private static final String SEATUNNEL_FLINK_JAR =
"seatunnel-core-flink.jar";
private static final String PLUGIN_MAPPING_FILE =
"plugin-mapping.properties";
private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
- private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME,
"lib", SEATUNNEL_FLINK_JAR).toString();
- private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME,
"connectors").toString();
+ private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME,
"bin").toString();
+ private static final String SEATUNNEL_LIB = Paths.get(SEATUNNEL_HOME,
"lib").toString();
+ private static final String SEATUNNEL_CONNECTORS =
Paths.get(SEATUNNEL_HOME, "connectors").toString();
private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
@@ -113,12 +115,9 @@ public abstract class FlinkContainer {
jobManager.copyFileToContainer(MountableFile.forHostPath(confPath),
targetConfInContainer);
// Running IT use cases under Windows requires replacing \ with /
- String jar = FLINK_JAR_PATH.replaceAll("\\\\", "/");
String conf = targetConfInContainer.replaceAll("\\\\", "/");
final List<String> command = new ArrayList<>();
- command.add("flink");
- command.add("run");
- command.add("-c org.apache.seatunnel.core.flink.SeatunnelFlink " +
jar);
+ command.add(Paths.get(SEATUNNEL_HOME,
"bin/start-seatunnel-flink.sh").toString());
command.add("--config " + conf);
Container.ExecResult execResult = jobManager.execInContainer("bash",
"-c", String.join(" ", command));
@@ -130,11 +129,20 @@ public abstract class FlinkContainer {
}
protected void copySeaTunnelFlinkFile() {
+ // copy lib
String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH
+
"/seatunnel-core/seatunnel-core-flink/target/seatunnel-core-flink.jar";
-
jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
FLINK_JAR_PATH);
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
+ Paths.get(SEATUNNEL_LIB, SEATUNNEL_FLINK_JAR).toString());
+
+ // copy bin
+ String seatunnelFlinkBinPath = PROJECT_ROOT_PATH +
"/seatunnel-core/seatunnel-core-flink/src/main/bin/start-seatunnel-flink.sh";
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(seatunnelFlinkBinPath),
+ Paths.get(SEATUNNEL_BIN, SEATUNNEL_FLINK_BIN).toString());
- // copy connectors jar
+ // copy connectors
File jars = new File(PROJECT_ROOT_PATH +
"/seatunnel-connectors/seatunnel-connectors-flink-dist/target/lib");
Arrays.stream(Objects.requireNonNull(jars.listFiles(f ->
f.getName().startsWith("seatunnel-connector-flink"))))
@@ -146,7 +154,7 @@ public abstract class FlinkContainer {
// copy plugin-mapping.properties
jobManager.copyFileToContainer(
MountableFile.forHostPath(PROJECT_ROOT_PATH +
"/seatunnel-connectors/plugin-mapping.properties"),
- Paths.get(CONNECTORS_PATH, PLUGIN_MAPPING_FILE).toString());
+ Paths.get(SEATUNNEL_CONNECTORS, PLUGIN_MAPPING_FILE).toString());
}
private String getResource(String confFile) {
@@ -154,7 +162,7 @@ public abstract class FlinkContainer {
}
private String getConnectorPath(String fileName) {
- return Paths.get(CONNECTORS_PATH.toString(), "flink",
fileName).toString();
+ return Paths.get(SEATUNNEL_CONNECTORS, "flink", fileName).toString();
}
}
diff --git
a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
index 8453515d..c5397ba5 100644
---
a/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
+++
b/seatunnel-e2e/seatunnel-flink-sql-e2e/src/test/java/org/apache/seatunnel/e2e/flink/sql/FlinkContainer.java
@@ -53,8 +53,10 @@ public abstract class FlinkContainer {
protected GenericContainer<?> jobManager;
protected GenericContainer<?> taskManager;
private static final Path PROJECT_ROOT_PATH =
Paths.get(System.getProperty("user.dir")).getParent().getParent();
+ private static final String SEATUNNEL_FLINK_BIN = "start-seatunnel-sql.sh";
private static final String SEATUNNEL_FLINK_SQL_JAR =
"seatunnel-core-flink-sql.jar";
private static final String SEATUNNEL_HOME = "/tmp/flink/seatunnel";
+ private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME,
"bin", SEATUNNEL_FLINK_BIN).toString();
private static final String FLINK_JAR_PATH = Paths.get(SEATUNNEL_HOME,
"lib", SEATUNNEL_FLINK_SQL_JAR).toString();
private static final int WAIT_FLINK_JOB_SUBMIT = 5000;
@@ -112,12 +114,9 @@ public abstract class FlinkContainer {
jobManager.copyFileToContainer(MountableFile.forHostPath(confPath),
targetConfInContainer);
// Running IT use cases under Windows requires replacing \ with /
- String jar = FLINK_JAR_PATH.replaceAll("\\\\", "/");
String conf = targetConfInContainer.replaceAll("\\\\", "/");
final List<String> command = new ArrayList<>();
- command.add("flink");
- command.add("run");
- command.add("-c org.apache.seatunnel.core.sql.SeatunnelSql " + jar);
+ command.add(Paths.get(SEATUNNEL_HOME,
"bin/start-seatunnel-sql.sh").toString());
command.add("--config " + conf);
Container.ExecResult execResult = jobManager.execInContainer("bash",
"-c", String.join(" ", command));
@@ -129,8 +128,17 @@ public abstract class FlinkContainer {
}
protected void copySeaTunnelFlinkFile() {
+ // copy lib
String seatunnelCoreFlinkJarPath = PROJECT_ROOT_PATH +
"/seatunnel-core/seatunnel-core-flink-sql/target/" + SEATUNNEL_FLINK_SQL_JAR;
-
jobManager.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
FLINK_JAR_PATH);
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(seatunnelCoreFlinkJarPath),
+ FLINK_JAR_PATH);
+
+ // copy bin
+ String seatunnelFlinkBinPath = PROJECT_ROOT_PATH +
"/seatunnel-core/seatunnel-core-flink-sql/src/main/bin/start-seatunnel-sql.sh";
+ jobManager.copyFileToContainer(
+ MountableFile.forHostPath(seatunnelFlinkBinPath),
+ Paths.get(SEATUNNEL_BIN).toString());
}
}
diff --git
a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
index b3dd5d16..552522d4 100644
---
a/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
+++
b/seatunnel-e2e/seatunnel-spark-e2e/src/test/java/org/apache/seatunnel/e2e/spark/SparkContainer.java
@@ -52,16 +52,18 @@ public abstract class SparkContainer {
protected GenericContainer<?> master;
private static final Path PROJECT_ROOT_PATH =
Paths.get(System.getProperty("user.dir")).getParent().getParent();
+ private static final String SEATUNNEL_SPARK_BIN =
"start-seatunnel-spark.sh";
private static final String SEATUNNEL_SPARK_JAR =
"seatunnel-core-spark.jar";
private static final String PLUGIN_MAPPING_FILE =
"plugin-mapping.properties";
private static final String SEATUNNEL_HOME = "/tmp/spark/seatunnel";
+ private static final String SEATUNNEL_BIN = Paths.get(SEATUNNEL_HOME,
"bin").toString();
private static final String SPARK_JAR_PATH = Paths.get(SEATUNNEL_HOME,
"lib", SEATUNNEL_SPARK_JAR).toString();
private static final String CONNECTORS_PATH = Paths.get(SEATUNNEL_HOME,
"connectors").toString();
private static final int WAIT_SPARK_JOB_SUBMIT = 5000;
@Before
- public void before() throws InterruptedException {
+ public void before() {
master = new GenericContainer<>(SPARK_DOCKER_IMAGE)
.withNetwork(NETWORK)
.withNetworkAliases("spark-master")
@@ -93,31 +95,14 @@ public abstract class SparkContainer {
// TODO: use start-seatunnel-spark.sh to run the spark job. Need to
modified the SparkStarter can find the seatunnel-core-spark.jar.
// Running IT use cases under Windows requires replacing \ with /
- String jar = SPARK_JAR_PATH.replaceAll("\\\\", "/");
String conf = targetConfInContainer.replaceAll("\\\\", "/");
final List<String> command = new ArrayList<>();
- command.add("spark-submit");
- command.add("--class");
- command.add("org.apache.seatunnel.core.spark.SeatunnelSpark");
- command.add("--name");
- command.add("SeaTunnel");
- command.add("--master");
- command.add("local");
- command.add("--jars");
- command.add(
- getConnectorJarFiles()
- .stream()
- .map(j -> getConnectorPath(j.getName()))
- .collect(Collectors.joining(",")));
- command.add("--deploy-mode");
- command.add("client");
- command.add(jar);
- command.add("-c");
- command.add(conf);
+ command.add(Paths.get(SEATUNNEL_HOME,
"bin/start-seatunnel-spark.sh").toString());
command.add("--master");
command.add("local");
command.add("--deploy-mode");
command.add("client");
+ command.add("--config " + conf);
Container.ExecResult execResult = master.execInContainer("bash", "-c",
String.join(" ", command));
LOG.info(execResult.getStdout());
@@ -128,12 +113,18 @@ public abstract class SparkContainer {
}
protected void copySeaTunnelSparkFile() {
- // copy jar to container
+ // copy lib
String seatunnelCoreSparkJarPath = PROJECT_ROOT_PATH
+
"/seatunnel-core/seatunnel-core-spark/target/seatunnel-core-spark.jar";
master.copyFileToContainer(MountableFile.forHostPath(seatunnelCoreSparkJarPath),
SPARK_JAR_PATH);
- // copy connectors jar
+ // copy bin
+ String seatunnelFlinkBinPath = PROJECT_ROOT_PATH +
"/seatunnel-core/seatunnel-core-spark/src/main/bin/start-seatunnel-spark.sh";
+ master.copyFileToContainer(
+ MountableFile.forHostPath(seatunnelFlinkBinPath),
+ Paths.get(SEATUNNEL_BIN, SEATUNNEL_SPARK_BIN).toString());
+
+ // copy connectors
getConnectorJarFiles()
.forEach(jar ->
master.copyFileToContainer(