This is an automated email from the ASF dual-hosted git repository.

tyrantlucifer pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 04e1743d9 [Improve][Connector-V2][Clickhouse] Unified exception for 
Clickhouse source & sink connector (#3563)
04e1743d9 is described below

commit 04e1743d9e9f9675bed5f0813b4c50ab9cba9d15
Author: FWLamb <[email protected]>
AuthorDate: Tue Nov 29 10:51:29 2022 +0800

    [Improve][Connector-V2][Clickhouse] Unified exception for Clickhouse source 
& sink connector (#3563)
    
    * unified exception
    
    * unified exception
    
    * update
    
    * resolve conflicts
    
    * resolve conflicts
    
    * resolve conflicts
    
    * resolve conflicts
    
    * resolve conflicts
    
    Co-authored-by: yangbinbin <[email protected]>
---
 .../connector-v2/Error-Quick-Reference-Manual.md   | 13 +++-
 .../config/ClickhouseFileCopyMethod.java           |  5 +-
 .../exception/ClickhouseConnectorErrorCode.java    | 49 +++++++++++++++
 .../exception/ClickhouseConnectorException.java    | 35 +++++++++++
 .../clickhouse/sink/client/ClickhouseProxy.java    | 54 ++++++++--------
 .../clickhouse/sink/client/ClickhouseSink.java     | 15 +++--
 .../sink/client/ClickhouseSinkWriter.java          | 10 +--
 .../clickhouse/sink/client/ShardRouter.java        | 10 +--
 .../clickhouse/sink/file/ClickhouseFileSink.java   | 38 ++++++-----
 .../sink/file/ClickhouseFileSinkWriter.java        | 73 +++++++++++-----------
 .../clickhouse/sink/file/FileTransferFactory.java  |  4 +-
 .../clickhouse/sink/file/RsyncFileTransfer.java    | 19 +++---
 .../clickhouse/sink/file/ScpFileTransfer.java      | 18 +++---
 .../sink/inject/ArrayInjectFunction.java           |  5 +-
 .../sink/inject/StringInjectFunction.java          |  5 +-
 .../clickhouse/source/ClickhouseSource.java        | 14 +++--
 .../seatunnel/clickhouse/util/TypeConvertUtil.java |  8 ++-
 17 files changed, 260 insertions(+), 115 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 707609d7a..4ed61a0df 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -46,7 +46,7 @@ problems encountered by users.
 | CASSANDRA-02 | Add batch SeaTunnelRow data into a batch failed | When users 
encounter this error code, it means that cassandra has some problems, please 
check it whether is work                                                      |
 | CASSANDRA-03 | Close cql session of cassandra failed           | When users 
encounter this error code, it means that cassandra has some problems, please 
check it whether is work                                                      |
 | CASSANDRA-04 | No data in source table                         | When users 
encounter this error code, it means that source cassandra table has no data, 
please check it                                                               |
-| CASSANDRA-05 | Parse ip address from string field field        | When users 
encounter this error code, it means that upstream data does not match ip 
address format, please check it                                                 
  |
+| CASSANDRA-05 | Parse ip address from string failed             | When users 
encounter this error code, it means that upstream data does not match ip 
address format, please check it                                                 
  |
 
 ## Slack Connector Error Codes
 
@@ -158,3 +158,14 @@ problems encountered by users.
 
|---------|-------------------------------|-----------------------------------------------------------------------------------------------------------|
 | HUDI-01 | Create ParquetMetadata failed | When the user encounters this 
error code, it indicates that ParquetMetadata creation failed. Please check |
 | HUDI-02 | Kerberos Authorized failed    | When the user encounters this 
error code, it indicates that Kerberos authorization failed. Please check   |
+
+## Clickhouse Connector Error Codes
+
+| code          | description                                                  
             | solution                                                         
                                                                                
                       |
+|---------------|---------------------------------------------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| CLICKHOUSE-01 | Field is not existed in target table                         
             | When users encounter this error code, it means that the fields 
of upstream data don't meet with target clickhouse table, please check target 
clickhouse table structure |
+| CLICKHOUSE-02 | Can’t find password of shard node                            
             | When users encounter this error code, it means that no password 
is configured for each node, please check                                       
                        |
+| CLICKHOUSE-03 | Can’t delete directory                                       
             | When users encounter this error code, it means that the 
directory does not exist or does not have permission, please check              
                                |
+| CLICKHOUSE-04 | Ssh operation failed, such as 
(login,connect,authentication,close) etc... | When users encounter this error 
code, it means that the ssh request failed, please check your network 
environment                                                       |
+| CLICKHOUSE-05 | Get cluster list from clickhouse failed                      
             | When users encounter this error code, it means that the 
clickhouse cluster is not configured correctly, please check                    
                                |
+| CLICKHOUSE-06 | Shard key not found in table                                 
             | When users encounter this error code, it means that the shard 
key of the distributed table is not configured, please check                    
                          |
\ No newline at end of file
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
index cec1f48bb..251dd31de 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/config/ClickhouseFileCopyMethod.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.config;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
 public enum ClickhouseFileCopyMethod {
     SCP("scp"),
     RSYNC("rsync"),
@@ -37,6 +40,6 @@ public enum ClickhouseFileCopyMethod {
                 return clickhouseFileCopyMethod;
             }
         }
-        throw new IllegalArgumentException("Unknown ClickhouseFileCopyMethod: 
" + name);
+        throw new 
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "Unknown 
ClickhouseFileCopyMethod: " + name);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorErrorCode.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorErrorCode.java
new file mode 100644
index 000000000..3fd0f9f41
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorErrorCode.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum ClickhouseConnectorErrorCode implements SeaTunnelErrorCode {
+
+    FIELD_NOT_IN_TABLE("CLICKHOUSE-01", "Field is not existed in target 
table"),
+    PASSWORD_NOT_FOUND_IN_SHARD_NODE("CLICKHOUSE-02", "Can’t find password of 
shard node"),
+    DELETE_DIRECTORY_FIELD("CLICKHOUSE-03", "Can’t delete directory"),
+    SSH_OPERATION_FAILED("CLICKHOUSE-04", "Ssh operation failed, such as 
(login,connect,authentication,close) etc..."),
+    CLUSTER_LIST_GET_FAILED("CLICKHOUSE-05", "Get cluster list from clickhouse 
failed"),
+    SHARD_KEY_NOT_FOUND("CLICKHOUSE-06", "Shard key not found in table"),
+    FILE_NOT_EXISTS("CLICKHOUSE-07", "Clickhouse local file not exists");
+
+    private final String code;
+    private final String description;
+
+    ClickhouseConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
+    }
+
+    @Override
+    public String getDescription() {
+        return description;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorException.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorException.java
new file mode 100644
index 000000000..421a83e72
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/exception/ClickhouseConnectorException.java
@@ -0,0 +1,35 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one or more
+ *     contributor license agreements.  See the NOTICE file distributed with
+ *     this work for additional information regarding copyright ownership.
+ *     The ASF licenses this file to You under the Apache License, Version 2.0
+ *     (the "License"); you may not use this file except in compliance with
+ *     the License.  You may obtain a copy of the License at
+ *
+ *        http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *     Unless required by applicable law or agreed to in writing, software
+ *     distributed under the License is distributed on an "AS IS" BASIS,
+ *     WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *     See the License for the specific language governing permissions and
+ *     limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.clickhouse.exception;
+
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class ClickhouseConnectorException extends SeaTunnelRuntimeException {
+    public ClickhouseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public ClickhouseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
+    }
+
+    public ClickhouseConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
index 3a3aa082c..67b4db7aa 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseProxy.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
@@ -48,7 +52,7 @@ public class ClickhouseProxy {
     public ClickhouseProxy(ClickHouseNode node) {
         this.client = ClickHouseClient.newInstance(node.getProtocol());
         this.clickhouseRequest =
-                
client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
+            
client.connect(node).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
 
     }
 
@@ -58,7 +62,7 @@ public class ClickhouseProxy {
 
     public ClickHouseRequest<?> getClickhouseConnection(Shard shard) {
         ClickHouseClient c = shardToDataSource
-                .computeIfAbsent(shard, s -> 
ClickHouseClient.newInstance(s.getNode().getProtocol()));
+            .computeIfAbsent(shard, s -> 
ClickHouseClient.newInstance(s.getNode().getProtocol()));
         return 
c.connect(shard.getNode()).format(ClickHouseFormat.RowBinaryWithNamesAndTypes);
     }
 
@@ -77,12 +81,12 @@ public class ClickhouseProxy {
                 // engineFull field will be like : Distributed(cluster, 
database, table[, sharding_key[, policy_name]])
                 String engineFull = record.getValue(0).asString();
                 List<String> infos = 
Arrays.stream(engineFull.substring(12).split(","))
-                        .map(s -> s.replace("'", 
"").trim()).collect(Collectors.toList());
+                    .map(s -> s.replace("'", 
"").trim()).collect(Collectors.toList());
                 return new DistributedEngine(infos.get(0), infos.get(1), 
infos.get(2).replace("\\)", "").trim());
             }
-            throw new RuntimeException("Cannot get distributed table from 
clickhouse, resultSet is empty");
+            throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot 
get distributed table from clickhouse, resultSet is empty");
         } catch (ClickHouseException e) {
-            throw new RuntimeException("Cannot get distributed table from 
clickhouse", e);
+            throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot 
get distributed table from clickhouse", e);
         }
     }
 
@@ -103,7 +107,7 @@ public class ClickhouseProxy {
         try (ClickHouseResponse response = 
request.query(sql).executeAndWait()) {
             response.records().forEach(r -> 
schema.put(r.getValue(0).asString(), r.getValue(1).asString()));
         } catch (ClickHouseException e) {
-            throw new RuntimeException("Cannot get table schema from 
clickhouse", e);
+            throw new 
ClickhouseConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, "Cannot 
get table schema from clickhouse", e);
         }
         return schema;
     }
@@ -124,16 +128,16 @@ public class ClickhouseProxy {
         try (ClickHouseResponse response = 
connection.query(sql).executeAndWait()) {
             response.records().forEach(r -> {
                 shardList.add(new Shard(
-                        r.getValue(0).asInteger(),
-                        r.getValue(1).asInteger(),
-                        r.getValue(2).asInteger(),
-                        r.getValue(3).asString(),
-                        r.getValue(4).asString(),
-                        port, database, username, password));
+                    r.getValue(0).asInteger(),
+                    r.getValue(1).asInteger(),
+                    r.getValue(2).asInteger(),
+                    r.getValue(3).asString(),
+                    r.getValue(4).asString(),
+                    port, database, username, password));
             });
             return shardList;
         } catch (ClickHouseException e) {
-            throw new RuntimeException("Cannot get cluster shard list from 
clickhouse", e);
+            throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.CLUSTER_LIST_GET_FAILED,
 "Cannot get cluster shard list from clickhouse", e);
         }
     }
 
@@ -149,7 +153,7 @@ public class ClickhouseProxy {
         try (ClickHouseResponse response = 
clickhouseRequest.query(sql).executeAndWait()) {
             List<ClickHouseRecord> records = 
response.stream().collect(Collectors.toList());
             if (records.isEmpty()) {
-                throw new RuntimeException("Cannot get table from clickhouse, 
resultSet is empty");
+                throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot 
get table from clickhouse, resultSet is empty");
             }
             ClickHouseRecord record = records.get(0);
             String engine = record.getValue(0).asString();
@@ -160,11 +164,11 @@ public class ClickhouseProxy {
             if ("Distributed".equals(engine)) {
                 distributedEngine = 
getClickhouseDistributedTable(clickhouseRequest, database, table);
                 String localTableSQL = String.format("select 
engine,create_table_query from system.tables where database = '%s' and name = 
'%s'",
-                        distributedEngine.getDatabase(), 
distributedEngine.getTable());
+                    distributedEngine.getDatabase(), 
distributedEngine.getTable());
                 try (ClickHouseResponse rs = 
clickhouseRequest.query(localTableSQL).executeAndWait()) {
                     List<ClickHouseRecord> localTableRecords = 
rs.stream().collect(Collectors.toList());
                     if (localTableRecords.isEmpty()) {
-                        throw new RuntimeException("Cannot get table from 
clickhouse, resultSet is empty");
+                        throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot 
get table from clickhouse, resultSet is empty");
                     }
                     String localEngine = 
localTableRecords.get(0).getValue(0).asString();
                     String createLocalTableDDL = 
localTableRecords.get(0).getValue(1).asString();
@@ -172,16 +176,16 @@ public class ClickhouseProxy {
                 }
             }
             return new ClickhouseTable(
-                    database,
-                    table,
-                    distributedEngine,
-                    engine,
-                    createTableDDL,
-                    engineFull,
-                    dataPaths,
-                    getClickhouseTableSchema(clickhouseRequest, table));
+                database,
+                table,
+                distributedEngine,
+                engine,
+                createTableDDL,
+                engineFull,
+                dataPaths,
+                getClickhouseTableSchema(clickhouseRequest, table));
         } catch (ClickHouseException e) {
-            throw new RuntimeException("Cannot get clickhouse table", e);
+            throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.TABLE_NOT_EXISTED, "Cannot 
get clickhouse table", e);
         }
 
     }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
index 028fe4a5c..b87c8307e 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSink.java
@@ -29,6 +29,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
@@ -40,7 +41,10 @@ import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.config.TypesafeConfigUtils;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file.ClickhouseTable;
@@ -85,12 +89,15 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
         }
 
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
+            throw new ClickhouseConnectorException(
+                SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    getPluginName(), PluginType.SINK, result.getMsg()));
         }
         Map<String, Object> defaultConfig = ImmutableMap.<String, 
Object>builder()
             .put(BULK_SIZE.key(), BULK_SIZE.defaultValue())
             .put(SPLIT_MODE.key(), SPLIT_MODE.defaultValue())
-                .build();
+            .build();
 
         config = config.withFallback(ConfigFactory.parseMap(defaultConfig));
 
@@ -123,7 +130,7 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
             ClickhouseTable table = 
proxy.getClickhouseTable(config.getString(DATABASE.key()),
                 config.getString(TABLE.key()));
             if (!"Distributed".equals(table.getEngine())) {
-                throw new IllegalArgumentException("split mode only support 
table which engine is " +
+                throw new 
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "split mode only 
support table which engine is " +
                     "'Distributed' engine at now");
             }
             if (config.hasPath(SHARDING_KEY.key())) {
@@ -157,7 +164,7 @@ public class ClickhouseSink implements 
SeaTunnelSink<SeaTunnelRow, ClickhouseSin
             // check if the fields exist in schema
             for (String field : fields) {
                 if (!tableSchema.containsKey(field)) {
-                    throw new RuntimeException("Field " + field + " does not 
exist in table " + config.getString(TABLE.key()));
+                    throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, 
"Field " + field + " does not exist in table " + config.getString(TABLE.key()));
                 }
             }
         } else {
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
index 79411ed5f..b103ecccc 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ClickhouseSinkWriter.java
@@ -20,7 +20,9 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ReaderOption;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.ArrayInjectFunction;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject.BigDecimalInjectFunction;
@@ -122,7 +124,7 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
                     intHolder.setValue(0);
                 }
             } catch (SQLException e) {
-                throw new RuntimeException("Failed to close prepared 
statement.", e);
+                throw new 
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Failed to 
close prepared statement.", e);
             }
         }
     }
@@ -145,7 +147,7 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
             }
             clickHouseStatement.addBatch();
         } catch (SQLException e) {
-            throw new RuntimeException("Add row data into batch error", e);
+            throw new 
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Add row 
data into batch error", e);
         }
     }
 
@@ -153,7 +155,7 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
         try {
             clickHouseStatement.executeBatch();
         } catch (Exception e) {
-            throw new RuntimeException("Clickhouse execute batch statement 
error", e);
+            throw new 
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Clickhouse 
execute batch statement error", e);
         }
     }
 
@@ -169,7 +171,7 @@ public class ClickhouseSinkWriter implements 
SinkWriter<SeaTunnelRow, CKCommitIn
                     new ClickhouseBatchStatement(clickhouseConnection, 
preparedStatement, intHolder);
                 result.put(s, batchStatement);
             } catch (SQLException e) {
-                throw new RuntimeException("Clickhouse prepare statement 
error: " + e.getMessage(), e);
+                throw new 
ClickhouseConnectorException(CommonErrorCode.SQL_OPERATION_FAILED, "Clickhouse 
prepare statement error: " + e.getMessage(), e);
             }
         });
         return result;
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
index 4471f8157..71e6430fc 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/client/ShardRouter.java
@@ -17,6 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client;
 
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.DistributedEngine;
@@ -55,15 +57,15 @@ public class ShardRouter implements Serializable {
         this.splitMode = shardMetadata.getSplitMode();
         this.table = shardMetadata.getTable();
         if (StringUtils.isNotEmpty(shardKey) && 
StringUtils.isEmpty(shardKeyType)) {
-            throw new IllegalArgumentException("Shard key " + shardKey + " not 
found in table " + table);
+            throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SHARD_KEY_NOT_FOUND, 
"Shard key " + shardKey + " not found in table " + table);
         }
         ClickHouseRequest<?> connection = proxy.getClickhouseConnection();
         if (splitMode) {
             DistributedEngine localTable = 
proxy.getClickhouseDistributedTable(connection, shardMetadata.getDatabase(), 
table);
             this.shardTable = localTable.getTable();
             List<Shard> shardList = proxy.getClusterShardList(connection, 
localTable.getClusterName(),
-                    localTable.getDatabase(), 
shardMetadata.getDefaultShard().getNode().getPort(),
-                    shardMetadata.getUsername(), shardMetadata.getPassword());
+                localTable.getDatabase(), 
shardMetadata.getDefaultShard().getNode().getPort(),
+                shardMetadata.getUsername(), shardMetadata.getPassword());
             int weight = 0;
             for (Shard shard : shardList) {
                 shards.put(weight, shard);
@@ -87,7 +89,7 @@ public class ShardRouter implements Serializable {
             return 
shards.lowerEntry(threadLocalRandom.nextInt(shardWeightCount + 1)).getValue();
         }
         int offset = (int) 
(HASH_INSTANCE.hash(ByteBuffer.wrap(shardValue.toString().getBytes(StandardCharsets.UTF_8)),
-                0) & Long.MAX_VALUE % shardWeightCount);
+            0) & Long.MAX_VALUE % shardWeightCount);
         return shards.lowerEntry(offset + 1).getValue();
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
index e32ecd2ef..973478a8b 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSink.java
@@ -30,6 +30,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
@@ -40,6 +41,8 @@ import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.ShardMetadata;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
@@ -75,15 +78,18 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
     public void prepare(Config config) throws PrepareFailException {
         CheckResult checkResult = CheckConfigUtil.checkAllExists(config, 
HOST.key(), TABLE.key(), DATABASE.key(), USERNAME.key(), PASSWORD.key(), 
CLICKHOUSE_LOCAL_PATH.key());
         if (!checkResult.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
checkResult.getMsg());
+            throw new ClickhouseConnectorException(
+                SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s",
+                    getPluginName(), PluginType.SINK, checkResult.getMsg()));
         }
         Map<String, Object> defaultConfigs = ImmutableMap.<String, 
