This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new d1599f8ad9 [Improve][Connector-V2] Support read orc with schema config
to cast type (#6531)
d1599f8ad9 is described below
commit d1599f8ad97d9143bce24377bcc61195df232a13
Author: Jia Fan <[email protected]>
AuthorDate: Fri Mar 29 10:39:35 2024 +0800
[Improve][Connector-V2] Support read orc with schema config to cast type
(#6531)
---
.../apache/seatunnel/api/table/type/TypeUtil.java | 44 +++++
.../file/config/BaseFileSourceConfig.java | 8 +-
.../file/source/reader/OrcReadStrategy.java | 185 ++++++++++++++++-----
.../seatunnel/file/source/reader/ReadStrategy.java | 5 +
.../e2e/connector/file/local/LocalFileIT.java | 5 +
...ocal_file_orc_to_assert_with_time_and_cast.conf | 126 ++++++++++++++
.../src/test/resources/orc/orc_for_cast.orc | Bin 0 -> 4775 bytes
7 files changed, 334 insertions(+), 39 deletions(-)
diff --git
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
new file mode 100644
index 0000000000..b8df6d80e5
--- /dev/null
+++
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.api.table.type;
+
+public class TypeUtil {
+
+ /** Check if the data type can be converted to another data type. */
+ public static boolean canConvert(SeaTunnelDataType<?> from,
SeaTunnelDataType<?> to) {
+ // any type can be converted to string
+ if (from == to || to.getSqlType() == SqlType.STRING) {
+ return true;
+ }
+ if (from.getSqlType() == SqlType.TINYINT) {
+ return to.getSqlType() == SqlType.SMALLINT
+ || to.getSqlType() == SqlType.INT
+ || to.getSqlType() == SqlType.BIGINT;
+ }
+ if (from.getSqlType() == SqlType.SMALLINT) {
+ return to.getSqlType() == SqlType.INT || to.getSqlType() ==
SqlType.BIGINT;
+ }
+ if (from.getSqlType() == SqlType.INT) {
+ return to.getSqlType() == SqlType.BIGINT;
+ }
+ if (from.getSqlType() == SqlType.FLOAT) {
+ return to.getSqlType() == SqlType.DOUBLE;
+ }
+ return false;
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
index c08a7a11de..5c16a7e28b 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/config/BaseFileSourceConfig.java
@@ -80,7 +80,8 @@ public abstract class BaseFileSourceConfig implements
Serializable {
private CatalogTable parseCatalogTable(ReadonlyConfig readonlyConfig) {
final CatalogTable catalogTable;
- if (readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent())
{
+ boolean configSchema =
readonlyConfig.getOptional(TableSchemaOptions.SCHEMA).isPresent();
+ if (configSchema) {
catalogTable = CatalogTableUtil.buildWithConfig(getPluginName(),
readonlyConfig);
} else {
catalogTable = CatalogTableUtil.buildSimpleTextTable();
@@ -99,7 +100,10 @@ public abstract class BaseFileSourceConfig implements
Serializable {
case ORC:
case PARQUET:
return newCatalogTable(
- catalogTable,
readStrategy.getSeaTunnelRowTypeInfo(filePaths.get(0)));
+ catalogTable,
+
readStrategy.getSeaTunnelRowTypeInfoWithUserConfigRowType(
+ filePaths.get(0),
+ configSchema ?
catalogTable.getSeaTunnelRowType() : null));
default:
throw new FileConnectorException(
FileConnectorErrorCode.FORMAT_NOT_SUPPORT,
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
index 79158a5423..77b02ab03a 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/OrcReadStrategy.java
@@ -28,6 +28,7 @@ import
org.apache.seatunnel.api.table.type.PrimitiveByteArrayType;
import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
import org.apache.seatunnel.api.table.type.SeaTunnelRow;
import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.api.table.type.SqlType;
import org.apache.seatunnel.common.exception.CommonErrorCodeDeprecated;
import
org.apache.seatunnel.connectors.seatunnel.file.config.BaseSourceConfigOptions;
import
org.apache.seatunnel.connectors.seatunnel.file.exception.FileConnectorErrorCode;
@@ -55,6 +56,8 @@ import
org.apache.orc.storage.ql.exec.vector.VectorizedRowBatch;
import lombok.extern.slf4j.Slf4j;
+import javax.annotation.Nullable;
+
import java.io.IOException;
import java.math.BigDecimal;
import java.nio.ByteBuffer;
@@ -62,12 +65,15 @@ import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.sql.Timestamp;
import java.time.LocalDate;
+import java.time.LocalDateTime;
+import java.time.LocalTime;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.seatunnel.api.table.type.TypeUtil.canConvert;
import static
org.apache.seatunnel.connectors.seatunnel.file.sink.writer.OrcWriteStrategy.buildFieldWithRowType;
@Slf4j
@@ -120,7 +126,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
if (cols[j] == null) {
fields[j] = null;
} else {
- fields[j] = readColumn(cols[j], children.get(j),
num);
+ fields[j] =
+ readColumn(
+ cols[j],
+ children.get(j),
+ seaTunnelRowType.getFieldType(j),
+ num);
}
}
SeaTunnelRow seaTunnelRow = new SeaTunnelRow(fields);
@@ -134,6 +145,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
@Override
public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws
FileConnectorException {
+ return getSeaTunnelRowTypeInfoWithUserConfigRowType(path, null);
+ }
+
+ @Override
+ public SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
+ String path, SeaTunnelRowType configRowType) throws
FileConnectorException {
try (Reader reader =
hadoopFileSystemProxy.doWithHadoopAuth(
((configuration, userGroupInformation) -> {
@@ -158,7 +175,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
"Column [%s] does not exists in table
schema [%s]",
readColumns.get(i), String.join(",",
fieldNames)));
}
- types[i] =
orcDataType2SeaTunnelDataType(schema.getChildren().get(index));
+ types[i] =
+ orcDataType2SeaTunnelDataType(
+ schema.getChildren().get(index),
+ configRowType != null &&
configRowType.getTotalFields() > i
+ ? configRowType.getFieldType(i)
+ : null);
}
seaTunnelRowType = new SeaTunnelRowType(fields, types);
seaTunnelRowTypeWithPartition = mergePartitionTypes(path,
seaTunnelRowType);
@@ -209,39 +231,57 @@ public class OrcReadStrategy extends AbstractReadStrategy
{
}
}
- private SeaTunnelDataType<?> orcDataType2SeaTunnelDataType(TypeDescription
typeDescription) {
+ private SeaTunnelDataType<?> getFinalType(
+ SeaTunnelDataType<?> fileType, SeaTunnelDataType<?> configType) {
+ if (configType == null) {
+ return fileType;
+ }
+ return canConvert(fileType, configType) ? configType : fileType;
+ }
+
+ private SeaTunnelDataType<?> orcDataType2SeaTunnelDataType(
+ TypeDescription typeDescription, SeaTunnelDataType<?> configType) {
switch (typeDescription.getCategory()) {
case BOOLEAN:
- return BasicType.BOOLEAN_TYPE;
+ return getFinalType(BasicType.BOOLEAN_TYPE, configType);
case INT:
- return BasicType.INT_TYPE;
+ return getFinalType(BasicType.INT_TYPE, configType);
case BYTE:
- return BasicType.BYTE_TYPE;
+ return getFinalType(BasicType.BYTE_TYPE, configType);
case SHORT:
- return BasicType.SHORT_TYPE;
+ return getFinalType(BasicType.SHORT_TYPE, configType);
case LONG:
- return BasicType.LONG_TYPE;
+ return getFinalType(BasicType.LONG_TYPE, configType);
case FLOAT:
- return BasicType.FLOAT_TYPE;
+ return getFinalType(BasicType.FLOAT_TYPE, configType);
case DOUBLE:
- return BasicType.DOUBLE_TYPE;
+ return getFinalType(BasicType.DOUBLE_TYPE, configType);
case BINARY:
- return PrimitiveByteArrayType.INSTANCE;
+ return getFinalType(PrimitiveByteArrayType.INSTANCE,
configType);
case STRING:
case VARCHAR:
case CHAR:
- return BasicType.STRING_TYPE;
+ return getFinalType(BasicType.STRING_TYPE, configType);
case DATE:
- return LocalTimeType.LOCAL_DATE_TYPE;
+ return getFinalType(LocalTimeType.LOCAL_DATE_TYPE, configType);
case TIMESTAMP:
- return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+ // Support only return time when the type is timestamps
+ if (configType != null &&
configType.getSqlType().equals(SqlType.TIME)) {
+ return LocalTimeType.LOCAL_TIME_TYPE;
+ }
+ return getFinalType(LocalTimeType.LOCAL_DATE_TIME_TYPE,
configType);
case DECIMAL:
int precision = typeDescription.getPrecision();
int scale = typeDescription.getScale();
- return new DecimalType(precision, scale);
+ return getFinalType(new DecimalType(precision, scale),
configType);
case LIST:
TypeDescription listType =
typeDescription.getChildren().get(0);
- SeaTunnelDataType<?> seaTunnelDataType =
orcDataType2SeaTunnelDataType(listType);
+ SeaTunnelDataType<?> seaTunnelDataType =
+ orcDataType2SeaTunnelDataType(listType, null);
+ if (configType instanceof ArrayType) {
+ SeaTunnelDataType<?> elementType = ((ArrayType)
configType).getElementType();
+ seaTunnelDataType =
orcDataType2SeaTunnelDataType(listType, elementType);
+ }
switch (seaTunnelDataType.getSqlType()) {
case STRING:
return ArrayType.STRING_ARRAY_TYPE;
@@ -270,16 +310,35 @@ public class OrcReadStrategy extends AbstractReadStrategy
{
case MAP:
TypeDescription keyType = typeDescription.getChildren().get(0);
TypeDescription valueType =
typeDescription.getChildren().get(1);
- return new MapType<>(
- orcDataType2SeaTunnelDataType(keyType),
- orcDataType2SeaTunnelDataType(valueType));
+ if (configType instanceof MapType) {
+ SeaTunnelDataType<?> keyDataType = ((MapType<?, ?>)
configType).getKeyType();
+ SeaTunnelDataType<?> valueDataType =
+ ((MapType<?, ?>) configType).getValueType();
+ keyDataType = orcDataType2SeaTunnelDataType(keyType,
keyDataType);
+ valueDataType = orcDataType2SeaTunnelDataType(valueType,
valueDataType);
+ return new MapType<>(keyDataType, valueDataType);
+ } else {
+ return new MapType<>(
+ orcDataType2SeaTunnelDataType(keyType, null),
+ orcDataType2SeaTunnelDataType(valueType, null));
+ }
case STRUCT:
List<TypeDescription> children = typeDescription.getChildren();
String[] fieldNames =
typeDescription.getFieldNames().toArray(TYPE_ARRAY_STRING);
- SeaTunnelDataType<?>[] fieldTypes =
- children.stream()
- .map(this::orcDataType2SeaTunnelDataType)
- .toArray(SeaTunnelDataType<?>[]::new);
+ SeaTunnelDataType<?>[] fieldTypes = new
SeaTunnelDataType[children.size()];
+ if (configType instanceof SeaTunnelRowType) {
+ for (int i = 0; i < children.size(); i++) {
+ fieldTypes[i] =
+ orcDataType2SeaTunnelDataType(
+ children.get(i),
+ ((SeaTunnelRowType)
configType).getFieldType(i));
+ }
+ } else {
+ fieldTypes =
+ children.stream()
+ .map(f -> orcDataType2SeaTunnelDataType(f,
null))
+ .toArray(SeaTunnelDataType<?>[]::new);
+ }
return new SeaTunnelRowType(fieldNames, fieldTypes);
default:
// do nothing
@@ -293,30 +352,37 @@ public class OrcReadStrategy extends AbstractReadStrategy
{
}
}
- private Object readColumn(ColumnVector colVec, TypeDescription colType,
int rowNum) {
+ private Object readColumn(
+ ColumnVector colVec,
+ TypeDescription colType,
+ @Nullable SeaTunnelDataType<?> dataType,
+ int rowNum) {
Object columnObj = null;
if (!colVec.isNull[rowNum]) {
switch (colVec.type) {
case LONG:
- columnObj = readLongVal(colVec, colType, rowNum);
+ columnObj = readLongVal(colVec, colType, dataType, rowNum);
break;
case DOUBLE:
columnObj = ((DoubleColumnVector) colVec).vector[rowNum];
if (colType.getCategory() ==
TypeDescription.Category.FLOAT) {
columnObj = ((Double) columnObj).floatValue();
}
+ if (dataType != null &&
dataType.getSqlType().equals(SqlType.STRING)) {
+ columnObj = columnObj.toString();
+ }
break;
case BYTES:
- columnObj = readBytesVal(colVec, colType, rowNum);
+ columnObj = readBytesVal(colVec, colType, dataType,
rowNum);
break;
case DECIMAL:
- columnObj = readDecimalVal(colVec, rowNum);
+ columnObj = readDecimalVal(colVec, dataType, rowNum);
break;
case TIMESTAMP:
- columnObj = readTimestampVal(colVec, colType, rowNum);
+ columnObj = readTimestampVal(colVec, colType, dataType,
rowNum);
break;
case STRUCT:
- columnObj = readStructVal(colVec, colType, rowNum);
+ columnObj = readStructVal(colVec, colType, dataType,
rowNum);
break;
case LIST:
columnObj = readListVal(colVec, colType, rowNum);
@@ -336,7 +402,11 @@ public class OrcReadStrategy extends AbstractReadStrategy {
return columnObj;
}
- private Object readLongVal(ColumnVector colVec, TypeDescription colType,
int rowNum) {
+ private Object readLongVal(
+ ColumnVector colVec,
+ TypeDescription colType,
+ SeaTunnelDataType<?> dataType,
+ int rowNum) {
Object colObj = null;
if (!colVec.isNull[rowNum]) {
LongColumnVector longVec = (LongColumnVector) colVec;
@@ -353,11 +423,18 @@ public class OrcReadStrategy extends AbstractReadStrategy
{
} else if (colType.getCategory() ==
TypeDescription.Category.SHORT) {
colObj = (short) longVal;
}
+ if (dataType != null &&
dataType.getSqlType().equals(SqlType.STRING)) {
+ colObj = colObj.toString();
+ }
}
return colObj;
}
- private Object readBytesVal(ColumnVector colVec, TypeDescription
typeDescription, int rowNum) {
+ private Object readBytesVal(
+ ColumnVector colVec,
+ TypeDescription typeDescription,
+ SeaTunnelDataType<?> dataType,
+ int rowNum) {
Charset charset = StandardCharsets.UTF_8;
if (pluginConfig != null) {
charset =
@@ -375,6 +452,11 @@ public class OrcReadStrategy extends AbstractReadStrategy {
&& bytesObj != null) {
bytesObj = ((String) bytesObj).getBytes(charset);
}
+ if (dataType != null
+ && dataType.getSqlType().equals(SqlType.STRING)
+ && bytesObj != null) {
+ bytesObj = bytesObj.toString();
+ }
}
return bytesObj;
}
@@ -400,16 +482,25 @@ public class OrcReadStrategy extends AbstractReadStrategy
{
charset);
}
- private Object readDecimalVal(ColumnVector colVec, int rowNum) {
+ private Object readDecimalVal(ColumnVector colVec, SeaTunnelDataType<?>
dataType, int rowNum) {
Object decimalObj = null;
if (!colVec.isNull[rowNum]) {
DecimalColumnVector decimalVec = (DecimalColumnVector) colVec;
decimalObj =
decimalVec.vector[rowNum].getHiveDecimal().bigDecimalValue();
+ if (dataType != null
+ && dataType.getSqlType().equals(SqlType.STRING)
+ && decimalObj != null) {
+ decimalObj = decimalObj.toString();
+ }
}
return decimalObj;
}
- private Object readTimestampVal(ColumnVector colVec, TypeDescription
colType, int rowNum) {
+ private Object readTimestampVal(
+ ColumnVector colVec,
+ TypeDescription colType,
+ SeaTunnelDataType<?> dataType,
+ int rowNum) {
Object timestampVal = null;
if (!colVec.isNull[rowNum]) {
TimestampColumnVector timestampVec = (TimestampColumnVector)
colVec;
@@ -420,12 +511,28 @@ public class OrcReadStrategy extends AbstractReadStrategy
{
timestampVal = timestamp.toLocalDateTime();
if (colType.getCategory() == TypeDescription.Category.DATE) {
timestampVal = LocalDate.ofEpochDay(timestamp.getTime());
+ } else if (dataType != null && dataType.getSqlType() ==
SqlType.TIME) {
+ timestampVal =
+ LocalTime.of(
+ ((LocalDateTime) timestampVal).getHour(),
+ ((LocalDateTime) timestampVal).getMinute(),
+ ((LocalDateTime) timestampVal).getSecond(),
+ ((LocalDateTime) timestampVal).getNano());
+ }
+ if (dataType != null
+ && dataType.getSqlType().equals(SqlType.STRING)
+ && timestampVal != null) {
+ timestampVal = timestampVal.toString();
}
}
return timestampVal;
}
- private Object readStructVal(ColumnVector colVec, TypeDescription colType,
int rowNum) {
+ private Object readStructVal(
+ ColumnVector colVec,
+ TypeDescription colType,
+ SeaTunnelDataType<?> dataType,
+ int rowNum) {
Object structObj = null;
if (!colVec.isNull[rowNum]) {
StructColumnVector structVector = (StructColumnVector) colVec;
@@ -433,8 +540,12 @@ public class OrcReadStrategy extends AbstractReadStrategy {
Object[] fieldValues = new Object[fieldVec.length];
List<TypeDescription> fieldTypes = colType.getChildren();
for (int i = 0; i < fieldVec.length; i++) {
- Object fieldObj = readColumn(fieldVec[i], fieldTypes.get(i),
rowNum);
- fieldValues[i] = fieldObj;
+ if (dataType instanceof SeaTunnelRowType) {
+ SeaTunnelDataType<?> fieldType = ((SeaTunnelRowType)
dataType).getFieldType(i);
+ fieldValues[i] = readColumn(fieldVec[i],
fieldTypes.get(i), fieldType, rowNum);
+ } else {
+ fieldValues[i] = readColumn(fieldVec[i],
fieldTypes.get(i), null, rowNum);
+ }
}
structObj = new SeaTunnelRow(fieldValues);
}
@@ -521,7 +632,7 @@ public class OrcReadStrategy extends AbstractReadStrategy {
TypeDescription fieldType = unionFieldTypes.get(tagVal);
if (tagVal < unionVector.fields.length) {
ColumnVector fieldVector = unionVector.fields[tagVal];
- Object unionValue = readColumn(fieldVector, fieldType, rowNum);
+ Object unionValue = readColumn(fieldVector, fieldType, null,
rowNum);
columnValuePair = Pair.of(fieldType, unionValue);
} else {
throw new FileConnectorException(
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
index a269594e1f..d3a210f56e 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ReadStrategy.java
@@ -38,6 +38,11 @@ public interface ReadStrategy extends Serializable,
Closeable {
SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws
FileConnectorException;
+ default SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
+ String path, SeaTunnelRowType rowType) throws
FileConnectorException {
+ return getSeaTunnelRowTypeInfo(path);
+ }
+
// todo: use CatalogTable
void setSeaTunnelRowTypeInfo(SeaTunnelRowType seaTunnelRowType);
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
index d06dc9f890..46213dde0f 100644
---
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/java/org/apache/seatunnel/e2e/connector/file/local/LocalFileIT.java
@@ -98,6 +98,9 @@ public class LocalFileIT extends TestSuiteBase {
"/seatunnel/read/orc/name=tyrantlucifer/hobby=coding/e2e.orc",
container);
+ ContainerUtil.copyFileIntoContainers(
+ "/orc/orc_for_cast.orc",
"/seatunnel/read/orc_cast/e2e.orc", container);
+
ContainerUtil.copyFileIntoContainers(
"/parquet/e2e.parquet",
"/seatunnel/read/parquet/name=tyrantlucifer/hobby=coding/e2e.parquet",
@@ -150,6 +153,8 @@ public class LocalFileIT extends TestSuiteBase {
helper.execute("/orc/local_file_orc_to_assert.conf");
// test read local orc file with projection
helper.execute("/orc/local_file_orc_projection_to_assert.conf");
+ // test read local orc file with projection and type cast
+
helper.execute("/orc/local_file_orc_to_assert_with_time_and_cast.conf");
// test write local parquet file
helper.execute("/parquet/fake_to_local_file_parquet.conf");
// test read local parquet file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_time_and_cast.conf
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_time_and_cast.conf
new file mode 100644
index 0000000000..476e21d923
--- /dev/null
+++
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/local_file_orc_to_assert_with_time_and_cast.conf
@@ -0,0 +1,126 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+env {
+ parallelism = 1
+ job.mode = "BATCH"
+
+ # You can set spark configuration here
+ spark.app.name = "SeaTunnel"
+ spark.executor.instances = 2
+ spark.executor.cores = 1
+ spark.executor.memory = "1g"
+ spark.master = local
+}
+
+source {
+ LocalFile {
+ path = "/seatunnel/read/orc_cast"
+ file_format_type = "orc"
+ result_table_name = "fake"
+ schema = {
+ fields {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ // change smallint to bigint
+ c_smallint = bigint
+ // change int to bigint
+ c_int = bigint
+ c_bigint = bigint
+ // change float value to string
+ c_float = string
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ // change timestamp value to time
+ c_timestamp = time
+ c_row = {
+ c_map = "map<string, string>"
+ c_array = "array<int>"
+ c_string = string
+ c_boolean = boolean
+ c_tinyint = tinyint
+ c_smallint = smallint
+ // change int value to string in c_row
+ c_int = string
+ c_bigint = bigint
+ c_float = float
+ c_double = double
+ c_bytes = bytes
+ c_date = date
+ c_decimal = "decimal(38, 18)"
+ c_timestamp = timestamp
+ }
+ }
+ }
+ }
+}
+
+sink {
+ Assert {
+ rules {
+ row_rules = [
+ {
+ rule_type = MAX_ROW
+ rule_value = 5
+ }
+ ],
+ field_rules = [
+ {
+ field_name = c_string
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_float
+ field_type = string
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_boolean
+ field_type = boolean
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ },
+ {
+ field_name = c_timestamp
+ field_type = time
+ field_value = [
+ {
+ rule_type = NOT_NULL
+ }
+ ]
+ }
+ ]
+ }
+ }
+}
\ No newline at end of file
diff --git
a/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/orc_for_cast.orc
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/orc_for_cast.orc
new file mode 100644
index 0000000000..9ccbdc02cb
Binary files /dev/null and
b/seatunnel-e2e/seatunnel-connector-v2-e2e/connector-file-local-e2e/src/test/resources/orc/orc_for_cast.orc
differ