This is an automated email from the ASF dual-hosted git repository.
wenjun pushed a commit to branch api-draft
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/api-draft by this push:
new 96bd7031 Add SeaTunnelAPIExample (#1867)
96bd7031 is described below
commit 96bd7031fb59c65dabe31c4a3fe803f43512186c
Author: Wenjun Ruan <[email protected]>
AuthorDate: Fri May 13 15:52:09 2022 +0800
Add SeaTunnelAPIExample (#1867)
* Add command implementation
* Add SeaTunnelAPIExample
---
.../seatunnel/console/sink/ConsoleSink.java | 29 -----
.../org.apache.seatunnel.api.sink.SeaTunnelSink | 17 +++
.../seatunnel/fake/source/FakeSource.java | 5 +-
.../seatunnel/fake/source/FakeSourceReader.java | 22 +++-
...org.apache.seatunnel.api.source.SeaTunnelSource | 17 +++
.../org/apache/seatunnel/core/base/Seatunnel.java | 3 +-
.../core/base/command/AbstractCommandArgs.java | 1 +
.../seatunnel/core/base/command/Command.java | 3 +-
.../{command/Command.java => config/ApiType.java} | 26 ++---
.../seatunnel/core/base/config/ConfigBuilder.java | 19 +---
.../Command.java => config/ConfigChecker.java} | 20 ++--
.../core/base/config/EnvironmentFactory.java | 1 +
.../CommandException.java} | 23 ++--
.../CommandExecuteException.java} | 23 ++--
.../ConfigCheckException.java} | 21 ++--
seatunnel-core/seatunnel-core-flink/pom.xml | 26 +++++
.../seatunnel/core/flink/SeatunnelFlink.java | 3 +-
.../core/flink/args/FlinkCommandArgs.java | 36 ++++++
...mmand.java => FlinkApiConfValidateCommand.java} | 14 ++-
...ommand.java => FlinkApiTaskExecuteCommand.java} | 13 ++-
.../core/flink/command/FlinkCommandBuilder.java | 32 +++++-
...d.java => SeaTunnelApiConfValidateCommand.java} | 27 ++---
.../command/SeaTunnelApiTaskExecuteCommand.java | 121 +++++++++++++++++++++
.../core/flink/config/FlinkApiConfigChecker.java | 46 ++++++++
.../flink/config/SeaTunnelApiConfigChecker.java} | 22 ++--
.../core/flink/config/SeaTunnelApiEnvironment.java | 48 ++++----
.../seatunnel/core/spark/SeatunnelSpark.java | 3 +-
.../apache/seatunnel/core/spark/SparkStarter.java | 2 +-
.../spark/command/SparkConfValidateCommand.java | 8 +-
.../spark/command/SparkTaskExecuteCommand.java | 7 +-
.../spark/config/SeaTunnelApiConfigChecker.java} | 22 ++--
.../core/spark/config/SeaTunnelEnvironment.java | 66 +++++++++++
.../core/spark/config/SparkApiConfigChecker.java | 46 ++++++++
.../seatunnel-flink-examples/pom.xml | 10 ++
.../seatunnel/example/flink/LocalFlinkExample.java | 3 +-
...lFlinkExample.java => SeaTunnelApiExample.java} | 7 +-
.../seatunnel/example/spark/LocalSparkExample.java | 3 +-
.../translation/flink/sink/FlinkSinkWriter.java | 3 +
38 files changed, 589 insertions(+), 209 deletions(-)
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
index 947b3125..d7eab44b 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/java/org/apache/seatunnel/connectors/seatunnel/console/sink/ConsoleSink.java
@@ -17,16 +17,12 @@
package org.apache.seatunnel.connectors.seatunnel.console.sink;
-import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
-import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.api.sink.SinkCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.console.state.ConsoleState;
import java.util.List;
-import java.util.Optional;
public class ConsoleSink implements SeaTunnelSink<SeaTunnelRow, ConsoleState,
ConsoleCommitInfo, ConsoleAggregatedCommitInfo> {
@@ -40,29 +36,4 @@ public class ConsoleSink implements
SeaTunnelSink<SeaTunnelRow, ConsoleState, Co
SinkWriter.Context context, List<ConsoleState> states) {
return restoreWriter(context, states);
}
-
- @Override
- public Optional<Serializer<ConsoleState>> getWriterStateSerializer() {
- return getWriterStateSerializer();
- }
-
- @Override
- public Optional<SinkCommitter<ConsoleCommitInfo>> createCommitter() {
- return createCommitter();
- }
-
- @Override
- public Optional<Serializer<ConsoleCommitInfo>> getCommitInfoSerializer() {
- return getCommitInfoSerializer();
- }
-
- @Override
- public Optional<SinkAggregatedCommitter<ConsoleCommitInfo,
ConsoleAggregatedCommitInfo>> createAggregatedCommitter() {
- return createAggregatedCommitter();
- }
-
- @Override
- public Optional<Serializer<ConsoleAggregatedCommitInfo>>
getAggregatedCommitInfoSerializer() {
- return getAggregatedCommitInfoSerializer();
- }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
new file mode 100644
index 00000000..12b49983
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-console/src/main/resources/META-INF/services/org.apache.seatunnel.api.sink.SeaTunnelSink
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seatunnel.connectors.seatunnel.console.sink.ConsoleSink
\ No newline at end of file
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
index 0c3fb31b..9f18668b 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSource.java
@@ -22,9 +22,10 @@ import org.apache.seatunnel.api.source.Boundedness;
import org.apache.seatunnel.api.source.SeaTunnelSource;
import org.apache.seatunnel.api.source.SourceReader;
import org.apache.seatunnel.api.source.SourceSplitEnumerator;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.connectors.seatunnel.fake.state.FakeState;
-public class FakeSource implements SeaTunnelSource<FakeSourceEvent,
FakeSourceSplit, FakeState> {
+public class FakeSource implements SeaTunnelSource<SeaTunnelRow,
FakeSourceSplit, FakeState> {
@Override
public Boundedness getBoundedness() {
@@ -32,7 +33,7 @@ public class FakeSource implements
SeaTunnelSource<FakeSourceEvent, FakeSourceSp
}
@Override
- public SourceReader<FakeSourceEvent, FakeSourceSplit>
createReader(SourceReader.Context readerContext) {
+ public SourceReader<SeaTunnelRow, FakeSourceSplit>
createReader(SourceReader.Context readerContext) {
return new FakeSourceReader(readerContext);
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
index fa1c28c4..bf2cb2f6 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
@@ -19,18 +19,27 @@ package
org.apache.seatunnel.connectors.seatunnel.fake.source;
import org.apache.seatunnel.api.source.Collector;
import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
+import java.util.Random;
+import java.util.concurrent.ThreadLocalRandom;
-public class FakeSourceReader implements SourceReader<FakeSourceEvent,
FakeSourceSplit> {
+public class FakeSourceReader implements SourceReader<SeaTunnelRow,
FakeSourceSplit> {
private static final Logger LOGGER =
LoggerFactory.getLogger(FakeSourceReader.class);
private final SourceReader.Context context;
+ private final String[] names = {"Wenjun", "Fanjia", "Zongwen",
"CalvinKirs"};
+ private final int[] ages = {1024, 2048, 4096, 8192};
+ private final Random random = ThreadLocalRandom.current();
+
public FakeSourceReader(SourceReader.Context context) {
this.context = context;
}
@@ -47,8 +56,15 @@ public class FakeSourceReader implements
SourceReader<FakeSourceEvent, FakeSourc
@Override
@SuppressWarnings("magicnumber")
- public void pollNext(Collector<FakeSourceEvent> output) {
- output.collect(new FakeSourceEvent("Tom", 19,
System.currentTimeMillis()));
+ public void pollNext(Collector<SeaTunnelRow> output) throws
InterruptedException {
+ Thread.sleep(1000L);
+ int i = random.nextInt(names.length);
+ Map<String, Object> fieldMap = new HashMap<>(4);
+ fieldMap.put("name", names[i]);
+ fieldMap.put("age", ages[i]);
+ fieldMap.put("timestamp", System.currentTimeMillis());
+ SeaTunnelRow seaTunnelRow = new SeaTunnelRow(new Object[]{names[i],
ages[i], System.currentTimeMillis()}, fieldMap);
+ output.collect(seaTunnelRow);
}
@Override
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
new file mode 100644
index 00000000..b21a0581
--- /dev/null
+++
b/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/resources/META-INF/services/org.apache.seatunnel.api.source.SeaTunnelSource
@@ -0,0 +1,17 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+org.apache.seatunnel.connectors.seatunnel.fake.source.FakeSource
\ No newline at end of file
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
index 644df716..e4587284 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/Seatunnel.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.core.base;
import org.apache.seatunnel.apis.base.command.CommandArgs;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.commons.lang3.exception.ExceptionUtils;
import org.slf4j.Logger;
@@ -34,7 +35,7 @@ public class Seatunnel {
* @param command commandArgs
* @param <T> commandType
*/
- public static <T extends CommandArgs> void run(Command<T> command) {
+ public static <T extends CommandArgs> void run(Command<T> command) throws
CommandException {
try {
command.execute();
} catch (ConfigRuntimeException e) {
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
index 7f46fb03..a19fe285 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/AbstractCommandArgs.java
@@ -37,6 +37,7 @@ public abstract class AbstractCommandArgs implements
CommandArgs {
description = "variable substitution, such as -i city=beijing, or -i
date=20190318")
private List<String> variables = Collections.emptyList();
+ // todo: use command type enum
@Parameter(names = {"-t", "--check"},
description = "check config")
private boolean checkConfig = false;
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
index 5cd80fc5..cca16c30 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
@@ -18,6 +18,7 @@
package org.apache.seatunnel.core.base.command;
import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.core.base.exception.CommandException;
/**
* Command interface.
@@ -30,6 +31,6 @@ public interface Command<T extends CommandArgs> {
/**
* Execute command
*/
- void execute();
+ void execute() throws CommandException;
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
similarity index 73%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
index 5cd80fc5..5468eee3 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ApiType.java
@@ -15,21 +15,19 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.config;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+public enum ApiType {
+ ENGINE_API("engine"),
+ SEATUNNEL_API("seatunnel"),
+ ;
+ private final String apiType;
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
-
- /**
- * Execute command
- */
- void execute();
+ ApiType(String apiType) {
+ this.apiType = apiType;
+ }
+ public String getApiType() {
+ return apiType;
+ }
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
index 605613b0..758ec8c8 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigBuilder.java
@@ -17,7 +17,6 @@
package org.apache.seatunnel.core.base.config;
-import org.apache.seatunnel.apis.base.env.RuntimeEnv;
import org.apache.seatunnel.common.config.ConfigRuntimeException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -35,18 +34,16 @@ import java.nio.file.Path;
*
* @param <ENVIRONMENT> environment type.
*/
-public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
+public class ConfigBuilder {
private static final Logger LOGGER =
LoggerFactory.getLogger(ConfigBuilder.class);
private static final String PLUGIN_NAME_KEY = "plugin_name";
private final Path configFile;
- private final EngineType engine;
private final Config config;
- public ConfigBuilder(Path configFile, EngineType engine) {
+ public ConfigBuilder(Path configFile) {
this.configFile = configFile;
- this.engine = engine;
this.config = load();
}
@@ -75,16 +72,4 @@ public class ConfigBuilder<ENVIRONMENT extends RuntimeEnv> {
return config;
}
- /**
- * check if config is valid.
- **/
- public void checkConfig() {
- // check environment
- ENVIRONMENT environment = new EnvironmentFactory<ENVIRONMENT>(config,
engine).getEnvironment();
- // check plugins
- PluginFactory<ENVIRONMENT> pluginFactory = new PluginFactory<>(config,
engine);
- pluginFactory.createPlugins(PluginType.SOURCE);
- pluginFactory.createPlugins(PluginType.TRANSFORM);
- pluginFactory.createPlugins(PluginType.SINK);
- }
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigChecker.java
similarity index 60%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigChecker.java
index 5cd80fc5..30e4243e 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/ConfigChecker.java
@@ -15,21 +15,25 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.config;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
/**
- * Command interface.
+ * Check the config is valid.
*
- * @param <T> args type
+ * @param <ENVIRONMENT> the environment type.
*/
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+public interface ConfigChecker<ENVIRONMENT extends RuntimeEnv> {
/**
- * Execute command
+ * Check if the config is validated, if check fails, throw exception.
+ *
+ * @param config given config.
*/
- void execute();
+ void checkConfig(Config config) throws ConfigCheckException;
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
index ff450836..9daa23b7 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/config/EnvironmentFactory.java
@@ -43,6 +43,7 @@ public class EnvironmentFactory<ENVIRONMENT extends
RuntimeEnv> {
this.engine = engine;
}
+ // todo:put this method into submodule to avoid dependency on the engine
public synchronized ENVIRONMENT getEnvironment() {
Config envConfig = config.getConfig("env");
boolean enableHive = checkIsContainHive();
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandException.java
similarity index 73%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandException.java
index 5cd80fc5..33f2013b 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandException.java
@@ -15,21 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.exception;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
-
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
-
- /**
- * Execute command
- */
- void execute();
+public class CommandException extends Exception {
+ public CommandException(String message) {
+ super(message);
+ }
+ public CommandException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandExecuteException.java
similarity index 72%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandExecuteException.java
index 5cd80fc5..a3a8ee42 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/CommandExecuteException.java
@@ -15,21 +15,14 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.exception;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
-
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
-
- /**
- * Execute command
- */
- void execute();
+public class CommandExecuteException extends CommandException {
+ public CommandExecuteException(String message) {
+ super(message);
+ }
+ public CommandExecuteException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/ConfigCheckException.java
similarity index 73%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/ConfigCheckException.java
index 5cd80fc5..22d193bf 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/exception/ConfigCheckException.java
@@ -15,21 +15,16 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.base.exception;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+public class ConfigCheckException extends CommandException {
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+ public ConfigCheckException(String message) {
+ super(message);
+ }
- /**
- * Execute command
- */
- void execute();
+ public ConfigCheckException(String message, Throwable cause) {
+ super(message, cause);
+ }
}
diff --git a/seatunnel-core/seatunnel-core-flink/pom.xml
b/seatunnel-core/seatunnel-core-flink/pom.xml
index d985628a..d5101a41 100644
--- a/seatunnel-core/seatunnel-core-flink/pom.xml
+++ b/seatunnel-core/seatunnel-core-flink/pom.xml
@@ -42,6 +42,32 @@
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-api</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-translation-flink</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <!-- todo: use another module to execute new API? -->
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-java</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-streaming-java_${scala.binary.version}</artifactId>
+ </dependency>
+
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>seatunnel-api-flink</artifactId>
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
index 45792d00..f1db4a15 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/SeatunnelFlink.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.flink;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
import org.apache.seatunnel.core.flink.config.FlinkJobType;
@@ -26,7 +27,7 @@ import org.apache.seatunnel.core.flink.utils.CommandLineUtils;
public class SeatunnelFlink {
- public static void main(String[] args) {
+ public static void main(String[] args) throws CommandException {
FlinkCommandArgs flinkCommandArgs =
CommandLineUtils.parseCommandArgs(args, FlinkJobType.JAR);
Command<FlinkCommandArgs> flinkCommand = new FlinkCommandBuilder()
.buildCommand(flinkCommandArgs);
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
index fb892f9c..5b27fcfd 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/args/FlinkCommandArgs.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.core.flink.args;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.base.command.AbstractCommandArgs;
+import org.apache.seatunnel.core.base.config.ApiType;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.flink.config.FlinkRunMode;
@@ -34,6 +35,11 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
description = "job run mode, run or run-application")
private FlinkRunMode runMode = FlinkRunMode.RUN;
+ @Parameter(names = {"-api", "--api-type"},
+ converter = ApiTypeConverter.class,
+ description = "Api type, engine or seatunnel")
+ private ApiType apiType = ApiType.ENGINE_API;
+
/**
* Undefined parameters parsed will be stored here as flink command
parameters.
*/
@@ -65,6 +71,14 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
this.flinkParams = flinkParams;
}
+ public ApiType getApiType() {
+ return apiType;
+ }
+
+ public void setApiType(ApiType apiType) {
+ this.apiType = apiType;
+ }
+
/**
* Used to convert the run mode string to the enum value.
*/
@@ -86,4 +100,26 @@ public class FlinkCommandArgs extends AbstractCommandArgs {
}
}
+ /**
+ * Used to convert the api type string to the enum value.
+ */
+ private static class ApiTypeConverter implements IStringConverter<ApiType>
{
+
+ /**
+ * If the '-api' is not set, then will not go into this convert method.
+ *
+ * @param value input value set by '-api' or '--api-type'
+ * @return api type enum value
+ */
+ @Override
+ public ApiType convert(String value) {
+ for (ApiType apiType : ApiType.values()) {
+ if (apiType.getApiType().equalsIgnoreCase(value)) {
+ return apiType;
+ }
+ }
+ throw new IllegalArgumentException(String.format("API type %s not
supported", value));
+ }
+ }
+
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
similarity index 74%
rename from
seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
rename to
seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
index 5868e252..f03522a0 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkConfValidateCommand.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiConfValidateCommand.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.core.flink.command;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.core.flink.config.FlinkApiConfigChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,20 +32,21 @@ import java.nio.file.Path;
/**
* Used to check the Flink conf is validated.
*/
-public class FlinkConfValidateCommand implements Command<FlinkCommandArgs> {
+public class FlinkApiConfValidateCommand implements Command<FlinkCommandArgs> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkConfValidateCommand.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
private final FlinkCommandArgs flinkCommandArgs;
- public FlinkConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
+ public FlinkApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
this.flinkCommandArgs = flinkCommandArgs;
}
@Override
- public void execute() {
+ public void execute() throws ConfigCheckException {
Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
- new ConfigBuilder<FlinkEnvironment>(configPath,
flinkCommandArgs.getEngineType()).checkConfig();
+ ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+ new FlinkApiConfigChecker().checkConfig(configBuilder.getConfig());
LOGGER.info("config OK !");
}
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
similarity index 84%
rename from
seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
rename to
seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
index 32d13440..345f8414 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkApiTaskExecuteCommand.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.config.ExecutionContext;
import org.apache.seatunnel.core.base.config.ExecutionFactory;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.flink.FlinkEnvironment;
@@ -36,22 +37,22 @@ import java.nio.file.Path;
import java.util.List;
/**
- * Used to execute Flink Job.
+ * Used to execute Flink Job by Flink API.
*/
-public class FlinkTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
+public class FlinkApiTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
private final FlinkCommandArgs flinkCommandArgs;
- public FlinkTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+ public FlinkApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
this.flinkCommandArgs = flinkCommandArgs;
}
@Override
- public void execute() {
+ public void execute() throws CommandExecuteException {
EngineType engine = flinkCommandArgs.getEngineType();
Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
- Config config = new ConfigBuilder<>(configFile, engine).getConfig();
+ Config config = new ConfigBuilder(configFile).getConfig();
ExecutionContext<FlinkEnvironment> executionContext = new
ExecutionContext<>(config, engine);
List<BaseSource<FlinkEnvironment>> sources =
executionContext.getSources();
List<BaseTransform<FlinkEnvironment>> transforms =
executionContext.getTransforms();
@@ -68,7 +69,7 @@ public class FlinkTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkCommand
execution.start(sources, transforms, sinks);
close(sources, transforms, sinks);
} catch (Exception e) {
- throw new RuntimeException("Execute Flink task error", e);
+ throw new CommandExecuteException("Execute Flink task error", e);
}
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
index 595f01a5..590e7d09 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/FlinkCommandBuilder.java
@@ -30,7 +30,35 @@ public class FlinkCommandBuilder implements
CommandBuilder<FlinkCommandArgs> {
throw new IllegalArgumentException(
String.format("Deploy mode: %s is Illegal",
commandArgs.getDeployMode()));
}
- return commandArgs.isCheckConfig() ? new
FlinkConfValidateCommand(commandArgs)
- : new FlinkTaskExecuteCommand(commandArgs);
+ switch (commandArgs.getApiType()) {
+ case ENGINE_API:
+ return new FlinkApiCommandBuilder().buildCommand(commandArgs);
+ case SEATUNNEL_API:
+ return new
SeaTunnelApiCommandBuilder().buildCommand(commandArgs);
+ default:
+ throw new IllegalArgumentException("Unsupported API type: " +
commandArgs.getApiType());
+ }
+ }
+
+ /**
+ * Used to generate command for engine API.
+ */
+ private static class FlinkApiCommandBuilder extends FlinkCommandBuilder {
+ @Override
+ public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs
commandArgs) {
+ return commandArgs.isCheckConfig() ? new
FlinkApiConfValidateCommand(commandArgs)
+ : new FlinkApiTaskExecuteCommand(commandArgs);
+ }
+ }
+
+ /**
+ * Used to generate command for seaTunnel API.
+ */
+ private static class SeaTunnelApiCommandBuilder extends
FlinkCommandBuilder {
+ @Override
+ public Command<FlinkCommandArgs> buildCommand(FlinkCommandArgs
commandArgs) {
+ return commandArgs.isCheckConfig() ? new
SeaTunnelApiConfValidateCommand(commandArgs)
+ : new SeaTunnelApiTaskExecuteCommand(commandArgs);
+ }
}
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiConfValidateCommand.java
similarity index 64%
rename from
seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
rename to
seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiConfValidateCommand.java
index c58884f5..e9c93411 100644
---
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiConfValidateCommand.java
@@ -19,11 +19,10 @@ package org.apache.seatunnel.core.flink.command;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
-import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
+import org.apache.seatunnel.core.flink.config.SeaTunnelApiConfigChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,26 +30,24 @@ import org.slf4j.LoggerFactory;
import java.nio.file.Path;
/**
- * Used to execute a SeaTunnelTask. This command is used to execute the Flink
job by new API.
+ * Use to validate the configuration of the SeaTunnel API.
*/
-public class SeaTunnelTaskExecuteCommand implements Command<FlinkCommandArgs> {
+public class SeaTunnelApiConfValidateCommand implements
Command<FlinkCommandArgs> {
- private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkConfValidateCommand.class);
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SeaTunnelApiConfValidateCommand.class);
private final FlinkCommandArgs flinkCommandArgs;
- public SeaTunnelTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+ public SeaTunnelApiConfValidateCommand(FlinkCommandArgs flinkCommandArgs) {
this.flinkCommandArgs = flinkCommandArgs;
}
@Override
- public void execute() {
- EngineType engine = flinkCommandArgs.getEngineType();
- Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
-
- Config config = new ConfigBuilder<>(configFile, engine).getConfig();
- // initialize the new plugin.
- // translate plugin to flink source/sink
- // execute the flink job
+ public void execute() throws ConfigCheckException {
+ Path configPath = FileUtils.getConfigPath(flinkCommandArgs);
+ // todo: validate the config by new api
+ ConfigBuilder configBuilder = new ConfigBuilder(configPath);
+ new SeaTunnelApiConfigChecker().checkConfig(configBuilder.getConfig());
+ LOGGER.info("config OK !");
}
}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
new file mode 100644
index 00000000..e767df5f
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/command/SeaTunnelApiTaskExecuteCommand.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.command;
+
+import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.source.SourceSplit;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.constants.JobMode;
+import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
+import org.apache.seatunnel.core.base.utils.FileUtils;
+import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
+import org.apache.seatunnel.translation.flink.sink.FlinkSinkConverter;
+import org.apache.seatunnel.translation.flink.source.SeaTunnelParallelSource;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.streaming.api.datastream.DataStreamSource;
+import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.nio.file.Path;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.ServiceLoader;
+
+/**
+ * This command is used to execute the Flink job by SeaTunnel new API.
+ */
+public class SeaTunnelApiTaskExecuteCommand implements
Command<FlinkCommandArgs> {
+
+ private static final Logger LOGGER =
LoggerFactory.getLogger(FlinkApiConfValidateCommand.class);
+
+ private final FlinkCommandArgs flinkCommandArgs;
+
+ public SeaTunnelApiTaskExecuteCommand(FlinkCommandArgs flinkCommandArgs) {
+ this.flinkCommandArgs = flinkCommandArgs;
+ }
+
+ @Override
+ public void execute() throws CommandExecuteException {
+ EngineType engine = flinkCommandArgs.getEngineType();
+ Path configFile = FileUtils.getConfigPath(flinkCommandArgs);
+
+ Config config = new ConfigBuilder(configFile).getConfig();
+ // todo: add basic type
+ SeaTunnelParallelSource source = getSource();
+ Sink<WrappedRow, Object, Object, Object> flinkSink = getSink();
+ // execute the flink job
+ FlinkEnvironment flinkEnvironment = getFlinkEnvironment(config);
+ StreamExecutionEnvironment streamExecutionEnvironment =
flinkEnvironment.getStreamExecutionEnvironment();
+ DataStreamSource<WrappedRow> dataStream =
streamExecutionEnvironment.addSource(source);
+ dataStream.sinkTo(flinkSink);
+ try {
+
streamExecutionEnvironment.execute("SeaTunnelAPITaskExecuteCommand");
+ } catch (Exception e) {
+ throw new CommandExecuteException("SeaTunnelAPITaskExecuteCommand
execute failed", e);
+ }
+ }
+
+ private SeaTunnelParallelSource getSource() {
+ return new SeaTunnelParallelSource(loadSourcePlugin());
+ }
+
+ private Sink<WrappedRow, Object, Object, Object> getSink() {
+ SeaTunnelSink<SeaTunnelRow, Object, Object, Object> sink =
loadSinkPlugin();
+ FlinkSinkConverter<SeaTunnelRow, WrappedRow, Object, Object, Object>
flinkSinkConverter = new FlinkSinkConverter<>();
+ return flinkSinkConverter.convert(sink, Collections.emptyMap());
+ }
+
+ private <T, SplitT extends SourceSplit, StateT> SeaTunnelSource<T, SplitT,
StateT> loadSourcePlugin() {
+ // todo: use FactoryUtils to load the plugin
+ ServiceLoader<SeaTunnelSource> serviceLoader =
ServiceLoader.load(SeaTunnelSource.class);
+ Iterator<SeaTunnelSource> iterator = serviceLoader.iterator();
+ if (iterator.hasNext()) {
+ return iterator.next();
+ }
+ throw new IllegalArgumentException("Cannot find the plugin.");
+ }
+
+ private <IN, StateT, CommitInfoT, AggregatedCommitInfoT> SeaTunnelSink<IN,
StateT, CommitInfoT, AggregatedCommitInfoT> loadSinkPlugin() {
+ // todo: use FactoryUtils to load the plugin
+ ServiceLoader<SeaTunnelSink> serviceLoader =
ServiceLoader.load(SeaTunnelSink.class);
+ Iterator<SeaTunnelSink> iterator = serviceLoader.iterator();
+ if (iterator.hasNext()) {
+ return iterator.next();
+ }
+ throw new IllegalArgumentException("Cannot find the plugin.");
+ }
+
+ private FlinkEnvironment getFlinkEnvironment(Config config) {
+ FlinkEnvironment flinkEnvironment = new FlinkEnvironment();
+ flinkEnvironment.setJobMode(JobMode.STREAMING);
+ flinkEnvironment.setConfig(config);
+ flinkEnvironment.prepare();
+
+ return flinkEnvironment;
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
new file mode 100644
index 00000000..7dca74ec
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/FlinkApiConfigChecker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.flink.config;
+
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.base.config.PluginFactory;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+public class FlinkApiConfigChecker implements ConfigChecker<FlinkEnvironment> {
+
+ @Override
+ public void checkConfig(Config config) throws ConfigCheckException {
+ try {
+ // check environment
+ FlinkEnvironment environment = new
EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+ // check plugins
+ PluginFactory<FlinkEnvironment> pluginFactory = new
PluginFactory<>(config, EngineType.FLINK);
+ pluginFactory.createPlugins(PluginType.SOURCE);
+ pluginFactory.createPlugins(PluginType.TRANSFORM);
+ pluginFactory.createPlugins(PluginType.SINK);
+ } catch (Exception ex) {
+ throw new ConfigCheckException("Config check fail", ex);
+ }
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiConfigChecker.java
similarity index 64%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to
seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiConfigChecker.java
index 5cd80fc5..7f0a722f 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiConfigChecker.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.flink.config;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
- /**
- * Execute command
- */
- void execute();
+public class SeaTunnelApiConfigChecker implements
ConfigChecker<SeaTunnelApiEnvironment> {
+ @Override
+ public void checkConfig(Config config) throws ConfigCheckException {
+ // todo: implement
+ }
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiEnvironment.java
similarity index 55%
copy from
seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
copy to
seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiEnvironment.java
index fa1c28c4..7d0d5956 100644
---
a/seatunnel-connectors/seatunnel-connectors-seatunnel/seatunnel-connectors-seatunnel-fake/src/main/java/org/apache/seatunnel/connectors/seatunnel/fake/source/FakeSourceReader.java
+++
b/seatunnel-core/seatunnel-core-flink/src/main/java/org/apache/seatunnel/core/flink/config/SeaTunnelApiEnvironment.java
@@ -15,59 +15,61 @@
* limitations under the License.
*/
-package org.apache.seatunnel.connectors.seatunnel.fake.source;
+package org.apache.seatunnel.core.flink.config;
-import org.apache.seatunnel.api.source.Collector;
-import org.apache.seatunnel.api.source.SourceReader;
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.net.URL;
import java.util.List;
-public class FakeSourceReader implements SourceReader<FakeSourceEvent,
FakeSourceSplit> {
-
- private static final Logger LOGGER =
LoggerFactory.getLogger(FakeSourceReader.class);
+public class SeaTunnelApiEnvironment implements RuntimeEnv {
- private final SourceReader.Context context;
+ private static final Logger LOGGER =
LoggerFactory.getLogger(SeaTunnelApiEnvironment.class);
- public FakeSourceReader(SourceReader.Context context) {
- this.context = context;
- }
+ private Config config;
@Override
- public void open() {
-
+ public SeaTunnelApiEnvironment setConfig(Config config) {
+ this.config = config;
+ return this;
}
@Override
- public void close() {
-
+ public Config getConfig() {
+ return config;
}
@Override
- @SuppressWarnings("magicnumber")
- public void pollNext(Collector<FakeSourceEvent> output) {
- output.collect(new FakeSourceEvent("Tom", 19,
System.currentTimeMillis()));
+ public CheckResult checkConfig() {
+ // todo
+ return null;
}
@Override
- public List<FakeSourceSplit> snapshotState(long checkpointId) {
+ public SeaTunnelApiEnvironment prepare() {
+ // todo
return null;
}
@Override
- public void addSplits(List<FakeSourceSplit> splits) {
-
+ public SeaTunnelApiEnvironment setJobMode(JobMode mode) {
+ return null;
}
@Override
- public void handleNoMoreSplits() {
-
+ public JobMode getJobMode() {
+ return null;
}
@Override
- public void notifyCheckpointComplete(long checkpointId) {
+ public void registerPlugin(List<URL> pluginPaths) {
}
}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
index d88cc7ec..05a1a1d1 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SeatunnelSpark.java
@@ -19,13 +19,14 @@ package org.apache.seatunnel.core.spark;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
import org.apache.seatunnel.core.spark.utils.CommandLineUtils;
public class SeatunnelSpark {
- public static void main(String[] args) {
+ public static void main(String[] args) throws CommandException {
SparkCommandArgs sparkArgs = CommandLineUtils.parseSparkArgs(args);
Command<SparkCommandArgs> sparkCommand =
new SparkCommandBuilder().buildCommand(sparkArgs);
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
index 81f44ba9..41b53080 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/SparkStarter.java
@@ -220,7 +220,7 @@ public class SparkStarter implements Starter {
if (!Files.exists(pluginRootDir) || !Files.isDirectory(pluginRootDir))
{
return Collections.emptyList();
}
- Config config = new
ConfigBuilder<>(Paths.get(commandArgs.getConfigFile()),
EngineType.SPARK).getConfig();
+ Config config = new
ConfigBuilder(Paths.get(commandArgs.getConfigFile())).getConfig();
PluginFactory<RuntimeEnv> pluginFactory = new PluginFactory<>(config,
EngineType.SPARK);
return pluginFactory.getPluginJarPaths().stream().map(url -> new
File(url.getPath()).toPath()).collect(Collectors.toList());
}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
index 5629e51d..0f4a9307 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkConfValidateCommand.java
@@ -19,9 +19,10 @@ package org.apache.seatunnel.core.spark.command;
import org.apache.seatunnel.core.base.command.Command;
import org.apache.seatunnel.core.base.config.ConfigBuilder;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
-import org.apache.seatunnel.spark.SparkEnvironment;
+import org.apache.seatunnel.core.spark.config.SparkApiConfigChecker;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,9 +43,10 @@ public class SparkConfValidateCommand implements
Command<SparkCommandArgs> {
}
@Override
- public void execute() {
+ public void execute() throws ConfigCheckException {
Path confPath = FileUtils.getConfigPath(sparkCommandArgs);
- new ConfigBuilder<SparkEnvironment>(confPath,
sparkCommandArgs.getEngineType()).checkConfig();
+ ConfigBuilder configBuilder = new ConfigBuilder(confPath);
+ new SparkApiConfigChecker().checkConfig(configBuilder.getConfig());
LOGGER.info("config OK !");
}
}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
index c8aebd4b..9a051416 100644
---
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/command/SparkTaskExecuteCommand.java
@@ -26,6 +26,7 @@ import org.apache.seatunnel.core.base.config.ConfigBuilder;
import org.apache.seatunnel.core.base.config.EngineType;
import org.apache.seatunnel.core.base.config.ExecutionContext;
import org.apache.seatunnel.core.base.config.ExecutionFactory;
+import org.apache.seatunnel.core.base.exception.CommandExecuteException;
import org.apache.seatunnel.core.base.utils.FileUtils;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.spark.SparkEnvironment;
@@ -44,11 +45,11 @@ public class SparkTaskExecuteCommand extends
BaseTaskExecuteCommand<SparkCommand
}
@Override
- public void execute() {
+ public void execute() throws CommandExecuteException {
EngineType engine = sparkCommandArgs.getEngineType();
Path confFile = FileUtils.getConfigPath(sparkCommandArgs);
- Config config = new ConfigBuilder<>(confFile, engine).getConfig();
+ Config config = new ConfigBuilder(confFile).getConfig();
ExecutionContext<SparkEnvironment> executionContext = new
ExecutionContext<>(config, engine);
List<BaseSource<SparkEnvironment>> sources =
executionContext.getSources();
@@ -66,7 +67,7 @@ public class SparkTaskExecuteCommand extends
BaseTaskExecuteCommand<SparkCommand
execution.start(sources, transforms, sinks);
close(sources, transforms, sinks);
} catch (Exception e) {
- throw new RuntimeException("Execute Spark task error", e);
+ throw new CommandExecuteException("Execute Spark task error", e);
}
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
similarity index 64%
copy from
seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
copy to
seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
index 5cd80fc5..3715c14b 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/core/base/command/Command.java
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelApiConfigChecker.java
@@ -15,21 +15,17 @@
* limitations under the License.
*/
-package org.apache.seatunnel.core.base.command;
+package org.apache.seatunnel.core.spark.config;
-import org.apache.seatunnel.apis.base.command.CommandArgs;
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
-/**
- * Command interface.
- *
- * @param <T> args type
- */
-@FunctionalInterface
-public interface Command<T extends CommandArgs> {
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
- /**
- * Execute command
- */
- void execute();
+public class SeaTunnelApiConfigChecker implements
ConfigChecker<SeaTunnelEnvironment> {
+ @Override
+ public void checkConfig(Config config) throws ConfigCheckException {
+ // todo: implement
+ }
}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
new file mode 100644
index 00000000..6f9e0135
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SeaTunnelEnvironment.java
@@ -0,0 +1,66 @@
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.config;
+
+import org.apache.seatunnel.apis.base.env.RuntimeEnv;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.JobMode;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import java.net.URL;
+import java.util.List;
+
+public class SeaTunnelEnvironment implements RuntimeEnv {
+
+ @Override
+ public SeaTunnelEnvironment setConfig(Config config) {
+ return null;
+ }
+
+ @Override
+ public Config getConfig() {
+ return null;
+ }
+
+ @Override
+ public CheckResult checkConfig() {
+ return null;
+ }
+
+ @Override
+ public SeaTunnelEnvironment prepare() {
+ return null;
+ }
+
+ @Override
+ public SeaTunnelEnvironment setJobMode(JobMode mode) {
+ return null;
+ }
+
+ @Override
+ public JobMode getJobMode() {
+ return null;
+ }
+
+ @Override
+ public void registerPlugin(List<URL> pluginPaths) {
+
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
new file mode 100644
index 00000000..26de5794
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-spark/src/main/java/org/apache/seatunnel/core/spark/config/SparkApiConfigChecker.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.core.spark.config;
+
+import org.apache.seatunnel.core.base.config.ConfigChecker;
+import org.apache.seatunnel.core.base.config.EngineType;
+import org.apache.seatunnel.core.base.config.EnvironmentFactory;
+import org.apache.seatunnel.core.base.config.PluginFactory;
+import org.apache.seatunnel.core.base.config.PluginType;
+import org.apache.seatunnel.core.base.exception.ConfigCheckException;
+import org.apache.seatunnel.flink.FlinkEnvironment;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+public class SparkApiConfigChecker implements ConfigChecker<FlinkEnvironment> {
+
+ @Override
+ public void checkConfig(Config config) throws ConfigCheckException {
+ try {
+ // check environment
+ FlinkEnvironment environment = new
EnvironmentFactory<FlinkEnvironment>(config, EngineType.FLINK).getEnvironment();
+ // check plugins
+ PluginFactory<FlinkEnvironment> pluginFactory = new
PluginFactory<>(config, EngineType.FLINK);
+ pluginFactory.createPlugins(PluginType.SOURCE);
+ pluginFactory.createPlugins(PluginType.TRANSFORM);
+ pluginFactory.createPlugins(PluginType.SINK);
+ } catch (Exception ex) {
+ throw new ConfigCheckException("Config check fail", ex);
+ }
+ }
+}
diff --git a/seatunnel-examples/seatunnel-flink-examples/pom.xml
b/seatunnel-examples/seatunnel-flink-examples/pom.xml
index 79fbe8f1..bd5caf5e 100644
--- a/seatunnel-examples/seatunnel-flink-examples/pom.xml
+++ b/seatunnel-examples/seatunnel-flink-examples/pom.xml
@@ -56,6 +56,16 @@
<artifactId>seatunnel-connector-flink-console</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-seatunnel-fake</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-connectors-seatunnel-console</artifactId>
+ <version>${project.version}</version>
+ </dependency>
<!-- seatunnel connectors -->
<!--flink-->
diff --git
a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
index a740b8ee..b41236e5 100644
---
a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++
b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.example.flink;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
@@ -29,7 +30,7 @@ import java.nio.file.Paths;
public class LocalFlinkExample {
- public static void main(String[] args) throws FileNotFoundException,
URISyntaxException {
+ public static void main(String[] args) throws FileNotFoundException,
URISyntaxException, CommandException {
String configFile =
getTestConfigFile("/examples/fake_to_console.conf");
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
flinkCommandArgs.setConfigFile(configFile);
diff --git
a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
similarity index 88%
copy from
seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
copy to
seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
index a740b8ee..3f13e4b5 100644
---
a/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/LocalFlinkExample.java
+++
b/seatunnel-examples/seatunnel-flink-examples/src/main/java/org/apache/seatunnel/example/flink/SeaTunnelApiExample.java
@@ -19,6 +19,8 @@ package org.apache.seatunnel.example.flink;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.config.ApiType;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.flink.args.FlinkCommandArgs;
import org.apache.seatunnel.core.flink.command.FlinkCommandBuilder;
@@ -27,14 +29,15 @@ import java.net.URISyntaxException;
import java.net.URL;
import java.nio.file.Paths;
-public class LocalFlinkExample {
+public class SeaTunnelApiExample {
- public static void main(String[] args) throws FileNotFoundException,
URISyntaxException {
+ public static void main(String[] args) throws FileNotFoundException,
URISyntaxException, CommandException {
String configFile =
getTestConfigFile("/examples/fake_to_console.conf");
FlinkCommandArgs flinkCommandArgs = new FlinkCommandArgs();
flinkCommandArgs.setConfigFile(configFile);
flinkCommandArgs.setCheckConfig(false);
flinkCommandArgs.setVariables(null);
+ flinkCommandArgs.setApiType(ApiType.SEATUNNEL_API);
Command<FlinkCommandArgs> flinkCommand =
new FlinkCommandBuilder().buildCommand(flinkCommandArgs);
Seatunnel.run(flinkCommand);
diff --git
a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
index 5469ec73..aa121a90 100644
---
a/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
+++
b/seatunnel-examples/seatunnel-spark-examples/src/main/java/org/apache/seatunnel/example/spark/LocalSparkExample.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.example.spark;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.core.base.Seatunnel;
import org.apache.seatunnel.core.base.command.Command;
+import org.apache.seatunnel.core.base.exception.CommandException;
import org.apache.seatunnel.core.spark.args.SparkCommandArgs;
import org.apache.seatunnel.core.spark.command.SparkCommandBuilder;
@@ -30,7 +31,7 @@ import java.nio.file.Paths;
public class LocalSparkExample {
- public static void main(String[] args) throws URISyntaxException,
FileNotFoundException {
+ public static void main(String[] args) throws URISyntaxException,
FileNotFoundException, CommandException {
String configFile = getTestConfigFile("/examples/spark.batch.conf");
SparkCommandArgs sparkArgs = new SparkCommandArgs();
sparkArgs.setConfigFile(configFile);
diff --git
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
index 354de014..bfdb2db4 100644
---
a/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
+++
b/seatunnel-translation/seatunnel-translation-flink/src/main/java/org/apache/seatunnel/translation/flink/sink/FlinkSinkWriter.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.translation.flink.sink;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import
org.apache.seatunnel.translation.flink.serialization.FlinkRowSerialization;
+import org.apache.seatunnel.translation.flink.serialization.WrappedRow;
import org.apache.flink.api.connector.sink.SinkWriter;
import org.apache.flink.types.Row;
@@ -41,6 +42,8 @@ public class FlinkSinkWriter<InputT, CommT, WriterStateT>
implements SinkWriter<
public void write(InputT element,
org.apache.flink.api.connector.sink.SinkWriter.Context context) throws
IOException {
if (element instanceof Row) {
sinkWriter.write(rowSerialization.deserialize((Row) element));
+ } else if (element instanceof WrappedRow) {
+ sinkWriter.write(rowSerialization.deserialize(((WrappedRow)
element).getRow()));
} else {
throw new InvalidClassException("only support Flink Row at now,
the element Class is " + element.getClass());
}