This is an automated email from the ASF dual-hosted git repository.

fanjia pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 273418ddc [Improve][Connector-V2][Kudu] Unified exception for kudu 
source & sink connector (#3564)
273418ddc is described below

commit 273418ddc9ae38ec107250f5f16ae0febfa7ed37
Author: Huan Liang <[email protected]>
AuthorDate: Sat Nov 26 12:08:21 2022 +0800

    [Improve][Connector-V2][Kudu] Unified exception for kudu source & sink 
connector (#3564)
    
    * [Doc]Fix typos in QuickStart Doc
    
    * [Improve][Connector-V2][Kudu] Unified exception for kudu source & sink 
connector
    
    * [Improve][Connector-V2][Kudu] Add license header
    
    * [Improve][Connector-V2][Kudu] fix code style
---
 .../connector-v2/Error-Quick-Reference-Manual.md   | 12 +++++
 .../seatunnel/kudu/config/KuduSinkConfig.java      |  7 ++-
 .../kudu/exception/KuduConnectorErrorCode.java     | 52 ++++++++++++++++++++++
 .../kudu/exception/KuduConnectorException.java     | 35 +++++++++++++++
 .../seatunnel/kudu/kuduclient/KuduInputFormat.java | 20 ++++-----
 .../kudu/kuduclient/KuduOutputFormat.java          | 18 ++++----
 .../seatunnel/kudu/kuduclient/KuduTypeMapper.java  |  4 +-
 .../seatunnel/kudu/source/KuduSource.java          | 23 +++++++---
 8 files changed, 143 insertions(+), 28 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 4fc996f3c..9fb6664b2 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -90,3 +90,15 @@ This document records some common error codes and 
corresponding solutions of Sea
 
|-------------|------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------|
 | INFLUXDB-01 | Connect influxdb failed, due to influxdb version info is 
unknown | When the user encounters this error code, it indicates that the 
connection to influxdb failed. Please check |
 | INFLUXDB-02 | Get column index of query result exception                     
  | When the user encounters this error code, it indicates that obtaining the 
column index failed. Please check |
+
+## Kudu Connector Error Codes
+
+| code    | description                                              | 
solution                                                                        
                                                                                
                                  |
+|---------|----------------------------------------------------------|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| KUDU-01 | Get the Kuduscan object for each splice failed           | When 
users encounter this error code, it is usually there are some problems with 
getting the KuduScan Object for each splice, please check your configuration 
whether correct and Kudu is work    |
+| KUDU-02 | Close Kudu client failed                                 | When 
users encounter this error code, it is usually there are some problems with 
closing the Kudu client, please check the Kudu is work                          
                                 |                                              
                  |
+| KUDU-03 | Value type does not match column type                    | When 
users encounter this error code, it is usually there are some problems on 
matching the Type between value type and colum type, please check if the data 
type is supported                    |
+| KUDU-04 | Upsert data to Kudu failed                               | When 
users encounter this error code, it means that Kudu has some problems, please 
check it whether is work                                                        
                               |
+| KUDU-05 | Insert data to Kudu failed                               | When 
users encounter this error code, it means that Kudu has some problems, please 
check it whether is work                                                        
                               |
+| KUDU-06 | Initialize the Kudu client failed                        | When 
users encounter this error code, it is usually there are some problems with 
initializing the Kudu client, please check your configuration whether correct 
and connector is work              |
+| KUDU-07 | Generate Kudu Parameters in the preparation phase failed | When 
users encounter this error code, it means that there are some problems on Kudu 
parameters generation, please check your configuration                          
                              |
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
index b00dd3232..82d924d19 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/config/KuduSinkConfig.java
@@ -17,8 +17,11 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.config;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
+import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -75,7 +78,9 @@ public class KuduSinkConfig {
             this.kuduMaster = pluginConfig.getString(KUDU_MASTER.key());
             this.kuduTableName = pluginConfig.getString(KUDU_TABLE_NAME.key());
         } else {
-            throw new RuntimeException("Missing Sink configuration 
parameters");
+            throw new 
KuduConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            "Kudu", PluginType.SINK, "Missing Sink 
configuration parameters"));
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java
new file mode 100644
index 000000000..36f1e2658
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorErrorCode.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum KuduConnectorErrorCode implements SeaTunnelErrorCode {
+    GET_KUDUSCAN_OBJECT_FAILED("KUDU-01", "Get the Kuduscan object for each 
splice failed"),
+    CLOSE_KUDU_CLIENT_FAILED("KUDU-02", "Close Kudu client failed"),
+    DATA_TYPE_CAST_FILED("KUDU-03", "Value type does not match column type"),
+    KUDU_UPSERT_FAILED("KUDU-04", "Upsert data to Kudu failed"),
+    KUDU_INSERT_FAILED("KUDU-05", "Insert data to Kudu failed"),
+    INIT_KUDU_CLIENT_FAILED("KUDU-06", "Initialize the Kudu client failed"),
+    GENERATE_KUDU_PARAMETERS_FAILED("KUDU-07", "Generate Kudu Parameters in 
the preparation phase failed")
+    ;
+
+
+
+    private final String code;
+
+    private final String description;
+
+    KuduConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return this.code;
+    }
+
+    @Override
+    public String getDescription() {
+        return this.description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorException.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorException.java
new file mode 100644
index 000000000..e33e501ef
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/exception/KuduConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.kudu.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class KuduConnectorException extends SeaTunnelRuntimeException {
+    public KuduConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public KuduConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public KuduConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
index a70a19f88..3471e07fb 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduInputFormat.java
@@ -17,14 +17,16 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.ExceptionUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kudu.ColumnSchema;
@@ -79,8 +81,7 @@ public class KuduInputFormat implements Serializable {
             keyColumn = schema.getPrimaryKeyColumns().get(0).getName();
             columns = schema.getColumns();
         } catch (KuduException e) {
-            log.warn("get table Columns Schemas Fail.", e);
-            throw new RuntimeException("get table Columns Schemas Fail..", e);
+            throw new 
KuduConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, "get table 
Columns Schemas Failed");
         }
         return columns;
     }
@@ -116,7 +117,8 @@ public class KuduInputFormat implements Serializable {
             } else if (BasicType.STRING_TYPE.equals(seaTunnelDataType)) {
                 seatunnelField = rs.getString(i);
             } else {
-                throw new IllegalStateException("Unexpected value: " + 
seaTunnelDataType);
+                throw new 
KuduConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        "Unsupported data type: " + seaTunnelDataType);
             }
             fields.add(seatunnelField);
         }
@@ -135,8 +137,8 @@ public class KuduInputFormat implements Serializable {
                 
seaTunnelDataTypes.add(KuduTypeMapper.mapping(columnSchemaList, i));
             }
         } catch (Exception e) {
-            log.warn("get row type info exception.", e);
-            throw new PrepareFailException("kudu", PluginType.SOURCE, 
ExceptionUtils.getMessage(e));
+            throw new 
KuduConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, 
String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    "Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e)));
         }
         return new SeaTunnelRowType(fieldNames.toArray(new 
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new 
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
     }
@@ -179,8 +181,7 @@ public class KuduInputFormat implements Serializable {
             kuduScanner = kuduScannerBuilder.addPredicate(lowerPred)
                     .addPredicate(upperPred).build();
         } catch (KuduException e) {
-            log.warn("get the Kuduscan object for each splice exception", e);
-            throw new RuntimeException("get the Kuduscan object for each 
splice exception.", e);
+            throw new 
KuduConnectorException(KuduConnectorErrorCode.GET_KUDUSCAN_OBJECT_FAILED, e);
         }
         return kuduScanner;
     }
@@ -190,8 +191,7 @@ public class KuduInputFormat implements Serializable {
             try {
                 kuduClient.close();
             } catch (KuduException e) {
-                log.warn("Kudu Client close failed.", e);
-                throw new RuntimeException("Kudu Client close failed.", e);
+                throw new 
KuduConnectorException(KuduConnectorErrorCode.CLOSE_KUDU_CLIENT_FAILED, e);
             } finally {
                 kuduClient = null;
             }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
index c356baabf..c8b5e4ca7 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduOutputFormat.java
@@ -18,7 +18,10 @@
 package org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSinkConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.kudu.ColumnSchema;
@@ -109,10 +112,10 @@ public class KuduOutputFormat
                         row.addDecimal(columnIndex, (BigDecimal) 
element.getField(columnIndex));
                         break;
                     default:
-                        throw new IllegalArgumentException("Unsupported column 
type: " + col.getType());
+                        throw new 
KuduConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "Unsupported 
column type: " + col.getType());
                 }
             } catch (ClassCastException e) {
-                throw new IllegalArgumentException(
+                throw new 
KuduConnectorException(KuduConnectorErrorCode.DATA_TYPE_CAST_FILED,
                         "Value type does not match column type " + 
col.getType() +
                                 " for column " + col.getName());
             }
@@ -128,8 +131,7 @@ public class KuduOutputFormat
         try {
             kuduSession.apply(upsert);
         } catch (KuduException e) {
-            log.error("Failed to upsert.", e);
-            throw new RuntimeException("Failed to upsert.", e);
+            throw new 
KuduConnectorException(KuduConnectorErrorCode.KUDU_UPSERT_FAILED, e);
         }
     }
 
@@ -141,8 +143,7 @@ public class KuduOutputFormat
         try {
             kuduSession.apply(insert);
         } catch (KuduException e) {
-            log.error("Failed to insert.", e);
-            throw new RuntimeException("Failed to insert.", e);
+            throw new 
KuduConnectorException(KuduConnectorErrorCode.KUDU_INSERT_FAILED, e);
         }
     }
 
@@ -155,7 +156,7 @@ public class KuduOutputFormat
                 upsert(element);
                 break;
             default:
-                throw new IllegalArgumentException(String.format("Unsupported 
saveMode: %s.", saveMode.name()));
+                throw new 
KuduConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, 
String.format("Unsupported saveMode: %s.", saveMode.name()));
         }
     }
 
@@ -170,8 +171,7 @@ public class KuduOutputFormat
         try {
             kuduTable = kuduClient.openTable(kuduTableName);
         } catch (KuduException e) {
-            log.error("Failed to initialize the Kudu client.", e);
-            throw new RuntimeException("Failed to initialize the Kudu 
client.", e);
+            throw new 
KuduConnectorException(KuduConnectorErrorCode.INIT_KUDU_CLIENT_FAILED, e);
         }
         log.info("The Kudu client for Master: {} is initialized 
successfully.", kuduMaster);
     }
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
index cea22dfb2..940556bcc 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/kuduclient/KuduTypeMapper.java
@@ -22,6 +22,8 @@ import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
 
 import org.apache.kudu.ColumnSchema;
 import org.slf4j.Logger;
@@ -94,7 +96,7 @@ public class KuduTypeMapper {
 
             case KUDU_UNKNOWN:
             default:
-                throw new UnsupportedOperationException(
+                throw new 
KuduConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
                     String.format(
                         "Doesn't support KUDU type '%s' .",
                         kuduType));
diff --git 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
index b4fc5652c..6098e89b1 100644
--- 
a/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
+++ 
b/seatunnel-connectors-v2/connector-kudu/src/main/java/org/apache/seatunnel/connectors/seatunnel/kudu/source/KuduSource.java
@@ -17,7 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.kudu.source;
 
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.source.Boundedness;
@@ -27,8 +27,14 @@ import org.apache.seatunnel.api.source.SourceSplitEnumerator;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import org.apache.seatunnel.common.utils.ExceptionUtils;
 import org.apache.seatunnel.connectors.seatunnel.kudu.config.KuduSourceConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.kudu.exception.KuduConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduInputFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.kudu.kuduclient.KuduTypeMapper;
 import org.apache.seatunnel.connectors.seatunnel.kudu.state.KuduSourceState;
@@ -103,13 +109,16 @@ public class KuduSource implements 
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
         String kudumaster = "";
         String tableName = "";
         String columnslist = "";
-        if (config.hasPath(KuduSourceConfig.KUDU_MASTER.key()) && 
config.hasPath(KuduSourceConfig.TABLE_NAME.key()) && 
config.hasPath(KuduSourceConfig.COLUMNS_LIST.key())) {
+        CheckResult checkResult = CheckConfigUtil.checkAllExists(config, 
KuduSourceConfig.KUDU_MASTER.key(), KuduSourceConfig.TABLE_NAME.key(), 
KuduSourceConfig.COLUMNS_LIST.key());
+        if (checkResult.isSuccess()) {
             kudumaster = config.getString(KuduSourceConfig.KUDU_MASTER.key());
             tableName = config.getString(KuduSourceConfig.TABLE_NAME.key());
             columnslist = 
config.getString(KuduSourceConfig.COLUMNS_LIST.key());
             kuduInputFormat = new KuduInputFormat(kudumaster, tableName, 
columnslist);
         } else {
-            throw new RuntimeException("Missing Source configuration 
parameters");
+            throw new 
KuduConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SINK, 
checkResult.getMsg()));
         }
         try {
             KuduClient.KuduClientBuilder kuduClientBuilder = new
@@ -121,7 +130,7 @@ public class KuduSource implements 
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
             SeaTunnelRowType seaTunnelRowType = 
getSeaTunnelRowType(kuduClient.openTable(tableName).getSchema().getColumns());
             rowTypeInfo = seaTunnelRowType;
         } catch (KuduException e) {
-            throw new RuntimeException("Parameters in the preparation phase 
fail to be generated", e);
+            throw new 
KuduConnectorException(KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, 
e);
         }
     }
 
@@ -158,7 +167,7 @@ public class KuduSource implements 
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
                 }
             }
         } catch (KuduException e) {
-            throw new RuntimeException("Failed to generate upper and lower 
limits for each partition", e);
+            throw new 
KuduConnectorException(KuduConnectorErrorCode.GENERATE_KUDU_PARAMETERS_FAILED, 
"Failed to generate upper and lower limits for each partition");
         }
         return new PartitionParameter(keyColumn, Long.parseLong(minKey + ""), 
Long.parseLong(maxKey + ""));
     }
@@ -174,8 +183,8 @@ public class KuduSource implements 
SeaTunnelSource<SeaTunnelRow, KuduSourceSplit
             }
 
         } catch (Exception e) {
-            log.warn("get row type info exception", e);
-            throw new PrepareFailException("kudu", PluginType.SOURCE, 
e.toString());
+            throw new 
KuduConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, 
String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    "Kudu", PluginType.SOURCE, ExceptionUtils.getMessage(e)));
         }
         return new SeaTunnelRowType(fieldNames.toArray(new 
String[fieldNames.size()]), seaTunnelDataTypes.toArray(new 
SeaTunnelDataType<?>[seaTunnelDataTypes.size()]));
     }

Reply via email to