This is an automated email from the ASF dual-hosted git repository.

gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 58584eefb [Improve][Connector-V2][Neo4j] Unified exception for Neo4j 
source & sink connector (#3565)
58584eefb is described below

commit 58584eefb1de838f937a0ba7e10b0759cde4ac12
Author: FWLamb <[email protected]>
AuthorDate: Thu Nov 24 16:18:35 2022 +0800

    [Improve][Connector-V2][Neo4j] Unified exception for Neo4j source & sink 
connector (#3565)
---
 .../seatunnel/neo4j/config/DriverBuilder.java      | 17 +++++-----
 .../neo4j/exception/Neo4jConnectorException.java   | 36 ++++++++++++++++++++++
 .../connectors/seatunnel/neo4j/sink/Neo4jSink.java | 16 +++++++---
 .../seatunnel/neo4j/source/Neo4jSource.java        | 17 +++++++---
 .../seatunnel/neo4j/source/Neo4jSourceReader.java  | 12 +++++---
 .../Neo4jSourceReaderTest.java                     |  5 +--
 6 files changed, 80 insertions(+), 23 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java
index ac8914e46..c566f0485 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/config/DriverBuilder.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.neo4j.config;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
+
 import lombok.Getter;
 import lombok.Setter;
 import org.neo4j.driver.AuthTokens;
@@ -51,26 +54,26 @@ public class DriverBuilder implements Serializable {
 
     public Driver build() {
         final Config.ConfigBuilder configBuilder = Config.builder()
-                .withMaxConnectionPoolSize(1);
+            .withMaxConnectionPoolSize(1);
         if (maxConnectionTimeoutSeconds != null) {
             configBuilder
-                    
.withConnectionAcquisitionTimeout(maxConnectionTimeoutSeconds * 2, 
TimeUnit.SECONDS)
-                    .withConnectionTimeout(maxConnectionTimeoutSeconds, 
TimeUnit.SECONDS);
+                .withConnectionAcquisitionTimeout(maxConnectionTimeoutSeconds 
* 2, TimeUnit.SECONDS)
+                .withConnectionTimeout(maxConnectionTimeoutSeconds, 
TimeUnit.SECONDS);
         }
         if (maxTransactionRetryTimeSeconds != null) {
             configBuilder
-                    
.withMaxTransactionRetryTime(maxTransactionRetryTimeSeconds, TimeUnit.SECONDS);
+                .withMaxTransactionRetryTime(maxTransactionRetryTimeSeconds, 
TimeUnit.SECONDS);
         }
         Config config = configBuilder
-                .build();
+            .build();
 
         if (username != null) {
             return GraphDatabase.driver(uri, AuthTokens.basic(username, 
password), config);
         } else if (bearerToken != null) {
             return GraphDatabase.driver(uri, AuthTokens.bearer(bearerToken), 
config);
-        } else if (kerberosTicket != null){
+        } else if (kerberosTicket != null) {
             return GraphDatabase.driver(uri, 
AuthTokens.kerberos(kerberosTicket), config);
         }
-        throw new IllegalArgumentException("Invalid Field");
+        throw new 
Neo4jConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED, 
"Invalid Field");
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/exception/Neo4jConnectorException.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/exception/Neo4jConnectorException.java
new file mode 100644
index 000000000..3dc4d501f
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/exception/Neo4jConnectorException.java
@@ -0,0 +1,36 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.neo4j.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class Neo4jConnectorException extends SeaTunnelRuntimeException {
+
+    public Neo4jConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public Neo4jConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public Neo4jConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
index cc71794bf..80c0a53b8 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/sink/Neo4jSink.java
@@ -30,6 +30,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkCo
 import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkConfig.QUERY_PARAM_POSITION;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -40,6 +41,7 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSinkQueryInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -67,7 +69,9 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, 
Void, Void, Void>
         final CheckResult queryConfigCheck =
             CheckConfigUtil.checkAllExists(config, KEY_QUERY.key(), 
QUERY_PARAM_POSITION.key());
         if (!queryConfigCheck.isSuccess()) {
-            throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, 
queryConfigCheck.getMsg());
+            throw new 
Neo4jConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    PLUGIN_NAME, PluginType.SINK, queryConfigCheck.getMsg()));
         }
         neo4JSinkQueryInfo.setQuery(config.getString(KEY_QUERY.key()));
         
neo4JSinkQueryInfo.setQueryParamPosition(config.getObject(QUERY_PARAM_POSITION.key()).unwrapped());
@@ -81,7 +85,9 @@ public class Neo4jSink implements SeaTunnelSink<SeaTunnelRow, 
Void, Void, Void>
                 KEY_KERBEROS_TICKET.key());
         final CheckResult mergedConfigCheck = 
