This is an automated email from the ASF dual-hosted git repository.

kirs pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 7b44c01  [Improve][core] Add PluginClosedException type for the 
Plugin's close method to make the exception more clearly and elegant (#1572)
7b44c01 is described below

commit 7b44c012783f53938b851bba3c605c3fe23fa72e
Author: xuankun zheng <[email protected]>
AuthorDate: Sat Mar 26 18:23:35 2022 +0800

    [Improve][core] Add PluginClosedException type for the Plugin's close 
method to make the exception more clearly and elegant (#1572)
    
    * [Improve][core] Add PluginClosedException for the close method in plugin 
to make the exception more clearly and elegant.
    
    * [fix][core] Add the license for PluginClosedException.
    
    * [Improve][core] Needn't use Optional.
---
 .../seatunnel/plugin/PluginClosedException.java    | 43 ++++++++++++++++++
 .../seatunnel/command/BaseTaskExecuteCommand.java  | 14 +++---
 .../command/BaseTaskExecuteCommandTest.java        | 51 +++++++++++++++++++---
 3 files changed, 95 insertions(+), 13 deletions(-)

diff --git 
a/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/PluginClosedException.java
 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/PluginClosedException.java
new file mode 100644
index 0000000..74b5a3f
--- /dev/null
+++ 
b/seatunnel-apis/seatunnel-api-base/src/main/java/org/apache/seatunnel/plugin/PluginClosedException.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.plugin;
+
+/**
+ * an Exception used for the scenes when plugin closed error.
+ */
+public class PluginClosedException extends RuntimeException {
+
+    public PluginClosedException() {
+    }
+
+    public PluginClosedException(String message) {
+        super(message);
+    }
+
+    public PluginClosedException(String message, Throwable cause) {
+        super(message, cause);
+    }
+
+    public PluginClosedException(Throwable cause) {
+        super(cause);
+    }
+
+    public PluginClosedException(String message, Throwable cause, boolean 
enableSuppression, boolean writableStackTrace) {
+        super(message, cause, enableSuppression, writableStackTrace);
+    }
+}
diff --git 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
index 0627821..dc8b966 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/main/java/org/apache/seatunnel/command/BaseTaskExecuteCommand.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.common.config.Common;
 import org.apache.seatunnel.common.config.DeployMode;
 import org.apache.seatunnel.env.RuntimeEnv;
 import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.plugin.PluginClosedException;
 import org.apache.seatunnel.utils.AsciiArtUtils;
 import org.apache.seatunnel.utils.CompressionUtils;
 
@@ -80,19 +81,16 @@ public abstract class BaseTaskExecuteCommand<T extends 
CommandArgs, E extends Ru
      */
     @SafeVarargs
     protected final void close(List<? extends Plugin<E>>... plugins) {
-        RuntimeException exceptionHolder = null;
+        PluginClosedException exceptionHolder = null;
         for (List<? extends Plugin<E>> pluginList : plugins) {
             for (Plugin<E> plugin : pluginList) {
                 try (Plugin<?> closed = plugin) {
                     // ignore
                 } catch (Exception e) {
-                    RuntimeException wrapperException = new RuntimeException(
-                            String.format("plugin %s closed error", 
plugin.getClass()), e);
-                    if (exceptionHolder == null) {
-                        exceptionHolder = wrapperException;
-                    } else {
-                        exceptionHolder.addSuppressed(wrapperException);
-                    }
+                    exceptionHolder = exceptionHolder == null ?
+                            new PluginClosedException("below plugins closed 
error:") : exceptionHolder;
+                    exceptionHolder.addSuppressed(new PluginClosedException(
+                            String.format("plugin %s closed error", 
plugin.getClass()), e));
                 }
             }
         }
diff --git 
a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
 
b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
index 3ddf8d3..bec6836 100644
--- 
a/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
+++ 
b/seatunnel-core/seatunnel-core-base/src/test/java/org/apache/seatunnel/command/BaseTaskExecuteCommandTest.java
@@ -17,12 +17,16 @@
 
 package org.apache.seatunnel.command;
 
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertThrows;
+
 import org.apache.seatunnel.flink.FlinkEnvironment;
 import org.apache.seatunnel.plugin.Plugin;
+import org.apache.seatunnel.plugin.PluginClosedException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
-import org.junit.Assert;
+import org.junit.Before;
 import org.junit.Test;
 
 import java.util.ArrayList;
@@ -32,6 +36,11 @@ public class BaseTaskExecuteCommandTest {
 
     private static int CLOSE_TIMES = 0;
 
+    @Before
+    public void setUp() {
+        CLOSE_TIMES = 0;
+    }
+
     @Test
     public void testClose() {
         List<MockPlugin> pluginListA = new ArrayList<>();
@@ -41,15 +50,27 @@ public class BaseTaskExecuteCommandTest {
         pluginListB.add(new MockPlugin());
         pluginListB.add(new MockPlugin());
         MockTaskExecutorCommand mockTaskExecutorCommand = new 
MockTaskExecutorCommand();
+        mockTaskExecutorCommand.close(pluginListA, pluginListB);
+        assertEquals(Integer.parseInt("0"), CLOSE_TIMES);
+    }
+
+    @Test
+    public void testExceptionClose() {
+        List<MockExceptionPlugin> pluginListA = new ArrayList<>();
+        pluginListA.add(new MockExceptionPlugin());
+        pluginListA.add(new MockExceptionPlugin());
+        List<MockExceptionPlugin> pluginListB = new ArrayList<>();
+        pluginListB.add(new MockExceptionPlugin());
+        pluginListB.add(new MockExceptionPlugin());
+        MockTaskExecutorCommand mockTaskExecutorCommand = new 
MockTaskExecutorCommand();
         try {
             mockTaskExecutorCommand.close(pluginListA, pluginListB);
         } catch (Exception ex) {
             // just print into console
             ex.printStackTrace();
         }
-        Assert.assertEquals(Integer.parseInt("4"), CLOSE_TIMES);
-        Assert.assertThrows(RuntimeException.class, () -> 
mockTaskExecutorCommand.close(pluginListA));
-
+        assertEquals(Integer.parseInt("4"), CLOSE_TIMES);
+        assertThrows(PluginClosedException.class, () -> 
mockTaskExecutorCommand.close(pluginListA));
     }
 
     private static class MockPlugin implements Plugin<FlinkEnvironment> {
@@ -65,9 +86,28 @@ public class BaseTaskExecuteCommandTest {
 
         @Override
         public void close() {
+
+        }
+
+    }
+
+    private static class MockExceptionPlugin implements 
Plugin<FlinkEnvironment> {
+
+        @Override
+        public void setConfig(Config config) {
+        }
+
+        @Override
+        public Config getConfig() {
+            return null;
+        }
+
+        @Override
+        public void close() {
             CLOSE_TIMES++;
-            throw new RuntimeException("Test close with exception, 
closeTimes:" + CLOSE_TIMES);
+            throw new PluginClosedException("Test close with exception, 
closeTimes:" + CLOSE_TIMES);
         }
+
     }
 
     private static class MockTaskExecutorCommand extends 
BaseTaskExecuteCommand<FlinkCommandArgs, FlinkEnvironment> {
@@ -76,5 +116,6 @@ public class BaseTaskExecuteCommandTest {
         public void execute(FlinkCommandArgs commandArgs) {
 
         }
+
     }
 }

Reply via email to