This is an automated email from the ASF dual-hosted git repository.

wuchunfu pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 031e8e263 [Improve][Connector-V2][File] Unified excetion for file 
source & sink connectors (#3525)
031e8e263 is described below

commit 031e8e263cd73b5ab4270aab614048aa8e86eb01
Author: Tyrantlucifer <[email protected]>
AuthorDate: Sat Nov 26 20:37:36 2022 +0800

    [Improve][Connector-V2][File] Unified excetion for file source & sink 
connectors (#3525)
    
    * [Improve][Connector-V2][File] Unified exception for file source & sink 
connector
    
    * [Improve][Connector-V2][File] Update error codes manual
---
 .../connector-v2/Error-Quick-Reference-Manual.md   | 12 ++++++-
 .../seatunnel/file/hdfs/sink/BaseHdfsFileSink.java |  6 +++-
 .../file/hdfs/source/BaseHdfsFileSource.java       | 23 ++++++++----
 .../seatunnel/file/config/BaseTextFileConfig.java  |  5 ++-
 ...nException.java => FileConnectorErrorCode.java} | 26 +++++++++++---
 ...nException.java => FileConnectorException.java} | 17 ++++++---
 .../seatunnel/file/sink/BaseFileSinkWriter.java    |  7 ++--
 .../file/sink/config/TextFileSinkConfig.java       | 14 +++++---
 .../seatunnel/file/sink/util/FileSystemUtils.java  | 17 +++++----
 .../file/sink/writer/AbstractWriteStrategy.java    |  9 +++--
 .../file/sink/writer/JsonWriteStrategy.java        | 17 ++++-----
 .../file/sink/writer/OrcWriteStrategy.java         | 38 +++++++++++++-------
 .../file/sink/writer/ParquetWriteStrategy.java     | 18 ++++++----
 .../file/sink/writer/TextWriteStrategy.java        | 13 ++++---
 .../seatunnel/file/sink/writer/WriteStrategy.java  |  5 +--
 .../file/sink/writer/WriteStrategyFactory.java     |  4 ++-
 .../file/source/BaseFileSourceReader.java          |  5 ++-
 .../file/source/reader/AbstractReadStrategy.java   |  3 +-
 .../file/source/reader/JsonReadStrategy.java       | 13 ++++---
 .../file/source/reader/OrcReadStrategy.java        | 42 +++++++++++++---------
 .../file/source/reader/ParquetReadStrategy.java    | 33 +++++++++--------
 .../seatunnel/file/source/reader/ReadStrategy.java |  6 ++--
 .../file/source/reader/ReadStrategyFactory.java    |  4 ++-
 .../file/source/reader/TextReadStrategy.java       |  9 ++---
 .../seatunnel/file/ftp/sink/FtpFileSink.java       |  6 +++-
 .../seatunnel/file/ftp/source/FtpFileSource.java   | 27 +++++++++-----
 .../file/local/source/LocalFileSource.java         | 23 ++++++++----
 .../seatunnel/file/oss/sink/OssFileSink.java       |  6 +++-
 .../seatunnel/file/oss/source/OssFileSource.java   | 23 ++++++++----
 .../seatunnel/file/s3/sink/S3FileSink.java         |  6 +++-
 .../seatunnel/file/s3/source/S3FileSource.java     | 23 ++++++++----
 .../seatunnel/file/sftp/sink/SftpFileSink.java     |  6 +++-
 .../seatunnel/file/sftp/source/SftpFileSource.java | 26 +++++++++-----
 33 files changed, 335 insertions(+), 157 deletions(-)

diff --git a/docs/en/connector-v2/Error-Quick-Reference-Manual.md 
b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
index 97000da4f..cf47f0cbc 100644
--- a/docs/en/connector-v2/Error-Quick-Reference-Manual.md
+++ b/docs/en/connector-v2/Error-Quick-Reference-Manual.md
@@ -82,12 +82,13 @@ This document records some common error codes and 
corresponding solutions of Sea
 | SOCKET-02 | Failed to send message to socket server                  | When 
the user encounters this error code, it means that there is a problem sending 
data and retry is not enabled, please check |
 | SOCKET-03 | Unable to write; interrupted while doing another attempt | When 
the user encounters this error code, it means that the data writing is 
interrupted abnormally, please check               |
 
-## Tablestore Connector Error Codes
+## TableStore Connector Error Codes
 
 | code           | description                           | solution            
                                                                                
                                |
 
|----------------|---------------------------------------|-------------------------------------------------------------------------------------------------------------------------------------|
 | TABLESTORE-01  | Failed to send these rows of data     | When users 
encounter this error code, it means that failed to write these rows of data, 
please check the rows that failed to import |
 
+
 ## Hive Connector Error Codes
 
 | code    | description                                                   | 
solution                                                                        
                                              |
@@ -122,3 +123,12 @@ This document records some common error codes and 
corresponding solutions of Sea
 | IOTDB-01 | Close IoTDB session failed                               | When 
the user encounters this error code, it indicates that closing the session 
failed. Please check         |
 | IOTDB-02 | Initialize IoTDB client failed                           | When 
the user encounters this error code, it indicates that the client 
initialization failed. Please check   |
 | IOTDB-03 | Close IoTDB client failed                                | When 
the user encounters this error code, it indicates that closing the client 
failed. Please check          |
+
+
+## File Connector Error Codes
+
+| code    | description                 | solution                             
                                                                                
                                            |
+|---------|-----------------------------|------------------------------------------------------------------------------------------------------------------------------------------------------------------|
+| FILE-01 | File type is invalid        | When users encounter this error 
code, it means that the this file is not the format that user assigned, please 
check it                                          |
+| FILE-02 | Data deserialization failed | When users encounter this error 
code, it means that data from files not satisfied the schema that user 
assigned, please check data from files whether is correct |
+| FILE-03 | Get file list failed        | When users encounter this error 
code, it means that connector try to traverse the path and get file list 
failed, please check file system whether is work        |
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
index e8cbad443..5870394a0 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
@@ -20,10 +20,12 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
@@ -34,7 +36,9 @@ public abstract class BaseHdfsFileSink extends BaseFileSink {
     public void prepare(Config pluginConfig) throws PrepareFailException {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
FS_DEFAULT_NAME_KEY);
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SINK, 
result.getMsg()));
         }
         super.prepare(pluginConfig);
         hadoopConf = new 
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 38e9fa03b..75e9cea1d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -18,14 +18,17 @@
 package org.apache.seatunnel.connectors.seatunnel.file.hdfs.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.source.config.HdfsSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategyFactory;
@@ -41,7 +44,9 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
HdfsSourceConfig.FILE_PATH.key(),
                 HdfsSourceConfig.FILE_TYPE.key(), 