CheckConfigUtil.mergeCheckResults(uriConfigCheck, authConfigCheck);
         if (!mergedConfigCheck.isSuccess()) {
-            throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, 
mergedConfigCheck.getMsg());
+            throw new 
Neo4jConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    PLUGIN_NAME, PluginType.SINK, mergedConfigCheck.getMsg()));
         }
 
         final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));
@@ -90,8 +96,10 @@ public class Neo4jSink implements 
SeaTunnelSink<SeaTunnelRow, Void, Void, Void>
 
         if (config.hasPath(KEY_USERNAME.key())) {
             final CheckResult pwParamCheck = 
CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
-            if (!mergedConfigCheck.isSuccess()) {
-                throw new PrepareFailException(PLUGIN_NAME, PluginType.SINK, 
pwParamCheck.getMsg());
+            if (!pwParamCheck.isSuccess()) {
+                throw new 
Neo4jConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                        PLUGIN_NAME, PluginType.SINK, pwParamCheck.getMsg()));
             }
             final String username = config.getString(KEY_USERNAME.key());
             final String password = config.getString(KEY_PASSWORD.key());
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
index 45c87c80c..67172d837 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSource.java
@@ -29,6 +29,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSource
 import static 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig.PLUGIN_NAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -44,6 +45,7 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReader
 import org.apache.seatunnel.connectors.seatunnel.neo4j.config.DriverBuilder;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceQueryInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -71,7 +73,9 @@ public class Neo4jSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
             CheckConfigUtil.checkAllExists(pluginConfig, KEY_QUERY.key(), 
SeaTunnelSchema.SCHEMA.key());
 
         if (!configCheck.isSuccess()) {
-            throw new PrepareFailException(Neo4jSourceConfig.PLUGIN_NAME, 
PluginType.SOURCE, configCheck.getMsg());
+            throw new 
Neo4jConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    Neo4jSourceConfig.PLUGIN_NAME, PluginType.SOURCE, 
configCheck.getMsg()));
         }
         neo4jSourceQueryInfo.setQuery(pluginConfig.getString(KEY_QUERY.key()));
 
@@ -103,8 +107,9 @@ public class Neo4jSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
                 KEY_KERBEROS_TICKET.key());
         final CheckResult mergedConfigCheck = 
CheckConfigUtil.mergeCheckResults(uriConfigCheck, authConfigCheck);
         if (!mergedConfigCheck.isSuccess()) {
-            throw new PrepareFailException(Neo4jSourceConfig.PLUGIN_NAME, 
PluginType.SOURCE,
-                mergedConfigCheck.getMsg());
+            throw new 
Neo4jConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    Neo4jSourceConfig.PLUGIN_NAME, PluginType.SOURCE, 
mergedConfigCheck.getMsg()));
         }
 
         final URI uri = URI.create(config.getString(KEY_NEO4J_URI.key()));