Object>builder()
-                .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName())
-                .build();
+            .put(COPY_METHOD.key(), COPY_METHOD.defaultValue().getName())
+            .build();
 
         config = config.withFallback(ConfigFactory.parseMap(defaultConfigs));
         List<ClickHouseNode> nodes = 
ClickhouseUtil.createNodes(config.getString(HOST.key()),
-                config.getString(DATABASE.key()), 
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
+            config.getString(DATABASE.key()), 
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
 
         ClickhouseProxy proxy = new ClickhouseProxy(nodes.get(0));
         Map<String, String> tableSchema = 
proxy.getClickhouseTableSchema(config.getString(TABLE.key()));
@@ -94,34 +100,34 @@ public class ClickhouseFileSink implements 
SeaTunnelSink<SeaTunnelRow, Clickhous
             shardKeyType = tableSchema.get(shardKey);
         }
         ShardMetadata shardMetadata = new ShardMetadata(
-                shardKey,
-                shardKeyType,
-                config.getString(DATABASE.key()),
-                config.getString(TABLE.key()),
-                false, // we don't need to set splitMode in clickhouse file 
mode.
-                new Shard(1, 1, nodes.get(0)), 
config.getString(USERNAME.key()), config.getString(PASSWORD.key()));
+            shardKey,
+            shardKeyType,
+            config.getString(DATABASE.key()),
+            config.getString(TABLE.key()),
+            false, // we don't need to set splitMode in clickhouse file mode.
+            new Shard(1, 1, nodes.get(0)), config.getString(USERNAME.key()), 
config.getString(PASSWORD.key()));
         List<String> fields;
         if (config.hasPath(FIELDS.key())) {
             fields = config.getStringList(FIELDS.key());
             // check if the fields exist in schema
             for (String field : fields) {
                 if (!tableSchema.containsKey(field)) {
-                    throw new RuntimeException("Field " + field + " does not 
exist in table " + config.getString(TABLE.key()));
+                    throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.FIELD_NOT_IN_TABLE, 
"Field " + field + " does not exist in table " + config.getString(TABLE.key()));
                 }
             }
         } else {
             fields = new ArrayList<>(tableSchema.keySet());
         }
         Map<String, String> nodeUser = 
