This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new d1599f8ad9 [Improve][Connector-V2] Support read orc with schema config 
to cast type (#6531)
d1599f8ad9 is described below

commit d1599f8ad97d9143bce24377bcc61195df232a13
Author: Jia Fan <[email protected]>
AuthorDate: Fri Mar 29 10:39:35 2024 +0800

    [Improve][Connector-V2] Support read orc with schema config to cast type 
(#6531)
---
 .../apache/seatunnel/api/table/type/TypeUtil.java  |  44 +++++
 .../file/config/BaseFileSourceConfig.java          |   8 +-
 .../file/source/reader/OrcReadStrategy.java        | 185 ++++++++++++++++-----
 .../seatunnel/file/source/reader/ReadStrategy.java |   5 +
 .../e2e/connector/file/local/LocalFileIT.java      |   5 +
 ...ocal_file_orc_to_assert_with_time_and_cast.conf | 126 ++++++++++++++
 .../src/test/resources/orc/orc_for_cast.orc        | Bin 0 -> 4775 bytes
 7 files changed, 334 insertions(+), 39 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
new file mode 100644
index 0000000000..b8df6d80e5
--- /dev/null
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+public class TypeUtil {
+
+    /** Check if the data type can be converted to another data type. */
+    public static boolean canConvert(SeaTunnelDataType<?> from, 
SeaTunnelDataType<?> to) {
+        // any type can be converted to string
+        if (from == to || to.getSqlType() == SqlType.STRING) {
+            return true;
+        }
+        if (from.getSqlType() == SqlType.TINYINT) {
+            return to.getSqlType() == SqlType.SMALLINT
+                    || to.getSqlType() == SqlType.INT
+                    || to.getSqlType() == SqlType.BIGINT;
+        }
+        if (from.getSqlType() == SqlType.SMALLINT) {
+            return to.getSqlType() == SqlType.INT || to.getSqlType() == 
SqlType.BIGINT;
+        }
+        if (from.getSqlType() == SqlType.INT) {
+            return to.getSqlType() == SqlType.BIGINT;
+        }
+        if (from.getSqlType() == SqlType.FLOAT) {
+            return to.getSqlType() == SqlType.DOUBLE;
+        }
+        return false;
+    }
+}
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
index c08a7a11de..5c16a7e28b 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
@@ -80,7 +80,8 @@ public abstract class BaseFileSourceConfig implements 
Serializable {
 
     private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
         final CatalogTable catalogTable;
-        if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent()) 
{
+        boolean configSchema = 
readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent();
+        if (configSchema) {
             catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(), 
readonlyConfig);
         } else {
             catalogTable = CatalogTableUtil.buildSimpleTextTable();
@@ -99,7 +100,10 @@ public abstract class BaseFileSourceConfig implements 
Serializable {
             case ORC:
             case PARQUET:
                 return newCatalogTable(
-                        catalogTable, 
readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0)));
+                        catalogTable,
+                        
readStrategy.getSeaTunnelRowTypeInfoWithUserConfigRowType(
+                                filePaths.get(0),
+                                configSchema ? 
catalogTable.getSeaTunnelRowType() : null));
             default:
                 throw new FileConnectorException(
                         FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 79158a5423..77b02ab03a 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -28,6 +28,7 @@ import 
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
 import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
 import 
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
 import 
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
@@ -55,6 +56,8 @@ import 
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
 
 import lombok.extern.slf4j.Slf4j;
 
+import javax.annotation.Nullable;
+
 import java.io.IOException;
 import java.math.BigDecimal;
 import java.nio.ByteBuffer;
@@ -62,12 +65,15 @@ import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.sql.Timestamp;
 import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.seatunnel.api.table.type.TypeUtil.canConvert;
 import static 
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy.buildFieldWithRowType;
 
 @Slf4j
@@ -120,7 +126,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                         if (cols[j] == null) {
                             fields[j] = null;
                         } else {
-                            fields[j] = readColumn(cols[j], children.get(j), 
num);
+                            fields[j] =
+                                    readColumn(
+                                            cols[j],
+                                            children.get(j),
+                                            seaTunnelRowType.getFieldType(j),
+                                            num);
                         }
                     }
                     SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
@@ -134,6 +145,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws 
FileConnectorException {
+        return getSeaTunnelRowTypeInfoWithUserConfigRowType(path, null);
+    }
+
+    @Override
+    public SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
+            String path, SeaTunnelRowType configRowType) throws 
FileConnectorException {
         try (Reader reader =
                 hadoopFileSystemProxy.doWithHadoopAuth(
                         ((configuration, userGroupInformation) -> {
@@ -158,7 +175,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                                     "Column [%s] does not exists in table 
schema [%s]",
                                     readColumns.get(i), String.join(",", 
fieldNames)));
                 }
-                types[i] = 
orcDataType2SeaTunnelDataType(schema.getChildren().get(index));
+                types[i] =
+                        orcDataType2SeaTunnelDataType(
+                                schema.getChildren().get(index),
+                                configRowType != null && 
configRowType.getTotalFields() > i
+                                        ? configRowType.getFieldType(i)
+                                        : null);
             }
             seaTunnelRowType = new SeaTunnelRowType(fields, types);
             seaTunnelRowTypeWithPartition = mergePartitionTypes(path, 
seaTunnelRowType);
@@ -209,39 +231,57 @@ public class OrcReadStrategy extends AbstractReadStrategy 
{
         }
     }
 