HdfsSourceConfig.DEFAULT_FS.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         readStrategy = 
ReadStrategyFactory.of(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
@@ -50,7 +55,8 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
         try {
             filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
         } catch (IOException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Check file path fail.");
+            String errorMsg = String.format("Get file list from this path [%s] 
failed", path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, 
e);
         }
         // support user-defined schema
         FileFormat fileFormat = 
FileFormat.valueOf(pluginConfig.getString(HdfsSourceConfig.FILE_TYPE.key()).toUpperCase());
@@ -69,16 +75,19 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
                     break;
                 case ORC:
                 case PARQUET:
-                    throw new UnsupportedOperationException("SeaTunnel does 
not support user-defined schema for [parquet, orc] files");
+                    throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                            "SeaTunnel does not support user-defined schema 
for [parquet, orc] files");
                 default:
                     // never got in there
-                    throw new UnsupportedOperationException("SeaTunnel does 
not supported this file format");
+                    throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                            "SeaTunnel does not supported this file format");
             }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
-            } catch (FilePluginException e) {
-                throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "Read file schema error.", e);
+            } catch (FileConnectorException e) {
+                String errorMsg = String.format("Get table schema from file 
[%s] failed", filePaths.get(0));
+                throw new 
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
index 3fb4c9989..9fcd01319 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseTextFileConfig.java
@@ -19,9 +19,11 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.config;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -47,7 +49,8 @@ public class BaseTextFileConfig implements DelimiterConfig, 
CompressConfig, Seri
 
     public BaseTextFileConfig(@NonNull Config config) {
         if (config.hasPath(BaseSinkConfig.COMPRESS_CODEC.key())) {
-            throw new RuntimeException("compress not support now");
+            throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                    "Compress not supported by SeaTunnel file connector now");
         }
 
         if (config.hasPath(BaseSinkConfig.FIELD_DELIMITER.key()) &&
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
similarity index 56%
copy from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
copy to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
index e747c6aea..e966ebcd5 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorErrorCode.java
@@ -17,12 +17,28 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.exception;
 
-public class FilePluginException extends Exception {
-    public FilePluginException(String message) {
-        super(message);
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+
+public enum FileConnectorErrorCode implements SeaTunnelErrorCode {
+    FILE_TYPE_INVALID("FILE-01", "File type is invalid"),
+    DATA_DESERIALIZE_FAILED("FILE-02", "Data deserialization failed"),
+    FILE_LIST_GET_FAILED("FILE-03", "Get file list failed");
+
+    private final String code;
+    private final String description;
+
+    FileConnectorErrorCode(String code, String description) {
+        this.code = code;
+        this.description = description;
+    }
+
+    @Override
+    public String getCode() {
+        return code;
     }
 
-    public FilePluginException(String message, Throwable cause) {
-        super(message, cause);
+    @Override
+    public String getDescription() {
+        return description;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorException.java
similarity index 57%
rename from 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
rename to 
seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorException.java
index e747c6aea..39404b01b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FilePluginException.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/exception/FileConnectorException.java
@@ -17,12 +17,19 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.exception;
 
-public class FilePluginException extends Exception {
-    public FilePluginException(String message) {
-        super(message);
+import org.apache.seatunnel.common.exception.SeaTunnelErrorCode;
+import org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
+
+public class FileConnectorException extends SeaTunnelRuntimeException {
+    public FileConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage) {
+        super(seaTunnelErrorCode, errorMessage);
+    }
+
+    public FileConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
String errorMessage, Throwable cause) {
+        super(seaTunnelErrorCode, errorMessage, cause);
     }
 
-    public FilePluginException(String message, Throwable cause) {
-        super(message, cause);
+    public FileConnectorException(SeaTunnelErrorCode seaTunnelErrorCode, 
Throwable cause) {
+        super(seaTunnelErrorCode, cause);
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
index 1a57947a4..3f8a203d2 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSinkWriter.java
@@ -19,7 +19,9 @@ package org.apache.seatunnel.connectors.seatunnel.file.sink;
 
 import org.apache.seatunnel.api.sink.SinkWriter;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.WriteStrategy;
@@ -61,8 +63,9 @@ public class BaseFileSinkWriter implements 
SinkWriter<SeaTunnelRow, FileCommitIn
     public void write(SeaTunnelRow element) throws IOException {
         try {
             writeStrategy.write(element);
-        } catch (Exception e) {
-            throw new RuntimeException("Write data error, please check", e);
+        } catch (FileConnectorException e) {
+            String errorMsg = String.format("Write this data [%s] to file 
failed", element);
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
index c56171172..968df88df 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/TextFileSinkConfig.java
@@ -20,9 +20,11 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.sink.config;
 import static com.google.common.base.Preconditions.checkArgument;
 
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseTextFileConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.PartitionConfig;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -107,7 +109,8 @@ public class TextFileSinkConfig extends BaseTextFileConfig 
implements PartitionC
 
         if (this.isEnableTransaction &&
                 
!this.fileNameExpression.contains(BaseSinkConfig.TRANSACTION_EXPRESSION)) {
-            throw new RuntimeException("file_name_expression must contains " +
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    "file_name_expression must contains " +
                     BaseSinkConfig.TRANSACTION_EXPRESSION + " when 
is_enable_transaction is true");
         }
 
@@ -115,17 +118,20 @@ public class TextFileSinkConfig extends 
BaseTextFileConfig implements PartitionC
         if (!CollectionUtils.isEmpty(this.partitionFieldList)
             && (CollectionUtils.isEmpty(this.sinkColumnList) ||
                 !new 
HashSet<>(this.sinkColumnList).containsAll(this.partitionFieldList))) {
-            throw new RuntimeException("partition fields must in sink 
columns");
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    "partition fields must in sink columns");
         }
 
         if (!CollectionUtils.isEmpty(this.partitionFieldList) && 
!isPartitionFieldWriteInFile) {
             if (!this.sinkColumnList.removeAll(this.partitionFieldList)) {
-                throw new RuntimeException("remove partition field from sink 
columns error");
+                throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                        "remove partition field from sink columns error");
             }
         }
 
         if (CollectionUtils.isEmpty(this.sinkColumnList)) {
-            throw new RuntimeException("sink columns can not be empty");
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    "sink columns can not be empty");
         }
 
         Map<String, Integer> columnsMap = new 
HashMap<>(seaTunnelRowTypeInfo.getFieldNames().length);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
index dff9ba57e..a3e3aa1c3 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/util/FileSystemUtils.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink.util;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import lombok.NonNull;
 import lombok.extern.slf4j.Slf4j;
@@ -28,7 +30,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.net.URI;
 import java.util.ArrayList;
@@ -65,7 +66,8 @@ public class FileSystemUtils {
         FileSystem fileSystem = getFileSystem(filePath);
         Path path = new Path(filePath);
         if (!fileSystem.createNewFile(path)) {
-            throw new IOException("create file " + filePath + " error");
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                    "create file " + filePath + " error");
         }
     }
 
@@ -74,7 +76,8 @@ public class FileSystemUtils {
         Path path = new Path(file);
         if (fileSystem.exists(path)) {
             if (!fileSystem.delete(path, true)) {
-                throw new IOException("delete file " + file + " error");
+                throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                        "delete file " + file + " error");
             }
         }
     }
@@ -113,7 +116,8 @@ public class FileSystemUtils {
         if (fileSystem.rename(oldPath, newPath)) {
             log.info("rename file :[" + oldPath + "] to [" + newPath + "] 
finish");
         } else {
-            throw new IOException("rename file :[" + oldPath + "] to [" + 
newPath + "] error");
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                    "rename file :[" + oldPath + "] to [" + newPath + "] 
error");
         }
     }
 
@@ -121,7 +125,8 @@ public class FileSystemUtils {
         FileSystem fileSystem = getFileSystem(filePath);
         Path dfs = new Path(filePath);
         if (!fileSystem.mkdirs(dfs)) {
-            throw new IOException("create dir " + filePath + " error");
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                    "create dir " + filePath + " error");
         }
     }
 
@@ -134,7 +139,7 @@ public class FileSystemUtils {
     /**
      * get the dir in filePath
      */
-    public static List<Path> dirList(@NonNull String filePath) throws 
FileNotFoundException, IOException {
+    public static List<Path> dirList(@NonNull String filePath) throws 
IOException {
         FileSystem fileSystem = getFileSystem(filePath);
         List<Path> pathList = new ArrayList<>();
         if (!fileExist(filePath)) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 1eafcb36e..f6baf69cd 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -25,10 +25,12 @@ import static 
org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.Constants;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.VariablesSubstitute;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.state.FileSinkState;
@@ -222,7 +224,8 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
         try {
             FileSystemUtils.deleteFile(getTransactionDir(transactionId));
         } catch (IOException e) {
-            throw new RuntimeException("abort transaction " + transactionId + 
" error.", e);
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                    "Abort transaction " + transactionId + " error, delete 
transaction directory failed", e);
         }
     }
 
@@ -257,7 +260,9 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
                     .map(dir -> dir.replaceAll(jobDir, ""))
                     .collect(Collectors.toList());
         } catch (IOException e) {
-            throw new RuntimeException(e);
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                    String.format("Get transaction id from states failed," +
+                            "it seems that can not get directory list from 
[%s]", jobDir), e);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index d03a5ae6b..a53163c79 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -20,6 +20,8 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 import org.apache.seatunnel.format.json.JsonSerializationSchema;
@@ -63,8 +65,8 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
             }
             fsDataOutputStream.write(rowBytes);
         } catch (IOException e) {
-            log.error("write data to file {} error", filePath);
-            throw new RuntimeException(e);
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                    String.format("Write data to file [%s] failed", filePath), 
e);
         }
     }
 
@@ -74,16 +76,15 @@ public class JsonWriteStrategy extends 
AbstractWriteStrategy {
             try {
                 value.flush();
             } catch (IOException e) {
-                log.error("error when flush file {}", key);
-                throw new RuntimeException(e);
+                throw new 
FileConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+                        String.format("Flush data to this file [%s] failed", 
key), e);
             } finally {
                 try {
                     value.close();
                 } catch (IOException e) {
-                    log.error("error when close output stream {}", key, e);
+                    log.warn("Close file output stream {} failed", key, e);
                 }
             }
-
             needMoveFiles.put(key, getTargetLocation(key));
         });
     }
@@ -96,8 +97,8 @@ public class JsonWriteStrategy extends AbstractWriteStrategy {
                 beingWrittenOutputStream.put(filePath, fsDataOutputStream);
                 isFirstWrite.put(filePath, true);
             } catch (IOException e) {
-                log.error("can not get output file stream");
-                throw new RuntimeException(e);
+                throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                        String.format("Open file output stream [%s] failed", 
filePath), e);
             }
         }
         return fsDataOutputStream;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
index 42dedca14..116eeb5b8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/OrcWriteStrategy.java
@@ -24,6 +24,8 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
 
 import lombok.NonNull;
@@ -84,7 +86,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
             rowBatch.reset();
         } catch (IOException e) {
             String errorMsg = String.format("Write data to orc file [%s] 
error", filePath);
-            throw new RuntimeException(errorMsg, e);
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
         }
     }
 
@@ -95,7 +97,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy {
                 v.close();
             } catch (IOException e) {
                 String errorMsg = String.format("Close file [%s] orc writer 
failed, error msg: [%s]", k, e.getMessage());
-                throw new RuntimeException(errorMsg, e);
+                throw new 
FileConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
             }
             needMoveFiles.put(k, getTargetLocation(k));
         });
@@ -120,7 +122,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
                 return newWriter;
             } catch (IOException e) {
                 String errorMsg = String.format("Get orc writer for file [%s] 
error", filePath);
-                throw new RuntimeException(errorMsg, e);
+                throw new 
FileConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
             }
         }
         return writer;
@@ -172,7 +174,7 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
             case NULL:
             default:
                 String errorMsg = String.format("Orc file not support this 
type [%s]", type.getSqlType());
-                throw new UnsupportedOperationException(errorMsg);
+                throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
         }
     }
 
@@ -224,7 +226,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
                     setStructColumnVector(value, structColumnVector, row);
                     break;
                 default:
-                    throw new RuntimeException("Unexpected ColumnVector 
subtype " + vector.type);
+                    throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                            "Unsupported ColumnVector subtype" + vector.type);
             }
         }
     }