config.getObjectList(NODE_PASS.key()).stream()
-                .collect(Collectors.toMap(configObject -> 
configObject.toConfig().getString(NODE_ADDRESS),
-                    configObject -> 
configObject.toConfig().hasPath(USERNAME.key()) ? 
configObject.toConfig().getString(USERNAME.key()) : "root"));
+            .collect(Collectors.toMap(configObject -> 
configObject.toConfig().getString(NODE_ADDRESS),
+                configObject -> 
configObject.toConfig().hasPath(USERNAME.key()) ? 
configObject.toConfig().getString(USERNAME.key()) : "root"));
         Map<String, String> nodePassword = 
config.getObjectList(NODE_PASS.key()).stream()
-                .collect(Collectors.toMap(configObject -> 
configObject.toConfig().getString(NODE_ADDRESS),
-                    configObject -> 
configObject.toConfig().getString(PASSWORD.key())));
+            .collect(Collectors.toMap(configObject -> 
configObject.toConfig().getString(NODE_ADDRESS),
+                configObject -> 
configObject.toConfig().getString(PASSWORD.key())));
 
         proxy.close();
         this.readerOption = new FileReaderOption(shardMetadata, tableSchema, 
fields, config.getString(CLICKHOUSE_LOCAL_PATH.key()),
-                
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, 
nodePassword);
+            
ClickhouseFileCopyMethod.from(config.getString(COPY_METHOD.key())), nodeUser, 
nodePassword);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
index ac5900915..6f24e2c79 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ClickhouseFileSinkWriter.java
@@ -20,7 +20,10 @@ package 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.common.config.Common;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.FileReaderOption;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.clickhouse.shard.Shard;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ClickhouseProxy;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.client.ShardRouter;
@@ -70,18 +73,18 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
         proxy = new 
