This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new efeed28aea [Improve][Connectors-v2] File sink refactor (#10587)
efeed28aea is described below

commit efeed28aeacf0e7568d0accbd2ec52db48eecb10
Author: Jarvis <[email protected]>
AuthorDate: Wed Mar 25 16:59:46 2026 +0800

    [Improve][Connectors-v2] File sink refactor (#10587)
---
 .../seatunnel/file/hdfs/sink/BaseHdfsFileSink.java |  76 -----------
 .../seatunnel/file/config/BaseFileSinkConfig.java  | 141 +++++----------------
 .../seatunnel/file/sink/BaseFileSink.java          |  58 ++++-----
 .../file/sink/BaseMultipleTableFileSink.java       |   2 +-
 .../seatunnel/file/sink/config/FileSinkConfig.java | 126 ++++++------------
 .../file/writer/CsvWriteStrategyTest.java          |   5 +-
 .../seatunnel/file/writer/FileSinkConfigTest.java  |  10 +-
 .../file/writer/OrcWriteStrategyTest.java          |   5 +-
 .../file/writer/ParquetWriteStrategyTest.java      |   5 +-
 .../seatunnel/file/cos/config/CosConf.java         |  16 +++
 .../seatunnel/file/cos/sink/CosFileSink.java       |  48 ++-----
 .../file/cos/sink/CosFileSinkFactory.java          |   7 +
 .../seatunnel/file/oss/jindo/config/OssConf.java   |  16 +++
 .../seatunnel/file/oss/jindo/sink/OssFileSink.java |  48 ++-----
 .../file/oss/jindo/sink/OssFileSinkFactory.java    |   7 +
 .../seatunnel/file/obs/config/ObsConf.java         |  11 ++
 .../seatunnel/file/obs/sink/ObsFileSink.java       |  48 ++-----
 .../file/obs/sink/ObsFileSinkFactory.java          |   7 +
 .../connectors/seatunnel/hive/sink/HiveSink.java   |   6 +-
 .../seatunnel/redshift/RedshiftJdbcClient.java     |  14 +-
 .../commit/S3RedshiftSinkAggregatedCommitter.java  |  10 +-
 ...nfigOptions.java => S3RedshiftSinkOptions.java} |   2 +-
 .../seatunnel/redshift/sink/S3RedshiftSink.java    |  49 ++-----
 ...hiftFactory.java => S3RedshiftSinkFactory.java} |  19 ++-
 .../e2e/sink/inmemory/InMemorySinkFactory.java     |   4 +-
 25 files changed, 251 insertions(+), 489 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
deleted file mode 100644
index 9304d9c360..0000000000
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/sink/BaseHdfsFileSink.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink;
-
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
-import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
-import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
-
-import java.util.Objects;
-
-import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_KEY;
-
-public abstract class BaseHdfsFileSink extends BaseFileSink {
-
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult result = CheckConfigUtil.checkAllExists(pluginConfig, 
FS_DEFAULT_NAME_KEY);
-        if (!result.isSuccess()) {
-            throw new FileConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        super.prepare(pluginConfig);
-        // Avoid overwriting hadoopConf for subclass initialization. If a 
subclass is initialized,
-        // it is not initialized here.
-        if (Objects.isNull(hadoopConf)) {
-            hadoopConf = new 
HadoopConf(pluginConfig.getString(FS_DEFAULT_NAME_KEY));
-        }
-        if (pluginConfig.hasPath(FileBaseSinkOptions.HDFS_SITE_PATH.key())) {
-            hadoopConf.setHdfsSitePath(
-                    
pluginConfig.getString(FileBaseSinkOptions.HDFS_SITE_PATH.key()));
-        }
-
-        if (pluginConfig.hasPath(FileBaseSinkOptions.REMOTE_USER.key())) {
-            
hadoopConf.setRemoteUser(pluginConfig.getString(FileBaseSinkOptions.REMOTE_USER.key()));
-        }
-
-        if (pluginConfig.hasPath(FileBaseSinkOptions.KRB5_PATH.key())) {
-            
hadoopConf.setKrb5Path(pluginConfig.getString(FileBaseSinkOptions.KRB5_PATH.key()));
-        }
-
-        if 
(pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key())) {
-            hadoopConf.setKerberosPrincipal(
-                    
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_PRINCIPAL.key()));
-        }
-        if 
(pluginConfig.hasPath(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key())) {
-            hadoopConf.setKerberosKeytabPath(
-                    
pluginConfig.getString(FileBaseSinkOptions.KERBEROS_KEYTAB_PATH.key()));
-        }
-    }
-}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
index b32711d413..b5f12771db 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSinkConfig.java
@@ -17,9 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
@@ -29,126 +27,57 @@ import lombok.NonNull;
 
 import java.io.File;
 import java.io.Serializable;