@@ -237,7 +240,9 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
                 setColumn(fields[i], structColumnVector.fields[i], row);
             }
         } else {
-            throw new RuntimeException("SeaTunnelRow type expected for field");
+            String errorMsg = String.format("SeaTunnelRow type expected for 
field, " +
+                    "not support this data type: [%s]", value.getClass());
+            throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
         }
 
     }
@@ -258,7 +263,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
                 ++i;
             }
         } else {
-            throw new RuntimeException("Map type expected for field");
+            String errorMsg = String.format("Map type expected for field, this 
field is [%s]", value.getClass());
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, 
errorMsg);
         }
     }
 
@@ -269,7 +275,9 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
         } else if (value instanceof List) {
             valueArray = ((List<?>) value).toArray();
         } else {
-            throw new RuntimeException("List and Array type expected for 
field");
+            String errorMsg = String.format("List and Array type expected for 
field, " +
+                    "this field is [%s]", value.getClass());
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, 
errorMsg);
         }
         listColumnVector.offsets[row] = listColumnVector.childCount;
         listColumnVector.lengths[row] = valueArray.length;
@@ -285,7 +293,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
         if (value instanceof BigDecimal) {
             decimalColumnVector.set(row, HiveDecimal.create((BigDecimal) 
value));
         } else {
-            throw new RuntimeException("BigDecimal type expected for field");
+            String errorMsg = String.format("BigDecimal type expected for 
field, this field is [%s]", value.getClass());
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, 
errorMsg);
         }
     }
 