ClickhouseProxy(this.readerOption.getShardMetadata().getDefaultShard().getNode());
         shardRouter = new ShardRouter(proxy, 
this.readerOption.getShardMetadata());
         clickhouseTable = 
proxy.getClickhouseTable(this.readerOption.getShardMetadata().getDatabase(),
-                this.readerOption.getShardMetadata().getTable());
+            this.readerOption.getShardMetadata().getTable());
         rowCache = new HashMap<>(Common.COLLECTION_SIZE);
 
         nodePasswordCheck();
 
         // find file local save path of each node
         shardLocalDataPaths = shardRouter.getShards().values().stream()
-                .collect(Collectors.toMap(Function.identity(), shard -> {
-                    ClickhouseTable shardTable = 
proxy.getClickhouseTable(shard.getNode().getDatabase().get(),
-                            clickhouseTable.getLocalTableName());
-                    return shardTable.getDataPaths();
-                }));
+            .collect(Collectors.toMap(Function.identity(), shard -> {
+                ClickhouseTable shardTable = 
proxy.getClickhouseTable(shard.getNode().getDatabase().get(),
+                    clickhouseTable.getLocalTableName());
+                return shardTable.getDataPaths();
+            }));
     }
 
     @Override
@@ -94,8 +97,8 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
         if (!this.readerOption.isNodeFreePass()) {
             shardRouter.getShards().values().forEach(shard -> {
                 if 
(!this.readerOption.getNodePassword().containsKey(shard.getNode().getAddress().getHostName())
-                        && 
!this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) {
-                    throw new RuntimeException("Cannot find password of shard 
" + shard.getNode().getAddress().getHostName());
+                    && 
!this.readerOption.getNodePassword().containsKey(shard.getNode().getHost())) {
+                    throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.PASSWORD_NOT_FOUND_IN_SHARD_NODE,
 "Cannot find password of shard " + shard.getNode().getAddress().getHostName());
                 }
             });
         }
