This is an automated email from the ASF dual-hosted git repository.
fanjia pushed a commit to branch st-engine
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/st-engine by this push:
new c7e92f019 [ST-Engine][Example] Add seatunnel example module (#2659)
c7e92f019 is described below
commit c7e92f01928e33a92dd9525200097579dd21db1a
Author: Hisoka <[email protected]>
AuthorDate: Thu Sep 8 21:51:01 2022 +0800
[ST-Engine][Example] Add seatunnel example module (#2659)
* [Engine][Example] Add seatunnel example module
* [Engine] [Core] Add seatunnel engine example
* [Engine] [Core] Add seatunnel engine example
* [Engine] [Example] Add random cluster name when use local mode
* [Engine] [Example] Fix review
---
.../core/starter/config/ConfigChecker.java | 4 +-
.../flink/config/FlinkApiConfigChecker.java | 2 +-
seatunnel-core/seatunnel-seatunnel-starter/pom.xml | 6 ++
.../core/starter/seatunnel/SeaTunnel.java} | 22 ++---
.../core/starter/seatunnel/SeaTunnelStarter.java | 55 ------------
.../seatunnel/args/ExecutionModeConverter.java} | 19 +++--
.../seatunnel/args/SeaTunnelCommandArgs.java | 42 +++++++++-
.../command/SeaTunnelApiConfValidateCommand.java | 52 ++++++++++++
.../command/SeaTunnelApiTaskExecuteCommand.java | 98 ++++++++++++++++++++++
.../command/SeaTunnelCommandBuilder.java} | 23 ++---
.../config/SeaTunnelApiConfigChecker.java} | 6 +-
.../starter/spark/command/SparkCommandBuilder.java | 13 +--
.../spark/config/SparkApiConfigChecker.java | 2 +-
.../engine/client/job/ClientJobProxy.java | 21 ++---
.../seatunnel/engine/server/SeaTunnelServer.java | 1 +
.../seatunnel-engine-examples/pom.xml | 18 ++++
.../example/engine/SeaTunnelEngineExample.java | 52 ++++++++++++
.../main/resources/examples/fake_to_console.conf | 51 +++++++++++
18 files changed, 365 insertions(+), 122 deletions(-)
diff --git
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigChecker.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigChecker.java
index 96925569c..3512eb38a 100644
---
a/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigChecker.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/config/ConfigChecker.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.core.starter.config;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -25,9 +24,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
/**
* Check the config is valid.
*
- * @param <ENVIRONMENT> the environment type.
*/
-public interface ConfigChecker<ENVIRONMENT extends RuntimeEnv> {
+public interface ConfigChecker {
/**
* Check if the config is validated, if check fails, throw exception.
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
index 2ae68515d..4a41e8b11 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
@@ -22,7 +22,7 @@ import
org.apache.seatunnel.core.starter.exception.ConfigCheckException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-public class FlinkApiConfigChecker implements
ConfigChecker<FlinkApiEnvironment> {
+public class FlinkApiConfigChecker implements ConfigChecker {
@Override
public void checkConfig(Config config) throws ConfigCheckException {
diff --git a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
b/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
index d2dd1ddbe..b4d7c3bf6 100644
--- a/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
+++ b/seatunnel-core/seatunnel-seatunnel-starter/pom.xml
@@ -46,6 +46,12 @@
<artifactId>seatunnel-engine-client</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-engine-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
similarity index 52%
copy from
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
copy to
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
index 2ae68515d..a3a98a7b1 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnel.java
@@ -15,17 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.flink.config;
+package org.apache.seatunnel.core.starter.seatunnel;
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import
org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-public class FlinkApiConfigChecker implements
ConfigChecker<FlinkApiEnvironment> {
-
- @Override
- public void checkConfig(Config config) throws ConfigCheckException {
- // todo: implement
+public class SeaTunnel {
+ public static void main(String[] args) throws CommandException {
+ SeaTunnelCommandArgs seaTunnelCommandArgs =
CommandLineUtils.parseSeaTunnelArgs(args);
+ Command<SeaTunnelCommandArgs> command =
+ new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+ Seatunnel.run(command);
}
}
diff --git
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
deleted file mode 100644
index 94335cca9..000000000
---
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/SeaTunnelStarter.java
+++ /dev/null
@@ -1,55 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.seatunnel;
-
-import org.apache.seatunnel.common.config.Common;
-import org.apache.seatunnel.common.config.DeployMode;
-import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
-import org.apache.seatunnel.core.starter.utils.FileUtils;
-import org.apache.seatunnel.engine.client.SeaTunnelClient;
-import org.apache.seatunnel.engine.client.job.ClientJobProxy;
-import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
-import org.apache.seatunnel.engine.common.config.ConfigProvider;
-import org.apache.seatunnel.engine.common.config.JobConfig;
-
-import com.hazelcast.client.config.ClientConfig;
-
-import java.nio.file.Path;
-import java.util.concurrent.ExecutionException;
-
-public class SeaTunnelStarter {
- public static void main(String[] args) {
- SeaTunnelCommandArgs seaTunnelCommandArgs =
CommandLineUtils.parseSeaTunnelArgs(args);
- Path configFile = FileUtils.getConfigPath(seaTunnelCommandArgs);
- Common.setDeployMode(DeployMode.CLIENT);
- JobConfig jobConfig = new JobConfig();
- jobConfig.setName("fake_to_file");
-
- ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
- SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
- JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(configFile.toString(), jobConfig);
-
- ClientJobProxy clientJobProxy;
- try {
- clientJobProxy = jobExecutionEnv.execute();
- clientJobProxy.waitForJobComplete();
- } catch (ExecutionException | InterruptedException e) {
- throw new RuntimeException(e);
- }
- }
-}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
similarity index 59%
copy from
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
copy to
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
index 2ae68515d..ee7da538b 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkApiConfigChecker.java
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/ExecutionModeConverter.java
@@ -15,17 +15,20 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.flink.config;
+package org.apache.seatunnel.core.starter.seatunnel.args;
-import org.apache.seatunnel.core.starter.config.ConfigChecker;
-import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-public class FlinkApiConfigChecker implements
ConfigChecker<FlinkApiEnvironment> {
+import com.beust.jcommander.IStringConverter;
+import com.beust.jcommander.ParameterException;
+public class ExecutionModeConverter implements IStringConverter<ExecutionMode>
{
@Override
- public void checkConfig(Config config) throws ConfigCheckException {
- // todo: implement
+ public ExecutionMode convert(String value) {
+ try {
+ return ExecutionMode.valueOf(value.toUpperCase());
+ } catch (IllegalArgumentException e) {
+ throw new ParameterException("execution-mode: " + value + " is not
allowed.");
+ }
}
}
diff --git
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
index 19c5d8587..9e86b2800 100644
---
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/args/SeaTunnelCommandArgs.java
@@ -20,16 +20,56 @@ package org.apache.seatunnel.core.starter.seatunnel.args;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.starter.command.AbstractCommandArgs;
import org.apache.seatunnel.core.starter.config.EngineType;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+
+import com.beust.jcommander.Parameter;
import java.util.List;
public class SeaTunnelCommandArgs extends AbstractCommandArgs {
/**
- * Undefined parameters parsed will be stored here as seatunnel engint
command parameters.
+ * Undefined parameters parsed will be stored here as seatunnel engine
command parameters.
*/
private List<String> seatunnelParams;
+ @Parameter(names = {"-n", "--name"},
+ description = "The name of job")
+ private String name = "seatunnel_job";
+
+ @Parameter(names = {"-cn", "--cluster"},
+ description = "The name of cluster")
+ private String clusterName = "seatunnel_default_cluster";
+
+ @Parameter(names = {"-e", "--deploy-mode"},
+ description = "SeaTunnel deploy mode",
+ converter = ExecutionModeConverter.class)
+ private ExecutionMode executionMode = ExecutionMode.LOCAL;
+
+ public ExecutionMode getExecutionMode() {
+ return executionMode;
+ }
+
+ public void setExecutionMode(ExecutionMode executionMode) {
+ this.executionMode = executionMode;
+ }
+
+ public String getName() {
+ return name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ public String getClusterName() {
+ return clusterName;
+ }
+
+ public void setClusterName(String clusterName) {
+ this.clusterName = clusterName;
+ }
+
@Override
public EngineType getEngineType() {
return EngineType.SEATUNNEL;
diff --git
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
new file mode 100644
index 000000000..8036120ae
--- /dev/null
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiConfValidateCommand.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.command;
+
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.config.ConfigBuilder;
+import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import
org.apache.seatunnel.core.starter.seatunnel.config.SeaTunnelApiConfigChecker;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+
+/**
+ * Use to validate the configuration of the SeaTunnel API.
+ */
+public class SeaTunnelApiConfValidateCommand implements
Command<SeaTunnelCommandArgs> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
+
+ private final SeaTunnelCommandArgs seaTunnelCommandArgs;
+
+ public SeaTunnelApiConfValidateCommand(SeaTunnelCommandArgs
seaTunnelCommandArgs) {
+ this.seaTunnelCommandArgs = seaTunnelCommandArgs;
+ }
+
+ @Override
+ public void execute() throws ConfigCheckException {
+ Path configPath = FileUtils.getConfigPath(seaTunnelCommandArgs);
+ ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+ new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
+ LOGGER.info("config OK !");
+ }
+}
diff --git
a/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
new file mode 100644
index 000000000..d9d8638d9
--- /dev/null
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelApiTaskExecuteCommand.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.starter.seatunnel.command;
+
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import org.apache.seatunnel.core.starter.utils.FileUtils;
+import org.apache.seatunnel.engine.client.SeaTunnelClient;
+import org.apache.seatunnel.engine.client.job.ClientJobProxy;
+import org.apache.seatunnel.engine.client.job.JobExecutionEnvironment;
+import org.apache.seatunnel.engine.common.config.ConfigProvider;
+import org.apache.seatunnel.engine.common.config.JobConfig;
+import org.apache.seatunnel.engine.common.config.SeaTunnelConfig;
+import org.apache.seatunnel.engine.common.runtime.ExecutionMode;
+import org.apache.seatunnel.engine.server.SeaTunnelNodeContext;
+
+import com.hazelcast.client.config.ClientConfig;
+import com.hazelcast.core.HazelcastInstance;
+import com.hazelcast.instance.impl.HazelcastInstanceFactory;
+
+import java.nio.file.Path;
+import java.util.Random;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * This command is used to execute the SeaTunnel engine job by SeaTunnel API.
+ */
+public class SeaTunnelApiTaskExecuteCommand implements
Command<SeaTunnelCommandArgs> {
+
+ private final SeaTunnelCommandArgs seaTunnelCommandArgs;
+
+ // TODO custom cluster name on cluster execution mode
+
+ public SeaTunnelApiTaskExecuteCommand(SeaTunnelCommandArgs
seaTunnelCommandArgs) {
+ this.seaTunnelCommandArgs = seaTunnelCommandArgs;
+ }
+
+ @Override
+ public void execute() throws CommandExecuteException {
+ Path configFile = FileUtils.getConfigPath(seaTunnelCommandArgs);
+
+ JobConfig jobConfig = new JobConfig();
+ jobConfig.setName(seaTunnelCommandArgs.getName());
+ HazelcastInstance instance = null;
+ try {
+ String clusterName = seaTunnelCommandArgs.getClusterName();
+ if
(seaTunnelCommandArgs.getExecutionMode().equals(ExecutionMode.LOCAL)) {
+ clusterName = creatRandomClusterName(clusterName);
+ instance = createServerInLocal(clusterName);
+ }
+ ClientConfig clientConfig =
ConfigProvider.locateAndGetClientConfig();
+ clientConfig.setClusterName(clusterName);
+ SeaTunnelClient engineClient = new SeaTunnelClient(clientConfig);
+ JobExecutionEnvironment jobExecutionEnv =
engineClient.createExecutionContext(configFile.toString(), jobConfig);
+
+ ClientJobProxy clientJobProxy;
+ clientJobProxy = jobExecutionEnv.execute();
+ clientJobProxy.waitForJobComplete();
+ } catch (ExecutionException | InterruptedException e) {
+ throw new CommandExecuteException("SeaTunnel job executed failed",
e);
+ } finally {
+ if (instance != null) {
+ instance.shutdown();
+ }
+ }
+ }
+
+ private HazelcastInstance createServerInLocal(String clusterName) {
+ SeaTunnelConfig seaTunnelConfig =
ConfigProvider.locateAndGetSeaTunnelConfig();
+ seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
+ return
HazelcastInstanceFactory.newHazelcastInstance(seaTunnelConfig.getHazelcastConfig(),
+ Thread.currentThread().getName(),
+ new SeaTunnelNodeContext(seaTunnelConfig));
+ }
+
+ @SuppressWarnings("checkstyle:MagicNumber")
+ private String creatRandomClusterName(String namePrefix) {
+ Random random = new Random();
+ return namePrefix + "-" + random.nextInt(1000000);
+ }
+
+}
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
similarity index 58%
copy from
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
copy to
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
index 9f6a9ec5b..722c84e7b 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/SeaTunnelCommandBuilder.java
@@ -15,30 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.spark.command;
+package org.apache.seatunnel.core.starter.seatunnel.command;
import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.command.CommandBuilder;
-import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
-public class SparkCommandBuilder implements CommandBuilder<SparkCommandArgs> {
+public class SeaTunnelCommandBuilder implements
CommandBuilder<SeaTunnelCommandArgs> {
@Override
- public Command<SparkCommandArgs> buildCommand(SparkCommandArgs
commandArgs) {
+ public Command<SeaTunnelCommandArgs> buildCommand(SeaTunnelCommandArgs
commandArgs) {
Common.setDeployMode(commandArgs.getDeployMode());
- return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
- }
-
- /**
- * Used to generate command for seaTunnel API.
- */
- private static class SeaTunnelApiCommandBuilder extends
SparkCommandBuilder {
- @Override
- public Command<SparkCommandArgs> buildCommand(SparkCommandArgs
commandArgs) {
- return commandArgs.isCheckConfig() ? new
SparkApiConfValidateCommand(commandArgs)
- : new SparkApiTaskExecuteCommand(commandArgs);
- }
+ return commandArgs.isCheckConfig() ? new
SeaTunnelApiConfValidateCommand(commandArgs)
+ : new SeaTunnelApiTaskExecuteCommand(commandArgs);
}
}
-
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
similarity index 87%
copy from
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
copy to
seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
index b08a4a752..bfaff2a9c 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
+++
b/seatunnel-core/seatunnel-seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/config/SeaTunnelApiConfigChecker.java
@@ -15,17 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.spark.config;
+package org.apache.seatunnel.core.starter.seatunnel.config;
import org.apache.seatunnel.core.starter.config.ConfigChecker;
import org.apache.seatunnel.core.starter.exception.ConfigCheckException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-public class SparkApiConfigChecker implements ConfigChecker<SparkEnvironment> {
+public class SeaTunnelApiConfigChecker implements ConfigChecker {
@Override
public void checkConfig(Config config) throws ConfigCheckException {
- // todo: implement
+ // TODO implement
}
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
index 9f6a9ec5b..9bf8e376e 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkCommandBuilder.java
@@ -27,18 +27,9 @@ public class SparkCommandBuilder implements
CommandBuilder<SparkCommandArgs> {
@Override
public Command<SparkCommandArgs> buildCommand(SparkCommandArgs
commandArgs) {
Common.setDeployMode(commandArgs.getDeployMode());
- return new SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
+ return commandArgs.isCheckConfig() ? new
SparkApiConfValidateCommand(commandArgs)
+ : new SparkApiTaskExecuteCommand(commandArgs);
}
- /**
- * Used to generate command for seaTunnel API.
- */
- private static class SeaTunnelApiCommandBuilder extends
SparkCommandBuilder {
- @Override
- public Command<SparkCommandArgs> buildCommand(SparkCommandArgs
commandArgs) {
- return commandArgs.isCheckConfig() ? new
SparkApiConfValidateCommand(commandArgs)
- : new SparkApiTaskExecuteCommand(commandArgs);
- }
- }
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
index b08a4a752..afdf77579 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/config/SparkApiConfigChecker.java
@@ -22,7 +22,7 @@ import
org.apache.seatunnel.core.starter.exception.ConfigCheckException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-public class SparkApiConfigChecker implements ConfigChecker<SparkEnvironment> {
+public class SparkApiConfigChecker implements ConfigChecker {
@Override
public void checkConfig(Config config) throws ConfigCheckException {
diff --git
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
index f25ee6998..8c372f2d5 100644
---
a/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
+++
b/seatunnel-engine/seatunnel-engine-client/src/main/java/org/apache/seatunnel/engine/client/job/ClientJobProxy.java
@@ -37,8 +37,8 @@ import lombok.NonNull;
public class ClientJobProxy implements Job {
private static final ILogger LOGGER =
Logger.getLogger(ClientJobProxy.class);
- private SeaTunnelHazelcastClient seaTunnelHazelcastClient;
- private JobImmutableInformation jobImmutableInformation;
+ private final SeaTunnelHazelcastClient seaTunnelHazelcastClient;
+ private final JobImmutableInformation jobImmutableInformation;
public ClientJobProxy(@NonNull SeaTunnelHazelcastClient
seaTunnelHazelcastClient,
@NonNull JobImmutableInformation
jobImmutableInformation) {
@@ -63,10 +63,10 @@ public class ClientJobProxy implements Job {
/**
* This method will block even the Job turn to a EndState
*
- * @return
+ * @return The job final status
*/
public JobStatus waitForJobComplete() {
- JobStatus jobStatus = null;
+ JobStatus jobStatus;
try {
jobStatus = RetryUtils.retryWithException(() -> {
PassiveCompletableFuture<JobStatus> jobFuture =
doWaitForJobComplete();
@@ -84,18 +84,15 @@ public class ClientJobProxy implements Job {
jobImmutableInformation.getJobConfig().getName(),
jobImmutableInformation.getJobId(),
jobStatus));
+ this.seaTunnelHazelcastClient.getHazelcastInstance().shutdown();
return jobStatus;
}
@Override
public PassiveCompletableFuture<JobStatus> doWaitForJobComplete() {
- PassiveCompletableFuture<JobStatus> jobFuture =
- seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
-
SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
- response -> {
- return
JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)];
- });
- return jobFuture;
+ return seaTunnelHazelcastClient.requestOnMasterAndGetCompletableFuture(
+
SeaTunnelWaitForJobCompleteCodec.encodeRequest(jobImmutableInformation.getJobId()),
+ response ->
JobStatus.values()[SeaTunnelWaitForJobCompleteCodec.decodeResponse(response)]);
}
@Override
@@ -110,7 +107,7 @@ public class ClientJobProxy implements Job {
public JobStatus getJobStatus() {
int jobStatusOrdinal =
seaTunnelHazelcastClient.requestOnMasterAndDecodeResponse(
SeaTunnelGetJobStatusCodec.encodeRequest(jobImmutableInformation.getJobId()),
- response -> SeaTunnelGetJobStatusCodec.decodeResponse(response));
+ SeaTunnelGetJobStatusCodec::decodeResponse);
return JobStatus.values()[jobStatusOrdinal];
}
diff --git
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
index 56c006206..eecd2f2f1 100644
---
a/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
+++
b/seatunnel-engine/seatunnel-engine-server/src/main/java/org/apache/seatunnel/engine/server/SeaTunnelServer.java
@@ -128,6 +128,7 @@ public class SeaTunnelServer implements ManagedService,
MembershipAwareService,
if (resourceManager != null) {
resourceManager.close();
}
+ executorService.shutdown();
taskExecutionService.shutdown();
}
diff --git a/seatunnel-examples/seatunnel-engine-examples/pom.xml
b/seatunnel-examples/seatunnel-engine-examples/pom.xml
index 9c3a5ca5d..b75a68b85 100644
--- a/seatunnel-examples/seatunnel-engine-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-engine-examples/pom.xml
@@ -27,4 +27,22 @@
<modelVersion>4.0.0</modelVersion>
<artifactId>seatunnel-engine-examples</artifactId>
+ <dependencies>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-seatunnel-starter</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-fake</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-console</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ </dependencies>
</project>
\ No newline at end of file
diff --git
a/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
new file mode 100644
index 000000000..a50f31271
--- /dev/null
+++
b/seatunnel-examples/seatunnel-engine-examples/src/main/java/org/apache/seatunnel/example/engine/SeaTunnelEngineExample.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.example.engine;
+
+import org.apache.seatunnel.core.starter.Seatunnel;
+import org.apache.seatunnel.core.starter.command.Command;
+import org.apache.seatunnel.core.starter.exception.CommandException;
+import org.apache.seatunnel.core.starter.seatunnel.args.SeaTunnelCommandArgs;
+import
org.apache.seatunnel.core.starter.seatunnel.command.SeaTunnelCommandBuilder;
+
+import java.io.FileNotFoundException;
+import java.net.URISyntaxException;
+import java.net.URL;
+import java.nio.file.Paths;
+
+public class SeaTunnelEngineExample {
+
+ public static void main(String[] args) throws FileNotFoundException,
URISyntaxException, CommandException {
+ String configFile =
getTestConfigFile("/examples/fake_to_console.conf");
+ SeaTunnelCommandArgs seaTunnelCommandArgs = new SeaTunnelCommandArgs();
+ seaTunnelCommandArgs.setConfigFile(configFile);
+ seaTunnelCommandArgs.setCheckConfig(false);
+ seaTunnelCommandArgs.setName("fake_to_console");
+ Command<SeaTunnelCommandArgs> command =
+ new SeaTunnelCommandBuilder().buildCommand(seaTunnelCommandArgs);
+ Seatunnel.run(command);
+ }
+
+ public static String getTestConfigFile(String configFile) throws
FileNotFoundException, URISyntaxException {
+ URL resource = SeaTunnelEngineExample.class.getResource(configFile);
+ if (resource == null) {
+ throw new FileNotFoundException("Can't find config file: " +
configFile);
+ }
+ return Paths.get(resource.toURI()).toString();
+ }
+
+}
diff --git
a/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
new file mode 100644
index 000000000..404c9ec69
--- /dev/null
+++
b/seatunnel-examples/seatunnel-engine-examples/src/main/resources/examples/fake_to_console.conf
@@ -0,0 +1,51 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+######
+###### This config file is a demonstration of streaming processing in
seatunnel config
+######
+
+env {
+ # You can set engine configuration here
+ execution.parallelism = 3
+ job.mode = "BATCH"
+ execution.checkpoint.interval = 5000
+ #execution.checkpoint.data-uri = "hdfs://localhost:9000/checkpoint"
+}
+
+source {
+ # This is a example source plugin **only for test and demonstrate the
feature source plugin**
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 2
+ }
+
+ FakeSource {
+ result_table_name = "fake"
+ field_name = "name,age",
+ parallelism = 3
+ }
+}
+
+transform {
+}
+
+sink {
+ console {
+ source_table_name="fake"
+ }
+}
\ No newline at end of file