This is an automated email from the ASF dual-hosted git repository.

wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/api-draft by this push:
     new 3601f429 Clean code for new API starter (#1962)
3601f429 is described below

commit 3601f4294d79ca8b397005211bfc430d9f8b4f78
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri May 27 13:26:09 2022 +0800

    Clean code for new API starter (#1962)
    
    * Add TaskExecuteException for new API
    * Remove unused class
---
 .../seatunnel/core/base/config/ConfigBuilder.java  |   1 -
 .../core/flink/command/FlinkCommandBuilder.java    |  16 +---
 .../core/spark/command/SparkCommandBuilder.java    |  21 +----
 seatunnel-core/seatunnel-core-starter/README.md    |  12 +++
 .../starter/exception/TaskExecuteException.java}   |  21 ++---
 .../core/starter/execution/TaskExecution.java}     |  17 +---
 .../flink/command/FlinkApiTaskExecuteCommand.java  |   6 +-
 .../starter/flink/command/FlinkCommandBuilder.java |  15 +--
 .../flink/config/FlinkExecutionContext.java        | 104 ---------------------
 .../core/starter/flink/env/FlinkEnvironment.java   |  49 +---------
 ...FlinkTaskExecution.java => FlinkExecution.java} |  16 +++-
 .../flink/execution/PluginExecuteProcessor.java    |   4 +-
 .../flink/execution/SinkExecuteProcessor.java      |   3 +-
 .../flink/execution/TransformExecuteProcessor.java |  22 +++--
 .../spark/command/SparkApiTaskExecuteCommand.java  |   4 +-
 .../spark/execution/PluginExecuteProcessor.java    |   4 +-
 .../spark/execution/SinkExecuteProcessor.java      |   3 +-
 ...SparkTaskExecution.java => SparkExecution.java} |   9 +-
 .../spark/execution/TransformExecuteProcessor.java |   3 +-
 19 files changed, 85 insertions(+), 245 deletions(-)

diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index b3f7eaff..5f80ae40 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -37,7 +37,6 @@ public class ConfigBuilder {
 
     private static final Logger LOGGER = 
LoggerFactory.getLogger(ConfigBuilder.class);
 
-    private static final String PLUGIN_NAME_KEY = "plugin_name";
     private final Path configFile;
     private final Config config;
 
diff --git 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
index 32c0d082..5aef9f72 100644
--- 
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
+++ 
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
@@ -28,20 +28,10 @@ public class FlinkCommandBuilder implements 
CommandBuilder<FlinkCommandArgs> {
     public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs 
commandArgs) {
         if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
             throw new IllegalArgumentException(
-                    String.format("Deploy mode: %s is Illegal", 
commandArgs.getDeployMode()));
+                String.format("Deploy mode: %s is Illegal", 
commandArgs.getDeployMode()));
         }
 
-        return new FlinkApiCommandBuilder().buildCommand(commandArgs);
-    }
-
-    /**
-     * Used to generate command for engine API.
-     */
-    private static class FlinkApiCommandBuilder extends FlinkCommandBuilder {
-        @Override
-        public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs 
commandArgs) {
-            return commandArgs.isCheckConfig() ? new 
FlinkApiConfValidateCommand(commandArgs)
-                    : new FlinkApiTaskExecuteCommand(commandArgs);
-        }
+        return commandArgs.isCheckConfig() ? new 
FlinkApiConfValidateCommand(commandArgs)
+            : new FlinkApiTaskExecuteCommand(commandArgs);
     }
 }
