This is an automated email from the ASF dual-hosted git repository.

lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 6b5f74e524 [Improve][Connector-V2] Change File Read/WriteStrategy 
`setSeaTunnelRowTypeInfo` to `setCatalogTable` (#7829)
6b5f74e524 is described below

commit 6b5f74e5247b8b7464b740e0f3cffe0ab2080db9
Author: Jia Fan <[email protected]>
AuthorDate: Wed Oct 16 23:34:23 2024 +0800

    [Improve][Connector-V2] Change File Read/WriteStrategy 
`setSeaTunnelRowTypeInfo` to `setCatalogTable` (#7829)
---
 .../file/hdfs/source/BaseHdfsFileSource.java        |  8 ++++----
 .../seatunnel/file/config/BaseFileSourceConfig.java |  2 +-
 .../seatunnel/file/sink/BaseFileSink.java           |  6 +++++-
 .../file/sink/BaseMultipleTableFileSink.java        |  2 +-
 .../file/sink/writer/AbstractWriteStrategy.java     |  7 ++++---
 .../file/sink/writer/BinaryWriteStrategy.java       |  8 ++++----
 .../file/sink/writer/JsonWriteStrategy.java         | 10 ++++++----
 .../file/sink/writer/TextWriteStrategy.java         |  9 +++++----
 .../seatunnel/file/sink/writer/WriteStrategy.java   |  8 ++++----
 .../file/source/reader/AbstractReadStrategy.java    |  7 ++++---
 .../file/source/reader/ExcelReadStrategy.java       | 17 +++++++++--------
 .../file/source/reader/JsonReadStrategy.java        |  5 +++--
 .../seatunnel/file/source/reader/ReadStrategy.java  |  4 ++--
 .../file/source/reader/TextReadStrategy.java        | 16 +++++++++-------
 .../file/source/reader/XmlReadStrategy.java         | 21 +++++++++++----------
 .../file/writer/ExcelReadStrategyTest.java          |  7 +++----
 .../file/writer/ParquetWriteStrategyTest.java       |  4 +++-
 .../file/writer/ReadStrategyEncodingTest.java       | 11 +++++------
 .../seatunnel/file/writer/XmlReadStrategyTest.java  |  7 +++----
 .../seatunnel/file/cos/source/CosFileSource.java    |  8 ++++----
 .../file/oss/jindo/source/OssFileSource.java        |  8 ++++----
 .../seatunnel/file/obs/source/ObsFileSource.java    |  8 ++++----
 .../connectors/seatunnel/hive/sink/HiveSink.java    |  2 +-
 .../hive/source/config/HiveSourceConfig.java        |  4 +++-
 24 files changed, 102 insertions(+), 87 deletions(-)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 9af2721e22..2b71980935 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -109,9 +109,9 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
                 case JSON:
                 case EXCEL:
                 case XML:
-                    SeaTunnelRowType userDefinedSchema =
-                            
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    CatalogTable userDefinedCatalogTable =
+                            CatalogTableUtil.buildWithConfig(pluginConfig);
+                    readStrategy.setCatalogTable(userDefinedCatalogTable);
                     rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
                     break;
                 case ORC:
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
index 373ada564a..10b969b008 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
@@ -95,7 +95,7 @@ public abstract class BaseFileSourceConfig implements 
Serializable {
             case JSON:
             case EXCEL:
             case XML:
-                
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+                readStrategy.setCatalogTable(catalogTable);
                 return newCatalogTable(catalogTable, 
readStrategy.getActualSeaTunnelRowTypeInfo());
             case ORC:
             case PARQUET:
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 6686da9880..af6003c79c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -26,6 +26,8 @@ import org.apache.seatunnel.api.serialization.Serializer;
 import org.apache.seatunnel.api.sink.SeaTunnelSink;
 import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
 import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
@@ -110,7 +112,9 @@ public abstract class BaseFileSink
     protected WriteStrategy createWriteStrategy() {
         WriteStrategy writeStrategy =
                 WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), 
fileSinkConfig);
-        writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+        writeStrategy.setCatalogTable(
+                CatalogTableUtil.getCatalogTable(
+                        "file", null, null, TablePath.DEFAULT.getTableName(), 
seaTunnelRowType));
         return writeStrategy;
     }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index a48368be44..b35c113f8d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -112,7 +112,7 @@ public abstract class BaseMultipleTableFileSink
     protected WriteStrategy createWriteStrategy() {
         WriteStrategy writeStrategy =
                 WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), 
fileSinkConfig);
-        
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+        writeStrategy.setCatalogTable(catalogTable);
         return writeStrategy;
     }
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 68476488a5..dd49c7f2d0 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -17,6 +17,7 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -150,11 +151,11 @@ public abstract class AbstractWriteStrategy implements 
WriteStrategy {
     /**
      * set seaTunnelRowTypeInfo in writer
      *
-     * @param seaTunnelRowType seaTunnelRowType
+     * @param catalogTable seaTunnelRowType
      */
     @Override
-    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public void setCatalogTable(CatalogTable catalogTable) {
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
     }
 
     /**
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
index 7f496b2927..06d05d6250 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
@@ -46,9 +46,9 @@ public class BinaryWriteStrategy extends 
AbstractWriteStrategy {
     }
 
     @Override
-    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
-        if (!seaTunnelRowType.equals(BinaryReadStrategy.binaryRowType)) {
+    public void setCatalogTable(CatalogTable catalogTable) {
+        super.setCatalogTable(catalogTable);
+        if 
(!catalogTable.getSeaTunnelRowType().equals(BinaryReadStrategy.binaryRowType)) {
             throw new FileConnectorException(
                     FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
                     "BinaryWriteStrategy only supports binary format, please 
read file with `BINARY` format, and do not change schema in the transform.");
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index f95973f4cf..23fb7893a8 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -18,8 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.common.utils.EncodingUtils;
@@ -55,11 +55,13 @@ public class JsonWriteStrategy extends 
AbstractWriteStrategy {
     }
 
     @Override
-    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+    public void setCatalogTable(CatalogTable catalogTable) {
+        super.setCatalogTable(catalogTable);
         this.serializationSchema =
                 new JsonSerializationSchema(
-                        buildSchemaWithRowType(seaTunnelRowType, 
sinkColumnsIndexInRow), charset);
+                        buildSchemaWithRowType(
+                                catalogTable.getSeaTunnelRowType(), 
sinkColumnsIndexInRow),
+                        charset);
     }
 
     @Override
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 621048fb39..77e2eb5c5b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -18,8 +18,8 @@
 package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 
 import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonError;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
@@ -71,12 +71,13 @@ public class TextWriteStrategy extends 
AbstractWriteStrategy {
     }
 
     @Override
-    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+    public void setCatalogTable(CatalogTable catalogTable) {
+        super.setCatalogTable(catalogTable);
         this.serializationSchema =
                 TextSerializationSchema.builder()
                         .seaTunnelRowType(
-                                buildSchemaWithRowType(seaTunnelRowType, 
sinkColumnsIndexInRow))
+                                buildSchemaWithRowType(
+                                        catalogTable.getSeaTunnelRowType(), 
sinkColumnsIndexInRow))
                         .delimiter(fieldDelimiter)
                         .dateFormatter(dateFormat)
                         .dateTimeFormatter(dateTimeFormat)
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 6a1b1840b4..24b23c9bfc 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -17,8 +17,8 @@
 
 package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
 
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
 import 
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
@@ -56,11 +56,11 @@ public interface WriteStrategy extends Transaction, 
Serializable, Closeable {
     void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException;
 
     /**
-     * set seaTunnelRowTypeInfo in writer
+     * set catalog table to write strategy
      *
-     * @param seaTunnelRowType seaTunnelRowType
+     * @param catalogTable catalogTable
      */
-    void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
+    void setCatalogTable(CatalogTable catalogTable);
 
     /**
      * use seaTunnelRow generate partition directory
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 3e71a3b293..00d90d8419 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -92,10 +93,10 @@ public abstract class AbstractReadStrategy implements 
ReadStrategy {
     }
 
     @Override
-    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        this.seaTunnelRowType = seaTunnelRowType;
+    public void setCatalogTable(CatalogTable catalogTable) {
+        this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
         this.seaTunnelRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+                mergePartitionTypes(fileNames.get(0), 
catalogTable.getSeaTunnelRowType());
     }
 
     boolean checkFileType(String path) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
index c90b6d6659..d7dfe206ab 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
@@ -21,6 +21,7 @@ import 
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
 
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -145,15 +146,15 @@ public class ExcelReadStrategy extends 
AbstractReadStrategy {
     }
 
     @Override
-    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        if (isNullOrEmpty(seaTunnelRowType.getFieldNames())
-                || isNullOrEmpty(seaTunnelRowType.getFieldTypes())) {
+    public void setCatalogTable(CatalogTable catalogTable) {
+        SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+        if (isNullOrEmpty(rowType.getFieldNames()) || 
isNullOrEmpty(rowType.getFieldTypes())) {
             throw new FileConnectorException(
                     CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
                     "Schema information is not set or incorrect Schema 
settings");
         }
         SeaTunnelRowType userDefinedRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+                mergePartitionTypes(fileNames.get(0), rowType);
         // column projection
         if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {
             // get the read column index from user-defined row type
@@ -161,15 +162,15 @@ public class ExcelReadStrategy extends 
AbstractReadStrategy {
             String[] fields = new String[readColumns.size()];
             SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[readColumns.size()];
             for (int i = 0; i < indexes.length; i++) {
-                indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
-                fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
-                types[i] = seaTunnelRowType.getFieldType(indexes[i]);
+                indexes[i] = rowType.indexOf(readColumns.get(i));
+                fields[i] = rowType.getFieldName(indexes[i]);
+                types[i] = rowType.getFieldType(indexes[i]);
             }
             this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
             this.seaTunnelRowTypeWithPartition =
                     mergePartitionTypes(fileNames.get(0), 
this.seaTunnelRowType);
         } else {
-            this.seaTunnelRowType = seaTunnelRowType;
+            this.seaTunnelRowType = rowType;
             this.seaTunnelRowTypeWithPartition = 
userDefinedRowTypeWithPartition;
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index 982419266f..dfd57363d9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -62,8 +63,8 @@ public class JsonReadStrategy extends AbstractReadStrategy {
     }
 
     @Override
-    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+    public void setCatalogTable(CatalogTable catalogTable) {
+        super.setCatalogTable(catalogTable);
         if (isMergePartition) {
             deserializationSchema =
                     new JsonDeserializationSchema(false, false, 
this.seaTunnelRowTypeWithPartition);
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index c5bdf28124..9389223814 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -20,6 +20,7 @@ package 
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
 import org.apache.seatunnel.shade.com.typesafe.config.Config;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.TablePath;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -56,8 +57,7 @@ public interface ReadStrategy extends Serializable, Closeable 
{
         return getSeaTunnelRowTypeInfo(path);
     }
 
-    // todo: use CatalogTable
-    void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
+    void setCatalogTable(CatalogTable catalogTable);
 
     List<String> getFileNamesByPath(String path) throws IOException;
 
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 2b72259377..1a7a7398a4 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.serialization.DeserializationSchema;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -170,9 +171,10 @@ public class TextReadStrategy extends AbstractReadStrategy 
{
     }
 
     @Override
-    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+    public void setCatalogTable(CatalogTable catalogTable) {
+        SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
         SeaTunnelRowType userDefinedRowTypeWithPartition =
-                mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+                mergePartitionTypes(fileNames.get(0), rowType);
         Optional<String> fieldDelimiterOptional =
                 ReadonlyConfig.fromConfig(pluginConfig)
                         .getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);
@@ -201,7 +203,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
             deserializationSchema =
                     
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
         } else {
-            deserializationSchema = 
builder.seaTunnelRowType(seaTunnelRowType).build();
+            deserializationSchema = builder.seaTunnelRowType(rowType).build();
         }
         // column projection
         if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {
@@ -210,15 +212,15 @@ public class TextReadStrategy extends 
AbstractReadStrategy {
             String[] fields = new String[readColumns.size()];
             SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[readColumns.size()];
             for (int i = 0; i < indexes.length; i++) {
-                indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
-                fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
-                types[i] = seaTunnelRowType.getFieldType(indexes[i]);
+                indexes[i] = rowType.indexOf(readColumns.get(i));
+                fields[i] = rowType.getFieldName(indexes[i]);
+                types[i] = rowType.getFieldType(indexes[i]);
             }
             this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
             this.seaTunnelRowTypeWithPartition =
                     mergePartitionTypes(fileNames.get(0), 
this.seaTunnelRowType);
         } else {
-            this.seaTunnelRowType = seaTunnelRowType;
+            this.seaTunnelRowType = rowType;
             this.seaTunnelRowTypeWithPartition = 
userDefinedRowTypeWithPartition;
         }
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
index a553a4f9d0..e012c46bdf 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.configuration.Option;
 import org.apache.seatunnel.api.configuration.ReadonlyConfig;
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -173,20 +174,20 @@ public class XmlReadStrategy extends AbstractReadStrategy 
{
     }
 
     @Override
-    public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
-        if (ArrayUtils.isEmpty(seaTunnelRowType.getFieldNames())
-                || ArrayUtils.isEmpty(seaTunnelRowType.getFieldTypes())) {
+    public void setCatalogTable(CatalogTable catalogTable) {
+        SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+        if (ArrayUtils.isEmpty(rowType.getFieldNames())
+                || ArrayUtils.isEmpty(rowType.getFieldTypes())) {
             throw new FileConnectorException(
                     CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
                     "Schema information is undefined or misconfigured, please 
check your configuration file.");
         }
 
         if (readColumns.isEmpty()) {
-            this.seaTunnelRowType = seaTunnelRowType;
-            this.seaTunnelRowTypeWithPartition =
-                    mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+            this.seaTunnelRowType = rowType;
+            this.seaTunnelRowTypeWithPartition = 
mergePartitionTypes(fileNames.get(0), rowType);
         } else {
-            if 
(readColumns.retainAll(Arrays.asList(seaTunnelRowType.getFieldNames()))) {
+            if (readColumns.retainAll(Arrays.asList(rowType.getFieldNames()))) 
{
                 log.warn(
                         "The read columns configuration will be filtered by 
the schema configuration, this may cause the actual results to be inconsistent 
with expectations. This is due to read columns not being a subset of the 
schema, "
                                 + "maybe you should check the schema and 
read_columns!");
@@ -195,9 +196,9 @@ public class XmlReadStrategy extends AbstractReadStrategy {
             String[] fields = new String[readColumns.size()];
             SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[readColumns.size()];
             for (int i = 0; i < readColumns.size(); i++) {
-                indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
-                fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
-                types[i] = seaTunnelRowType.getFieldType(indexes[i]);
+                indexes[i] = rowType.indexOf(readColumns.get(i));
+                fields[i] = rowType.getFieldName(indexes[i]);
+                types[i] = rowType.getFieldType(indexes[i]);
             }
             this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
             this.seaTunnelRowTypeWithPartition =
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java
index 8aa43a03bd..149ee7648d 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
@@ -72,9 +72,8 @@ public class ExcelReadStrategyTest {
         excelReadStrategy.init(localConf);
 
         List<String> fileNamesByPath = 
excelReadStrategy.getFileNamesByPath(excelFilePath);
-        SeaTunnelRowType userDefinedSchema =
-                
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-        excelReadStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+        CatalogTable userDefinedCatalogTable = 
CatalogTableUtil.buildWithConfig(pluginConfig);
+        excelReadStrategy.setCatalogTable(userDefinedCatalogTable);
         TestCollector testCollector = new TestCollector();
         excelReadStrategy.read(fileNamesByPath.get(0), "", testCollector);
         for (SeaTunnelRow seaTunnelRow : testCollector.getRows()) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
index 236d6f5a03..e692d7294b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.writer;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.BasicType;
 import org.apache.seatunnel.api.table.type.LocalTimeType;
 import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
@@ -82,7 +83,8 @@ public class ParquetWriteStrategyTest {
         ParquetWriteStrategy writeStrategy = new 
ParquetWriteStrategy(writeSinkConfig);
         ParquetReadStrategyTest.LocalConf hadoopConf =
                 new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
-        writeStrategy.setSeaTunnelRowTypeInfo(writeRowType);
+        writeStrategy.setCatalogTable(
+                CatalogTableUtil.getCatalogTable("test", null, null, "test", 
writeRowType));
         writeStrategy.init(hadoopConf, "test1", "test1", 0);
         writeStrategy.beginTransaction(1L);
         writeStrategy.write(
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
index 736ae59096..ad23dd0186 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
@@ -121,11 +121,10 @@ public class ReadStrategyEncodingTest {
         readStrategy.init(localConf);
         readStrategy.getFileNamesByPath(sourceFilePath);
         testCollector = new TestCollector();
-        SeaTunnelRowType seaTunnelRowTypeInfo =
-                
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-        Assertions.assertNotNull(seaTunnelRowTypeInfo);
-        readStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowTypeInfo);
-        log.info(seaTunnelRowTypeInfo.toString());
+        CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(pluginConfig);
+        Assertions.assertNotNull(catalogTable.getSeaTunnelRowType());
+        readStrategy.setCatalogTable(catalogTable);
+        log.info(catalogTable.getSeaTunnelRowType().toString());
         readStrategy.read(sourceFilePath, "", testCollector);
         assertRows(testCollector);
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
index 8bb2e48389..fca8f68fd2 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.utils.DateTimeUtils;
 import org.apache.seatunnel.common.utils.DateUtils;
 import org.apache.seatunnel.common.utils.TimeUtils;
@@ -66,9 +66,8 @@ public class XmlReadStrategyTest {
         xmlReadStrategy.setPluginConfig(pluginConfig);
         xmlReadStrategy.init(localConf);
         List<String> fileNamesByPath = 
xmlReadStrategy.getFileNamesByPath(xmlFilePath);
-        SeaTunnelRowType userDefinedSchema =
-                
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-        xmlReadStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+        CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(pluginConfig);
+        xmlReadStrategy.setCatalogTable(catalogTable);
         TestCollector testCollector = new TestCollector();
         xmlReadStrategy.read(fileNamesByPath.get(0), "", testCollector);
         for (SeaTunnelRow seaTunnelRow : testCollector.getRows()) {
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
index 0690b2aceb..bd8df0261c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
@@ -22,9 +22,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -95,9 +95,9 @@ public class CosFileSource extends BaseFileSource {
                 case JSON:
                 case EXCEL:
                 case XML:
-                    SeaTunnelRowType userDefinedSchema =
-                            
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    CatalogTable userDefinedCatalogTable =
+                            CatalogTableUtil.buildWithConfig(pluginConfig);
+                    readStrategy.setCatalogTable(userDefinedCatalogTable);
                     rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
                     break;
                 case ORC:
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
index 335e396780..ed9807729f 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
@@ -22,9 +22,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -96,9 +96,9 @@ public class OssFileSource extends BaseFileSource {
                 case JSON:
                 case EXCEL:
                 case XML:
-                    SeaTunnelRowType userDefinedSchema =
-                            
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    CatalogTable userDefinedCatalogTable =
+                            CatalogTableUtil.buildWithConfig(pluginConfig);
+                    readStrategy.setCatalogTable(userDefinedCatalogTable);
                     rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
                     break;
                 case ORC:
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
index cf3061a44a..8d2ae3d90b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
@@ -22,9 +22,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.api.common.PrepareFailException;
 import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
 import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
 import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
 import org.apache.seatunnel.common.config.CheckConfigUtil;
 import org.apache.seatunnel.common.config.CheckResult;
 import org.apache.seatunnel.common.constants.PluginType;
@@ -91,9 +91,9 @@ public class ObsFileSource extends BaseFileSource {
                 case TEXT:
                 case JSON:
                 case EXCEL:
-                    SeaTunnelRowType userDefinedSchema =
-                            
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
-                    readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+                    CatalogTable userDefinedCatalogTable =
+                            CatalogTableUtil.buildWithConfig(pluginConfig);
+                    readStrategy.setCatalogTable(userDefinedCatalogTable);
                     rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
                     break;
                 case ORC:
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 997c42f9fa..6e91baf001 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -240,7 +240,7 @@ public class HiveSink
     private WriteStrategy getWriteStrategy() {
         if (writeStrategy == null) {
             writeStrategy = 
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
-            
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+            writeStrategy.setCatalogTable(catalogTable);
         }
         return writeStrategy;
     }
diff --git 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
index e98143fcf0..eba9b5a15b 100644
--- 
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
@@ -279,7 +279,9 @@ public class HiveSourceConfig implements Serializable {
         }
 
         SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames, 
fieldTypes);
-        readStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+        readStrategy.setCatalogTable(
+                CatalogTableUtil.getCatalogTable(
+                        "hive", table.getDbName(), null, table.getTableName(), 
seaTunnelRowType));
         final SeaTunnelRowType finalSeatunnelRowType = 
readStrategy.getActualSeaTunnelRowTypeInfo();
 
         CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig, 
table);

Reply via email to