@@ -297,7 +306,8 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
         } else if (value instanceof LocalTime) {
             timestampColumnVector.set(row, Timestamp.valueOf(((LocalTime) 
value).atDate(LocalDate.ofEpochDay(0))));
         } else {
-            throw new RuntimeException("Time series type expected for field");
+            String errorMsg = String.format("Time series type expected for 
field, this field is [%s]", value.getClass());
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, 
errorMsg);
         }
     }
 
@@ -319,7 +329,9 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
         } else if (value instanceof LocalDate) {
             longVector.vector[row] = ((LocalDate) 
value).getLong(ChronoField.EPOCH_DAY);
         } else {
-            throw new RuntimeException("Long or Integer type expected for 
field");
+            String errorMsg = String.format("Long or Integer type expected for 
field, " +
+                    "this field is [%s]", value.getClass());
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, 
errorMsg);
         }
     }
 
@@ -341,7 +353,9 @@ public class OrcWriteStrategy extends AbstractWriteStrategy 
{
             Float floatValue = (Float) value;
             doubleVector.vector[rowNum] = floatValue.doubleValue();
         } else {
-            throw new RuntimeException("Double or Float type expected for 
field ");
+            String errorMsg = String.format("Double or Float type expected for 
field, " +
+                    "this field is [%s]", value.getClass());
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, 
errorMsg);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
index fb9b139ad..0e04ce650 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/ParquetWriteStrategy.java
@@ -24,7 +24,9 @@ import org.apache.seatunnel.api.table.type.MapType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
 
 import lombok.NonNull;
@@ -102,7 +104,7 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
             writer.write(record);
         } catch (IOException e) {
             String errorMsg = String.format("Write data to file [%s] error", 
filePath);
-            throw new RuntimeException(errorMsg, e);
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
         }
     }
 
@@ -113,7 +115,7 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
                 v.close();
             } catch (IOException e) {
                 String errorMsg = String.format("Close file [%s] parquet 
writer failed, error msg: [%s]", k, e.getMessage());
-                throw new RuntimeException(errorMsg, e);
+                throw new 
FileConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
             }
             needMoveFiles.put(k, getTargetLocation(k));
         });
@@ -147,7 +149,7 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
                 return newWriter;
             } catch (IOException e) {
                 String errorMsg = String.format("Get parquet writer for file 
[%s] error", filePath);
-                throw new RuntimeException(errorMsg, e);
+                throw new 
FileConnectorException(CommonErrorCode.WRITER_OPERATION_FAILED, errorMsg, e);
             }
         }
         return writer;
@@ -197,8 +199,9 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
                 }
                 return recordBuilder.build();
             default:
-                String errorMsg = String.format("SeaTunnel file connector is 
not supported for this data type [%s]", seaTunnelDataType.getSqlType());
-                throw new UnsupportedOperationException(errorMsg);
+                String errorMsg = String.format("SeaTunnel file connector is 
not supported for this data type [%s]",
+                        seaTunnelDataType.getSqlType());
+                throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
         }
     }
 
@@ -278,8 +281,9 @@ public class ParquetWriteStrategy extends 
AbstractWriteStrategy {
                 return Types.optionalGroup().addFields(types).named(fieldName);
             case NULL:
             default:
-                String errorMsg = String.format("SeaTunnel file connector is 
not supported for this data type [%s]", seaTunnelDataType.getSqlType());
-                throw new UnsupportedOperationException(errorMsg);
+                String errorMsg = String.format("SeaTunnel file connector is 
not supported for this data type [%s]",
+                        seaTunnelDataType.getSqlType());
+                throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index e9579723d..3545acc88 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -20,9 +20,11 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 import org.apache.seatunnel.api.serialization.SerializationSchema;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.util.FileSystemUtils;
 import org.apache.seatunnel.format.text.TextSerializationSchema;
@@ -79,8 +81,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
             }
             
fsDataOutputStream.write(serializationSchema.serialize(seaTunnelRow));
         } catch (IOException e) {
-            log.error("write data to file {} error", filePath);
-            throw new RuntimeException(e);
+            throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                    String.format("Write data to file [%s] failed", filePath), 
e);
         }
     }
 