-import java.util.Locale;
 
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkNotNull;
 
 @Data
 public class BaseFileSinkConfig implements DelimiterConfig, Serializable {
     private static final long serialVersionUID = 1L;
-    protected CompressFormat compressFormat = 
FileBaseSinkOptions.COMPRESS_CODEC.defaultValue();
+    protected CompressFormat compressFormat;
     protected String fieldDelimiter;
-    protected int sheetMaxRows = 
FileBaseSinkOptions.SHEET_MAX_ROWS.defaultValue();
-    protected String rowDelimiter = 
FileBaseSinkOptions.ROW_DELIMITER.defaultValue();
-    protected int batchSize = FileBaseSinkOptions.BATCH_SIZE.defaultValue();
+    protected int sheetMaxRows;
+    protected String rowDelimiter;
+    protected int batchSize;
     protected String path;
-    protected String fileNameExpression = 
FileBaseSinkOptions.FILE_NAME_EXPRESSION.defaultValue();
-    protected boolean singleFileMode = 
FileBaseSinkOptions.SINGLE_FILE_MODE.defaultValue();
-    protected boolean createEmptyFileWhenNoData =
-            FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.defaultValue();
+    protected String fileNameExpression;
+    protected boolean singleFileMode;
+    protected boolean createEmptyFileWhenNoData;
     protected FileFormat fileFormat;
-    protected String filenameExtension = 
FileBaseSinkOptions.FILENAME_EXTENSION.defaultValue();
-    protected DateUtils.Formatter dateFormat = DateUtils.Formatter.YYYY_MM_DD;
-    protected DateTimeUtils.Formatter datetimeFormat = 
DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS;
-    protected TimeUtils.Formatter timeFormat = TimeUtils.Formatter.HH_MM_SS;
+    protected String filenameExtension;
+    protected DateUtils.Formatter dateFormat;
+    protected DateTimeUtils.Formatter datetimeFormat;
+    protected TimeUtils.Formatter timeFormat;
     protected Boolean enableHeaderWriter = false;
 
-    public BaseFileSinkConfig(@NonNull Config config) {
-        if (config.hasPath(FileBaseSinkOptions.COMPRESS_CODEC.key())) {
-            String compressCodec = 
config.getString(FileBaseSinkOptions.COMPRESS_CODEC.key());
-            this.compressFormat = 
CompressFormat.valueOf(compressCodec.toUpperCase());
-        }
-        if (config.hasPath(FileBaseSinkOptions.BATCH_SIZE.key())) {
-            this.batchSize = 
config.getInt(FileBaseSinkOptions.BATCH_SIZE.key());
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.SHEET_MAX_ROWS.key())
-                && StringUtils.isNotEmpty(
-                        
config.getString(FileBaseSinkOptions.SHEET_MAX_ROWS.key()))) {
-            this.sheetMaxRows = 
config.getInt(FileBaseSinkOptions.SHEET_MAX_ROWS.key());
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.ROW_DELIMITER.key())) {
-            this.rowDelimiter = 
config.getString(FileBaseSinkOptions.ROW_DELIMITER.key());
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.FILE_PATH.key())
-                && 
!StringUtils.isBlank(config.getString(FileBaseSinkOptions.FILE_PATH.key()))) {
-            this.path = config.getString(FileBaseSinkOptions.FILE_PATH.key());
-        }
+    public BaseFileSinkConfig(@NonNull ReadonlyConfig pluginConfig) {
+        this.compressFormat = 
pluginConfig.get(FileBaseSinkOptions.COMPRESS_CODEC);
+        this.batchSize = pluginConfig.get(FileBaseSinkOptions.BATCH_SIZE);
+        this.sheetMaxRows = 
pluginConfig.get(FileBaseSinkOptions.SHEET_MAX_ROWS);
+        this.rowDelimiter = 
pluginConfig.get(FileBaseSinkOptions.ROW_DELIMITER);
+        this.path = pluginConfig.get(FileBaseSinkOptions.FILE_PATH);
         checkNotNull(path);
-
         if (path.equals(File.separator)) {
             this.path = "";
         }
-
-        if (config.hasPath(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key())
-                && !StringUtils.isBlank(
-                        
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key()))) {
-            this.fileNameExpression =
-                    
config.getString(FileBaseSinkOptions.FILE_NAME_EXPRESSION.key());
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.SINGLE_FILE_MODE.key())) {
-            this.singleFileMode = 
config.getBoolean(FileBaseSinkOptions.SINGLE_FILE_MODE.key());
-        }
-
-        if 
(config.hasPath(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())) {
-            this.createEmptyFileWhenNoData =
-                    
config.getBoolean(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key());
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
-                && !StringUtils.isBlank(
-                        
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key()))) {
-            this.fileFormat =
-                    FileFormat.valueOf(
-                            
config.getString(FileBaseSinkOptions.FILE_FORMAT_TYPE.key())
-                                    .toUpperCase(Locale.ROOT));
-        } else {
-            // fall back to the default
-            this.fileFormat = 
FileBaseSinkOptions.FILE_FORMAT_TYPE.defaultValue();
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.FIELD_DELIMITER.key())
-                && StringUtils.isNotEmpty(
-                        
config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key()))) {
-            this.fieldDelimiter = 
config.getString(FileBaseSinkOptions.FIELD_DELIMITER.key());
+        this.fileNameExpression = 
pluginConfig.get(FileBaseSinkOptions.FILE_NAME_EXPRESSION);
+        this.singleFileMode = 
pluginConfig.get(FileBaseSinkOptions.SINGLE_FILE_MODE);
+        this.createEmptyFileWhenNoData =
+                
pluginConfig.get(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA);
+        this.fileFormat = 
pluginConfig.get(FileBaseSinkOptions.FILE_FORMAT_TYPE);
+        // if set, use user config, if not set, when format is csv, use "," 
otherwise use default
+        // delimiter
+        if 
(pluginConfig.getOptional(FileBaseSinkOptions.FIELD_DELIMITER).isPresent()) {
+            this.fieldDelimiter = 
pluginConfig.get(FileBaseSinkOptions.FIELD_DELIMITER);
+        } else if (FileFormat.CSV.equals(this.fileFormat)) {
+            this.fieldDelimiter = ",";
         } else {
-            if (FileFormat.CSV.equals(this.fileFormat)) {
-                this.fieldDelimiter = ",";
-            } else {
-                this.fieldDelimiter = 
FileBaseSinkOptions.FIELD_DELIMITER.defaultValue();
-            }
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.FILENAME_EXTENSION.key())
-                && !StringUtils.isBlank(
-                        
config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key()))) {
-            this.filenameExtension = 
config.getString(FileBaseSinkOptions.FILENAME_EXTENSION.key());
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.DATE_FORMAT_LEGACY.key())) {
-            dateFormat =
-                    DateUtils.Formatter.parse(
-                            
config.getString(FileBaseSinkOptions.DATE_FORMAT_LEGACY.key()));
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY.key())) {
-            datetimeFormat =
-                    DateTimeUtils.Formatter.parse(
-                            
config.getString(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY.key()));
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.TIME_FORMAT_LEGACY.key())) {
-            timeFormat =
-                    TimeUtils.Formatter.parse(
-                            
config.getString(FileBaseSinkOptions.TIME_FORMAT_LEGACY.key()));
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key())) {
-            enableHeaderWriter = 
config.getBoolean(FileBaseSinkOptions.ENABLE_HEADER_WRITE.key());
+            this.fieldDelimiter = 
FileBaseSinkOptions.FIELD_DELIMITER.defaultValue();
         }
+        this.filenameExtension = 
pluginConfig.get(FileBaseSinkOptions.FILENAME_EXTENSION);
+        this.dateFormat = 
pluginConfig.get(FileBaseSinkOptions.DATE_FORMAT_LEGACY);
+        this.datetimeFormat = 
pluginConfig.get(FileBaseSinkOptions.DATETIME_FORMAT_LEGACY);
+        this.timeFormat = 
pluginConfig.get(FileBaseSinkOptions.TIME_FORMAT_LEGACY);
+        this.enableHeaderWriter = 
pluginConfig.get(FileBaseSinkOptions.ENABLE_HEADER_WRITE);
     }
 
     public BaseFileSinkConfig() {}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 79c5e2424f..abe8e41ace 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -17,19 +17,15 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
 import org.apache.seatunnel.api.common.JobContext;
-import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DefaultSerializer;
 import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
-import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
-import org.apache.seatunnel.api.table.catalog.TablePath;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
@@ -46,22 +42,36 @@ import java.util.Optional;
 public abstract class BaseFileSink
         implements SeaTunnelSink<
                 SeaTunnelRow, FileSinkState, FileCommitInfo, 
