This is an automated email from the ASF dual-hosted git repository.
kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 7b44c01 [Improve][core] Add PluginClosedException type for the
Plugin's close method to make the exception more clearly and elegant (#1572)
7b44c01 is described below
commit 7b44c012783f53938b851bba3c605c3fe23fa72e
Author: xuankun zheng <[email protected]>
AuthorDate: Sat Mar 26 18:23:35 2022 +0800
[Improve][core] Add PluginClosedException type for the Plugin's close
method to make the exception more clearly and elegant (#1572)
* [Improve][core] Add PluginClosedException for the close method in plugin
to make the exception more clearly and elegant.
* [fix][core] Add the license for PluginClosedException.
* [Improve][core] Needn't use Optional.
---
.../seatunnel/plugin/PluginClosedException.java | 43 ++++++++++++++++++
.../seatunnel/command/BaseTaskExecuteCommand.java | 14 +++---
.../command/BaseTaskExecuteCommandTest.java | 51 +++++++++++++++++++---
3 files changed, 95 insertions(+), 13 deletions(-)
diff --git
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/PluginClosedException.java
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/PluginClosedException.java
new file mode 100644
index 0000000..74b5a3f
--- /dev/null
+++
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/PluginClosedException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin;
+
+/**
+ * an Exception used for the scenes when plugin closed error.
+ */
+public class PluginClosedException extends RuntimeException {
+
+ public PluginClosedException() {
+ }
+
+ public PluginClosedException(String message) {
+ super(message);
+ }
+
+ public PluginClosedException(String message, Throwable cause) {
+ super(message, cause);
+ }
+
+ public PluginClosedException(Throwable cause) {
+ super(cause);
+ }
+
+ public PluginClosedException(String message, Throwable cause, boolean
enableSuppression, boolean writableStackTrace) {
+ super(message, cause, enableSuppression, writableStackTrace);
+ }
+}
diff --git
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
index 0627821..dc8b966 100644
---
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.config.Common;
import org.apache.seatunnel.common.config.DeployMode;
import org.apache.seatunnel.env.RuntimeEnv;
import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.plugin.PluginClosedException;
import org.apache.seatunnel.utils.AsciiArtUtils;
import org.apache.seatunnel.utils.CompressionUtils;
@@ -80,19 +81,16 @@ public abstract class BaseTaskExecuteCommand<T extends
CommandArgs, E extends Ru
*/
@SafeVarargs
protected final void close(List<? extends Plugin<E>>... plugins) {
- RuntimeException exceptionHolder = null;
+ PluginClosedException exceptionHolder = null;
for (List<? extends Plugin<E>> pluginList : plugins) {
for (Plugin<E> plugin : pluginList) {
try (Plugin<?> closed = plugin) {
// ignore
} catch (Exception e) {
- RuntimeException wrapperException = new RuntimeException(
- String.format("plugin %s closed error",
plugin.getClass()), e);
- if (exceptionHolder == null) {
- exceptionHolder = wrapperException;
- } else {
- exceptionHolder.addSuppressed(wrapperException);
- }
+ exceptionHolder = exceptionHolder == null ?
+ new PluginClosedException("below plugins closed
error:") : exceptionHolder;
+ exceptionHolder.addSuppressed(new PluginClosedException(
+ String.format("plugin %s closed error",
plugin.getClass()), e));
}
}
}
diff --git
a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
index 3ddf8d3..bec6836 100644
---
a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
+++
b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
@@ -17,12 +17,16 @@
package org.apache.seatunnel.command;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
import org.apache.seatunnel.flink.FlinkEnvironment;
import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.plugin.PluginClosedException;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.junit.Assert;
+import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
@@ -32,6 +36,11 @@ public class BaseTaskExecuteCommandTest {
private static int CLOSE_TIMES = 0;
+ @Before
+ public void setUp() {
+ CLOSE_TIMES = 0;
+ }
+
@Test
public void testClose() {
List<MockPlugin> pluginListA = new ArrayList<>();
@@ -41,15 +50,27 @@ public class BaseTaskExecuteCommandTest {
pluginListB.add(new MockPlugin());
pluginListB.add(new MockPlugin());
MockTaskExecutorCommand mockTaskExecutorCommand = new
MockTaskExecutorCommand();
+ mockTaskExecutorCommand.close(pluginListA, pluginListB);
+ assertEquals(Integer.parseInt("0"), CLOSE_TIMES);
+ }
+
+ @Test
+ public void testExceptionClose() {
+ List<MockExceptionPlugin> pluginListA = new ArrayList<>();
+ pluginListA.add(new MockExceptionPlugin());
+ pluginListA.add(new MockExceptionPlugin());
+ List<MockExceptionPlugin> pluginListB = new ArrayList<>();
+ pluginListB.add(new MockExceptionPlugin());
+ pluginListB.add(new MockExceptionPlugin());
+ MockTaskExecutorCommand mockTaskExecutorCommand = new
MockTaskExecutorCommand();
try {
mockTaskExecutorCommand.close(pluginListA, pluginListB);
} catch (Exception ex) {
// just print into console
ex.printStackTrace();
}
- Assert.assertEquals(Integer.parseInt("4"), CLOSE_TIMES);
- Assert.assertThrows(RuntimeException.class, () ->
mockTaskExecutorCommand.close(pluginListA));
-
+ assertEquals(Integer.parseInt("4"), CLOSE_TIMES);
+ assertThrows(PluginClosedException.class, () ->
mockTaskExecutorCommand.close(pluginListA));
}
private static class MockPlugin implements Plugin<FlinkEnvironment> {
@@ -65,9 +86,28 @@ public class BaseTaskExecuteCommandTest {
@Override
public void close() {
+
+ }
+
+ }
+
+ private static class MockExceptionPlugin implements
Plugin<FlinkEnvironment> {
+
+ @Override
+ public void setConfig(Config config) {
+ }
+
+ @Override
+ public Config getConfig() {
+ return null;
+ }
+
+ @Override
+ public void close() {
CLOSE_TIMES++;
- throw new RuntimeException("Test close with exception,
closeTimes:" + CLOSE_TIMES);
+ throw new PluginClosedException("Test close with exception,
closeTimes:" + CLOSE_TIMES);
}
+
}
private static class MockTaskExecutorCommand extends
BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
@@ -76,5 +116,6 @@ public class BaseTaskExecuteCommandTest {
public void execute(FlinkCommandArgs commandArgs) {
}
+
}
}