@@ -90,8 +92,8 @@ public class TextWriteStrategy extends AbstractWriteStrategy {
             try {
                 value.flush();
             } catch (IOException e) {
-                log.error("error when flush file {}", key);
-                throw new RuntimeException(e);
+                throw new 
FileConnectorException(CommonErrorCode.FLUSH_DATA_FAILED,
+                        String.format("Flush data to this file [%s] failed", 
key), e);
             } finally {
                 try {
                     value.close();
@@ -114,7 +116,8 @@ public class TextWriteStrategy extends 
AbstractWriteStrategy {
                 isFirstWrite.put(filePath, true);
             } catch (IOException e) {
                 log.error("can not get output file stream");
-                throw new RuntimeException(e);
+                throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED,
+                        String.format("Open file output stream [%s] failed", 
filePath), e);
             }
         }
         return fsDataOutputStream;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 55ac378db..c1370345c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import org.apache.hadoop.conf.Configuration;
 
@@ -44,9 +45,9 @@ public interface WriteStrategy extends Transaction, 
Serializable {
     /**
      * write seaTunnelRow to target datasource
      * @param seaTunnelRow seaTunnelRow
-     * @throws Exception Exceptions
+     * @throws FileConnectorException Exceptions
      */
-    void write(SeaTunnelRow seaTunnelRow) throws Exception;
+    void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException;
 
     /**
      * set seaTunnelRowTypeInfo in writer
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
index 92467ca92..03388cf58 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategyFactory.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.config.TextFileSinkConfig;
 
 import lombok.extern.slf4j.Slf4j;
@@ -33,7 +35,7 @@ public class WriteStrategyFactory {
             return fileFormat.getWriteStrategy(textFileSinkConfig);
         } catch (IllegalArgumentException e) {
             String errorMsg = String.format("File sink connector not support 
this file type [%s], please check your config", fileType);
-            throw new RuntimeException(errorMsg, e);
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, 
errorMsg);
         }
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
index 3b2088b4d..02c213377 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/BaseFileSourceReader.java
@@ -20,7 +20,9 @@ package org.apache.seatunnel.connectors.seatunnel.file.source;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.source.SourceReader;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ReadStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.split.FileSourceSplit;
 
@@ -59,7 +61,8 @@ public class BaseFileSourceReader implements 
SourceReader<SeaTunnelRow, FileSour
             try {
                 readStrategy.read(source.splitId(), output);
             } catch (Exception e) {
-                throw new RuntimeException("File source read error", e);
+                String errorMsg = String.format("Read data from this file [%s] 
failed", source.splitId());
+                throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg, e);
             }
         });
         context.signalNoMoreElement();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 94218250a..5dc3b3207 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -27,7 +27,6 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -93,7 +92,7 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
         return configuration;
     }
 
-    Configuration getConfiguration() throws FilePluginException {
+    Configuration getConfiguration() {
         return getConfiguration(hadoopConf);
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index e0ff36c14..68106604f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -21,8 +21,9 @@ import 
org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.format.json.JsonDeserializationSchema;
 
 import org.apache.hadoop.conf.Configuration;
@@ -49,7 +50,7 @@ public class JsonReadStrategy extends AbstractReadStrategy {
     }
 
     @Override
-    public void read(String path, Collector<SeaTunnelRow> output) throws 
Exception {
+    public void read(String path, Collector<SeaTunnelRow> output) throws 
FileConnectorException, IOException {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
         Path filePath = new Path(path);
@@ -66,14 +67,16 @@ public class JsonReadStrategy extends AbstractReadStrategy {
                     }
                     output.collect(seaTunnelRow);
                 } catch (IOException e) {
-                    throw new RuntimeException(e);
+                    String errorMsg = String.format("Read data from this file 
[%s] failed", filePath);
+                    throw new 
FileConnectorException(CommonErrorCode.FILE_OPERATION_FAILED, errorMsg);
                 }
             });
         }
     }
 
     @Override
-    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) throws FilePluginException {
-        throw new UnsupportedOperationException("User must defined schema for 
json file type");
+    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) throws FileConnectorException {
+        throw new FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                "User must defined schema for json file type");
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 52b2abdd2..032f91f5a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -27,8 +27,10 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.commons.lang3.tuple.Pair;
@@ -69,9 +71,10 @@ public class OrcReadStrategy extends AbstractReadStrategy {
     private static final long MIN_SIZE = 16 * 1024;
 
     @Override
-    public void read(String path, Collector<SeaTunnelRow> output) throws 
Exception {
+    public void read(String path, Collector<SeaTunnelRow> output) throws 
FileConnectorException, IOException {
         if (Boolean.FALSE.equals(checkFileType(path))) {
-            throw new Exception("Please check file type");
+            String errorMsg = String.format("This file [%s] is not a orc file, 
please check the format of this file", path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
         }
         Configuration configuration = getConfiguration();
         Path filePath = new Path(path);
@@ -113,7 +116,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
     }
 
     @Override
-    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) throws FilePluginException {
+    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) throws FileConnectorException {
         Configuration configuration = getConfiguration(hadoopConf);
         OrcFile.ReaderOptions readerOptions = 
OrcFile.readerOptions(configuration);
         Path dstDir = new Path(path);
@@ -129,7 +132,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
             seaTunnelRowTypeWithPartition = mergePartitionTypes(path, 
seaTunnelRowType);
             return getActualSeaTunnelRowTypeInfo();
         } catch (IOException e) {
-            throw new FilePluginException("Create OrcReader Fail", e);
+            String errorMsg = String.format("Create orc reader for this file 
[%s] failed", path);
+            throw new 
FileConnectorException(CommonErrorCode.READER_OPERATION_FAILED, errorMsg);
         }
     }
 
@@ -169,9 +173,9 @@ public class OrcReadStrategy extends AbstractReadStrategy {
             }
             in.close();
             return checkResult;
-        } catch (FilePluginException | IOException e) {
-            String errorMsg = String.format("Check orc file [%s] error", path);
-            throw new UnsupportedOperationException(errorMsg, e);
+        } catch (IOException e) {
+            String errorMsg = String.format("Check orc file [%s] failed", 
path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg, e);
         }
     }
 
@@ -227,7 +231,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                         return ArrayType.DOUBLE_ARRAY_TYPE;
                     default:
                         String errorMsg = String.format("SeaTunnel array type 
not supported this genericType [%s] yet", seaTunnelDataType);
-                        throw new UnsupportedOperationException(errorMsg);
+                        throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
                 }
             case MAP:
                 TypeDescription keyType = typeDescription.getChildren().get(0);
@@ -242,7 +246,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 // do nothing
                 // never get in there
                 String errorMsg = String.format("SeaTunnel file connector not 
supported this orc type [%s] yet", typeDescription.getCategory());
-                throw new UnsupportedOperationException(errorMsg);
+                throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
         }
     }
 
@@ -281,7 +285,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                     columnObj = readUnionVal(colVec, colType, rowNum);
                     break;
                 default:
-                    throw new UnsupportedOperationException("ReadColumn: 
unsupported ORC file column type: " + colVec.type.name());
+                    throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                            "ReadColumn: unsupported ORC file column type: " + 
colVec.type.name());
             }
         }
         return columnObj;
@@ -378,7 +383,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                 objMap.put(keyList[i], valueList[i]);
             }
         } else {
-            throw new UnsupportedOperationException("readMapVal: unsupported 
key or value types");
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    "readMapVal: unsupported key or value types");
         }
         return objMap;
     }
@@ -446,7 +452,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                     );
                 break;
             default:
-                throw new UnsupportedOperationException(mapVector.type.name() 
+ " is not supported for MapColumnVectors");
+                throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                        mapVector.type.name() + " is not supported for 
MapColumnVectors");
         }
         return mapList;
     }
@@ -463,10 +470,12 @@ public class OrcReadStrategy extends AbstractReadStrategy 
{
                 Object unionValue = readColumn(fieldVector, fieldType, rowNum);
                 columnValuePair = Pair.of(fieldType, unionValue);
             } else {
-                throw new UnsupportedOperationException("readUnionVal: union 
tag value out of range for union column vectors");
+                throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                        "readUnionVal: union tag value out of range for union 
column vectors");
             }
         } else {
-            throw new UnsupportedOperationException("readUnionVal: union tag 
value out of range for union types");
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    "readUnionVal: union tag value out of range for union 
types");
         }
         return columnValuePair;
     }
@@ -494,7 +503,8 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                     listValues = readTimestampListValues(listVector, 
childType, rowNum);
                     break;
                 default:
-                    throw new 
UnsupportedOperationException(listVector.type.name() + " is not supported for 
ListColumnVectors");
+                    throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                            listVector.type.name() + " is not supported for 
ListColumnVectors");
             }
         }
         return listValues;
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 212f79821..beddcec88 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -28,8 +28,10 @@ import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.api.table.type.SqlType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 import org.apache.avro.Conversions;
@@ -75,9 +77,10 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
     private static final long JULIAN_DAY_NUMBER_FOR_UNIX_EPOCH = 2440588;
 
     @Override
-    public void read(String path, Collector<SeaTunnelRow> output) throws 
Exception {
+    public void read(String path, Collector<SeaTunnelRow> output) throws 
FileConnectorException, IOException {
         if (Boolean.FALSE.equals(checkFileType(path))) {
-            throw new Exception("please check file type");
+            String errorMsg = String.format("This file [%s] is not a parquet 
file, please check the format of this file", path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
         }
         Path filePath = new Path(path);
         Map<String, String> partitionsMap = parsePartitionsByPath(path);
@@ -141,7 +144,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                         return origArray.toArray(TYPE_ARRAY_DOUBLE);
                     default:
                         String errorMsg = String.format("SeaTunnel array type 
not support this type [%s] now", fieldType.getSqlType());
-                        throw new UnsupportedOperationException(errorMsg);
+                        throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
                 }
             case MAP:
                 HashMap<Object, Object> dataMap = new HashMap<>();
@@ -193,12 +196,12 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
             default:
                 // do nothing
                 // never got in there
-                throw new UnsupportedOperationException("SeaTunnel not support 
this data type now");
+                throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, "SeaTunnel not 
support this data type now");
         }
     }
 
     @Override
-    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) throws FilePluginException {
+    public SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, 
String path) throws FileConnectorException {
         Path filePath = new Path(path);
         ParquetMetadata metadata;
         try {
@@ -207,7 +210,8 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
             metadata = reader.getFooter();
             reader.close();
         } catch (IOException e) {
-            throw new FilePluginException("Create parquet reader failed", e);
+            String errorMsg = String.format("Create parquet reader for this 
file [%s] failed", path);
+            throw new 
FileConnectorException(CommonErrorCode.READER_OPERATION_FAILED, errorMsg, e);
         }
         FileMetaData fileMetaData = metadata.getFileMetaData();
         MessageType schema = fileMetaData.getSchema();
@@ -242,7 +246,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                             return LocalTimeType.LOCAL_DATE_TYPE;
                         default:
                             String errorMsg = String.format("Not support this 
type [%s]", type);
-                            throw new UnsupportedOperationException(errorMsg);
+                            throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
                     }
                 case INT64:
                     if (type.asPrimitiveType().getOriginalType() == 
OriginalType.TIMESTAMP_MILLIS) {
@@ -276,7 +280,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                     return new DecimalType(precision, scale);
                 default:
                     String errorMsg = String.format("Not support this type 
[%s]", type);
-                    throw new UnsupportedOperationException(errorMsg);
+                    throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
             }
         } else {
             LogicalTypeAnnotation logicalTypeAnnotation = 
type.asGroupType().getLogicalTypeAnnotation();
@@ -326,10 +330,11 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                                 return ArrayType.DOUBLE_ARRAY_TYPE;
                             default:
                                 String errorMsg = String.format("SeaTunnel 
array type not supported this genericType [%s] yet", fieldType);
-                                throw new 
UnsupportedOperationException(errorMsg);
+                                throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE, errorMsg);
                         }
                     default:
-                        throw new UnsupportedOperationException("SeaTunnel 
file connector not support this nest type");
+                        throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_DATA_TYPE,
+                                "SeaTunnel file connector not support this 
nest type");
                 }
             }
         }
@@ -350,9 +355,9 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
             checkResult = Arrays.equals(magic, PARQUET_MAGIC);
             in.close();
             return checkResult;
-        } catch (FilePluginException | IOException e) {
-            String errorMsg = String.format("Check parquet file [%s] error", 
path);
-            throw new RuntimeException(errorMsg, e);
+        } catch (IOException e) {
+            String errorMsg = String.format("Check parquet file [%s] failed", 
path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index 3d1063c51..d8811a40c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
@@ -36,9 +36,9 @@ public interface ReadStrategy extends Serializable {
 
     Configuration getConfiguration(HadoopConf conf);
 
-    void read(String path, Collector<SeaTunnelRow> output) throws Exception;
+    void read(String path, Collector<SeaTunnelRow> output) throws IOException, 
FileConnectorException;
 
-    SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String 
path) throws FilePluginException;
+    SeaTunnelRowType getSeaTunnelRowTypeInfo(HadoopConf hadoopConf, String 
path) throws FileConnectorException;
 
     void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
index f12f2c2dc..9e9a47075 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategyFactory.java
@@ -17,7 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 
 import lombok.extern.slf4j.Slf4j;
 
@@ -32,7 +34,7 @@ public class ReadStrategyFactory {
             return fileFormat.getReadStrategy();
         } catch (IllegalArgumentException e) {
             String errorMsg = String.format("File source connector not support 
this file type [%s], please check your config", fileType);
-            throw new RuntimeException(errorMsg);
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT, 
errorMsg);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index e77273a7f..8df21c9d4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -28,7 +28,8 @@ import 
org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.format.text.TextDeserializationSchema;
 
 import org.apache.hadoop.conf.Configuration;
@@ -49,7 +50,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
     private TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
 
     @Override
-    public void read(String path, Collector<SeaTunnelRow> output) throws 
IOException, FilePluginException {
+    public void read(String path, Collector<SeaTunnelRow> output) throws 
FileConnectorException, IOException {
         Configuration conf = getConfiguration();
         FileSystem fs = FileSystem.get(conf);
         Path filePath = new Path(path);
@@ -66,8 +67,8 @@ public class TextReadStrategy extends AbstractReadStrategy {
                     }
                     output.collect(seaTunnelRow);
                 } catch (IOException e) {
-                    String errorMsg = String.format("Deserialize this data 
[%s] error, please check the origin data", line);
-                    throw new RuntimeException(errorMsg);
+                    String errorMsg = String.format("Deserialize this data 
[%s] failed, please check the origin data", line);
+                    throw new 
FileConnectorException(FileConnectorErrorCode.DATA_DESERIALIZE_FAILED, 
errorMsg, e);
                 }
             });
         }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
index 2f0446374..881b7c6c7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/sink/FtpFileSink.java
@@ -18,11 +18,13 @@
 package org.apache.seatunnel.connectors.seatunnel.file.ftp.sink;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
 import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -44,7 +46,9 @@ public class FtpFileSink extends BaseFileSink {
                 FtpConfig.FTP_HOST.key(), FtpConfig.FTP_PORT.key(),
                 FtpConfig.FTP_USERNAME.key(), FtpConfig.FTP_PASSWORD.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SINK, 
result.getMsg()));
         }
         super.prepare(pluginConfig);
         hadoopConf = FtpConf.buildWithConfig(pluginConfig);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
index 681cc6fb2..c17e13ba8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-ftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/ftp/source/FtpFileSource.java
@@ -18,15 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.file.ftp.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConf;
 import org.apache.seatunnel.connectors.seatunnel.file.ftp.config.FtpConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -52,11 +55,13 @@ public class FtpFileSource extends BaseFileSource {
                 FtpConfig.FTP_HOST.key(), FtpConfig.FTP_PORT.key(),
                 FtpConfig.FTP_USERNAME.key(), FtpConfig.FTP_PASSWORD.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
-        }
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));        }
         FileFormat fileFormat = 
FileFormat.valueOf(pluginConfig.getString(FtpConfig.FILE_TYPE.key()).toUpperCase());
         if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Ftp file source connector only support read [text, csv, json] files");
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    "Ftp file source connector only support read [text, csv, 
json] files");
         }
         readStrategy = 
ReadStrategyFactory.of(pluginConfig.getString(FtpConfig.FILE_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
@@ -65,7 +70,8 @@ public class FtpFileSource extends BaseFileSource {
         try {
             filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
         } catch (IOException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Check file path fail.");
+            String errorMsg = String.format("Get file list from this path [%s] 
failed", path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, 
e);
         }
         // support user-defined schema
         // only json type support user-defined schema now
@@ -83,16 +89,19 @@ public class FtpFileSource extends BaseFileSource {
                     break;
                 case ORC:
                 case PARQUET:
-                    throw new UnsupportedOperationException("SeaTunnel does 
not support user-defined schema for [parquet, orc] files");
+                    throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                            "SeaTunnel does not support user-defined schema 
for [parquet, orc] files");
                 default:
                     // never got in there
-                    throw new UnsupportedOperationException("SeaTunnel does 
not supported this file format");
+                    throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                            "SeaTunnel does not supported this file format");
             }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
-            } catch (FilePluginException e) {
-                throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "Read file schema error.", e);
+            } catch (FileConnectorException e) {
+                String errorMsg = String.format("Get table schema from file 
[%s] failed", filePaths.get(0));
+                throw new 
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
index 836610c36..925fb2539 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-local/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/local/source/LocalFileSource.java
@@ -18,15 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.file.local.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.local.config.LocalConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.local.source.config.LocalSourceConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -52,7 +55,9 @@ public class LocalFileSource extends BaseFileSource {
         CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
LocalSourceConfig.FILE_PATH.key(),
                 LocalSourceConfig.FILE_TYPE.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         readStrategy = 
ReadStrategyFactory.of(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
@@ -61,7 +66,8 @@ public class LocalFileSource extends BaseFileSource {
         try {
             filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
         } catch (IOException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Check file path fail.");
+            String errorMsg = String.format("Get file list from this path [%s] 
failed", path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, 
e);
         }
         // support user-defined schema
         FileFormat fileFormat = 
FileFormat.valueOf(pluginConfig.getString(LocalSourceConfig.FILE_TYPE.key()).toUpperCase());
@@ -80,16 +86,19 @@ public class LocalFileSource extends BaseFileSource {
                     break;
                 case ORC:
                 case PARQUET:
-                    throw new UnsupportedOperationException("SeaTunnel does 
not support user-defined schema for [parquet, orc] files");
+                    throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                            "SeaTunnel does not support user-defined schema 
for [parquet, orc] files");
                 default:
                     // never got in there
-                    throw new UnsupportedOperationException("SeaTunnel does 
not supported this file format");
+                    throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                            "SeaTunnel does not supported this file format");
             }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
-            } catch (FilePluginException e) {
-                throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "Read file schema error.", e);
+            } catch (FileConnectorException e) {
+                String errorMsg = String.format("Get table schema from file 
[%s] failed", filePaths.get(0));
+                throw new 
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
index dc7dc1bdc..08cf65757 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/sink/OssFileSink.java
@@ -18,11 +18,13 @@
 package org.apache.seatunnel.connectors.seatunnel.file.oss.sink;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
 import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -46,7 +48,9 @@ public class OssFileSink extends BaseFileSink {
                 OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(),
                 OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SINK, 
result.getMsg()));
         }
         hadoopConf = OssConf.buildWithConfig(pluginConfig);
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
index d8a98cd0c..452b2c065 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/source/OssFileSource.java
@@ -18,15 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.file.oss.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConf;
 import org.apache.seatunnel.connectors.seatunnel.file.oss.config.OssConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -52,7 +55,9 @@ public class OssFileSource extends BaseFileSource {
                 OssConfig.BUCKET.key(), OssConfig.ACCESS_KEY.key(),
                 OssConfig.ACCESS_SECRET.key(), OssConfig.BUCKET.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         readStrategy = 
ReadStrategyFactory.of(pluginConfig.getString(OssConfig.FILE_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
@@ -61,7 +66,8 @@ public class OssFileSource extends BaseFileSource {
         try {
             filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
         } catch (IOException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Check file path fail.");
+            String errorMsg = String.format("Get file list from this path [%s] 
failed", path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, 
e);
         }
         // support user-defined schema
         FileFormat fileFormat = 
FileFormat.valueOf(pluginConfig.getString(OssConfig.FILE_TYPE.key()).toUpperCase());
@@ -80,16 +86,19 @@ public class OssFileSource extends BaseFileSource {
                     break;
                 case ORC:
                 case PARQUET:
-                    throw new UnsupportedOperationException("SeaTunnel does 
not support user-defined schema for [parquet, orc] files");
+                    throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                            "SeaTunnel does not support user-defined schema 
for [parquet, orc] files");
                 default:
                     // never got in there
-                    throw new UnsupportedOperationException("SeaTunnel does 
not supported this file format");
+                    throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                            "SeaTunnel does not supported this file format");
             }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
-            } catch (FilePluginException e) {
-                throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "Read file schema error.", e);
+            } catch (FileConnectorException e) {
+                String errorMsg = String.format("Get table schema from file 
[%s] failed", filePaths.get(0));
+                throw new 
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
index e38e264a0..5f7622575 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/sink/S3FileSink.java
@@ -18,11 +18,13 @@
 package org.apache.seatunnel.connectors.seatunnel.file.s3.sink;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
 import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -45,7 +47,9 @@ public class S3FileSink extends BaseFileSink {
                 S3Config.FILE_PATH.key(), S3Config.S3_BUCKET.key(),
                 S3Config.S3_ACCESS_KEY.key(), S3Config.S3_SECRET_KEY.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SINK, 
result.getMsg()));
         }
         hadoopConf = S3Conf.buildWithConfig(pluginConfig);
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
index ca1792e73..46995347c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-s3/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/s3/source/S3FileSource.java
@@ -18,15 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.file.s3.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Conf;
 import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3Config;
 import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -51,7 +54,9 @@ public class S3FileSource extends BaseFileSource {
                 S3Config.FILE_PATH.key(), S3Config.FILE_TYPE.key(), 
S3Config.S3_BUCKET.key(),
                 S3Config.S3_ACCESS_KEY.key(), S3Config.S3_SECRET_KEY.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         readStrategy = 
ReadStrategyFactory.of(pluginConfig.getString(S3Config.FILE_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
@@ -60,7 +65,8 @@ public class S3FileSource extends BaseFileSource {
         try {
             filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
         } catch (IOException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Check file path fail.");
+            String errorMsg = String.format("Get file list from this path [%s] 
failed", path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, 
e);
         }
         // support user-defined schema
         FileFormat fileFormat = 
FileFormat.valueOf(pluginConfig.getString(S3Config.FILE_TYPE.key()).toUpperCase());
@@ -79,16 +85,19 @@ public class S3FileSource extends BaseFileSource {
                     break;
                 case ORC:
                 case PARQUET:
-                    throw new UnsupportedOperationException("SeaTunnel does 
not support user-defined schema for [parquet, orc] files");
+                    throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                            "SeaTunnel does not support user-defined schema 
for [parquet, orc] files");
                 default:
                     // never got in there
-                    throw new UnsupportedOperationException("SeaTunnel does 
not supported this file format");
+                    throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                            "SeaTunnel does not supported this file format");
             }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
-            } catch (FilePluginException e) {
-                throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "Read file schema error.", e);
+            } catch (FileConnectorException e) {
+                String errorMsg = String.format("Get table schema from file 
[%s] failed", filePaths.get(0));
+                throw new 
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
             }
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
index c6833bc0b..4fa76054a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/sink/SftpFileSink.java
@@ -18,11 +18,13 @@
 package org.apache.seatunnel.connectors.seatunnel.file.sftp.sink;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
 import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
@@ -44,7 +46,9 @@ public class SftpFileSink extends BaseFileSink {
                 SftpConfig.SFTP_HOST.key(), SftpConfig.SFTP_PORT.key(),
                 SftpConfig.SFTP_USERNAME.key(), 
SftpConfig.SFTP_PASSWORD.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SINK, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SINK, 
result.getMsg()));
         }
         super.prepare(pluginConfig);
         hadoopConf = SftpConf.buildWithConfig(pluginConfig);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
index d03028184..2b5c3cacb 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-sftp/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sftp/source/SftpFileSource.java
@@ -18,15 +18,18 @@
 package org.apache.seatunnel.connectors.seatunnel.file.sftp.source;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.common.exception.CommonErrorCode;
 import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FilePluginException;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
+import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConf;
 import org.apache.seatunnel.connectors.seatunnel.file.sftp.config.SftpConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.source.BaseFileSource;
@@ -52,11 +55,14 @@ public class SftpFileSource extends BaseFileSource {
                 SftpConfig.SFTP_HOST.key(), SftpConfig.SFTP_PORT.key(),
                 SftpConfig.SFTP_USERNAME.key(), 
SftpConfig.SFTP_PASSWORD.key());
         if (!result.isSuccess()) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
result.getMsg());
+            throw new 
FileConnectorException(SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
+                    String.format("PluginName: %s, PluginType: %s, Message: 
%s",
+                            getPluginName(), PluginType.SOURCE, 
result.getMsg()));
         }
         FileFormat fileFormat = 
FileFormat.valueOf(pluginConfig.getString(SftpConfig.FILE_TYPE.key()).toUpperCase());
         if (fileFormat == FileFormat.ORC || fileFormat == FileFormat.PARQUET) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Sftp file source connector only support read [text, csv, json] files");
+            throw new FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                    "Sftp file source connector only support read [text, csv, 
json] files");
         }
         readStrategy = 
ReadStrategyFactory.of(pluginConfig.getString(SftpConfig.FILE_TYPE.key()));
         readStrategy.setPluginConfig(pluginConfig);
@@ -65,7 +71,8 @@ public class SftpFileSource extends BaseFileSource {
         try {
             filePaths = readStrategy.getFileNamesByPath(hadoopConf, path);
         } catch (IOException e) {
-            throw new PrepareFailException(getPluginName(), PluginType.SOURCE, 
"Check file path fail.");
+            String errorMsg = String.format("Get file list from this path [%s] 
failed", path);
+            throw new 
FileConnectorException(FileConnectorErrorCode.FILE_LIST_GET_FAILED, errorMsg, 
e);
         }
         // support user-defined schema
         // only json csv text type support user-defined schema now
@@ -83,16 +90,19 @@ public class SftpFileSource extends BaseFileSource {
                     break;
                 case ORC:
                 case PARQUET:
-                    throw new UnsupportedOperationException("SeaTunnel does 
not support user-defined schema for [parquet, orc] files");
+                    throw new 
FileConnectorException(CommonErrorCode.UNSUPPORTED_OPERATION,
+                            "SeaTunnel does not support user-defined schema 
for [parquet, orc] files");
                 default:
                     // never got in there
-                    throw new UnsupportedOperationException("SeaTunnel does 
not supported this file format");
+                    throw new 
FileConnectorException(CommonErrorCode.ILLEGAL_ARGUMENT,
+                            "SeaTunnel does not supported this file format");
             }
         } else {
             try {
                 rowType = readStrategy.getSeaTunnelRowTypeInfo(hadoopConf, 
filePaths.get(0));
-            } catch (FilePluginException e) {
-                throw new PrepareFailException(getPluginName(), 
PluginType.SOURCE, "Read file schema error.", e);
+            } catch (FileConnectorException e) {
+                String errorMsg = String.format("Get table schema from file 
[%s] failed", filePaths.get(0));
+                throw new 
FileConnectorException(CommonErrorCode.TABLE_SCHEMA_GET_FAILED, errorMsg, e);
             }
         }
     }

Reply via email to