This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 3601f429 Clean code for new API starter (#1962)
3601f429 is described below
commit 3601f4294d79ca8b397005211bfc430d9f8b4f78
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri May 27 13:26:09 2022 +0800
Clean code for new API starter (#1962)
* Add TaskExecuteException for new API
* Remove unused class
---
.../seatunnel/core/base/config/ConfigBuilder.java | 1 -
.../core/flink/command/FlinkCommandBuilder.java | 16 +---
.../core/spark/command/SparkCommandBuilder.java | 21 +----
seatunnel-core/seatunnel-core-starter/README.md | 12 +++
.../starter/exception/TaskExecuteException.java} | 21 ++---
.../core/starter/execution/TaskExecution.java} | 17 +---
.../flink/command/FlinkApiTaskExecuteCommand.java | 6 +-
.../starter/flink/command/FlinkCommandBuilder.java | 15 +--
.../flink/config/FlinkExecutionContext.java | 104 ---------------------
.../core/starter/flink/env/FlinkEnvironment.java | 49 +---------
...FlinkTaskExecution.java => FlinkExecution.java} | 16 +++-
.../flink/execution/PluginExecuteProcessor.java | 4 +-
.../flink/execution/SinkExecuteProcessor.java | 3 +-
.../flink/execution/TransformExecuteProcessor.java | 22 +++--
.../spark/command/SparkApiTaskExecuteCommand.java | 4 +-
.../spark/execution/PluginExecuteProcessor.java | 4 +-
.../spark/execution/SinkExecuteProcessor.java | 3 +-
...SparkTaskExecution.java => SparkExecution.java} | 9 +-
.../spark/execution/TransformExecuteProcessor.java | 3 +-
19 files changed, 85 insertions(+), 245 deletions(-)
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index b3f7eaff..5f80ae40 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -37,7 +37,6 @@ public class ConfigBuilder {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigBuilder.class);
- private static final String PLUGIN_NAME_KEY = "plugin_name";
private final Path configFile;
private final Config config;
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
index 32c0d082..5aef9f72 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
@@ -28,20 +28,10 @@ public class FlinkCommandBuilder implements
CommandBuilder<FlinkCommandArgs> {
public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs
commandArgs) {
if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal",
commandArgs.getDeployMode()));
+ String.format("Deploy mode: %s is Illegal",
commandArgs.getDeployMode()));
}
- return new FlinkApiCommandBuilder().buildCommand(commandArgs);
- }
-
- /**
- * Used to generate command for engine API.
- */
- private static class FlinkApiCommandBuilder extends FlinkCommandBuilder {
- @Override
- public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs
commandArgs) {
- return commandArgs.isCheckConfig() ? new
FlinkApiConfValidateCommand(commandArgs)
- : new FlinkApiTaskExecuteCommand(commandArgs);
- }
+ return commandArgs.isCheckConfig() ? new
FlinkApiConfValidateCommand(commandArgs)
+ : new FlinkApiTaskExecuteCommand(commandArgs);
}
}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
index 65c9582e..4ea8ed1e 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
@@ -28,25 +28,10 @@ public class SparkCommandBuilder implements
CommandBuilder<SparkCommandArgs> {
public Command<SparkCommandArgs> buildCommand(SparkCommandArgs
commandArgs) {
if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal",
commandArgs.getDeployMode()));
- }
-
- return new SparkApiCommandBuilder().buildCommand(commandArgs);
- }
-
- /**
- * Used to generate command for engine API.
- */
- private static class SparkApiCommandBuilder extends SparkCommandBuilder {
- @Override
- public Command<SparkCommandArgs> buildCommand(SparkCommandArgs
commandArgs) {
- if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
- throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal",
commandArgs.getDeployMode()));
- }
- return commandArgs.isCheckConfig() ? new
SparkConfValidateCommand(commandArgs)
- : new SparkTaskExecuteCommand(commandArgs);
+ String.format("Deploy mode: %s is Illegal",
commandArgs.getDeployMode()));
}
+ return commandArgs.isCheckConfig() ? new
SparkConfValidateCommand(commandArgs)
+ : new SparkTaskExecuteCommand(commandArgs);
}
}
diff --git a/seatunnel-core/seatunnel-core-starter/README.md
b/seatunnel-core/seatunnel-core-starter/README.md
new file mode 100644
index 00000000..835a5b19
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/README.md
@@ -0,0 +1,12 @@
+# Introduction
+
+This module is the base start module for SeaTunnel new connector API.
+
+
+
+# SeaTunnel Job Execute Process
+
+The first step, SeaTunnel runtime engine will get job definition from
seatunnel.conf file, then parse the config, load
+seatunnel plugin from classpath/FileSystem. After initialize seatunnel plugin,
SeaTunnel runtime engine will translate
+the job to target engine(Flink/Spark) job, then submit the job to target
engine.
+
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/exception/TaskExecuteException.java
similarity index 62%
copy from
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
copy to
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/exception/TaskExecuteException.java
index 51fc752c..346e7348 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/exception/TaskExecuteException.java
@@ -15,20 +15,15 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.spark.execution;
+package org.apache.seatunnel.core.starter.exception;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
+public class TaskExecuteException extends Exception {
-import java.util.List;
+ public TaskExecuteException(String message) {
+ super(message);
+ }
-public interface PluginExecuteProcessor {
-
- /**
- * Execute the current plugins, and return the result data stream.
- *
- * @param upstreamDataStreams the upstream data streams.
- * @return the result data stream
- */
- List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws
Exception;
+ public TaskExecuteException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/TaskExecution.java
similarity index 62%
copy from
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
copy to
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/TaskExecution.java
index 51fc752c..a64781e8 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/TaskExecution.java
@@ -15,20 +15,11 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.starter.spark.execution;
+package org.apache.seatunnel.core.starter.execution;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
-import java.util.List;
+public interface TaskExecution {
-public interface PluginExecuteProcessor {
-
- /**
- * Execute the current plugins, and return the result data stream.
- *
- * @param upstreamDataStreams the upstream data streams.
- * @return the result data stream
- */
- List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws
Exception;
+ void execute() throws TaskExecuteException;
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
index f674ca12..9fbbca17 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.execution.FlinkTaskExecution;
+import org.apache.seatunnel.core.starter.flink.execution.FlinkExecution;
import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -37,7 +37,7 @@ import java.nio.file.Path;
*/
public class FlinkApiTaskExecuteCommand implements Command<FlinkCommandArgs> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkApiTaskExecuteCommand.class);
private final FlinkCommandArgs flinkCommandArgs;
@@ -50,7 +50,7 @@ public class FlinkApiTaskExecuteCommand implements
Command<FlinkCommandArgs> {
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
Config config = new ConfigBuilder(configFile).getConfig();
- FlinkTaskExecution seaTunnelTaskExecution = new
FlinkTaskExecution(config);
+ FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
try {
seaTunnelTaskExecution.execute();
} catch (Exception e) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
index f6e8e4fb..437ce5d8 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
@@ -28,19 +28,10 @@ public class FlinkCommandBuilder implements
CommandBuilder<FlinkCommandArgs> {
public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs
commandArgs) {
if
(Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName())))
{
throw new IllegalArgumentException(
- String.format("Deploy mode: %s is Illegal",
commandArgs.getDeployMode()));
+ String.format("Deploy mode: %s is Illegal",
commandArgs.getDeployMode()));
}
- return new FlinkApiCommandBuilder().buildCommand(commandArgs);
+ return commandArgs.isCheckConfig() ? new
FlinkApiConfValidateCommand(commandArgs)
+ : new FlinkApiTaskExecuteCommand(commandArgs);
}
- /**
- * Used to generate command for seaTunnel API.
- */
- private static class FlinkApiCommandBuilder extends FlinkCommandBuilder {
- @Override
- public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs
commandArgs) {
- return commandArgs.isCheckConfig() ? new
FlinkApiConfValidateCommand(commandArgs)
- : new FlinkApiTaskExecuteCommand(commandArgs);
- }
- }
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkExecutionContext.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkExecutionContext.java
deleted file mode 100644
index 05bf68d0..00000000
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkExecutionContext.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.config;
-
-import org.apache.seatunnel.apis.base.api.BaseSink;
-import org.apache.seatunnel.apis.base.api.BaseSource;
-import org.apache.seatunnel.apis.base.api.BaseTransform;
-import org.apache.seatunnel.core.starter.config.AbstractExecutionContext;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.core.starter.config.PluginType;
-import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.flink.FlinkSinkPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.flink.FlinkSourcePluginDiscovery;
-import
org.apache.seatunnel.plugin.discovery.flink.FlinkTransformPluginDiscovery;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class FlinkExecutionContext extends
AbstractExecutionContext<FlinkEnvironment> {
- private final FlinkSourcePluginDiscovery flinkSourcePluginDiscovery;
- private final FlinkTransformPluginDiscovery flinkTransformPluginDiscovery;
- private final FlinkSinkPluginDiscovery flinkSinkPluginDiscovery;
- private final List<URL> pluginJars;
-
- public FlinkExecutionContext(Config config, EngineType engine) {
- super(config, engine);
- this.flinkSourcePluginDiscovery = new FlinkSourcePluginDiscovery();
- this.flinkTransformPluginDiscovery = new
FlinkTransformPluginDiscovery();
- this.flinkSinkPluginDiscovery = new FlinkSinkPluginDiscovery();
- List<URL> pluginJars = new ArrayList<>();
- // since we didn't split the transform plugin jars, we just need to
register the source/sink plugin jars
-
pluginJars.addAll(flinkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
-
pluginJars.addAll(flinkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
- this.pluginJars = pluginJars;
- this.getEnvironment().registerPlugin(pluginJars);
- }
-
- @Override
- public List<BaseSource<FlinkEnvironment>> getSources() {
- final String pluginType = PluginType.SOURCE.getType();
- final String engineType = EngineType.FLINK.getEngine();
- final List<? extends Config> configList =
getRootConfig().getConfigList(pluginType);
- return configList.stream()
- .map(pluginConfig -> {
- PluginIdentifier pluginIdentifier =
PluginIdentifier.of(engineType, pluginType,
pluginConfig.getString("plugin_name"));
- BaseSource<FlinkEnvironment> pluginInstance =
flinkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
- pluginInstance.setConfig(pluginConfig);
- return pluginInstance;
- }).collect(Collectors.toList());
- }
-
- @Override
- public List<BaseTransform<FlinkEnvironment>> getTransforms() {
- final String pluginType = PluginType.TRANSFORM.getType();
- final String engineType = EngineType.FLINK.getEngine();
- final List<? extends Config> configList =
getRootConfig().getConfigList(pluginType);
- return configList.stream()
- .map(pluginConfig -> {
- PluginIdentifier pluginIdentifier =
PluginIdentifier.of(engineType, pluginType,
pluginConfig.getString("plugin_name"));
- BaseTransform<FlinkEnvironment> pluginInstance =
flinkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
- pluginInstance.setConfig(pluginConfig);
- return pluginInstance;
- }).collect(Collectors.toList());
- }
-
- @Override
- public List<BaseSink<FlinkEnvironment>> getSinks() {
- final String pluginType = PluginType.SINK.getType();
- final String engineType = EngineType.FLINK.getEngine();
- final List<? extends Config> configList =
getRootConfig().getConfigList(pluginType);
- return configList.stream()
- .map(pluginConfig -> {
- PluginIdentifier pluginIdentifier =
PluginIdentifier.of(engineType, pluginType,
pluginConfig.getString("plugin_name"));
- BaseSink<FlinkEnvironment> pluginInstance =
flinkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
- pluginInstance.setConfig(pluginConfig);
- return pluginInstance;
- }).collect(Collectors.toList());
- }
-
- @Override
- public List<URL> getPluginJars() {
- return pluginJars;
- }
-}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
index bd216dfa..f9de514b 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.PipelineOptions;
import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
@@ -40,7 +39,6 @@ import
org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.util.TernaryBoolean;
import org.slf4j.Logger;
@@ -62,10 +60,6 @@ public class FlinkEnvironment implements RuntimeEnv {
private StreamTableEnvironment tableEnvironment;
- private ExecutionEnvironment batchEnvironment;
-
- private BatchTableEnvironment batchTableEnvironment;
-
private JobMode jobMode;
private String jobName = "seatunnel";
@@ -91,10 +85,6 @@ public class FlinkEnvironment implements RuntimeEnv {
// Batch/Streaming both use data stream api in SeaTunnel New API
createStreamEnvironment();
createStreamTableEnvironment();
- if (!isStreaming()) {
- createExecutionEnvironment();
- createBatchTableEnvironment();
- }
if (config.hasPath("job.name")) {
jobName = config.getString("job.name");
}
@@ -105,10 +95,6 @@ public class FlinkEnvironment implements RuntimeEnv {
return jobName;
}
- public boolean isStreaming() {
- return JobMode.STREAMING.equals(jobMode);
- }
-
@Override
public FlinkEnvironment setJobMode(JobMode jobMode) {
this.jobMode = jobMode;
@@ -125,16 +111,12 @@ public class FlinkEnvironment implements RuntimeEnv {
pluginPaths.forEach(url -> LOGGER.info("register plugins : {}", url));
Configuration configuration;
try {
- if (isStreaming()) {
- configuration =
- (Configuration)
Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
- "getConfiguration")).orElseThrow(() -> new
RuntimeException("can't find " +
- "method:
getConfiguration")).invoke(this.environment);
- } else {
- configuration = batchEnvironment.getConfiguration();
- }
+ configuration =
+ (Configuration)
Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
+ "getConfiguration")).orElseThrow(() -> new
RuntimeException("can't find " +
+ "method: getConfiguration")).invoke(this.environment);
} catch (Exception e) {
- throw new RuntimeException(e);
+ throw new RuntimeException("Get flink configuration from
environment failed", e);
}
List<String> jars = configuration.get(PipelineOptions.JARS);
if (jars == null) {
@@ -208,27 +190,6 @@ public class FlinkEnvironment implements RuntimeEnv {
}
}
- public ExecutionEnvironment getBatchEnvironment() {
- return batchEnvironment;
- }
-
- public BatchTableEnvironment getBatchTableEnvironment() {
- return batchTableEnvironment;
- }
-
- private void createExecutionEnvironment() {
- batchEnvironment = ExecutionEnvironment.getExecutionEnvironment();
- if (config.hasPath(ConfigKeyName.PARALLELISM)) {
- int parallelism = config.getInt(ConfigKeyName.PARALLELISM);
- batchEnvironment.setParallelism(parallelism);
- }
- EnvironmentUtil.setRestartStrategy(config,
batchEnvironment.getConfig());
- }
-
- private void createBatchTableEnvironment() {
- batchTableEnvironment = BatchTableEnvironment.create(batchEnvironment);
- }
-
private void setTimeCharacteristic() {
if (config.hasPath(ConfigKeyName.TIME_CHARACTERISTIC)) {
String timeType =
config.getString(ConfigKeyName.TIME_CHARACTERISTIC);
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
similarity index 83%
rename from
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
rename to
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index c7e8759b..8d433c27 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.core.starter.flink.execution;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.core.starter.config.EngineType;
import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.TaskExecution;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -35,9 +37,9 @@ import java.util.List;
/**
* Used to execute a SeaTunnelTask.
*/
-public class FlinkTaskExecution {
+public class FlinkExecution implements TaskExecution {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkTaskExecution.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkExecution.class);
private final Config config;
private final FlinkEnvironment flinkEnvironment;
@@ -45,7 +47,7 @@ public class FlinkTaskExecution {
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
- public FlinkTaskExecution(Config config) {
+ public FlinkExecution(Config config) {
this.config = config;
this.flinkEnvironment = (FlinkEnvironment) new
EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
SeaTunnelContext.getContext().setJobMode(flinkEnvironment.getJobMode());
@@ -54,13 +56,17 @@ public class FlinkTaskExecution {
this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
}
- public void execute() throws Exception {
+ public void execute() throws TaskExecuteException {
List<DataStream<Row>> dataStreams = new ArrayList<>();
dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
sinkPluginExecuteProcessor.execute(dataStreams);
LOGGER.info("Flink Execution Plan:{}",
flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
-
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
+ try {
+
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
+ } catch (Exception e) {
+ throw new TaskExecuteException("Execute Flink job error", e);
+ }
}
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/PluginExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/PluginExecuteProcessor.java
index 3a458a79..92501ffa 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/PluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/PluginExecuteProcessor.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.types.Row;
@@ -30,5 +32,5 @@ public interface PluginExecuteProcessor {
* @param upstreamDataStreams the upstream data streams.
* @return the result data stream
*/
- List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
throws Exception;
+ List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams)
throws TaskExecuteException;
}
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 24b8fffc..d207b7eb 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -69,7 +70,7 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
}
@Override
- public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams) throws Exception {
+ public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams) throws TaskExecuteException {
DataStream<Row> input = upstreamDataStreams.get(0);
FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable,
Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
for (int i = 0; i < plugins.size(); i++) {
diff --git
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index e06c761e..ab0200e3 100644
---
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.flink.execution;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -60,20 +61,25 @@ public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<Fl
}
@Override
- public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams) throws Exception {
+ public List<DataStream<Row>> execute(List<DataStream<Row>>
upstreamDataStreams) throws TaskExecuteException {
if (plugins.isEmpty()) {
return upstreamDataStreams;
}
DataStream<Row> input = upstreamDataStreams.get(0);
List<DataStream<Row>> result = new ArrayList<>();
for (int i = 0; i < plugins.size(); i++) {
- FlinkStreamTransform transform = plugins.get(i);
- Config pluginConfig = pluginConfigs.get(i);
- DataStream<Row> stream =
fromSourceTable(pluginConfig).orElse(input);
- input = transform.processStream(flinkEnvironment, stream);
- registerResultTable(pluginConfig, input);
- transform.registerFunction(flinkEnvironment);
- result.add(input);
+ try {
+ FlinkStreamTransform transform = plugins.get(i);
+ Config pluginConfig = pluginConfigs.get(i);
+ DataStream<Row> stream =
fromSourceTable(pluginConfig).orElse(input);
+ input = transform.processStream(flinkEnvironment, stream);
+ registerResultTable(pluginConfig, input);
+ transform.registerFunction(flinkEnvironment);
+ result.add(input);
+ } catch (Exception e) {
+ throw new TaskExecuteException(
+ String.format("SeaTunnel transform task: %s execute
error", plugins.get(i).getPluginName()), e);
+ }
}
return result;
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
index ecdc8a92..0bcbb372 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.core.starter.command.Command;
import org.apache.seatunnel.core.starter.config.ConfigBuilder;
import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.spark.execution.SparkTaskExecution;
+import org.apache.seatunnel.core.starter.spark.execution.SparkExecution;
import org.apache.seatunnel.core.starter.utils.FileUtils;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -50,7 +50,7 @@ public class SparkApiTaskExecuteCommand implements
Command<SparkCommandArgs> {
Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
Config config = new ConfigBuilder(configFile).getConfig();
try {
- SparkTaskExecution seaTunnelTaskExecution = new
SparkTaskExecution(config);
+ SparkExecution seaTunnelTaskExecution = new SparkExecution(config);
seaTunnelTaskExecution.execute();
} catch (Exception e) {
LOGGER.error("Run SeaTunnel on spark failed.", e);
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
index 51fc752c..198fa43f 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
@@ -17,6 +17,8 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
@@ -30,5 +32,5 @@ public interface PluginExecuteProcessor {
* @param upstreamDataStreams the upstream data streams.
* @return the result data stream
*/
- List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws
Exception;
+ List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws
TaskExecuteException;
}
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 2a5a4597..d9016a6e 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
import org.apache.seatunnel.spark.SparkEnvironment;
@@ -61,7 +62,7 @@ public class SinkExecuteProcessor extends
AbstractPluginExecuteProcessor<SeaTunn
}
@Override
- public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
throws Exception {
+ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
throws TaskExecuteException {
Dataset<Row> input = upstreamDataStreams.get(0);
for (int i = 0; i < plugins.size(); i++) {
Config sinkConfig = pluginConfigs.get(i);
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
similarity index 92%
rename from
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
rename to
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 5ddd0474..7b90bd0d 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
import org.apache.seatunnel.api.common.SeaTunnelContext;
import org.apache.seatunnel.core.starter.config.EngineType;
import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.spark.SparkEnvironment;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -32,9 +33,9 @@ import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-public class SparkTaskExecution {
+public class SparkExecution {
- private static final Logger LOGGER =
LoggerFactory.getLogger(SparkTaskExecution.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SparkExecution.class);
private final Config config;
private final SparkEnvironment sparkEnvironment;
@@ -42,7 +43,7 @@ public class SparkTaskExecution {
private final PluginExecuteProcessor transformPluginExecuteProcessor;
private final PluginExecuteProcessor sinkPluginExecuteProcessor;
- public SparkTaskExecution(Config config) {
+ public SparkExecution(Config config) {
this.config = config;
this.sparkEnvironment = (SparkEnvironment) new
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
@@ -51,7 +52,7 @@ public class SparkTaskExecution {
this.sinkPluginExecuteProcessor = new
SinkExecuteProcessor(sparkEnvironment, config.getConfigList("sink"));
}
- public void execute() throws Exception {
+ public void execute() throws TaskExecuteException {
List<Dataset<Row>> datasets = new ArrayList<>();
datasets = sourcePluginExecuteProcessor.execute(datasets);
datasets = transformPluginExecuteProcessor.execute(datasets);
diff --git
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index deec43b9..faa2bace 100644
---
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.core.starter.spark.execution;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
import
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
import org.apache.seatunnel.spark.BaseSparkTransform;
@@ -60,7 +61,7 @@ public class TransformExecuteProcessor extends
AbstractPluginExecuteProcessor<Ba
}
@Override
- public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
throws Exception {
+ public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams)
throws TaskExecuteException {
if (plugins.isEmpty()) {
return upstreamDataStreams;
}