diff --git 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
index 65c9582e..4ea8ed1e 100644
--- 
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
+++ 
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkCommandBuilder.java
@@ -28,25 +28,10 @@ public class SparkCommandBuilder implements 
CommandBuilder<SparkCommandArgs> {
     public Command<SparkCommandArgs> buildCommand(SparkCommandArgs 
commandArgs) {
         if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
             throw new IllegalArgumentException(
-                    String.format("Deploy mode: %s is Illegal", 
commandArgs.getDeployMode()));
-        }
-
-        return new SparkApiCommandBuilder().buildCommand(commandArgs);
-    }
-
-    /**
-     * Used to generate command for engine API.
-     */
-    private static class SparkApiCommandBuilder extends SparkCommandBuilder {
-        @Override
-        public Command<SparkCommandArgs> buildCommand(SparkCommandArgs 
commandArgs) {
-            if (!Common.setDeployMode(commandArgs.getDeployMode().getName())) {
-                throw new IllegalArgumentException(
-                        String.format("Deploy mode: %s is Illegal", 
commandArgs.getDeployMode()));
-            }
-            return commandArgs.isCheckConfig() ? new 
SparkConfValidateCommand(commandArgs)
-                    : new SparkTaskExecuteCommand(commandArgs);
+                String.format("Deploy mode: %s is Illegal", 
commandArgs.getDeployMode()));
         }
+        return commandArgs.isCheckConfig() ? new 
SparkConfValidateCommand(commandArgs)
+            : new SparkTaskExecuteCommand(commandArgs);
     }
 
 }