@@ -113,8 +118,10 @@ public class Neo4jSource extends 
AbstractSingleSplitSource<SeaTunnelRow> {
 
         if (config.hasPath(KEY_USERNAME.key())) {
             final CheckResult pwParamCheck = 
CheckConfigUtil.checkAllExists(config, KEY_PASSWORD.key());
-            if (!mergedConfigCheck.isSuccess()) {
-                throw new PrepareFailException(Neo4jSourceConfig.PLUGIN_NAME, 
PluginType.SOURCE, pwParamCheck.getMsg());
+            if (!pwParamCheck.isSuccess()) {
+                throw new 
Neo4jConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                        Neo4jSourceConfig.PLUGIN_NAME, PluginType.SOURCE, 
pwParamCheck.getMsg()));
             }
             final String username = config.getString(KEY_USERNAME.key());
             final String password = config.getString(KEY_PASSWORD.key());
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceReader.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceReader.java
index 85dc991c4..9869ae1ed 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/main/java/org/apache/seatunnel/connectors/seatunnel/neo4j/source/Neo4jSourceReader.java
@@ -24,9 +24,11 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
 import 
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
 import 
org.apache.seatunnel.connectors.seatunnel.neo4j.config.Neo4jSourceQueryInfo;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
 
 import org.neo4j.driver.Driver;
 import org.neo4j.driver.Query;
@@ -92,11 +94,11 @@ public class Neo4jSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
     /**
      * convert {@link SeaTunnelDataType} to java data type
      *
-     * @throws IllegalArgumentException when not supported data type
-     * @throws LossyCoercion            when conversion cannot be achieved 
without losing precision.
+     * @throws Neo4jConnectorException when not supported data type
+     * @throws LossyCoercion           when conversion cannot be achieved 
without losing precision.
      */
     public static Object convertType(SeaTunnelDataType<?> dataType, Value 
value)
-        throws IllegalArgumentException, LossyCoercion {
+        throws Neo4jConnectorException, LossyCoercion {
         Objects.requireNonNull(dataType);
         Objects.requireNonNull(value);
 
@@ -121,7 +123,7 @@ public class Neo4jSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
                 return value.asLocalDateTime();
             case MAP:
                 if (!((MapType<?, ?>) 
dataType).getKeyType().equals(BasicType.STRING_TYPE)) {
-                    throw new IllegalArgumentException("Key Type of MapType 
must String type");
+                    throw new 
Neo4jConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Key Type of MapType 
must String type");
                 }
                 final SeaTunnelDataType<?> valueType = ((MapType<?, ?>) 
dataType).getValueType();
                 return value.asMap(v -> 
valueType.getTypeClass().cast(convertType(valueType, v)));
@@ -138,7 +140,7 @@ public class Neo4jSourceReader extends 
AbstractSingleSplitReader<SeaTunnelRow> {
             case FLOAT:
                 return value.asFloat();
             default:
-                throw new IllegalArgumentException("not supported data type: " 
+ dataType);
+                throw new 
Neo4jConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "not supported 
data type: " + dataType);
         }
 
     }
diff --git 
a/seatunnel-connectors-v2/connector-neo4j/src/test/java/org.apache.seatunnel.connectors.seatunnel.neo4j.source/Neo4jSourceReaderTest.java
 
b/seatunnel-connectors-v2/connector-neo4j/src/test/java/org.apache.seatunnel.connectors.seatunnel.neo4j.source/Neo4jSourceReaderTest.java
index c3f511751..664b03b9b 100644
--- 
a/seatunnel-connectors-v2/connector-neo4j/src/test/java/org.apache.seatunnel.connectors.seatunnel.neo4j.source/Neo4jSourceReaderTest.java
+++ 
b/seatunnel-connectors-v2/connector-neo4j/src/test/java/org.apache.seatunnel.connectors.seatunnel.neo4j.source/Neo4jSourceReaderTest.java
@@ -27,6 +27,7 @@ import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
+import 
org.apache.seatunnel.connectors.seatunnel.neo4j.exception.Neo4jConnectorException;
 
 import org.junit.jupiter.api.Test;
 import org.neo4j.driver.exceptions.value.LossyCoercion;
@@ -66,8 +67,8 @@ class Neo4jSourceReaderTest {
         assertEquals(1, Neo4jSourceReader.convertType(BasicType.INT_TYPE, new 
IntegerValue(1)));
         assertEquals(1.1F, Neo4jSourceReader.convertType(BasicType.FLOAT_TYPE, 
new FloatValue(1.1F)));
 
-        assertThrows(IllegalArgumentException.class, () -> 
Neo4jSourceReader.convertType(BasicType.SHORT_TYPE, new IntegerValue(256)));
+        assertThrows(Neo4jConnectorException.class, () -> 
Neo4jSourceReader.convertType(BasicType.SHORT_TYPE, new IntegerValue(256)));
         assertThrows(LossyCoercion.class, () -> 
Neo4jSourceReader.convertType(BasicType.INT_TYPE, new 
IntegerValue(Integer.MAX_VALUE + 1L)));
-        assertThrows(IllegalArgumentException.class, () -> 
Neo4jSourceReader.convertType(new MapType<>(BasicType.INT_TYPE, 
BasicType.BOOLEAN_TYPE), new MapValue(Collections.singletonMap("1", 
BooleanValue.FALSE))));
+        assertThrows(Neo4jConnectorException.class, () -> 
Neo4jSourceReader.convertType(new MapType<>(BasicType.INT_TYPE, 
BasicType.BOOLEAN_TYPE), new MapValue(Collections.singletonMap("1", 
BooleanValue.FALSE))));
     }
 }

Reply via email to