@@ -126,12 +129,12 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
             // clear local file
             clearLocalFileDirectory(clickhouseLocalFiles);
         } catch (Exception e) {
-            throw new RuntimeException("Flush data into clickhouse file 
error", e);
+            throw new 
ClickhouseConnectorException(CommonErrorCode.FLUSH_DATA_FAILED, "Flush data 
into clickhouse file error", e);
         }
     }
 
     private List<String> generateClickhouseLocalFiles(List<SeaTunnelRow> rows) 
throws IOException,
-            InterruptedException {
+        InterruptedException {
         if (rows.isEmpty()) {
             return Collections.emptyList();
         }
@@ -140,18 +143,18 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
         FileUtils.forceMkdir(new File(clickhouseLocalFile));
         String clickhouseLocalFileTmpFile = clickhouseLocalFile + 
"/local_data.log";
         try (FileChannel fileChannel = 
FileChannel.open(Paths.get(clickhouseLocalFileTmpFile), 
StandardOpenOption.WRITE,
-                StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) {
+            StandardOpenOption.READ, StandardOpenOption.CREATE_NEW)) {
             String data = rows.stream()
-                    .map(row -> 
this.readerOption.getFields().stream().map(field -> 
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
-                            .collect(Collectors.joining("\t")))
-                    .collect(Collectors.joining("\n"));
+                .map(row -> this.readerOption.getFields().stream().map(field 
-> 
row.getField(this.readerOption.getSeaTunnelRowType().indexOf(field)).toString())
+                    .collect(Collectors.joining("\t")))
+                .collect(Collectors.joining("\n"));
             MappedByteBuffer buffer = 
fileChannel.map(FileChannel.MapMode.READ_WRITE, fileChannel.size(),
-                    data.getBytes(StandardCharsets.UTF_8).length);
+                data.getBytes(StandardCharsets.UTF_8).length);
             buffer.put(data.getBytes(StandardCharsets.UTF_8));
         }
 
         List<String> localPaths = 
Arrays.stream(this.readerOption.getClickhouseLocalPath().trim().split(" "))
-                .collect(Collectors.toList());
+            .collect(Collectors.toList());
         List<String> command = new ArrayList<>(localPaths);
         if (localPaths.size() == 1) {
             command.add("local");
@@ -164,17 +167,17 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
         command.add("\"" + "temp_table" + uuid + "\"");
         command.add("-q");
         command.add(String.format(
-                "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"",
-                
clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() + 
".", "").replaceAll("`", ""),
-                clickhouseTable.getLocalTableName(),
-                readerOption.getTableSchema().keySet().stream().map(s -> {
-                    if (readerOption.getFields().contains(s)) {
-                        return s;
-                    } else {
-                        return "NULL";
-                    }
-                }).collect(Collectors.joining(",")),
-                uuid));
+            "\"%s; INSERT INTO TABLE %s SELECT %s FROM temp_table%s;\"",
+            
clickhouseTable.getCreateTableDDL().replace(clickhouseTable.getDatabase() + 
".", "").replaceAll("`", ""),
+            clickhouseTable.getLocalTableName(),
+            readerOption.getTableSchema().keySet().stream().map(s -> {
+                if (readerOption.getFields().contains(s)) {
+                    return s;
+                } else {
+                    return "NULL";
+                }
+            }).collect(Collectors.joining(",")),
+            uuid));
         command.add("--path");
         command.add("\"" + clickhouseLocalFile + "\"");
         log.info("Generate clickhouse local file command: {}", String.join(" 
", command));
@@ -192,16 +195,16 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
         start.waitFor();
         File file = new File(clickhouseLocalFile + "/data/_local/" + 
clickhouseTable.getLocalTableName());
         if (!file.exists()) {
-            throw new RuntimeException("clickhouse local file not exists");
+            throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.FILE_NOT_EXISTS, 
"clickhouse local file not exists");
         }
         File[] files = file.listFiles();
         if (files == null) {
-            throw new RuntimeException("clickhouse local file not exists");
+            throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.FILE_NOT_EXISTS, 
"clickhouse local file not exists");
         }
         return Arrays.stream(files)
-                .filter(File::isDirectory)
-                .filter(f -> !"detached".equals(f.getName()))
-                .map(File::getAbsolutePath).collect(Collectors.toList());
+            .filter(File::isDirectory)
+            .filter(f -> !"detached".equals(f.getName()))
+            .map(File::getAbsolutePath).collect(Collectors.toList());
     }
 
     private void attachClickhouseLocalFileToServer(Shard shard, List<String> 
clickhouseLocalFiles) throws ClickHouseException {
@@ -215,8 +218,8 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
         ClickHouseRequest<?> request = proxy.getClickhouseConnection(shard);
         for (String clickhouseLocalFile : clickhouseLocalFiles) {
             ClickHouseResponse response = request.query(String.format("ALTER 
TABLE %s ATTACH PART '%s'",
-                    clickhouseTable.getLocalTableName(),
-                    
clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 
1))).executeAndWait();
+                clickhouseTable.getLocalTableName(),
+                
clickhouseLocalFile.substring(clickhouseLocalFile.lastIndexOf("/") + 
1))).executeAndWait();
             response.close();
         }
     }