diff --git a/seatunnel-core/seatunnel-core-starter/README.md 
b/seatunnel-core/seatunnel-core-starter/README.md
new file mode 100644
index 00000000..835a5b19
--- /dev/null
+++ b/seatunnel-core/seatunnel-core-starter/README.md
@@ -0,0 +1,12 @@
+# Introduction
+
+This module is the base start module for SeaTunnel new connector API.
+
+![seatunnel_architecture.png](../../docs/en/images/seatunnel_architecture.png)
+
+# SeaTunnel Job Execute Process
+
+The first step, SeaTunnel runtime engine will get job definition from 
seatunnel.conf file, then parse the config, load
+seatunnel plugin from classpath/FileSystem. After initialize seatunnel plugin, 
SeaTunnel runtime engine will translate
+the job to target engine(Flink/Spark) job, then submit the job to target 
engine.
+
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/exception/TaskExecuteException.java
similarity index 62%
copy from 
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
copy to 
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/exception/TaskExecuteException.java
index 51fc752c..346e7348 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/exception/TaskExecuteException.java
@@ -15,20 +15,15 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.spark.execution;
+package org.apache.seatunnel.core.starter.exception;
 
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
+public class TaskExecuteException extends Exception {
 
-import java.util.List;
+    public TaskExecuteException(String message) {
+        super(message);
+    }
 
-public interface PluginExecuteProcessor {
-
-    /**
-     * Execute the current plugins, and return the result data stream.
-     *
-     * @param upstreamDataStreams the upstream data streams.
-     * @return the result data stream
-     */
-    List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws 
Exception;
+    public TaskExecuteException(String message, Throwable cause) {
+        super(message, cause);
+    }
 }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/TaskExecution.java
similarity index 62%
copy from 
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
copy to 
seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/TaskExecution.java
index 51fc752c..a64781e8 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-core-starter/src/main/java/org/apache/seatunnel/core/starter/execution/TaskExecution.java
@@ -15,20 +15,11 @@
  * limitations under the License.
  */
 
-package org.apache.seatunnel.core.starter.spark.execution;
+package org.apache.seatunnel.core.starter.execution;
 
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 
-import java.util.List;
+public interface TaskExecution {
 
-public interface PluginExecuteProcessor {
-
-    /**
-     * Execute the current plugins, and return the result data stream.
-     *
-     * @param upstreamDataStreams the upstream data streams.
-     * @return the result data stream
-     */
-    List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws 
Exception;
+    void execute() throws TaskExecuteException;
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
index f674ca12..9fbbca17 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkApiTaskExecuteCommand.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.config.ConfigBuilder;
 import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
 import org.apache.seatunnel.core.starter.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.core.starter.flink.execution.FlinkTaskExecution;
+import org.apache.seatunnel.core.starter.flink.execution.FlinkExecution;
 import org.apache.seatunnel.core.starter.utils.FileUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -37,7 +37,7 @@ import java.nio.file.Path;
  */
 public class FlinkApiTaskExecuteCommand implements Command<FlinkCommandArgs> {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkApiTaskExecuteCommand.class);
 
     private final FlinkCommandArgs flinkCommandArgs;
 
@@ -50,7 +50,7 @@ public class FlinkApiTaskExecuteCommand implements 
Command<FlinkCommandArgs> {
         Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
 
         Config config = new ConfigBuilder(configFile).getConfig();
-        FlinkTaskExecution seaTunnelTaskExecution = new 
FlinkTaskExecution(config);
+        FlinkExecution seaTunnelTaskExecution = new FlinkExecution(config);
         try {
             seaTunnelTaskExecution.execute();
         } catch (Exception e) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
index f6e8e4fb..437ce5d8 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/command/FlinkCommandBuilder.java
@@ -28,19 +28,10 @@ public class FlinkCommandBuilder implements 
CommandBuilder<FlinkCommandArgs> {
     public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs 
commandArgs) {
         if 
(Boolean.FALSE.equals(Common.setDeployMode(commandArgs.getDeployMode().getName())))
 {
             throw new IllegalArgumentException(
-                    String.format("Deploy mode: %s is Illegal", 
commandArgs.getDeployMode()));
+                String.format("Deploy mode: %s is Illegal", 
commandArgs.getDeployMode()));
         }
-        return new FlinkApiCommandBuilder().buildCommand(commandArgs);
+        return commandArgs.isCheckConfig() ? new 
FlinkApiConfValidateCommand(commandArgs)
+            : new FlinkApiTaskExecuteCommand(commandArgs);
     }
 
-    /**
-     * Used to generate command for seaTunnel API.
-     */
-    private static class FlinkApiCommandBuilder extends FlinkCommandBuilder {
-        @Override
-        public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs 
commandArgs) {
-            return commandArgs.isCheckConfig() ? new 
FlinkApiConfValidateCommand(commandArgs)
-                    : new FlinkApiTaskExecuteCommand(commandArgs);
-        }
-    }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkExecutionContext.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkExecutionContext.java
deleted file mode 100644
index 05bf68d0..00000000
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/config/FlinkExecutionContext.java
+++ /dev/null
@@ -1,104 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.core.starter.flink.config;
-
-import org.apache.seatunnel.apis.base.api.BaseSink;
-import org.apache.seatunnel.apis.base.api.BaseSource;
-import org.apache.seatunnel.apis.base.api.BaseTransform;
-import org.apache.seatunnel.core.starter.config.AbstractExecutionContext;
-import org.apache.seatunnel.core.starter.config.EngineType;
-import org.apache.seatunnel.core.starter.config.PluginType;
-import org.apache.seatunnel.flink.FlinkEnvironment;
-import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
-import org.apache.seatunnel.plugin.discovery.flink.FlinkSinkPluginDiscovery;
-import org.apache.seatunnel.plugin.discovery.flink.FlinkSourcePluginDiscovery;
-import 
org.apache.seatunnel.plugin.discovery.flink.FlinkTransformPluginDiscovery;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import java.net.URL;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.stream.Collectors;
-
-public class FlinkExecutionContext extends 
AbstractExecutionContext<FlinkEnvironment> {
-    private final FlinkSourcePluginDiscovery flinkSourcePluginDiscovery;
-    private final FlinkTransformPluginDiscovery flinkTransformPluginDiscovery;
-    private final FlinkSinkPluginDiscovery flinkSinkPluginDiscovery;
-    private final List<URL> pluginJars;
-
-    public FlinkExecutionContext(Config config, EngineType engine) {
-        super(config, engine);
-        this.flinkSourcePluginDiscovery = new FlinkSourcePluginDiscovery();
-        this.flinkTransformPluginDiscovery = new 
FlinkTransformPluginDiscovery();
-        this.flinkSinkPluginDiscovery = new FlinkSinkPluginDiscovery();
-        List<URL> pluginJars = new ArrayList<>();
-        // since we didn't split the transform plugin jars, we just need to 
register the source/sink plugin jars
-        
pluginJars.addAll(flinkSourcePluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SOURCE)));
-        
pluginJars.addAll(flinkSinkPluginDiscovery.getPluginJarPaths(getPluginIdentifiers(PluginType.SINK)));
-        this.pluginJars = pluginJars;
-        this.getEnvironment().registerPlugin(pluginJars);
-    }
-
-    @Override
-    public List<BaseSource<FlinkEnvironment>> getSources() {
-        final String pluginType = PluginType.SOURCE.getType();
-        final String engineType = EngineType.FLINK.getEngine();
-        final List<? extends Config> configList = 
getRootConfig().getConfigList(pluginType);
-        return configList.stream()
-            .map(pluginConfig -> {
-                PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(engineType, pluginType, 
pluginConfig.getString("plugin_name"));
-                BaseSource<FlinkEnvironment> pluginInstance = 
flinkSourcePluginDiscovery.getPluginInstance(pluginIdentifier);
-                pluginInstance.setConfig(pluginConfig);
-                return pluginInstance;
-            }).collect(Collectors.toList());
-    }
-
-    @Override
-    public List<BaseTransform<FlinkEnvironment>> getTransforms() {
-        final String pluginType = PluginType.TRANSFORM.getType();
-        final String engineType = EngineType.FLINK.getEngine();
-        final List<? extends Config> configList = 
getRootConfig().getConfigList(pluginType);
-        return configList.stream()
-            .map(pluginConfig -> {
-                PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(engineType, pluginType, 
pluginConfig.getString("plugin_name"));
-                BaseTransform<FlinkEnvironment> pluginInstance = 
flinkTransformPluginDiscovery.getPluginInstance(pluginIdentifier);
-                pluginInstance.setConfig(pluginConfig);
-                return pluginInstance;
-            }).collect(Collectors.toList());
-    }
-
-    @Override
-    public List<BaseSink<FlinkEnvironment>> getSinks() {
-        final String pluginType = PluginType.SINK.getType();
-        final String engineType = EngineType.FLINK.getEngine();
-        final List<? extends Config> configList = 
getRootConfig().getConfigList(pluginType);
-        return configList.stream()
-            .map(pluginConfig -> {
-                PluginIdentifier pluginIdentifier = 
PluginIdentifier.of(engineType, pluginType, 
pluginConfig.getString("plugin_name"));
-                BaseSink<FlinkEnvironment> pluginInstance = 
flinkSinkPluginDiscovery.getPluginInstance(pluginIdentifier);
-                pluginInstance.setConfig(pluginConfig);
-                return pluginInstance;
-            }).collect(Collectors.toList());
-    }
-
-    @Override
-    public List<URL> getPluginJars() {
-        return pluginJars;
-    }
-}
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
index bd216dfa..f9de514b 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/env/FlinkEnvironment.java
@@ -28,7 +28,6 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.time.Time;
-import org.apache.flink.api.java.ExecutionEnvironment;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.configuration.PipelineOptions;
 import org.apache.flink.contrib.streaming.state.RocksDBStateBackend;