FileAggregatedCommitInfo> {
-    protected SeaTunnelRowType seaTunnelRowType;
-    protected Config pluginConfig;
-    protected HadoopConf hadoopConf;
+    protected ReadonlyConfig pluginConfig;
+    protected CatalogTable catalogTable;
     protected FileSinkConfig fileSinkConfig;
+    protected HadoopConf hadoopConf;
     protected JobContext jobContext;
     protected String jobId;
 
+    public BaseFileSink(ReadonlyConfig pluginConfig, CatalogTable 
catalogTable) {
+        this.pluginConfig = pluginConfig;
+        this.catalogTable = catalogTable;
+        this.fileSinkConfig = new FileSinkConfig(pluginConfig, 
catalogTable.getSeaTunnelRowType());
+        this.hadoopConf = initHadoopConf();
+    }
+
+    protected abstract HadoopConf initHadoopConf();
+
+    @Override
+    public Optional<CatalogTable> getWriteCatalogTable() {
+        return Optional.of(catalogTable);
+    }
+
     public void preCheckConfig() {
-        if (pluginConfig.hasPath(FileBaseSinkOptions.SINGLE_FILE_MODE.key())
-                && 
pluginConfig.getBoolean(FileBaseSinkOptions.SINGLE_FILE_MODE.key())
+        if 
(pluginConfig.getOptional(FileBaseSinkOptions.SINGLE_FILE_MODE).isPresent()
+                && pluginConfig.get(FileBaseSinkOptions.SINGLE_FILE_MODE)
                 && jobContext.isEnableCheckpoint()) {
             throw new IllegalArgumentException(
                     "Single file mode is not supported when checkpoint is 
enabled or in streaming mode.");
         }
-        if 
(pluginConfig.hasPath(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())
-                && 
pluginConfig.getBoolean(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA.key())
+        if 
(pluginConfig.getOptional(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA).isPresent()
+                && 
pluginConfig.get(FileBaseSinkOptions.CREATE_EMPTY_FILE_WHEN_NO_DATA)
                 && !fileSinkConfig.getPartitionFieldList().isEmpty()) {
             throw new IllegalArgumentException(
                     "Generate empty file when no data is not supported when 
partition is enabled.");
@@ -75,12 +85,6 @@ public abstract class BaseFileSink
         preCheckConfig();
     }
 
-    @Override
-    public void setTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
-        this.fileSinkConfig = new FileSinkConfig(pluginConfig, 
seaTunnelRowType);
-    }
-
     @Override
     public SinkWriter<SeaTunnelRow, FileCommitInfo, FileSinkState> 
restoreWriter(
             SinkWriter.Context context, List<FileSinkState> states) {
@@ -114,24 +118,10 @@ public abstract class BaseFileSink
         return Optional.of(new DefaultSerializer<>());
     }
 
-    /**
-     * Use the pluginConfig to do some initialize operation.
-     *
-     * @param pluginConfig plugin config.
-     * @throws PrepareFailException if plugin prepare failed, the {@link 
PrepareFailException} will
-     *     throw.
-     */
-    @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        this.pluginConfig = pluginConfig;
-    }
-
     protected WriteStrategy createWriteStrategy() {
         WriteStrategy writeStrategy =
                 WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), 
fileSinkConfig);
-        writeStrategy.setCatalogTable(
-                CatalogTableUtil.getCatalogTable(
-                        "file", null, null, TablePath.DEFAULT.getTableName(), 
seaTunnelRowType));
+        writeStrategy.setCatalogTable(catalogTable);
         return writeStrategy;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index 8ad7f4ab2d..cd2f9b1f10 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -69,7 +69,7 @@ public abstract class BaseMultipleTableFileSink
         this.readonlyConfig = readonlyConfig;
         this.hadoopConf = hadoopConf;
         this.fileSinkConfig =
-                new FileSinkConfig(readonlyConfig.toConfig(), 
catalogTable.getSeaTunnelRowType());
+                new FileSinkConfig(readonlyConfig, 
catalogTable.getSeaTunnelRowType());
         this.catalogTable = catalogTable;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
index 8efa59e6ba..3a884c6e4b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/config/FileSinkConfig.java
@@ -17,9 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink.config;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseFileSinkConfig;
@@ -54,16 +54,15 @@ public class FileSinkConfig extends BaseFileSinkConfig 
implements PartitionConfi
 
     private String partitionDirExpression;
 
-    private boolean isPartitionFieldWriteInFile =
-            
FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.defaultValue();
+    private boolean isPartitionFieldWriteInFile;
 
-    private String tmpPath = FileBaseSinkOptions.TMP_PATH.defaultValue();
+    private String tmpPath;
 
-    private String fileNameTimeFormat = 
FileBaseSinkOptions.FILENAME_TIME_FORMAT.defaultValue();
+    private String fileNameTimeFormat;
 
-    private boolean isEnableTransaction = 
FileBaseSinkOptions.IS_ENABLE_TRANSACTION.defaultValue();
+    private boolean isEnableTransaction;
 
-    private String encoding = FileBaseSinkOptions.ENCODING.defaultValue();
+    private String encoding;
 
     // ---------------------generator by config params-------------------
 
@@ -75,32 +74,28 @@ public class FileSinkConfig extends BaseFileSinkConfig 
implements PartitionConfi
 
     private String sheetName;
 
-    private String xmlRootTag = 
FileBaseSinkOptions.XML_ROOT_TAG.defaultValue();
+    private String xmlRootTag;
 
-    private String xmlRowTag = FileBaseSinkOptions.XML_ROW_TAG.defaultValue();
+    private String xmlRowTag;
 
     private Boolean xmlUseAttrFormat;
 
-    private Boolean parquetWriteTimestampAsInt96 =
-            
FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.defaultValue();
+    private Boolean parquetWriteTimestampAsInt96;
 
-    private List<String> parquetAvroWriteFixedAsInt96 =
-            
FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.defaultValue();
+    private List<String> parquetAvroWriteFixedAsInt96;
 
-    private CsvStringQuoteMode csvStringQuoteMode =
-            FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.defaultValue();
+    private CsvStringQuoteMode csvStringQuoteMode;
 
-    private Boolean mergeUpdateEvent = 
FileBaseSinkOptions.MERGE_UPDATE_EVENT.defaultValue();
+    private Boolean mergeUpdateEvent;
 
-    public FileSinkConfig(@NonNull Config config, @NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo) {
-        super(config);
+    public FileSinkConfig(
+            @NonNull ReadonlyConfig pluginConfig, @NonNull SeaTunnelRowType 
seaTunnelRowTypeInfo) {
+        super(pluginConfig);
         checkArgument(
                 
!CollectionUtils.isEmpty(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames())));
 
-        if (config.hasPath(FileBaseSinkOptions.SINK_COLUMNS.key())
-                && !CollectionUtils.isEmpty(
-                        
config.getStringList(FileBaseSinkOptions.SINK_COLUMNS.key()))) {
-            this.sinkColumnList = 
config.getStringList(FileBaseSinkOptions.SINK_COLUMNS.key());
+        if 
(pluginConfig.getOptional(FileBaseSinkOptions.SINK_COLUMNS).isPresent()) {
+            this.sinkColumnList = 
pluginConfig.get(FileBaseSinkOptions.SINK_COLUMNS);
         }
 
         // if the config sink_columns is empty, all fields in 
SeaTunnelRowTypeInfo will being write
@@ -111,44 +106,27 @@ public class FileSinkConfig extends BaseFileSinkConfig 
implements PartitionConfi
                     new 
ArrayList<>(Arrays.asList(seaTunnelRowTypeInfo.getFieldNames()));
         }
 
-        if (config.hasPath(FileBaseSinkOptions.PARTITION_BY.key())) {
-            this.partitionFieldList = 
config.getStringList(FileBaseSinkOptions.PARTITION_BY.key());
+        if 
(pluginConfig.getOptional(FileBaseSinkOptions.PARTITION_BY).isPresent()) {
+            this.partitionFieldList = 
pluginConfig.get(FileBaseSinkOptions.PARTITION_BY);
         } else {
             this.partitionFieldList = Collections.emptyList();
         }
 
-        if (config.hasPath(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key())
+        if 
(pluginConfig.getOptional(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION).isPresent()
                 && !StringUtils.isBlank(
-                        
config.getString(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key()))) {
+                        
pluginConfig.get(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION))) {
             this.partitionDirExpression =
-                    
config.getString(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION.key());
+                    
pluginConfig.get(FileBaseSinkOptions.PARTITION_DIR_EXPRESSION);
         }
 
-        if 
(config.hasPath(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.key())) {
-            this.isPartitionFieldWriteInFile =
-                    
config.getBoolean(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE.key());
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.TMP_PATH.key())
-                && 
!StringUtils.isBlank(config.getString(FileBaseSinkOptions.TMP_PATH.key()))) {
-            this.tmpPath = 
config.getString(FileBaseSinkOptions.TMP_PATH.key());
-        }
-
-        if (config.hasPath(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key())
-                && !StringUtils.isBlank(
-                        
config.getString(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key()))) {
-            this.fileNameTimeFormat =
-                    
config.getString(FileBaseSinkOptions.FILENAME_TIME_FORMAT.key());
-        }
+        this.isPartitionFieldWriteInFile =
+                
pluginConfig.get(FileBaseSinkOptions.IS_PARTITION_FIELD_WRITE_IN_FILE);
 
-        if (config.hasPath(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key())) {
-            this.isEnableTransaction =
-                    
config.getBoolean(FileBaseSinkOptions.IS_ENABLE_TRANSACTION.key());
-        }
+        this.tmpPath = pluginConfig.get(FileBaseSinkOptions.TMP_PATH);
 
-        if (config.hasPath(FileBaseSinkOptions.ENCODING.key())) {
-            this.encoding = 
config.getString(FileBaseSinkOptions.ENCODING.key());
-        }
+        this.fileNameTimeFormat = 
pluginConfig.get(FileBaseSinkOptions.FILENAME_TIME_FORMAT);
+        this.isEnableTransaction = 
pluginConfig.get(FileBaseSinkOptions.IS_ENABLE_TRANSACTION);
+        this.encoding = pluginConfig.get(FileBaseSinkOptions.ENCODING);
 
         if (this.isEnableTransaction
                 && 
!this.fileNameExpression.contains(FileBaseSinkOptions.TRANSACTION_EXPRESSION)) {
@@ -204,60 +182,40 @@ public class FileSinkConfig extends BaseFileSinkConfig 
implements PartitionConfi
                             .collect(Collectors.toList());
         }
 
-        if (config.hasPath(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY.key())) {
-            this.maxRowsInMemory = 
config.getInt(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY.key());
+        if 
(pluginConfig.getOptional(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY).isPresent()) {
+            this.maxRowsInMemory = 
pluginConfig.get(FileBaseSinkOptions.MAX_ROWS_IN_MEMORY);
         }
 
-        if (config.hasPath(FileBaseSinkOptions.SHEET_NAME.key())) {
-            this.sheetName = 
config.getString(FileBaseSinkOptions.SHEET_NAME.key());
+        if 
(pluginConfig.getOptional(FileBaseSinkOptions.SHEET_NAME).isPresent()) {
+            this.sheetName = pluginConfig.get(FileBaseSinkOptions.SHEET_NAME);
         }
 
         if (FileFormat.XML.equals(this.fileFormat)) {
-            if 
(!config.hasPath(FileBaseSinkOptions.XML_USE_ATTR_FORMAT.key())) {
+            if 
(!pluginConfig.getOptional(FileBaseSinkOptions.XML_USE_ATTR_FORMAT).isPresent())
 {
                 throw new FileConnectorException(
                         CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
                         "User must define xml_use_attr_format when 
file_format_type is xml");
             }
 
-            this.xmlUseAttrFormat =
-                    
config.getBoolean(FileBaseSinkOptions.XML_USE_ATTR_FORMAT.key());
-
-            if (config.hasPath(FileBaseSinkOptions.XML_ROOT_TAG.key())) {
-                this.xmlRootTag = 
config.getString(FileBaseSinkOptions.XML_ROOT_TAG.key());
-            }
-
-            if (config.hasPath(FileBaseSinkOptions.XML_ROW_TAG.key())) {
-                this.xmlRowTag = 
config.getString(FileBaseSinkOptions.XML_ROW_TAG.key());
-            }
+            this.xmlUseAttrFormat = 
pluginConfig.get(FileBaseSinkOptions.XML_USE_ATTR_FORMAT);
+            this.xmlRootTag = 
pluginConfig.get(FileBaseSinkOptions.XML_ROOT_TAG);
+            this.xmlRowTag = pluginConfig.get(FileBaseSinkOptions.XML_ROW_TAG);
         }
 
         if (FileFormat.PARQUET.equals(this.fileFormat)) {
-            if 
(config.hasPath(FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.key()))
 {
-                this.parquetWriteTimestampAsInt96 =
-                        config.getBoolean(
-                                
FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96.key());
-            }
-            if 
(config.hasPath(FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.key())) {
-                this.parquetAvroWriteFixedAsInt96 =
-                        config.getStringList(
-                                
FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96.key());
-            }
+            this.parquetWriteTimestampAsInt96 =
+                    
pluginConfig.get(FileBaseSinkOptions.PARQUET_AVRO_WRITE_TIMESTAMP_AS_INT96);
+            this.parquetAvroWriteFixedAsInt96 =
+                    
pluginConfig.get(FileBaseSinkOptions.PARQUET_AVRO_WRITE_FIXED_AS_INT96);
         }
 
         if (FileFormat.CSV.equals(this.fileFormat)) {
-            if 
(config.hasPath(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.key())) {
-                this.csvStringQuoteMode =
-                        CsvStringQuoteMode.valueOf(
-                                
config.getString(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE.key()));
-            }
+            this.csvStringQuoteMode = 
pluginConfig.get(FileBaseSinkOptions.CSV_STRING_QUOTE_MODE);
         }
         if (FileFormat.DEBEZIUM_JSON.equals(this.fileFormat)
                 || FileFormat.CANAL_JSON.equals(this.fileFormat)
                 || FileFormat.MAXWELL_JSON.equals(this.fileFormat)) {
-            if (config.hasPath(FileBaseSinkOptions.MERGE_UPDATE_EVENT.key())) {
-                this.mergeUpdateEvent =
-                        
config.getBoolean(FileBaseSinkOptions.MERGE_UPDATE_EVENT.key());
-            }
+            this.mergeUpdateEvent = 
pluginConfig.get(FileBaseSinkOptions.MERGE_UPDATE_EVENT);
         }
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
index 7202da3e01..55e33e194c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/CsvWriteStrategyTest.java
@@ -19,6 +19,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.writer;
 
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -63,7 +64,7 @@ public class CsvWriteStrategyTest {
                             BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.INT_TYPE
                         });
         FileSinkConfig writeSinkConfig =
-                new FileSinkConfig(ConfigFactory.parseMap(writeConfig), 
writeRowType);
+                new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig), 
writeRowType);
         CsvWriteStrategy writeStrategy = new CsvWriteStrategy(writeSinkConfig);
         ParquetReadStrategyTest.LocalConf hadoopConf =
                 new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
@@ -126,7 +127,7 @@ public class CsvWriteStrategyTest {
                             BasicType.INT_TYPE, BasicType.STRING_TYPE, 
BasicType.INT_TYPE
                         });
         FileSinkConfig writeSinkConfig =
-                new FileSinkConfig(ConfigFactory.parseMap(writeConfig), 
writeRowType);
+                new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig), 
writeRowType);
         CsvWriteStrategy writeStrategy = new CsvWriteStrategy(writeSinkConfig);
         ParquetReadStrategyTest.LocalConf hadoopConf =
                 new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
index 16cf021c2c..db048b01b8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/FileSinkConfigTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.writer;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -46,7 +47,8 @@ public class FileSinkConfigTest {
                 new SeaTunnelRowType(
                         new String[] {"data", "ts"},
                         new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
BasicType.STRING_TYPE});
-        Assertions.assertDoesNotThrow(() -> new FileSinkConfig(config, 
rowType));
+        Assertions.assertDoesNotThrow(
+                () -> new FileSinkConfig(ReadonlyConfig.fromConfig(config), 
rowType));
     }
 
     @Test
@@ -60,7 +62,8 @@ public class FileSinkConfigTest {
                 new SeaTunnelRowType(
                         new String[] {"data", "ts"},
                         new SeaTunnelDataType[] {BasicType.STRING_TYPE, 
BasicType.STRING_TYPE});
-        Assertions.assertDoesNotThrow(() -> new FileSinkConfig(config, 
rowType));
+        Assertions.assertDoesNotThrow(
+                () -> new FileSinkConfig(ReadonlyConfig.fromConfig(config), 
rowType));
     }
 
     @Test
@@ -76,7 +79,8 @@ public class FileSinkConfigTest {
                         new SeaTunnelDataType[] {
                             BasicType.STRING_TYPE, BasicType.INT_TYPE, 
BasicType.STRING_TYPE
                         });
-        FileSinkConfig fileSinkConfig = new FileSinkConfig(config, 
seaTunnelRowTypeInfo);
+        FileSinkConfig fileSinkConfig =
+                new FileSinkConfig(ReadonlyConfig.fromConfig(config), 
seaTunnelRowTypeInfo);
         List<Integer> sinkColumnsIndexInRow = 
fileSinkConfig.getSinkColumnsIndexInRow();
         Assertions.assertEquals(
                 sinkColumnsIndexInRow.size(), 
seaTunnelRowTypeInfo.getFieldNames().length);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
index 7dbc8bd9de..fa179d5097 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/OrcWriteStrategyTest.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.writer;
 
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -64,7 +63,7 @@ public class OrcWriteStrategyTest {
                             BasicType.STRING_TYPE,
                         });
         FileSinkConfig writeSinkConfig =