@@ -230,7 +233,7 @@ public class ClickhouseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, CKComm
                 FileUtils.deleteDirectory(new File(localFileDir));
             }
         } catch (IOException e) {
-            throw new RuntimeException("Unable to delete directory " + 
localFileDir, e);
+            throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.DELETE_DIRECTORY_FIELD,
 "Unable to delete directory " + localFileDir, e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransferFactory.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransferFactory.java
index 14455b45f..3af640c50 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransferFactory.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/FileTransferFactory.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseFileCopyMethod;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 
 public class FileTransferFactory {
     public static FileTransfer createFileTransfer(ClickhouseFileCopyMethod 
type, String host, String user, String password) {
@@ -27,7 +29,7 @@ public class FileTransferFactory {
             case RSYNC:
                 return new RsyncFileTransfer(host, user, password);
             default:
-                throw new RuntimeException("unsupported clickhouse file copy 
method:" + type);
+                throw new 
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "unsupported 
clickhouse file copy method:" + type);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
index 4dd8bc766..9cbf176ef 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/RsyncFileTransfer.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sshd.client.SshClient;
@@ -58,10 +62,11 @@ public class RsyncFileTransfer implements FileTransfer {
             }
             // TODO support add publicKey to identity
             if (!clientSession.auth().verify().isSuccess()) {
-                throw new IOException("ssh host " + host + "authentication 
failed");
+                throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED, 
"ssh host " + host + "authentication failed");
             }
         } catch (IOException e) {
-            throw new RuntimeException("Failed to connect to host: " + host + 
" by user: " + user + " on port 22", e);
+            throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED,
+                "Failed to connect to host: " + host + " by user: " + user + " 
on port 22", e);
         }
     }
 
@@ -96,7 +101,7 @@ public class RsyncFileTransfer implements FileTransfer {
             }
             start.waitFor();
         } catch (IOException | InterruptedException ex) {
-            throw new RuntimeException("Rsync failed to transfer file: " + 
sourcePath + " to: " + targetPath, ex);
+            throw new 
ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "Rsync 
failed to transfer file: " + sourcePath + " to: " + targetPath, ex);
         }
         // remote exec command to change file owner. Only file owner equal 
with server's clickhouse user can
         // make ATTACH command work.
@@ -104,7 +109,7 @@ public class RsyncFileTransfer implements FileTransfer {
         command.add("ls");
         command.add("-l");
         command.add(targetPath.substring(0,
-                StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
+            StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
         command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R 
{}:{} " + targetPath);
         try {
             String finalCommand = String.join(" ", command);
@@ -118,7 +123,7 @@ public class RsyncFileTransfer implements FileTransfer {
     @Override
     public void transferAndChown(List<String> sourcePaths, String targetPath) {
         if (sourcePaths == null) {
-            throw new IllegalArgumentException("sourcePath is null");
+            throw new 
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "sourcePath is 
null");
         }
         sourcePaths.forEach(sourcePath -> transferAndChown(sourcePath, 
targetPath));
     }
@@ -129,7 +134,7 @@ public class RsyncFileTransfer implements FileTransfer {
             try {
                 clientSession.close();
             } catch (IOException e) {
-                throw new RuntimeException("Failed to close ssh session", e);
+                throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED, 
"Failed to close ssh session", e);
             }
         }
         if (sshClient != null && sshClient.isOpen()) {
@@ -137,7 +142,7 @@ public class RsyncFileTransfer implements FileTransfer {
             try {
                 sshClient.close();
             } catch (IOException e) {
-                throw new RuntimeException("Failed to close ssh client", e);
+                throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED, 
"Failed to close ssh client", e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
index 1c207276a..1c39815d8 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/file/ScpFileTransfer.java
@@ -17,6 +17,10 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.file;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.sshd.client.SshClient;
@@ -58,11 +62,11 @@ public class ScpFileTransfer implements FileTransfer {
             }
             // TODO support add publicKey to identity
             if (!clientSession.auth().verify().isSuccess()) {
-                throw new IOException("ssh host " + host + "authentication 
failed");
+                throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED, 
"ssh host " + host + "authentication failed");
             }
             scpClient = 
ScpClientCreator.instance().createScpClient(clientSession);
         } catch (IOException e) {
-            throw new RuntimeException("Failed to connect to host: " + host + 
" by user: " + user + " on port 22", e);
+            throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED, 
"Failed to connect to host: " + host + " by user: " + user + " on port 22", e);
         }
     }
 
@@ -76,7 +80,7 @@ public class ScpFileTransfer implements FileTransfer {
                 ScpClient.Option.TargetIsDirectory,
                 ScpClient.Option.PreserveAttributes);
         } catch (IOException e) {
-            throw new RuntimeException("Scp failed to transfer file: " + 
sourcePath + " to: " + targetPath, e);
+            throw new 
ClickhouseConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, "Scp failed 
to transfer file: " + sourcePath + " to: " + targetPath, e);
         }
         // remote exec command to change file owner. Only file owner equal 
with server's clickhouse user can
         // make ATTACH command work.
@@ -84,7 +88,7 @@ public class ScpFileTransfer implements FileTransfer {
         command.add("ls");
         command.add("-l");
         command.add(targetPath.substring(0,
-                StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
+            StringUtils.stripEnd(targetPath, "/").lastIndexOf("/")) + "/");
         command.add("| tail -n 1 | awk '{print $3}' | xargs -t -i chown -R 
{}:{} " + targetPath);
         try {
             String finalCommand = String.join(" ", command);
@@ -98,7 +102,7 @@ public class ScpFileTransfer implements FileTransfer {
     @Override
     public void transferAndChown(List<String> sourcePaths, String targetPath) {
         if (sourcePaths == null) {
-            throw new IllegalArgumentException("sourcePath is null");
+            throw new 
ClickhouseConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, "sourcePath is 
null");
         }
         sourcePaths.forEach(sourcePath -> transferAndChown(sourcePath, 
targetPath));
     }
@@ -109,7 +113,7 @@ public class ScpFileTransfer implements FileTransfer {
             try {
                 clientSession.close();
             } catch (IOException e) {
-                throw new RuntimeException("Failed to close ssh session", e);
+                throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED, 
"Failed to close ssh session", e);
             }
         }
         if (sshClient != null && sshClient.isOpen()) {
@@ -117,7 +121,7 @@ public class ScpFileTransfer implements FileTransfer {
             try {
                 sshClient.close();
             } catch (IOException e) {
-                throw new RuntimeException("Failed to close ssh client", e);
+                throw new 
ClickhouseConnectorException(ClickhouseConnectorErrorCode.SSH_OPERATION_FAILED, 
"Failed to close ssh client", e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
index 644e61994..606e7f80d 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/ArrayInjectFunction.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
 import java.sql.PreparedStatement;
 import java.sql.SQLException;
 import java.util.Arrays;
@@ -74,7 +77,7 @@ public class ArrayInjectFunction implements 
ClickhouseFieldInjectFunction {
                 elements = Arrays.copyOf(elements, elements.length, 
Boolean[].class);
                 break;
             default:
-                throw new IllegalArgumentException("array inject error, not 
supported data type: " + type);
+                throw new 
ClickhouseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "array 
inject error, unsupported data type: " + type);
         }
         statement.setArray(index, 
statement.getConnection().createArrayOf(sqlType, elements));
     }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
index 96d1f5193..a8ae28eba 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/sink/inject/StringInjectFunction.java
@@ -17,6 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.clickhouse.sink.inject;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
+
 import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 
@@ -43,7 +46,7 @@ public class StringInjectFunction implements 
ClickhouseFieldInjectFunction {
                 statement.setString(index, value.toString());
             }
         } catch (JsonProcessingException e) {
-            throw new RuntimeException(e);
+            throw new 
ClickhouseConnectorException(CommonErrorCode.JSON_OPERATION_FAILED, 
e.getMessage());
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
index 59a24cac9..f9d857b9c 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/source/ClickhouseSource.java
@@ -24,6 +24,7 @@ import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.Clickh
 import static 
org.apache.seatunnel.connectors.seatunnel.clickhouse.config.ClickhouseConfig.USERNAME;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.Boundedness;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.source.SourceReader;
@@ -34,6 +35,7 @@ import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.state.ClickhouseSourceState;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.ClickhouseUtil;
 import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.util.TypeConvertUtil;
@@ -66,17 +68,18 @@ public class ClickhouseSource implements 
SeaTunnelSource<SeaTunnelRow, Clickhous
     public void prepare(Config config) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(config, 
HOST.key(), DATABASE.key(), SQL.key(), USERNAME.key(), PASSWORD.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s", 
getPluginName(), PluginType.SOURCE, result.getMsg()));
         }
         servers = ClickhouseUtil.createNodes(config.getString(HOST.key()), 
config.getString(DATABASE.key()),
-                config.getString(USERNAME.key()), 
config.getString(PASSWORD.key()));
+            config.getString(USERNAME.key()), 
config.getString(PASSWORD.key()));
 
         sql = config.getString(SQL.key());
         ClickHouseNode currentServer = 
servers.get(ThreadLocalRandom.current().nextInt(servers.size()));
         try (ClickHouseClient client = 
ClickHouseClient.newInstance(currentServer.getProtocol());
              ClickHouseResponse response =
-                     
client.connect(currentServer).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
-                             
.query(modifySQLToLimit1(config.getString(SQL.key()))).executeAndWait()) {
+                 
client.connect(currentServer).format(ClickHouseFormat.RowBinaryWithNamesAndTypes)
+                     
.query(modifySQLToLimit1(config.getString(SQL.key()))).executeAndWait()) {
 
             int columnSize = response.getColumns().size();
             String[] fieldNames = new String[columnSize];
@@ -90,7 +93,8 @@ public class ClickhouseSource implements 
SeaTunnelSource<SeaTunnelRow, Clickhous
             this.rowTypeInfo = new SeaTunnelRowType(fieldNames, 
seaTunnelDataTypes);
 
         } catch (ClickHouseException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
e.getMessage());
+            throw new 
ClickhouseConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                String.format("PluginName: %s, PluginType: %s, Message: %s", 
getPluginName(), PluginType.SOURCE, e.getMessage()));
         }
 
     }
diff --git 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
index abfe96809..4fedc083e 100644
--- 
a/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
+++ 
b/seatunnel-connectors-v2/connector-clickhouse/src/main/java/org/apache/seatunnel/connectors/seatunnel/clickhouse/util/TypeConvertUtil.java
@@ -23,6 +23,8 @@ import org.apache.seatunnel.api.table.type.DecimalType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.clickhouse.exception.ClickhouseConnectorException;
 
 import com.clickhouse.client.ClickHouseColumn;
 import com.clickhouse.client.ClickHouseValue;
@@ -59,7 +61,7 @@ public class TypeConvertUtil {
             } else if (BasicType.BYTE_TYPE.equals(dataType)) {
                 return ArrayType.BYTE_ARRAY_TYPE;
             } else {
-                throw new IllegalArgumentException("data type in array is not 
supported: " + subArrayDataType.getDataType());
+                throw new 
ClickhouseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "data type 
in array is not supported: " + subArrayDataType.getDataType());
             }
         }
         Class<?> type = column.getDataType().getObjectClass();
@@ -99,7 +101,7 @@ public class TypeConvertUtil {
             return BasicType.STRING_TYPE;
         } else {
             // TODO support pojo
-            throw new IllegalArgumentException("not supported data type: " + 
column.getDataType());
+            throw new 
ClickhouseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
"unsupported data type: " + column.getDataType());
         }
     }
 
@@ -151,7 +153,7 @@ public class TypeConvertUtil {
             }
         } else {
             // TODO support pojo
-            throw new IllegalArgumentException("not supported data type: " + 
dataType);
+            throw new 
ClickhouseConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, 
"unsupported data type: " + dataType);
         }
     }
 

Reply via email to