@@ -40,7 +39,6 @@ import 
org.apache.flink.streaming.api.environment.CheckpointConfig;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
 import org.apache.flink.table.api.EnvironmentSettings;
 import org.apache.flink.table.api.TableConfig;
-import org.apache.flink.table.api.bridge.java.BatchTableEnvironment;
 import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
 import org.apache.flink.util.TernaryBoolean;
 import org.slf4j.Logger;
@@ -62,10 +60,6 @@ public class FlinkEnvironment implements RuntimeEnv {
 
     private StreamTableEnvironment tableEnvironment;
 
-    private ExecutionEnvironment batchEnvironment;
-
-    private BatchTableEnvironment batchTableEnvironment;
-
     private JobMode jobMode;
 
     private String jobName = "seatunnel";
@@ -91,10 +85,6 @@ public class FlinkEnvironment implements RuntimeEnv {
         // Batch/Streaming both use data stream api in SeaTunnel New API
         createStreamEnvironment();
         createStreamTableEnvironment();
-        if (!isStreaming()) {
-            createExecutionEnvironment();
-            createBatchTableEnvironment();
-        }
         if (config.hasPath("job.name")) {
             jobName = config.getString("job.name");
         }
@@ -105,10 +95,6 @@ public class FlinkEnvironment implements RuntimeEnv {
         return jobName;
     }
 
-    public boolean isStreaming() {
-        return JobMode.STREAMING.equals(jobMode);
-    }
-
     @Override
     public FlinkEnvironment setJobMode(JobMode jobMode) {
         this.jobMode = jobMode;
@@ -125,16 +111,12 @@ public class FlinkEnvironment implements RuntimeEnv {
         pluginPaths.forEach(url -> LOGGER.info("register plugins : {}", url));
         Configuration configuration;
         try {
-            if (isStreaming()) {
-                configuration =
-                        (Configuration) 
Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
-                                "getConfiguration")).orElseThrow(() -> new 
RuntimeException("can't find " +
-                                "method: 
getConfiguration")).invoke(this.environment);
-            } else {
-                configuration = batchEnvironment.getConfiguration();
-            }
+            configuration =
+                (Configuration) 
Objects.requireNonNull(ReflectionUtils.getDeclaredMethod(StreamExecutionEnvironment.class,
+                    "getConfiguration")).orElseThrow(() -> new 
RuntimeException("can't find " +
+                    "method: getConfiguration")).invoke(this.environment);
         } catch (Exception e) {
-            throw new RuntimeException(e);
+            throw new RuntimeException("Get flink configuration from 
environment failed", e);
         }
         List<String> jars = configuration.get(PipelineOptions.JARS);
         if (jars == null) {
@@ -208,27 +190,6 @@ public class FlinkEnvironment implements RuntimeEnv {
         }
     }
 
-    public ExecutionEnvironment getBatchEnvironment() {
-        return batchEnvironment;
-    }
-
-    public BatchTableEnvironment getBatchTableEnvironment() {
-        return batchTableEnvironment;
-    }
-
-    private void createExecutionEnvironment() {
-        batchEnvironment = ExecutionEnvironment.getExecutionEnvironment();
-        if (config.hasPath(ConfigKeyName.PARALLELISM)) {
-            int parallelism = config.getInt(ConfigKeyName.PARALLELISM);
-            batchEnvironment.setParallelism(parallelism);
-        }
-        EnvironmentUtil.setRestartStrategy(config, 
batchEnvironment.getConfig());
-    }
-
-    private void createBatchTableEnvironment() {
-        batchTableEnvironment = BatchTableEnvironment.create(batchEnvironment);
-    }
-
     private void setTimeCharacteristic() {
         if (config.hasPath(ConfigKeyName.TIME_CHARACTERISTIC)) {
             String timeType = 
config.getString(ConfigKeyName.TIME_CHARACTERISTIC);
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
similarity index 83%
rename from 
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
rename to 
seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
index c7e8759b..8d433c27 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkTaskExecution.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/FlinkExecution.java
@@ -20,6 +20,8 @@ package org.apache.seatunnel.core.starter.flink.execution;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.core.starter.config.EngineType;
 import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+import org.apache.seatunnel.core.starter.execution.TaskExecution;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -35,9 +37,9 @@ import java.util.List;
 /**
  * Used to execute a SeaTunnelTask.
  */
-public class FlinkTaskExecution {
+public class FlinkExecution implements TaskExecution {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkTaskExecution.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(FlinkExecution.class);
 
     private final Config config;
     private final FlinkEnvironment flinkEnvironment;
@@ -45,7 +47,7 @@ public class FlinkTaskExecution {
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
     private final PluginExecuteProcessor sinkPluginExecuteProcessor;
 
-    public FlinkTaskExecution(Config config) {
+    public FlinkExecution(Config config) {
         this.config = config;
         this.flinkEnvironment = (FlinkEnvironment) new 
EnvironmentFactory<>(config, EngineType.FLINK).getEnvironment();
         
SeaTunnelContext.getContext().setJobMode(flinkEnvironment.getJobMode());
@@ -54,13 +56,17 @@ public class FlinkTaskExecution {
         this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(flinkEnvironment, config.getConfigList("sink"));
     }
 
-    public void execute() throws Exception {
+    public void execute() throws TaskExecuteException {
         List<DataStream<Row>> dataStreams = new ArrayList<>();
         dataStreams = sourcePluginExecuteProcessor.execute(dataStreams);
         dataStreams = transformPluginExecuteProcessor.execute(dataStreams);
         sinkPluginExecuteProcessor.execute(dataStreams);
 
         LOGGER.info("Flink Execution Plan:{}", 
flinkEnvironment.getStreamExecutionEnvironment().getExecutionPlan());
-        
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
+        try {
+            
flinkEnvironment.getStreamExecutionEnvironment().execute(flinkEnvironment.getJobName());
+        } catch (Exception e) {
+            throw new TaskExecuteException("Execute Flink job error", e);
+        }
     }
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/PluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/PluginExecuteProcessor.java
index 3a458a79..92501ffa 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/PluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/PluginExecuteProcessor.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.types.Row;
 
@@ -30,5 +32,5 @@ public interface PluginExecuteProcessor {
      * @param upstreamDataStreams the upstream data streams.
      * @return the result data stream
      */
-    List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) 
throws Exception;
+    List<DataStream<Row>> execute(List<DataStream<Row>> upstreamDataStreams) 
throws TaskExecuteException;
 }
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
index 24b8fffc..d207b7eb 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/SinkExecuteProcessor.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowTypeInfo;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
@@ -69,7 +70,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
     }
 
     @Override
-    public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams) throws Exception {
+    public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams) throws TaskExecuteException {
         DataStream<Row> input = upstreamDataStreams.get(0);
         FlinkSinkConverter<SeaTunnelRow, Row, Serializable, Serializable, 
Serializable> flinkSinkConverter = new FlinkSinkConverter<>();
         for (int i = 0; i < plugins.size(); i++) {
diff --git 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
index e06c761e..ab0200e3 100644
--- 
a/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-flink-starter/src/main/java/org/apache/seatunnel/core/starter/flink/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.flink.execution;
 
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.flink.stream.FlinkStreamTransform;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
@@ -60,20 +61,25 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Fl
     }
 
     @Override
-    public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams) throws Exception {
+    public List<DataStream<Row>> execute(List<DataStream<Row>> 
upstreamDataStreams) throws TaskExecuteException {
         if (plugins.isEmpty()) {
             return upstreamDataStreams;
         }
         DataStream<Row> input = upstreamDataStreams.get(0);
         List<DataStream<Row>> result = new ArrayList<>();
         for (int i = 0; i < plugins.size(); i++) {
-            FlinkStreamTransform transform = plugins.get(i);
-            Config pluginConfig = pluginConfigs.get(i);
-            DataStream<Row> stream = 
fromSourceTable(pluginConfig).orElse(input);
-            input = transform.processStream(flinkEnvironment, stream);
-            registerResultTable(pluginConfig, input);
-            transform.registerFunction(flinkEnvironment);
-            result.add(input);
+            try {
+                FlinkStreamTransform transform = plugins.get(i);
+                Config pluginConfig = pluginConfigs.get(i);
+                DataStream<Row> stream = 
fromSourceTable(pluginConfig).orElse(input);
+                input = transform.processStream(flinkEnvironment, stream);
+                registerResultTable(pluginConfig, input);
+                transform.registerFunction(flinkEnvironment);
+                result.add(input);
+            } catch (Exception e) {
+                throw new TaskExecuteException(
+                    String.format("SeaTunnel transform task: %s execute 
error", plugins.get(i).getPluginName()), e);
+            }
         }
         return result;
     }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
index ecdc8a92..0bcbb372 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/command/SparkApiTaskExecuteCommand.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.core.starter.command.Command;
 import org.apache.seatunnel.core.starter.config.ConfigBuilder;
 import org.apache.seatunnel.core.starter.exception.CommandExecuteException;
 import org.apache.seatunnel.core.starter.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.core.starter.spark.execution.SparkTaskExecution;
+import org.apache.seatunnel.core.starter.spark.execution.SparkExecution;
 import org.apache.seatunnel.core.starter.utils.FileUtils;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -50,7 +50,7 @@ public class SparkApiTaskExecuteCommand implements 
Command<SparkCommandArgs> {
         Path configFile = FileUtils.getConfigPath(sparkCommandArgs);
         Config config = new ConfigBuilder(configFile).getConfig();
         try {
-            SparkTaskExecution seaTunnelTaskExecution = new 
SparkTaskExecution(config);
+            SparkExecution seaTunnelTaskExecution = new SparkExecution(config);
             seaTunnelTaskExecution.execute();
         } catch (Exception e) {
             LOGGER.error("Run SeaTunnel on spark failed.", e);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
index 51fc752c..198fa43f 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/PluginExecuteProcessor.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
+
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 
@@ -30,5 +32,5 @@ public interface PluginExecuteProcessor {
      * @param upstreamDataStreams the upstream data streams.
      * @return the result data stream
      */
-    List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws 
Exception;
+    List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) throws 
TaskExecuteException;
 }
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
index 2a5a4597..d9016a6e 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SinkExecuteProcessor.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
 
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSinkPluginDiscovery;
 import org.apache.seatunnel.spark.SparkEnvironment;
@@ -61,7 +62,7 @@ public class SinkExecuteProcessor extends 
AbstractPluginExecuteProcessor<SeaTunn
     }
 
     @Override
-    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) 
throws Exception {
+    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) 
throws TaskExecuteException {
         Dataset<Row> input = upstreamDataStreams.get(0);
         for (int i = 0; i < plugins.size(); i++) {
             Config sinkConfig = pluginConfigs.get(i);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
similarity index 92%
rename from 
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
rename to 
seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
index 5ddd0474..7b90bd0d 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkTaskExecution.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/SparkExecution.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.starter.spark.execution;
 import org.apache.seatunnel.api.common.SeaTunnelContext;
 import org.apache.seatunnel.core.starter.config.EngineType;
 import org.apache.seatunnel.core.starter.config.EnvironmentFactory;
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.spark.SparkEnvironment;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -32,9 +33,9 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 
-public class SparkTaskExecution {
+public class SparkExecution {
 
-    private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkTaskExecution.class);
+    private static final Logger LOGGER = 
LoggerFactory.getLogger(SparkExecution.class);
 
     private final Config config;
     private final SparkEnvironment sparkEnvironment;
@@ -42,7 +43,7 @@ public class SparkTaskExecution {
     private final PluginExecuteProcessor transformPluginExecuteProcessor;
     private final PluginExecuteProcessor sinkPluginExecuteProcessor;
 
-    public SparkTaskExecution(Config config) {
+    public SparkExecution(Config config) {
         this.config = config;
         this.sparkEnvironment = (SparkEnvironment) new 
EnvironmentFactory<>(config, EngineType.SPARK).getEnvironment();
         
SeaTunnelContext.getContext().setJobMode(sparkEnvironment.getJobMode());
@@ -51,7 +52,7 @@ public class SparkTaskExecution {
         this.sinkPluginExecuteProcessor = new 
SinkExecuteProcessor(sparkEnvironment, config.getConfigList("sink"));
     }
 
-    public void execute() throws Exception {
+    public void execute() throws TaskExecuteException {
         List<Dataset<Row>> datasets = new ArrayList<>();
         datasets = sourcePluginExecuteProcessor.execute(datasets);
         datasets = transformPluginExecuteProcessor.execute(datasets);
diff --git 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
index deec43b9..faa2bace 100644
--- 
a/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
+++ 
b/seatunnel-core/seatunnel-spark-starter/src/main/java/org/apache/seatunnel/core/starter/spark/execution/TransformExecuteProcessor.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.core.starter.spark.execution;
 
+import org.apache.seatunnel.core.starter.exception.TaskExecuteException;
 import org.apache.seatunnel.plugin.discovery.PluginIdentifier;
 import 
org.apache.seatunnel.plugin.discovery.seatunnel.SeaTunnelSparkTransformPluginDiscovery;
 import org.apache.seatunnel.spark.BaseSparkTransform;
@@ -60,7 +61,7 @@ public class TransformExecuteProcessor extends 
AbstractPluginExecuteProcessor<Ba
     }
 
     @Override
-    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) 
throws Exception {
+    public List<Dataset<Row>> execute(List<Dataset<Row>> upstreamDataStreams) 
throws TaskExecuteException {
         if (plugins.isEmpty()) {
             return upstreamDataStreams;
         }

Reply via email to