-                new FileSinkConfig(ConfigFactory.parseMap(writeConfig), 
writeRowType);
+                new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig), 
writeRowType);
         OrcWriteStrategy writeStrategy = new OrcWriteStrategy(writeSinkConfig);
 
         OrcReadStrategyTest.LocalConf hadoopConf =
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
index e692d7294b..7fa6ced60d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
@@ -17,8 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.writer;
 
-import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Collector;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.BasicType;
@@ -79,7 +78,7 @@ public class ParquetWriteStrategyTest {
                             PrimitiveByteArrayType.INSTANCE
                         });
         FileSinkConfig writeSinkConfig =
-                new FileSinkConfig(ConfigFactory.parseMap(writeConfig), 
writeRowType);
+                new FileSinkConfig(ReadonlyConfig.fromMap(writeConfig), 
writeRowType);
         ParquetWriteStrategy writeStrategy = new 
ParquetWriteStrategy(writeSinkConfig);
         ParquetReadStrategyTest.LocalConf hadoopConf =
                 new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
index c85c755566..0f58007acd 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/config/CosConf.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.cos.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 
 import org.apache.hadoop.fs.CosNConfigKeys;
@@ -57,4 +58,19 @@ public class CosConf extends HadoopConf {
         hadoopConf.setExtraOptions(cosOptions);
         return hadoopConf;
     }