-    private SeaTunnelDataType<?> orcDataType2SeaTunnelDataType(TypeDescription 
typeDescription) {
+    private SeaTunnelDataType<?> getFinalType(
+            SeaTunnelDataType<?> fileType, SeaTunnelDataType<?> configType) {
+        if (configType == null) {
+            return fileType;
+        }
+        return canConvert(fileType, configType) ? configType : fileType;
+    }
+
+    private SeaTunnelDataType<?> orcDataType2SeaTunnelDataType(
+            TypeDescription typeDescription, SeaTunnelDataType<?> configType) {
         switch (typeDescription.getCategory()) {
             case BOOLEAN:
-                return BasicType.BOOLEAN_TYPE;
+                return getFinalType(BasicType.BOOLEAN_TYPE, configType);
             case INT:
-                return BasicType.INT_TYPE;
+                return getFinalType(BasicType.INT_TYPE, configType);
             case BYTE:
-                return BasicType.BYTE_TYPE;
+                return getFinalType(BasicType.BYTE_TYPE, configType);
             case SHORT:
-                return BasicType.SHORT_TYPE;
+                return getFinalType(BasicType.SHORT_TYPE, configType);
             case LONG:
-                return BasicType.LONG_TYPE;
+                return getFinalType(BasicType.LONG_TYPE, configType);
             case FLOAT:
-                return BasicType.FLOAT_TYPE;
+                return getFinalType(BasicType.FLOAT_TYPE, configType);
             case DOUBLE:
-                return BasicType.DOUBLE_TYPE;
+                return getFinalType(BasicType.DOUBLE_TYPE, configType);
             case BINARY:
-                return PrimitiveByteArrayType.INSTANCE;
+                return getFinalType(PrimitiveByteArrayType.INSTANCE, 
configType);
             case STRING:
             case VARCHAR:
             case CHAR:
-                return BasicType.STRING_TYPE;
+                return getFinalType(BasicType.STRING_TYPE, configType);
             case DATE:
-                return LocalTimeType.LOCAL_DATE_TYPE;
+                return getFinalType(LocalTimeType.LOCAL_DATE_TYPE, configType);
             case TIMESTAMP:
-                return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+                // Support only return time when the type is timestamps
+                if (configType != null && 
configType.getSqlType().equals(SqlType.TIME)) {
+                    return LocalTimeType.LOCAL_TIME_TYPE;
+                }
+                return getFinalType(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
configType);
             case DECIMAL:
                 int precision = typeDescription.getPrecision();
                 int scale = typeDescription.getScale();
-                return new DecimalType(precision, scale);
+                return getFinalType(new DecimalType(precision, scale), 
configType);
             case LIST:
                 TypeDescription listType = 
typeDescription.getChildren().get(0);
-                SeaTunnelDataType<?> seaTunnelDataType = 
orcDataType2SeaTunnelDataType(listType);
+                SeaTunnelDataType<?> seaTunnelDataType =
+                        orcDataType2SeaTunnelDataType(listType, null);
+                if (configType instanceof ArrayType) {
+                    SeaTunnelDataType<?> elementType = ((ArrayType) 
configType).getElementType();
+                    seaTunnelDataType = 
orcDataType2SeaTunnelDataType(listType, elementType);
+                }
                 switch (seaTunnelDataType.getSqlType()) {
                     case STRING:
                         return ArrayType.STRING_ARRAY_TYPE;
@@ -270,16 +310,35 @@ public class OrcReadStrategy extends AbstractReadStrategy 
{
             case MAP:
                 TypeDescription keyType = typeDescription.getChildren().get(0);
                 TypeDescription valueType = 
typeDescription.getChildren().get(1);
-                return new MapType<>(
-                        orcDataType2SeaTunnelDataType(keyType),
-                        orcDataType2SeaTunnelDataType(valueType));
+                if (configType instanceof MapType) {
+                    SeaTunnelDataType<?> keyDataType = ((MapType<?, ?>) 
configType).getKeyType();
+                    SeaTunnelDataType<?> valueDataType =
+                            ((MapType<?, ?>) configType).getValueType();
+                    keyDataType = orcDataType2SeaTunnelDataType(keyType, 
keyDataType);
+                    valueDataType = orcDataType2SeaTunnelDataType(valueType, 
valueDataType);
+                    return new MapType<>(keyDataType, valueDataType);
+                } else {
+                    return new MapType<>(
+                            orcDataType2SeaTunnelDataType(keyType, null),
+                            orcDataType2SeaTunnelDataType(valueType, null));
+                }
             case STRUCT:
                 List<TypeDescription> children = typeDescription.getChildren();
                 String[] fieldNames = 
typeDescription.getFieldNames().toArray(TYPE_ARRAY_STRING);
-                SeaTunnelDataType<?>[] fieldTypes =
-                        children.stream()
-                                .map(this::orcDataType2SeaTunnelDataType)
-                                .toArray(SeaTunnelDataType<?>[]::new);
+                SeaTunnelDataType<?>[] fieldTypes = new 
SeaTunnelDataType[children.size()];
+                if (configType instanceof SeaTunnelRowType) {
+                    for (int i = 0; i < children.size(); i++) {
+                        fieldTypes[i] =
+                                orcDataType2SeaTunnelDataType(
+                                        children.get(i),
+                                        ((SeaTunnelRowType) 
configType).getFieldType(i));
+                    }
+                } else {
+                    fieldTypes =
+                            children.stream()
+                                    .map(f -> orcDataType2SeaTunnelDataType(f, 
null))
+                                    .toArray(SeaTunnelDataType<?>[]::new);
+                }
                 return new SeaTunnelRowType(fieldNames, fieldTypes);
             default:
                 // do nothing
@@ -293,30 +352,37 @@ public class OrcReadStrategy extends AbstractReadStrategy 
{
         }
     }
 
-    private Object readColumn(ColumnVector colVec, TypeDescription colType, 
int rowNum) {
+    private Object readColumn(
+            ColumnVector colVec,
+            TypeDescription colType,
+            @Nullable SeaTunnelDataType<?> dataType,
+            int rowNum) {
         Object columnObj = null;
         if (!colVec.isNull[rowNum]) {
             switch (colVec.type) {
                 case LONG:
-                    columnObj = readLongVal(colVec, colType, rowNum);
+                    columnObj = readLongVal(colVec, colType, dataType, rowNum);
                     break;
                 case DOUBLE:
                     columnObj = ((DoubleColumnVector) colVec).vector[rowNum];
                     if (colType.getCategory() == 
TypeDescription.Category.FLOAT) {
                         columnObj = ((Double) columnObj).floatValue();
                     }
+                    if (dataType != null && 
dataType.getSqlType().equals(SqlType.STRING)) {
+                        columnObj = columnObj.toString();
+                    }
                     break;
                 case BYTES:
-                    columnObj = readBytesVal(colVec, colType, rowNum);
+                    columnObj = readBytesVal(colVec, colType, dataType, 
rowNum);
                     break;
                 case DECIMAL:
-                    columnObj = readDecimalVal(colVec, rowNum);
+                    columnObj = readDecimalVal(colVec, dataType, rowNum);
                     break;
                 case TIMESTAMP:
-                    columnObj = readTimestampVal(colVec, colType, rowNum);
+                    columnObj = readTimestampVal(colVec, colType, dataType, 
rowNum);
                     break;
                 case STRUCT:
-                    columnObj = readStructVal(colVec, colType, rowNum);
+                    columnObj = readStructVal(colVec, colType, dataType, 
rowNum);
                     break;
                 case LIST:
                     columnObj = readListVal(colVec, colType, rowNum);
@@ -336,7 +402,11 @@ public class OrcReadStrategy extends AbstractReadStrategy {
         return columnObj;
     }
 
-    private Object readLongVal(ColumnVector colVec, TypeDescription colType, 
int rowNum) {
+    private Object readLongVal(
+            ColumnVector colVec,
+            TypeDescription colType,
+            SeaTunnelDataType<?> dataType,
+            int rowNum) {
         Object colObj = null;
         if (!colVec.isNull[rowNum]) {
             LongColumnVector longVec = (LongColumnVector) colVec;
@@ -353,11 +423,18 @@ public class OrcReadStrategy extends AbstractReadStrategy 
{
             } else if (colType.getCategory() == 
TypeDescription.Category.SHORT) {
                 colObj = (short) longVal;
             }
+            if (dataType != null && 
dataType.getSqlType().equals(SqlType.STRING)) {
+                colObj = colObj.toString();
+            }
         }
         return colObj;
     }
 
-    private Object readBytesVal(ColumnVector colVec, TypeDescription 
typeDescription, int rowNum) {
+    private Object readBytesVal(
+            ColumnVector colVec,
+            TypeDescription typeDescription,
+            SeaTunnelDataType<?> dataType,
+            int rowNum) {
         Charset charset = StandardCharsets.UTF_8;
         if (pluginConfig != null) {
             charset =
@@ -375,6 +452,11 @@ public class OrcReadStrategy extends AbstractReadStrategy {
                     && bytesObj != null) {
                 bytesObj = ((String) bytesObj).getBytes(charset);
             }
+            if (dataType != null
+                    && dataType.getSqlType().equals(SqlType.STRING)
+                    && bytesObj != null) {
+                bytesObj = bytesObj.toString();
+            }
         }
         return bytesObj;
     }
@@ -400,16 +482,25 @@ public class OrcReadStrategy extends AbstractReadStrategy 
{
                         charset);
     }
 
-    private Object readDecimalVal(ColumnVector colVec, int rowNum) {
+    private Object readDecimalVal(ColumnVector colVec, SeaTunnelDataType<?> 
dataType, int rowNum) {
         Object decimalObj = null;
         if (!colVec.isNull[rowNum]) {
             DecimalColumnVector decimalVec = (DecimalColumnVector) colVec;
             decimalObj = 
decimalVec.vector[rowNum].getHiveDecimal().bigDecimalValue();
+            if (dataType != null
+                    && dataType.getSqlType().equals(SqlType.STRING)
+                    && decimalObj != null) {
+                decimalObj = decimalObj.toString();
+            }
         }
         return decimalObj;
     }
 
-    private Object readTimestampVal(ColumnVector colVec, TypeDescription 
colType, int rowNum) {
+    private Object readTimestampVal(
+            ColumnVector colVec,
+            TypeDescription colType,
+            SeaTunnelDataType<?> dataType,
+            int rowNum) {
         Object timestampVal = null;
         if (!colVec.isNull[rowNum]) {
             TimestampColumnVector timestampVec = (TimestampColumnVector) 
colVec;
@@ -420,12 +511,28 @@ public class OrcReadStrategy extends AbstractReadStrategy 
{
             timestampVal = timestamp.toLocalDateTime();
             if (colType.getCategory() == TypeDescription.Category.DATE) {
                 timestampVal = LocalDate.ofEpochDay(timestamp.getTime());
+            } else if (dataType != null && dataType.getSqlType() == 
SqlType.TIME) {
+                timestampVal =
+                        LocalTime.of(
+                                ((LocalDateTime) timestampVal).getHour(),
+                                ((LocalDateTime) timestampVal).getMinute(),
+                                ((LocalDateTime) timestampVal).getSecond(),
+                                ((LocalDateTime) timestampVal).getNano());
+            }
+            if (dataType != null
+                    && dataType.getSqlType().equals(SqlType.STRING)
+                    && timestampVal != null) {
+                timestampVal = timestampVal.toString();
             }
         }
         return timestampVal;
     }
 
-    private Object readStructVal(ColumnVector colVec, TypeDescription colType, 
int rowNum) {
+    private Object readStructVal(
+            ColumnVector colVec,
+            TypeDescription colType,
+            SeaTunnelDataType<?> dataType,
+            int rowNum) {
         Object structObj = null;
         if (!colVec.isNull[rowNum]) {
             StructColumnVector structVector = (StructColumnVector) colVec;
@@ -433,8 +540,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
             Object[] fieldValues = new Object[fieldVec.length];
             List<TypeDescription> fieldTypes = colType.getChildren();
             for (int i = 0; i < fieldVec.length; i++) {
-                Object fieldObj = readColumn(fieldVec[i], fieldTypes.get(i), 
rowNum);
-                fieldValues[i] = fieldObj;
+                if (dataType instanceof SeaTunnelRowType) {
+                    SeaTunnelDataType<?> fieldType = ((SeaTunnelRowType) 
dataType).getFieldType(i);
+                    fieldValues[i] = readColumn(fieldVec[i], 
fieldTypes.get(i), fieldType, rowNum);
+                } else {
+                    fieldValues[i] = readColumn(fieldVec[i], 
fieldTypes.get(i), null, rowNum);
+                }
             }
             structObj = new SeaTunnelRow(fieldValues);
         }
@@ -521,7 +632,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
             TypeDescription fieldType = unionFieldTypes.get(tagVal);
             if (tagVal < unionVector.fields.length) {
                 ColumnVector fieldVector = unionVector.fields[tagVal];
-                Object unionValue = readColumn(fieldVector, fieldType, rowNum);
+                Object unionValue = readColumn(fieldVector, fieldType, null, 
rowNum);
                 columnValuePair = Pair.of(fieldType, unionValue);
             } else {
                 throw new FileConnectorException(
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index a269594e1f..d3a210f56e 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -38,6 +38,11 @@ public interface ReadStrategy extends Serializable, 
Closeable {
 
     SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws 
FileConnectorException;
 
+    default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
+            String path, SeaTunnelRowType rowType) throws 
FileConnectorException {
+        return getSeaTunnelRowTypeInfo(path);
+    }
+
     // todo: use CatalogTable
     void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
 
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index d06dc9f890..46213dde0f 100644
--- 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -98,6 +98,9 @@ public class LocalFileIT extends TestSuiteBase {
                         
"/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
                         container);
 
+                ContainerUtil.copyFileIntoContainers(
+                        "/orc/orc_for_cast.orc", 
"/seatunnel/read/orc_cast/e2e.orc", container);
+
                 ContainerUtil.copyFileIntoContainers(
                         "/parquet/e2e.parquet",
                         
"/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
@@ -150,6 +153,8 @@ public class LocalFileIT extends TestSuiteBase {
         helper.execute("/orc/local_file_orc_to_assert.conf");
         // test read local orc file with projection
         helper.execute("/orc/local_file_orc_projection_to_assert.conf");
+        // test read local orc file with projection and type cast
+        
helper.execute("/orc/local_file_orc_to_assert_with_time_and_cast.conf");
         // test write local parquet file
         helper.execute("/parquet/fake_to_local_file_parquet.conf");
         // test read local parquet file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_time_and_cast.conf
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_time_and_cast.conf
new file mode 100644
index 0000000000..476e21d923
--- /dev/null
+++ 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_time_and_cast.conf
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+  parallelism = 1
+  job.mode = "BATCH"
+
+  # You can set spark configuration here
+  spark.app.name = "SeaTunnel"
+  spark.executor.instances = 2
+  spark.executor.cores = 1
+  spark.executor.memory = "1g"
+  spark.master = local
+}
+
+source {
+  LocalFile {
+    path = "/seatunnel/read/orc_cast"
+    file_format_type = "orc"
+    result_table_name = "fake"
+    schema = {
+      fields {
+        c_map = "map<string, string>"
+        c_array = "array<int>"
+        c_string = string
+        c_boolean = boolean
+        c_tinyint = tinyint
+        // change smallint to bigint
+        c_smallint = bigint
+        // change int to bigint
+        c_int = bigint
+        c_bigint = bigint
+        // change float value to string
+        c_float = string
+        c_double = double
+        c_bytes = bytes
+        c_date = date
+        c_decimal = "decimal(38, 18)"
+        // change timestamp value to time
+        c_timestamp = time
+        c_row = {
+          c_map = "map<string, string>"
+          c_array = "array<int>"
+          c_string = string
+          c_boolean = boolean
+          c_tinyint = tinyint
+          c_smallint = smallint
+          // change int value to string in c_row
+          c_int = string
+          c_bigint = bigint
+          c_float = float
+          c_double = double
+          c_bytes = bytes
+          c_date = date
+          c_decimal = "decimal(38, 18)"
+          c_timestamp = timestamp
+        }
+      }
+    }
+  }
+}
+
+sink {
+  Assert {
+    rules {
+      row_rules = [
+        {
+          rule_type = MAX_ROW
+          rule_value = 5
+        }
+      ],
+      field_rules = [
+        {
+          field_name = c_string
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_float
+          field_type = string
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_boolean
+          field_type = boolean
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        },
+        {
+          field_name = c_timestamp
+          field_type = time
+          field_value = [
+            {
+              rule_type = NOT_NULL
+            }
+          ]
+        }
+      ]
+    }
+  }
+}
\ No newline at end of file
diff --git 
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/orc_for_cast.orc
 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/orc_for_cast.orc
new file mode 100644
index 0000000000..9ccbdc02cb
Binary files /dev/null and 
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/orc_for_cast.orc
 differ

Reply via email to