This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 379791d [Feature][core] Add close method in plugin. (#1530)
379791d is described below
commit 379791dd441cefced65315226b769d49a562fb82
Author: Wenjun Ruan <[email protected]>
AuthorDate: Thu Mar 24 12:33:21 2022 +0800
[Feature][core] Add close method in plugin. (#1530)
* Add lifecycle
* fix checkstyle
* Remove lifecycle
* Fix java doc
* Add comment
* Plugin extends with AutoCloseable
* Add UT
* Format error message
* Use suppressed exception
---
.../java/org/apache/seatunnel/plugin/Plugin.java | 34 ++++++++-
.../apache/seatunnel/flink/source/FakeSource.java | 16 ++---
.../org/apache/seatunnel/flink/sink/FileSink.java | 7 ++
.../seatunnel/command/BaseTaskExecuteCommand.java | 30 +++++++-
.../command/flink/FlinkTaskExecuteCommand.java | 3 +-
.../command/spark/SparkTaskExecuteCommand.java | 3 +-
.../command/BaseTaskExecuteCommandTest.java | 80 ++++++++++++++++++++++
7 files changed, 157 insertions(+), 16 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
index 32e65e7..488c4d4 100644
---
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
+++
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/Plugin.java
@@ -25,9 +25,21 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import java.io.Serializable;
/**
- * a base interface indicates belonging to SeaTunnel.
+ * A base interface indicates belonging to SeaTunnel.
+ * Plugin will be used as follows:
+ * <pre>{@code
+ * Plugin<?> plugin = new PluginA<>();
+ * plugin.setConfig(Config);
+ * CheckResult checkResult = plugin.checkConfig();
+ * if (checkResult.getSuccess()) {
+ * plugin.prepare();
+ * // plugin execute code
+ * plugin.close();
+ * }
+ *
+ * }</pre>
*/
-public interface Plugin<T extends RuntimeEnv> extends Serializable {
+public interface Plugin<T extends RuntimeEnv> extends Serializable,
AutoCloseable {
String RESULT_TABLE_NAME = "result_table_name";
String SOURCE_TABLE_NAME = "source_table_name";
@@ -39,7 +51,23 @@ public interface Plugin<T extends RuntimeEnv> extends
Serializable {
return CheckResult.success();
}
- default void prepare(T prepareEnv) {
+ /**
+ * This is a lifecycle method, this method will be executed after Plugin
created.
+ *
+ * @param env environment
+ */
+ default void prepare(T env) {
+
+ }
+
+ /**
+ * This is a lifecycle method, this method will be executed before Plugin
destroy.
+ * It's used to release some resource.
+ *
+ * @throws Exception when close failed.
+ */
+ default void close() throws Exception {
+
}
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
index 54e48fd..6a7ab86 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-fake/src/main/java/org/apache/seatunnel/flink/source/FakeSource.java
@@ -53,18 +53,14 @@ public class FakeSource implements FlinkBatchSource {
}
@Override
- public void prepare(FlinkEnvironment prepareEnv) {
-
- }
-
- @Override
public DataSet<Row> getData(FlinkEnvironment env) {
Random random = new Random();
return env.getBatchTableEnvironment().toDataSet(
- env.getBatchTableEnvironment().fromValues(
- DataTypes.ROW(DataTypes.FIELD("name",
DataTypes.STRING()),
- DataTypes.FIELD("age", DataTypes.INT())),
- Arrays.stream(NAME_ARRAY).map(n -> Row.of(n,
random.nextInt(AGE_LIMIT)))
- .collect(Collectors.toList())), Row.class);
+ env.getBatchTableEnvironment().fromValues(
+ DataTypes.ROW(DataTypes.FIELD("name", DataTypes.STRING()),
+ DataTypes.FIELD("age", DataTypes.INT())),
+ Arrays.stream(NAME_ARRAY).map(n -> Row.of(n,
random.nextInt(AGE_LIMIT)))
+ .collect(Collectors.toList())), Row.class);
}
+
}
diff --git
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
index 9e730d0..55dd971 100644
---
a/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
+++
b/seatunnel-connectors/seatunnel-connectors-flink/seatunnel-connector-flink-file/src/main/java/org/apache/seatunnel/flink/sink/FileSink.java
@@ -150,4 +150,11 @@ public class FileSink implements FlinkStreamSink,
FlinkBatchSink {
String path = StringTemplate.substitute(config.getString(PATH),
format);
filePath = new Path(path);
}
+
+ @Override
+ public void close() throws Exception {
+ if (outputFormat != null) {
+ outputFormat.close();
+ }
+ }
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
index c9bf805..ba46c2c 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -61,7 +61,7 @@ public abstract class BaseTaskExecuteCommand<T extends
CommandArgs, E extends Ru
}
/**
- * Execute prepare method defined in {@link Plugin}.
+ * Execute prepare method defined in {@link
org.apache.seatunnel.plugin.Plugin}.
*
* @param env runtimeEnv
* @param plugins plugin list
@@ -74,6 +74,34 @@ public abstract class BaseTaskExecuteCommand<T extends
CommandArgs, E extends Ru
}
/**
+ * Execute close method defined in {@link
org.apache.seatunnel.plugin.Plugin}
+ *
+ * @param plugins plugin list
+ */
+ @SafeVarargs
+ protected final void close(List<? extends Plugin<E>>... plugins) {
+ RuntimeException exceptionHolder = null;
+ for (List<? extends Plugin<E>> pluginList : plugins) {
+ for (Plugin<E> plugin : pluginList) {
+ try (Plugin<?> closed = plugin) {
+ // ignore
+ } catch (Throwable e) {
+ RuntimeException wrapperException = new RuntimeException(
+ String.format("plugin %s closed error",
plugin.getClass()), e);
+ if (exceptionHolder == null) {
+ exceptionHolder = wrapperException;
+ } else {
+ exceptionHolder.addSuppressed(wrapperException);
+ }
+ }
+ }
+ }
+ if (exceptionHolder != null) {
+ throw exceptionHolder;
+ }
+ }
+
+ /**
* Print the logo.
*/
protected void showAsciiLogo() {
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
index f3f7a06..a31c38d 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/flink/FlinkTaskExecuteCommand.java
@@ -46,11 +46,12 @@ public class FlinkTaskExecuteCommand extends
BaseTaskExecuteCommand<FlinkCommand
execution = configBuilder.createExecution();
baseCheckConfig(sources, transforms, sinks);
- prepare(configBuilder.getEnv(), sources, transforms, sinks);
showAsciiLogo();
try {
+ prepare(configBuilder.getEnv(), sources, transforms, sinks);
execution.start(sources, transforms, sinks);
+ close(sources, transforms, sinks);
} catch (Exception e) {
throw new RuntimeException("Execute Flink task error", e);
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
index 912023f..0064382 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/spark/SparkTaskExecuteCommand.java
@@ -43,11 +43,12 @@ public class SparkTaskExecuteCommand extends
BaseTaskExecuteCommand<SparkCommand
Execution<BaseSource<SparkEnvironment>,
BaseTransform<SparkEnvironment>, BaseSink<SparkEnvironment>, SparkEnvironment>
execution = configBuilder.createExecution();
baseCheckConfig(sources, transforms, sinks);
- prepare(configBuilder.getEnv(), sources, transforms, sinks);
showAsciiLogo();
try {
+ prepare(configBuilder.getEnv(), sources, transforms, sinks);
execution.start(sources, transforms, sinks);
+ close(sources, transforms, sinks);
} catch (Exception e) {
throw new RuntimeException("Execute Spark task error", e);
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
new file mode 100644
index 0000000..3ddf8d3
--- /dev/null
+++
b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.command;
+
+import org.apache.seatunnel.flink.FlinkEnvironment;
+import org.apache.seatunnel.plugin.Plugin;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class BaseTaskExecuteCommandTest {
+
+ private static int CLOSE_TIMES = 0;
+
+ @Test
+ public void testClose() {
+ List<MockPlugin> pluginListA = new ArrayList<>();
+ pluginListA.add(new MockPlugin());
+ pluginListA.add(new MockPlugin());
+ List<MockPlugin> pluginListB = new ArrayList<>();
+ pluginListB.add(new MockPlugin());
+ pluginListB.add(new MockPlugin());
+ MockTaskExecutorCommand mockTaskExecutorCommand = new
MockTaskExecutorCommand();
+ try {
+ mockTaskExecutorCommand.close(pluginListA, pluginListB);
+ } catch (Exception ex) {
+ // just print into console
+ ex.printStackTrace();
+ }
+ Assert.assertEquals(Integer.parseInt("4"), CLOSE_TIMES);
+ Assert.assertThrows(RuntimeException.class, () ->
mockTaskExecutorCommand.close(pluginListA));
+
+ }
+
+ private static class MockPlugin implements Plugin<FlinkEnvironment> {
+
+ @Override
+ public void setConfig(Config config) {
+ }
+
+ @Override
+ public Config getConfig() {
+ return null;
+ }
+
+ @Override
+ public void close() {
+ CLOSE_TIMES++;
+ throw new RuntimeException("Test close with exception,
closeTimes:" + CLOSE_TIMES);
+ }
+ }
+
+ private static class MockTaskExecutorCommand extends
BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
+
+ @Override
+ public void execute(FlinkCommandArgs commandArgs) {
+
+ }
+ }
+}