+
+    public static HadoopConf buildWithReadonlyConfig(ReadonlyConfig 
readonlyConfig) {
+        HadoopConf hadoopConf = new 
CosConf(readonlyConfig.get(CosFileBaseOptions.BUCKET));
+        HashMap<String, String> cosOptions = new HashMap<>();
+        cosOptions.put(
+                CosNConfigKeys.COSN_USERINFO_SECRET_ID_KEY,
+                readonlyConfig.get(CosFileBaseOptions.SECRET_ID));
+        cosOptions.put(
+                CosNConfigKeys.COSN_USERINFO_SECRET_KEY_KEY,
+                readonlyConfig.get(CosFileBaseOptions.SECRET_KEY));
+        cosOptions.put(
+                CosNConfigKeys.COSN_REGION_KEY, 
readonlyConfig.get(CosFileBaseOptions.REGION));
+        hadoopConf.setExtraOptions(cosOptions);
+        return hadoopConf;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
index da1ed22197..eb7a875a91 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSink.java
@@ -17,56 +17,26 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.cos.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.cos.config.CosFileSinkOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
-import com.google.auto.service.AutoService;
-
-import java.util.Optional;
-
-@AutoService(SeaTunnelSink.class)
 public class CosFileSink extends BaseFileSink {
-    @Override
-    public String getPluginName() {
-        return FileSystemType.COS.getFileSystemPluginName();
+
+    public CosFileSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) 
{
+        super(pluginConfig, catalogTable);
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        FileBaseOptions.FILE_PATH.key(),
-                        CosFileSinkOptions.REGION.key(),
-                        CosFileSinkOptions.SECRET_ID.key(),
-                        CosFileSinkOptions.SECRET_KEY.key(),
-                        CosFileSinkOptions.BUCKET.key());
-        if (!result.isSuccess()) {
-            throw new FileConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        hadoopConf = CosConf.buildWithConfig(pluginConfig);
+    protected HadoopConf initHadoopConf() {
+        return CosConf.buildWithReadonlyConfig(pluginConfig);
     }
 
     @Override
-    public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+    public String getPluginName() {
+        return FileSystemType.COS.getFileSystemPluginName();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
index bdf82d54f9..df4b78f833 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/sink/CosFileSinkFactory.java
@@ -18,8 +18,10 @@
 package org.apache.seatunnel.connectors.seatunnel.file.cos.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
@@ -110,4 +112,9 @@ public class CosFileSinkFactory implements TableSinkFactory 
{
                 .optional(FileBaseSinkOptions.TMP_PATH)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new CosFileSink(context.getOptions(), 
context.getCatalogTable());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/config/OssConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/config/OssConf.java
index 938a638484..8741087d31 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/config/OssConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/config/OssConf.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 
 import java.util.HashMap;
@@ -55,4 +56,19 @@ public class OssConf extends HadoopConf {
         hadoopConf.setExtraOptions(ossOptions);
         return hadoopConf;
     }
+
+    public static HadoopConf buildWithReadonlyConfig(ReadonlyConfig 
readonlyConfig) {
+        HadoopConf hadoopConf = new 
OssConf(readonlyConfig.get(OssFileBaseOptions.BUCKET));
+        HashMap<String, String> ossOptions = new HashMap<>();
+        ossOptions.put("fs.AbstractFileSystem.oss.impl", 
"com.aliyun.emr.fs.oss.OSS");
+        ossOptions.put("fs.oss.impl", 
"com.aliyun.emr.fs.oss.JindoOssFileSystem");
+        ossOptions.put("fs.oss.accessKeyId", 
readonlyConfig.get(OssFileBaseOptions.ACCESS_KEY));
+        ossOptions.put(
+                "fs.oss.accessKeySecret", 
readonlyConfig.get(OssFileBaseOptions.ACCESS_SECRET));
+        ossOptions.put("fs.oss.endpoint", 
readonlyConfig.get(OssFileBaseOptions.ENDPOINT));
+        ossOptions.put("fs.oss.upload.thread.concurrency", "20");
+        ossOptions.put("fs.oss.upload.queue.size", "100");
+        hadoopConf.setExtraOptions(ossOptions);
+        return hadoopConf;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
index 2405e34db2..f644fcac3c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSink.java
@@ -17,56 +17,26 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.config.OssFileSinkOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.exception.OssJindoConnectorException;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
-import com.google.auto.service.AutoService;
-
-import java.util.Optional;
-
-@AutoService(SeaTunnelSink.class)
 public class OssFileSink extends BaseFileSink {
-    @Override
-    public String getPluginName() {
-        return FileSystemType.OSS_JINDO.getFileSystemPluginName();
+
+    public OssFileSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) 
{
+        super(pluginConfig, catalogTable);
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        FileBaseOptions.FILE_PATH.key(),
-                        OssFileSinkOptions.ENDPOINT.key(),
-                        OssFileSinkOptions.ACCESS_KEY.key(),
-                        OssFileSinkOptions.ACCESS_SECRET.key(),
-                        OssFileSinkOptions.BUCKET.key());
-        if (!result.isSuccess()) {
-            throw new OssJindoConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        hadoopConf = OssConf.buildWithConfig(pluginConfig);
+    protected HadoopConf initHadoopConf() {
+        return OssConf.buildWithReadonlyConfig(pluginConfig);
     }
 
     @Override
-    public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+    public String getPluginName() {
+        return FileSystemType.OSS_JINDO.getFileSystemPluginName();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
index 41ced858e8..a4088712c7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/sink/OssFileSinkFactory.java
@@ -18,8 +18,10 @@
 package org.apache.seatunnel.connectors.seatunnel.file.oss.jindo.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
@@ -110,4 +112,9 @@ public class OssFileSinkFactory implements TableSinkFactory 
{
                 .optional(FileBaseSinkOptions.TMP_PATH)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new OssFileSink(context.getOptions(), 
context.getCatalogTable());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
index e331146e55..9ca85e10b4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/config/ObsConf.java
@@ -19,6 +19,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.obs.config;
 
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 
 import org.apache.hadoop.fs.obs.Constants;
@@ -53,4 +54,14 @@ public class ObsConf extends HadoopConf {
         hadoopConf.setExtraOptions(ossOptions);
         return hadoopConf;
     }
+
+    public static HadoopConf buildWithReadonlyConfig(ReadonlyConfig 
readonlyConfig) {
+        HadoopConf hadoopConf = new 
ObsConf(readonlyConfig.get(ObsFileBaseOptions.BUCKET));
+        HashMap<String, String> ossOptions = new HashMap<>();
+        ossOptions.put(Constants.ACCESS_KEY, 
readonlyConfig.get(ObsFileBaseOptions.ACCESS_KEY));
+        ossOptions.put(Constants.SECRET_KEY, 
readonlyConfig.get(ObsFileBaseOptions.ACCESS_SECRET));
+        ossOptions.put(Constants.ENDPOINT, 
readonlyConfig.get(ObsFileBaseOptions.ENDPOINT));
+        hadoopConf.setExtraOptions(ossOptions);
+        return hadoopConf;
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
index 6212213200..bba854b1a9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSink.java
@@ -17,56 +17,26 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.obs.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.table.catalog.CatalogTable;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileSystemType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsConf;
-import 
org.apache.seatunnel.connectors.seatunnel.file.obs.config.ObsFileSinkOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 
-import com.google.auto.service.AutoService;
-
-import java.util.Optional;
-
-@AutoService(SeaTunnelSink.class)
 public class ObsFileSink extends BaseFileSink {
-    @Override
-    public String getPluginName() {
-        return FileSystemType.OBS.getFileSystemPluginName();
+
+    public ObsFileSink(ReadonlyConfig pluginConfig, CatalogTable catalogTable) 
{
+        super(pluginConfig, catalogTable);
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        super.prepare(pluginConfig);
-        CheckResult result =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        FileBaseOptions.FILE_PATH.key(),
-                        ObsFileSinkOptions.BUCKET.key(),
-                        ObsFileSinkOptions.ACCESS_KEY.key(),
-                        ObsFileSinkOptions.ACCESS_SECRET.key(),
-                        ObsFileSinkOptions.BUCKET.key());
-        if (!result.isSuccess()) {
-            throw new FileConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
result.getMsg()));
-        }
-        hadoopConf = ObsConf.buildWithConfig(pluginConfig);
+    protected HadoopConf initHadoopConf() {
+        return ObsConf.buildWithReadonlyConfig(pluginConfig);
     }
 
     @Override
-    public Optional<CatalogTable> getWriteCatalogTable() {
-        return super.getWriteCatalogTable();
+    public String getPluginName() {
+        return FileSystemType.OBS.getFileSystemPluginName();
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
index a1cf8354ae..27c322e682 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/sink/ObsFileSinkFactory.java
@@ -18,8 +18,10 @@
 package org.apache.seatunnel.connectors.seatunnel.file.obs.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
@@ -92,4 +94,9 @@ public class ObsFileSinkFactory implements TableSinkFactory {
                 .optional(FileBaseSinkOptions.FILENAME_EXTENSION)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new ObsFileSink(context.getOptions(), 
context.getCatalogTable());
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index d693e98c9b..772acef2ef 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -151,7 +151,8 @@ public class HiveSink
                                     ConfigValueFactory.fromAnyRef(
                                             
getDefaultTableLocation(readonlyConfig)));
 
-            return new FileSinkConfig(pluginConfig, 
catalogTable.getSeaTunnelRowType());
+            return new FileSinkConfig(
+                    ReadonlyConfig.fromConfig(pluginConfig), 
catalogTable.getSeaTunnelRowType());
         }
 
         List<String> sinkFields =
@@ -220,7 +221,8 @@ public class HiveSink
                             ConfigValueFactory.fromAnyRef("${transactionId}"));
         }
 
-        return new FileSinkConfig(pluginConfig, 
catalogTable.getSeaTunnelRowType());
+        return new FileSinkConfig(
+                ReadonlyConfig.fromConfig(pluginConfig), 
catalogTable.getSeaTunnelRowType());
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java
 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java
index 19b48f72b4..2dfc1ab448 100644
--- 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java
+++ 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/RedshiftJdbcClient.java
@@ -17,10 +17,9 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redshift;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
-import 
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
 
 import java.sql.Connection;
@@ -36,7 +35,7 @@ public class RedshiftJdbcClient {
 
     private final Connection connection;
 
-    public static RedshiftJdbcClient getInstance(Config config)
+    public static RedshiftJdbcClient getInstance(ReadonlyConfig config)
             throws S3RedshiftJdbcConnectorException {
         if (INSTANCE == null) {
             synchronized (RedshiftJdbcClient.class) {
@@ -45,10 +44,9 @@ public class RedshiftJdbcClient {
                     try {
                         INSTANCE =
                                 new RedshiftJdbcClient(
-                                        
config.getString(S3RedshiftConfigOptions.JDBC_URL.key()),
-                                        
config.getString(S3RedshiftConfigOptions.JDBC_USER.key()),
-                                        config.getString(
-                                                
S3RedshiftConfigOptions.JDBC_PASSWORD.key()));
+                                        
config.get(S3RedshiftSinkOptions.JDBC_URL),
+                                        
config.get(S3RedshiftSinkOptions.JDBC_USER),
+                                        
config.get(S3RedshiftSinkOptions.JDBC_PASSWORD));
                     } catch (SQLException | ClassNotFoundException e) {
                         throw new S3RedshiftJdbcConnectorException(
                                 CommonErrorCodeDeprecated.SQL_OPERATION_FAILED,
diff --git 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
index 659282a924..a309b8c98c 100644
--- 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
+++ 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/commit/S3RedshiftSinkAggregatedCommitter.java
@@ -17,15 +17,15 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redshift.commit;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.org.apache.commons.lang3.StringUtils;
 
+import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileSinkAggregatedCommitter;
 import org.apache.seatunnel.connectors.seatunnel.redshift.RedshiftJdbcClient;
-import 
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftConnectorErrorCode;
 import 
org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
 
@@ -43,12 +43,12 @@ public class S3RedshiftSinkAggregatedCommitter extends 
FileSinkAggregatedCommitt
 
     private final String executeSql;
 
-    private Config pluginConfig;
+    private final ReadonlyConfig pluginConfig;
 
-    public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf, Config 
pluginConfig) {
+    public S3RedshiftSinkAggregatedCommitter(HadoopConf hadoopConf, 
ReadonlyConfig pluginConfig) {
         super(hadoopConf);
         this.pluginConfig = pluginConfig;
-        this.executeSql = 
pluginConfig.getString(S3RedshiftConfigOptions.EXECUTE_SQL.key());
+        this.executeSql = pluginConfig.get(S3RedshiftSinkOptions.EXECUTE_SQL);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftConfigOptions.java
 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftSinkOptions.java
similarity index 96%
rename from 
seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftConfigOptions.java
rename to 
seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftSinkOptions.java
index f1a8240703..8a83a8167d 100644
--- 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftConfigOptions.java
+++ 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/config/S3RedshiftSinkOptions.java
@@ -21,7 +21,7 @@ import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.Options;
 import 
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileBaseOptions;
 
-public class S3RedshiftConfigOptions extends S3FileBaseOptions {
+public class S3RedshiftSinkOptions extends S3FileBaseOptions {
 
     public static final Option<String> JDBC_URL =
             Options.key("jdbc_url")
diff --git 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
index 88bdeb9c7f..b56a9a9a7f 100644
--- 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
+++ 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSink.java
@@ -17,57 +17,32 @@
 
 package org.apache.seatunnel.connectors.seatunnel.redshift.sink;
 
-import org.apache.seatunnel.shade.com.typesafe.config.Config;
-
-import org.apache.seatunnel.api.common.PrepareFailException;
-import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
-import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
-import org.apache.seatunnel.common.config.CheckConfigUtil;
-import org.apache.seatunnel.common.config.CheckResult;
-import org.apache.seatunnel.common.constants.PluginType;
-import 
org.apache.seatunnel.connectors.seatunnel.file.hdfs.sink.BaseHdfsFileSink;
-import 
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileBaseOptions;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3HadoopConf;
+import org.apache.seatunnel.connectors.seatunnel.file.sink.BaseFileSink;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileAggregatedCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.file.sink.commit.FileCommitInfo;
 import 
org.apache.seatunnel.connectors.seatunnel.redshift.commit.S3RedshiftSinkAggregatedCommitter;
-import 
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfigOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.redshift.exception.S3RedshiftJdbcConnectorException;
-
-import com.google.auto.service.AutoService;
 
 import java.util.Optional;
 
-@AutoService(SeaTunnelSink.class)
-public class S3RedshiftSink extends BaseHdfsFileSink {
+public class S3RedshiftSink extends BaseFileSink {
+
+    public S3RedshiftSink(ReadonlyConfig pluginConfig, CatalogTable 
catalogTable) {
+        super(pluginConfig, catalogTable);
+    }
 
     @Override
-    public String getPluginName() {
-        return "S3Redshift";
+    protected HadoopConf initHadoopConf() {
+        return S3HadoopConf.buildWithReadOnlyConfig(pluginConfig);
     }
 
     @Override
-    public void prepare(Config pluginConfig) throws PrepareFailException {
-        CheckResult checkResult =
-                CheckConfigUtil.checkAllExists(
-                        pluginConfig,
-                        S3FileBaseOptions.S3_BUCKET.key(),
-                        S3FileBaseOptions.S3A_AWS_CREDENTIALS_PROVIDER.key(),
-                        S3RedshiftConfigOptions.JDBC_URL.key(),
-                        S3RedshiftConfigOptions.JDBC_USER.key(),
-                        S3RedshiftConfigOptions.JDBC_PASSWORD.key(),
-                        S3RedshiftConfigOptions.EXECUTE_SQL.key());
-        if (!checkResult.isSuccess()) {
-            throw new S3RedshiftJdbcConnectorException(
-                    SeaTunnelAPIErrorCode.CONFIG_VALIDATION_FAILED,
-                    String.format(
-                            "PluginName: %s, PluginType: %s, Message: %s",
-                            getPluginName(), PluginType.SINK, 
checkResult.getMsg()));
-        }
-        this.pluginConfig = pluginConfig;
-        hadoopConf = 
S3HadoopConf.buildWithReadOnlyConfig(ReadonlyConfig.fromConfig(pluginConfig));
+    public String getPluginName() {
+        return "S3Redshift";
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSinkFactory.java
similarity index 83%
rename from 
seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
rename to 
seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSinkFactory.java
index 816fef5c6e..bbd7f66cda 100644
--- 
a/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftFactory.java
+++ 
b/seatunnel-connectors-v2/connector-s3-redshift/src/main/java/org/apache/seatunnel/connectors/seatunnel/redshift/sink/S3RedshiftSinkFactory.java
@@ -18,18 +18,20 @@
 package org.apache.seatunnel.connectors.seatunnel.redshift.sink;
 
 import org.apache.seatunnel.api.configuration.util.OptionRule;
+import org.apache.seatunnel.api.table.connector.TableSink;
 import org.apache.seatunnel.api.table.factory.Factory;
 import org.apache.seatunnel.api.table.factory.TableSinkFactory;
+import org.apache.seatunnel.api.table.factory.TableSinkFactoryContext;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSinkOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.FileBaseSourceOptions;
 import org.apache.seatunnel.connectors.seatunnel.file.config.FileFormat;
 import 
org.apache.seatunnel.connectors.seatunnel.file.s3.config.S3FileBaseOptions;
-import 
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftConfigOptions;
+import 
org.apache.seatunnel.connectors.seatunnel.redshift.config.S3RedshiftSinkOptions;
 
 import com.google.auto.service.AutoService;
 
 @AutoService(Factory.class)
-public class S3RedshiftFactory implements TableSinkFactory {
+public class S3RedshiftSinkFactory implements TableSinkFactory {
 
     @Override
     public String factoryIdentifier() {
@@ -41,10 +43,10 @@ public class S3RedshiftFactory implements TableSinkFactory {
         return OptionRule.builder()
                 .required(
                         S3FileBaseOptions.S3_BUCKET,
-                        S3RedshiftConfigOptions.JDBC_URL,
-                        S3RedshiftConfigOptions.JDBC_USER,
-                        S3RedshiftConfigOptions.JDBC_PASSWORD,
-                        S3RedshiftConfigOptions.EXECUTE_SQL,
+                        S3RedshiftSinkOptions.JDBC_URL,
+                        S3RedshiftSinkOptions.JDBC_USER,
+                        S3RedshiftSinkOptions.JDBC_PASSWORD,
+                        S3RedshiftSinkOptions.EXECUTE_SQL,
                         FileBaseSourceOptions.FILE_PATH,
                         S3FileBaseOptions.S3A_AWS_CREDENTIALS_PROVIDER)
                 .conditional(
@@ -71,4 +73,9 @@ public class S3RedshiftFactory implements TableSinkFactory {
                 .optional(FileBaseSinkOptions.FILE_NAME_EXPRESSION)
                 .build();
     }
+
+    @Override
+    public TableSink createSink(TableSinkFactoryContext context) {
+        return () -> new S3RedshiftSink(context.getOptions(), 
context.getCatalogTable());
+    }
 }
diff --git 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
index e8722bb74b..674d69584c 100644
--- 
a/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
+++ 
b/seatunnel-e2e/seatunnel-e2e-common/src/test/java/org/apache/seatunnel/e2e/sink/inmemory/InMemorySinkFactory.java
@@ -30,6 +30,7 @@ import com.google.auto.service.AutoService;
 
 import java.util.List;
 
+import static 
org.apache.seatunnel.api.options.SinkConnectorCommonOptions.MULTI_TABLE_SINK_REPLICA;
 import static 
org.apache.seatunnel.shade.com.google.common.base.Preconditions.checkArgument;
 
 @AutoService(Factory.class)
@@ -73,7 +74,8 @@ public class InMemorySinkFactory
                         CHECKPOINT_SLEEP,
                         THROW_EXCEPTION_OF_COMMITTER,
                         ASSERT_OPTIONS_KEY,
-                        ASSERT_OPTIONS_VALUE)
+                        ASSERT_OPTIONS_VALUE,
+                        MULTI_TABLE_SINK_REPLICA)
                 .build();
     }
 

Reply via email to