This is an automated email from the ASF dual-hosted git repository.
lidongdai pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 6b5f74e524 [Improve][Connector-V2] Change File Read/WriteStrategy
`setSeaTunnelRowTypeInfo` to `setCatalogTable` (#7829)
6b5f74e524 is described below
commit 6b5f74e5247b8b7464b740e0f3cffe0ab2080db9
Author: Jia Fan <[email protected]>
AuthorDate: Wed Oct 16 23:34:23 2024 +0800
[Improve][Connector-V2] Change File Read/WriteStrategy
`setSeaTunnelRowTypeInfo` to `setCatalogTable` (#7829)
---
.../file/hdfs/source/BaseHdfsFileSource.java | 8 ++++----
.../seatunnel/file/config/BaseFileSourceConfig.java | 2 +-
.../seatunnel/file/sink/BaseFileSink.java | 6 +++++-
.../file/sink/BaseMultipleTableFileSink.java | 2 +-
.../file/sink/writer/AbstractWriteStrategy.java | 7 ++++---
.../file/sink/writer/BinaryWriteStrategy.java | 8 ++++----
.../file/sink/writer/JsonWriteStrategy.java | 10 ++++++----
.../file/sink/writer/TextWriteStrategy.java | 9 +++++----
.../seatunnel/file/sink/writer/WriteStrategy.java | 8 ++++----
.../file/source/reader/AbstractReadStrategy.java | 7 ++++---
.../file/source/reader/ExcelReadStrategy.java | 17 +++++++++--------
.../file/source/reader/JsonReadStrategy.java | 5 +++--
.../seatunnel/file/source/reader/ReadStrategy.java | 4 ++--
.../file/source/reader/TextReadStrategy.java | 16 +++++++++-------
.../file/source/reader/XmlReadStrategy.java | 21 +++++++++++----------
.../file/writer/ExcelReadStrategyTest.java | 7 +++----
.../file/writer/ParquetWriteStrategyTest.java | 4 +++-
.../file/writer/ReadStrategyEncodingTest.java | 11 +++++------
.../seatunnel/file/writer/XmlReadStrategyTest.java | 7 +++----
.../seatunnel/file/cos/source/CosFileSource.java | 8 ++++----
.../file/oss/jindo/source/OssFileSource.java | 8 ++++----
.../seatunnel/file/obs/source/ObsFileSource.java | 8 ++++----
.../connectors/seatunnel/hive/sink/HiveSink.java | 2 +-
.../hive/source/config/HiveSourceConfig.java | 4 +++-
24 files changed, 102 insertions(+), 87 deletions(-)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 9af2721e22..2b71980935 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -109,9 +109,9 @@ public abstract class BaseHdfsFileSource extends
BaseFileSource {
case JSON:
case EXCEL:
case XML:
- SeaTunnelRowType userDefinedSchema =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ CatalogTable userDefinedCatalogTable =
+ CatalogTableUtil.buildWithConfig(pluginConfig);
+ readStrategy.setCatalogTable(userDefinedCatalogTable);
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
index 373ada564a..10b969b008 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
@@ -95,7 +95,7 @@ public abstract class BaseFileSourceConfig implements
Serializable {
case JSON:
case EXCEL:
case XML:
-
readStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+ readStrategy.setCatalogTable(catalogTable);
return newCatalogTable(catalogTable,
readStrategy.getActualSeaTunnelRowTypeInfo());
case ORC:
case PARQUET:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
index 6686da9880..af6003c79c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseFileSink.java
@@ -26,6 +26,8 @@ import org.apache.seatunnel.api.serialization.Serializer;
import org.apache.seatunnel.api.sink.SeaTunnelSink;
import org.apache.seatunnel.api.sink.SinkAggregatedCommitter;
import org.apache.seatunnel.api.sink.SinkWriter;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
+import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
@@ -110,7 +112,9 @@ public abstract class BaseFileSink
protected WriteStrategy createWriteStrategy() {
WriteStrategy writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(),
fileSinkConfig);
- writeStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ writeStrategy.setCatalogTable(
+ CatalogTableUtil.getCatalogTable(
+ "file", null, null, TablePath.DEFAULT.getTableName(),
seaTunnelRowType));
return writeStrategy;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
index a48368be44..b35c113f8d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/BaseMultipleTableFileSink.java
@@ -112,7 +112,7 @@ public abstract class BaseMultipleTableFileSink
protected WriteStrategy createWriteStrategy() {
WriteStrategy writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(),
fileSinkConfig);
-
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+ writeStrategy.setCatalogTable(catalogTable);
return writeStrategy;
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
index 68476488a5..dd49c7f2d0 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/AbstractWriteStrategy.java
@@ -17,6 +17,7 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -150,11 +151,11 @@ public abstract class AbstractWriteStrategy implements
WriteStrategy {
/**
* set seaTunnelRowTypeInfo in writer
*
- * @param seaTunnelRowType seaTunnelRowType
+ * @param catalogTable seaTunnelRowType
*/
@Override
- public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public void setCatalogTable(CatalogTable catalogTable) {
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
}
/**
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
index 7f496b2927..06d05d6250 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/BinaryWriteStrategy.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
@@ -46,9 +46,9 @@ public class BinaryWriteStrategy extends
AbstractWriteStrategy {
}
@Override
- public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
- if (!seaTunnelRowType.equals(BinaryReadStrategy.binaryRowType)) {
+ public void setCatalogTable(CatalogTable catalogTable) {
+ super.setCatalogTable(catalogTable);
+ if
(!catalogTable.getSeaTunnelRowType().equals(BinaryReadStrategy.binaryRowType)) {
throw new FileConnectorException(
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
"BinaryWriteStrategy only supports binary format, please
read file with `BINARY` format, and do not change schema in the transform.");
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
index f95973f4cf..23fb7893a8 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/JsonWriteStrategy.java
@@ -18,8 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.EncodingUtils;
@@ -55,11 +55,13 @@ public class JsonWriteStrategy extends
AbstractWriteStrategy {
}
@Override
- public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ public void setCatalogTable(CatalogTable catalogTable) {
+ super.setCatalogTable(catalogTable);
this.serializationSchema =
new JsonSerializationSchema(
- buildSchemaWithRowType(seaTunnelRowType,
sinkColumnsIndexInRow), charset);
+ buildSchemaWithRowType(
+ catalogTable.getSeaTunnelRowType(),
sinkColumnsIndexInRow),
+ charset);
}
@Override
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
index 621048fb39..77e2eb5c5b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/TextWriteStrategy.java
@@ -18,8 +18,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
import org.apache.seatunnel.api.serialization.SerializationSchema;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonError;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import org.apache.seatunnel.common.utils.DateTimeUtils;
@@ -71,12 +71,13 @@ public class TextWriteStrategy extends
AbstractWriteStrategy {
}
@Override
- public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ public void setCatalogTable(CatalogTable catalogTable) {
+ super.setCatalogTable(catalogTable);
this.serializationSchema =
TextSerializationSchema.builder()
.seaTunnelRowType(
- buildSchemaWithRowType(seaTunnelRowType,
sinkColumnsIndexInRow))
+ buildSchemaWithRowType(
+ catalogTable.getSeaTunnelRowType(),
sinkColumnsIndexInRow))
.delimiter(fieldDelimiter)
.dateFormatter(dateFormat)
.dateTimeFormatter(dateTimeFormat)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
index 6a1b1840b4..24b23c9bfc 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/sink/writer/WriteStrategy.java
@@ -17,8 +17,8 @@
package org.apache.seatunnel.connectors.seatunnel.file.sink.writer;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorException;
import
org.apache.seatunnel.connectors.seatunnel.file.hadoop.HadoopFileSystemProxy;
@@ -56,11 +56,11 @@ public interface WriteStrategy extends Transaction,
Serializable, Closeable {
void write(SeaTunnelRow seaTunnelRow) throws FileConnectorException;
/**
- * set seaTunnelRowTypeInfo in writer
+ * set catalog table to write strategy
*
- * @param seaTunnelRowType seaTunnelRowType
+ * @param catalogTable catalogTable
*/
- void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
+ void setCatalogTable(CatalogTable catalogTable);
/**
* use seaTunnelRow generate partition directory
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
index 3e71a3b293..00d90d8419 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/AbstractReadStrategy.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -92,10 +93,10 @@ public abstract class AbstractReadStrategy implements
ReadStrategy {
}
@Override
- public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- this.seaTunnelRowType = seaTunnelRowType;
+ public void setCatalogTable(CatalogTable catalogTable) {
+ this.seaTunnelRowType = catalogTable.getSeaTunnelRowType();
this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+ mergePartitionTypes(fileNames.get(0),
catalogTable.getSeaTunnelRowType());
}
boolean checkFileType(String path) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
index c90b6d6659..d7dfe206ab 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ExcelReadStrategy.java
@@ -21,6 +21,7 @@ import
org.apache.seatunnel.shade.com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -145,15 +146,15 @@ public class ExcelReadStrategy extends
AbstractReadStrategy {
}
@Override
- public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- if (isNullOrEmpty(seaTunnelRowType.getFieldNames())
- || isNullOrEmpty(seaTunnelRowType.getFieldTypes())) {
+ public void setCatalogTable(CatalogTable catalogTable) {
+ SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+ if (isNullOrEmpty(rowType.getFieldNames()) ||
isNullOrEmpty(rowType.getFieldTypes())) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
"Schema information is not set or incorrect Schema
settings");
}
SeaTunnelRowType userDefinedRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+ mergePartitionTypes(fileNames.get(0), rowType);
// column projection
if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {
// get the read column index from user-defined row type
@@ -161,15 +162,15 @@ public class ExcelReadStrategy extends
AbstractReadStrategy {
String[] fields = new String[readColumns.size()];
SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[readColumns.size()];
for (int i = 0; i < indexes.length; i++) {
- indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
- fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
- types[i] = seaTunnelRowType.getFieldType(indexes[i]);
+ indexes[i] = rowType.indexOf(readColumns.get(i));
+ fields[i] = rowType.getFieldName(indexes[i]);
+ types[i] = rowType.getFieldType(indexes[i]);
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0),
this.seaTunnelRowType);
} else {
- this.seaTunnelRowType = seaTunnelRowType;
+ this.seaTunnelRowType = rowType;
this.seaTunnelRowTypeWithPartition =
userDefinedRowTypeWithPartition;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
index 982419266f..dfd57363d9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/JsonReadStrategy.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
@@ -62,8 +63,8 @@ public class JsonReadStrategy extends AbstractReadStrategy {
}
@Override
- public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- super.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ public void setCatalogTable(CatalogTable catalogTable) {
+ super.setCatalogTable(catalogTable);
if (isMergePartition) {
deserializationSchema =
new JsonDeserializationSchema(false, false,
this.seaTunnelRowTypeWithPartition);
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index c5bdf28124..9389223814 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -20,6 +20,7 @@ package
org.apache.seatunnel.connectors.seatunnel.file.source.reader;
import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.TablePath;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -56,8 +57,7 @@ public interface ReadStrategy extends Serializable, Closeable
{
return getSeaTunnelRowTypeInfo(path);
}
- // todo: use CatalogTable
- void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
+ void setCatalogTable(CatalogTable catalogTable);
List<String> getFileNamesByPath(String path) throws IOException;
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
index 2b72259377..1a7a7398a4 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/TextReadStrategy.java
@@ -21,6 +21,7 @@ import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.serialization.DeserializationSchema;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
@@ -170,9 +171,10 @@ public class TextReadStrategy extends AbstractReadStrategy
{
}
@Override
- public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
+ public void setCatalogTable(CatalogTable catalogTable) {
+ SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
SeaTunnelRowType userDefinedRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+ mergePartitionTypes(fileNames.get(0), rowType);
Optional<String> fieldDelimiterOptional =
ReadonlyConfig.fromConfig(pluginConfig)
.getOptional(BaseSourceConfigOptions.FIELD_DELIMITER);
@@ -201,7 +203,7 @@ public class TextReadStrategy extends AbstractReadStrategy {
deserializationSchema =
builder.seaTunnelRowType(userDefinedRowTypeWithPartition).build();
} else {
- deserializationSchema =
builder.seaTunnelRowType(seaTunnelRowType).build();
+ deserializationSchema = builder.seaTunnelRowType(rowType).build();
}
// column projection
if (pluginConfig.hasPath(BaseSourceConfigOptions.READ_COLUMNS.key())) {
@@ -210,15 +212,15 @@ public class TextReadStrategy extends
AbstractReadStrategy {
String[] fields = new String[readColumns.size()];
SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[readColumns.size()];
for (int i = 0; i < indexes.length; i++) {
- indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
- fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
- types[i] = seaTunnelRowType.getFieldType(indexes[i]);
+ indexes[i] = rowType.indexOf(readColumns.get(i));
+ fields[i] = rowType.getFieldName(indexes[i]);
+ types[i] = rowType.getFieldType(indexes[i]);
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0),
this.seaTunnelRowType);
} else {
- this.seaTunnelRowType = seaTunnelRowType;
+ this.seaTunnelRowType = rowType;
this.seaTunnelRowTypeWithPartition =
userDefinedRowTypeWithPartition;
}
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
index a553a4f9d0..e012c46bdf 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/XmlReadStrategy.java
@@ -23,6 +23,7 @@ import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.configuration.Option;
import org.apache.seatunnel.api.configuration.ReadonlyConfig;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -173,20 +174,20 @@ public class XmlReadStrategy extends AbstractReadStrategy
{
}
@Override
- public void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType) {
- if (ArrayUtils.isEmpty(seaTunnelRowType.getFieldNames())
- || ArrayUtils.isEmpty(seaTunnelRowType.getFieldTypes())) {
+ public void setCatalogTable(CatalogTable catalogTable) {
+ SeaTunnelRowType rowType = catalogTable.getSeaTunnelRowType();
+ if (ArrayUtils.isEmpty(rowType.getFieldNames())
+ || ArrayUtils.isEmpty(rowType.getFieldTypes())) {
throw new FileConnectorException(
CommonErrorCodeDeprecated.ILLEGAL_ARGUMENT,
"Schema information is undefined or misconfigured, please
check your configuration file.");
}
if (readColumns.isEmpty()) {
- this.seaTunnelRowType = seaTunnelRowType;
- this.seaTunnelRowTypeWithPartition =
- mergePartitionTypes(fileNames.get(0), seaTunnelRowType);
+ this.seaTunnelRowType = rowType;
+ this.seaTunnelRowTypeWithPartition =
mergePartitionTypes(fileNames.get(0), rowType);
} else {
- if
(readColumns.retainAll(Arrays.asList(seaTunnelRowType.getFieldNames()))) {
+ if (readColumns.retainAll(Arrays.asList(rowType.getFieldNames())))
{
log.warn(
"The read columns configuration will be filtered by
the schema configuration, this may cause the actual results to be inconsistent
with expectations. This is due to read columns not being a subset of the
schema, "
+ "maybe you should check the schema and
read_columns!");
@@ -195,9 +196,9 @@ public class XmlReadStrategy extends AbstractReadStrategy {
String[] fields = new String[readColumns.size()];
SeaTunnelDataType<?>[] types = new
SeaTunnelDataType[readColumns.size()];
for (int i = 0; i < readColumns.size(); i++) {
- indexes[i] = seaTunnelRowType.indexOf(readColumns.get(i));
- fields[i] = seaTunnelRowType.getFieldName(indexes[i]);
- types[i] = seaTunnelRowType.getFieldType(indexes[i]);
+ indexes[i] = rowType.indexOf(readColumns.get(i));
+ fields[i] = rowType.getFieldName(indexes[i]);
+ types[i] = rowType.getFieldType(indexes[i]);
}
this.seaTunnelRowType = new SeaTunnelRowType(fields, types);
this.seaTunnelRowTypeWithPartition =
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java
index 8aa43a03bd..149ee7648d 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ExcelReadStrategyTest.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
@@ -72,9 +72,8 @@ public class ExcelReadStrategyTest {
excelReadStrategy.init(localConf);
List<String> fileNamesByPath =
excelReadStrategy.getFileNamesByPath(excelFilePath);
- SeaTunnelRowType userDefinedSchema =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- excelReadStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ CatalogTable userDefinedCatalogTable =
CatalogTableUtil.buildWithConfig(pluginConfig);
+ excelReadStrategy.setCatalogTable(userDefinedCatalogTable);
TestCollector testCollector = new TestCollector();
excelReadStrategy.read(fileNamesByPath.get(0), "", testCollector);
for (SeaTunnelRow seaTunnelRow : testCollector.getRows()) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
index 236d6f5a03..e692d7294b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetWriteStrategyTest.java
@@ -20,6 +20,7 @@ package org.apache.seatunnel.connectors.seatunnel.file.writer;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.BasicType;
import org.apache.seatunnel.api.table.type.LocalTimeType;
import org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
@@ -82,7 +83,8 @@ public class ParquetWriteStrategyTest {
ParquetWriteStrategy writeStrategy = new
ParquetWriteStrategy(writeSinkConfig);
ParquetReadStrategyTest.LocalConf hadoopConf =
new ParquetReadStrategyTest.LocalConf(FS_DEFAULT_NAME_DEFAULT);
- writeStrategy.setSeaTunnelRowTypeInfo(writeRowType);
+ writeStrategy.setCatalogTable(
+ CatalogTableUtil.getCatalogTable("test", null, null, "test",
writeRowType));
writeStrategy.init(hadoopConf, "test1", "test1", 0);
writeStrategy.beginTransaction(1L);
writeStrategy.write(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
index 736ae59096..ad23dd0186 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ReadStrategyEncodingTest.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.AbstractReadStrategy;
import
org.apache.seatunnel.connectors.seatunnel.file.source.reader.JsonReadStrategy;
@@ -121,11 +121,10 @@ public class ReadStrategyEncodingTest {
readStrategy.init(localConf);
readStrategy.getFileNamesByPath(sourceFilePath);
testCollector = new TestCollector();
- SeaTunnelRowType seaTunnelRowTypeInfo =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- Assertions.assertNotNull(seaTunnelRowTypeInfo);
- readStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowTypeInfo);
- log.info(seaTunnelRowTypeInfo.toString());
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(pluginConfig);
+ Assertions.assertNotNull(catalogTable.getSeaTunnelRowType());
+ readStrategy.setCatalogTable(catalogTable);
+ log.info(catalogTable.getSeaTunnelRowType().toString());
readStrategy.read(sourceFilePath, "", testCollector);
assertRows(testCollector);
}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
index 8bb2e48389..fca8f68fd2 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/XmlReadStrategyTest.java
@@ -21,9 +21,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.utils.DateTimeUtils;
import org.apache.seatunnel.common.utils.DateUtils;
import org.apache.seatunnel.common.utils.TimeUtils;
@@ -66,9 +66,8 @@ public class XmlReadStrategyTest {
xmlReadStrategy.setPluginConfig(pluginConfig);
xmlReadStrategy.init(localConf);
List<String> fileNamesByPath =
xmlReadStrategy.getFileNamesByPath(xmlFilePath);
- SeaTunnelRowType userDefinedSchema =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- xmlReadStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ CatalogTable catalogTable =
CatalogTableUtil.buildWithConfig(pluginConfig);
+ xmlReadStrategy.setCatalogTable(catalogTable);
TestCollector testCollector = new TestCollector();
xmlReadStrategy.read(fileNamesByPath.get(0), "", testCollector);
for (SeaTunnelRow seaTunnelRow : testCollector.getRows()) {
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
index 0690b2aceb..bd8df0261c 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-cos/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/cos/source/CosFileSource.java
@@ -22,9 +22,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -95,9 +95,9 @@ public class CosFileSource extends BaseFileSource {
case JSON:
case EXCEL:
case XML:
- SeaTunnelRowType userDefinedSchema =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ CatalogTable userDefinedCatalogTable =
+ CatalogTableUtil.buildWithConfig(pluginConfig);
+ readStrategy.setCatalogTable(userDefinedCatalogTable);
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
index 335e396780..ed9807729f 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-jindo-oss/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/oss/jindo/source/OssFileSource.java
@@ -22,9 +22,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -96,9 +96,9 @@ public class OssFileSource extends BaseFileSource {
case JSON:
case EXCEL:
case XML:
- SeaTunnelRowType userDefinedSchema =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ CatalogTable userDefinedCatalogTable =
+ CatalogTableUtil.buildWithConfig(pluginConfig);
+ readStrategy.setCatalogTable(userDefinedCatalogTable);
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
index cf3061a44a..8d2ae3d90b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-obs/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/obs/source/ObsFileSource.java
@@ -22,9 +22,9 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
import org.apache.seatunnel.api.common.PrepareFailException;
import org.apache.seatunnel.api.common.SeaTunnelAPIErrorCode;
import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
import org.apache.seatunnel.api.table.catalog.schema.TableSchemaOptions;
-import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
import org.apache.seatunnel.common.config.CheckConfigUtil;
import org.apache.seatunnel.common.config.CheckResult;
import org.apache.seatunnel.common.constants.PluginType;
@@ -91,9 +91,9 @@ public class ObsFileSource extends BaseFileSource {
case TEXT:
case JSON:
case EXCEL:
- SeaTunnelRowType userDefinedSchema =
-
CatalogTableUtil.buildWithConfig(pluginConfig).getSeaTunnelRowType();
- readStrategy.setSeaTunnelRowTypeInfo(userDefinedSchema);
+ CatalogTable userDefinedCatalogTable =
+ CatalogTableUtil.buildWithConfig(pluginConfig);
+ readStrategy.setCatalogTable(userDefinedCatalogTable);
rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
break;
case ORC:
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
index 997c42f9fa..6e91baf001 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/sink/HiveSink.java
@@ -240,7 +240,7 @@ public class HiveSink
private WriteStrategy getWriteStrategy() {
if (writeStrategy == null) {
writeStrategy =
WriteStrategyFactory.of(fileSinkConfig.getFileFormat(), fileSinkConfig);
-
writeStrategy.setSeaTunnelRowTypeInfo(catalogTable.getSeaTunnelRowType());
+ writeStrategy.setCatalogTable(catalogTable);
}
return writeStrategy;
}
diff --git
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
index e98143fcf0..eba9b5a15b 100644
---
a/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-hive/src/main/java/org/apache/seatunnel/connectors/seatunnel/hive/source/config/HiveSourceConfig.java
@@ -279,7 +279,9 @@ public class HiveSourceConfig implements Serializable {
}
SeaTunnelRowType seaTunnelRowType = new SeaTunnelRowType(fieldNames,
fieldTypes);
- readStrategy.setSeaTunnelRowTypeInfo(seaTunnelRowType);
+ readStrategy.setCatalogTable(
+ CatalogTableUtil.getCatalogTable(
+ "hive", table.getDbName(), null, table.getTableName(),
seaTunnelRowType));
final SeaTunnelRowType finalSeatunnelRowType =
readStrategy.getActualSeaTunnelRowTypeInfo();
CatalogTable catalogTable = buildEmptyCatalogTable(readonlyConfig,
table);