This is an automated email from the ASF dual-hosted git repository.

corgy pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new 2bdaeb6a07 [Fix][Connector-File] Fix parquet support user config 
schema (#9596)
2bdaeb6a07 is described below

commit 2bdaeb6a07e3b59b9afd894cde701d85ae447c2a
Author: JeremyXin <[email protected]>
AuthorDate: Wed Jul 23 10:05:48 2025 +0800

    [Fix][Connector-File] Fix parquet support user config schema (#9596)
---
 .../apache/seatunnel/api/table/type/TypeUtil.java  |   5 +-
 .../file/hdfs/source/BaseHdfsFileSource.java       |   8 +-
 .../file/source/reader/ParquetReadStrategy.java    | 131 +++++++++++++----
 .../file/writer/ParquetReadStrategyTest.java       | 159 ++++++++++++++++++++-
 .../resources/test_user_config_read_parquet.conf   |  29 ++++
 5 files changed, 301 insertions(+), 31 deletions(-)

diff --git 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
index b8df6d80e5..84fbfd0670 100644
--- 
a/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
+++ 
b/seatunnel-api/src/main/java/org/apache/seatunnel/api/table/type/TypeUtil.java
@@ -37,7 +37,10 @@ public class TypeUtil {
             return to.getSqlType() == SqlType.BIGINT;
         }
         if (from.getSqlType() == SqlType.FLOAT) {
-            return to.getSqlType() == SqlType.DOUBLE;
+            return to.getSqlType() == SqlType.DOUBLE || to.getSqlType() == 
SqlType.DECIMAL;
+        }
+        if (from.getSqlType() == SqlType.DOUBLE) {
+            return to.getSqlType() == SqlType.DECIMAL;
         }
         return false;
     }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
index 29cd0ec39c..7e93e39cec 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base-hadoop/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/hdfs/source/BaseHdfsFileSource.java
@@ -107,19 +107,23 @@ public abstract class BaseHdfsFileSource extends 
BaseFileSource {
                                 .toUpperCase());
         // only json text csv type support user-defined schema now
         if (pluginConfig.hasPath(ConnectorCommonOptions.SCHEMA.key())) {
+            CatalogTable userDefinedCatalogTable = 
CatalogTableUtil.buildWithConfig(pluginConfig);
             switch (fileFormat) {
                 case CSV:
                 case TEXT:
                 case JSON:
                 case EXCEL:
                 case XML:
-                    CatalogTable userDefinedCatalogTable =
-                            CatalogTableUtil.buildWithConfig(pluginConfig);
                     readStrategy.setCatalogTable(userDefinedCatalogTable);
                     rowType = readStrategy.getActualSeaTunnelRowTypeInfo();
                     break;
                 case ORC:
                 case PARQUET:
+                    rowType =
+                            
readStrategy.getSeaTunnelRowTypeInfoWithUserConfigRowType(
+                                    filePaths.get(0),
+                                    
userDefinedCatalogTable.getSeaTunnelRowType());
+                    break;
                 case BINARY:
                     throw new FileConnectorException(
                             CommonErrorCodeDeprecated.UNSUPPORTED_OPERATION,
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 37d03b8ab7..2c2e134f4c 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -58,6 +58,8 @@ import org.apache.parquet.schema.Type;
 import lombok.extern.slf4j.Slf4j;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.RoundingMode;
 import java.nio.ByteBuffer;
 import java.sql.Timestamp;
 import java.time.Instant;
@@ -71,6 +73,8 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.stream.IntStream;
 
+import static org.apache.seatunnel.api.table.type.TypeUtil.canConvert;
+
 @Slf4j
 public class ParquetReadStrategy extends AbstractReadStrategy {
     private static final byte[] PARQUET_MAGIC =
@@ -185,14 +189,31 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                                         resolveObject(value, valueType)));
                 return dataMap;
             case BOOLEAN:
+                return Boolean.parseBoolean(field.toString());
             case INT:
+                return Integer.parseInt(field.toString());
             case BIGINT:
+                return Long.parseLong(field.toString());
             case FLOAT:
+                return Float.parseFloat(field.toString());
             case DOUBLE:
+                return Double.parseDouble(field.toString());
             case DECIMAL:
+                if (field instanceof Float || field instanceof Double) {
+                    DecimalType decimalType = (DecimalType) fieldType;
+                    return new BigDecimal(field.toString())
+                            .setScale(decimalType.getScale(), 
RoundingMode.HALF_UP);
+                }
+                return field;
             case DATE:
                 return field;
             case STRING:
+                if (field instanceof ByteBuffer) {
+                    ByteBuffer buffer = (ByteBuffer) field;
+                    byte[] bytes = new byte[buffer.remaining()];
+                    buffer.get(bytes, 0, bytes.length);
+                    return new String(bytes);
+                }
                 return field.toString();
             case TINYINT:
                 return Byte.parseByte(field.toString());
@@ -238,12 +259,18 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(String path) throws 
FileConnectorException {
-        return getSeaTunnelRowTypeInfo(TablePath.DEFAULT, path);
+        return getSeaTunnelRowTypeInfoWithUserConfigRowType(path, null);
     }
 
     @Override
     public SeaTunnelRowType getSeaTunnelRowTypeInfo(TablePath tablePath, 
String path)
             throws FileConnectorException {
+        return getSeaTunnelRowTypeInfoWithUserConfigRowType(path, null);
+    }
+
+    @Override
+    public SeaTunnelRowType getSeaTunnelRowTypeInfoWithUserConfigRowType(
+            String path, SeaTunnelRowType configRowType) throws 
FileConnectorException {
         ParquetMetadata metadata;
         try (ParquetFileReader reader =
                 hadoopFileSystemProxy.doWithHadoopAuth(
@@ -259,6 +286,7 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
             throw new FileConnectorException(
                     CommonErrorCodeDeprecated.READER_OPERATION_FAILED, 
errorMsg, e);
         }
+
         FileMetaData fileMetaData = metadata.getFileMetaData();
         MessageType originalSchema = fileMetaData.getSchema();
         if (readColumns.isEmpty()) {
@@ -270,62 +298,66 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
         SeaTunnelDataType<?>[] types = new 
SeaTunnelDataType[readColumns.size()];
         indexes = new int[readColumns.size()];
         buildColumnsWithErrorCheck(
-                tablePath,
+                TablePath.DEFAULT,
                 IntStream.range(0, readColumns.size()).iterator(),
                 i -> {
                     fields[i] = readColumns.get(i);
                     Type type = originalSchema.getType(fields[i]);
                     int fieldIndex = originalSchema.getFieldIndex(fields[i]);
                     indexes[i] = fieldIndex;
-                    types[i] = parquetType2SeaTunnelType(type, fields[i]);
+                    SeaTunnelDataType<?> configDataType =
+                            getConfigFieldType(configRowType, fields[i]);
+                    types[i] = parquetType2SeaTunnelType(type, configDataType, 
fields[i]);
                 });
+
         seaTunnelRowType = new SeaTunnelRowType(fields, types);
         seaTunnelRowTypeWithPartition = mergePartitionTypes(path, 
seaTunnelRowType);
         return getActualSeaTunnelRowTypeInfo();
     }
 
-    private SeaTunnelDataType<?> parquetType2SeaTunnelType(Type type, String 
name) {
+    private SeaTunnelDataType<?> parquetType2SeaTunnelType(
+            Type type, SeaTunnelDataType<?> configType, String name) {
         if (type.isPrimitive()) {
             switch (type.asPrimitiveType().getPrimitiveTypeName()) {
                 case INT32:
                     OriginalType originalType = 
type.asPrimitiveType().getOriginalType();
                     if (originalType == null) {
-                        return BasicType.INT_TYPE;
+                        return getFinalType(BasicType.INT_TYPE, configType);
                     }
                     switch (type.asPrimitiveType().getOriginalType()) {
                         case INT_8:
-                            return BasicType.BYTE_TYPE;
+                            return getFinalType(BasicType.BYTE_TYPE, 
configType);
                         case INT_16:
-                            return BasicType.SHORT_TYPE;
+                            return getFinalType(BasicType.SHORT_TYPE, 
configType);
                         case INT_32:
-                            return BasicType.INT_TYPE;
+                            return getFinalType(BasicType.INT_TYPE, 
configType);
                         case DATE:
-                            return LocalTimeType.LOCAL_DATE_TYPE;
+                            return getFinalType(LocalTimeType.LOCAL_DATE_TYPE, 
configType);
                         default:
                             throw CommonError.convertToSeaTunnelTypeError(
                                     PARQUET, type.toString(), name);
                     }
                 case INT64:
                     if (type.asPrimitiveType().getOriginalType() == 
OriginalType.TIMESTAMP_MILLIS) {
-                        return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+                        return 
getFinalType(LocalTimeType.LOCAL_DATE_TIME_TYPE, configType);
                     }
-                    return BasicType.LONG_TYPE;
+                    return getFinalType(BasicType.LONG_TYPE, configType);
                 case INT96:
-                    return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+                    return getFinalType(LocalTimeType.LOCAL_DATE_TIME_TYPE, 
configType);
                 case BINARY:
                     if (type.asPrimitiveType().getOriginalType() == null) {
-                        return PrimitiveByteArrayType.INSTANCE;
+                        return getFinalType(PrimitiveByteArrayType.INSTANCE, 
configType);
                     }
-                    return BasicType.STRING_TYPE;
+                    return getFinalType(BasicType.STRING_TYPE, configType);
                 case FLOAT:
-                    return BasicType.FLOAT_TYPE;
+                    return getFinalType(BasicType.FLOAT_TYPE, configType);
                 case DOUBLE:
-                    return BasicType.DOUBLE_TYPE;
+                    return getFinalType(BasicType.DOUBLE_TYPE, configType);
                 case BOOLEAN:
-                    return BasicType.BOOLEAN_TYPE;
+                    return getFinalType(BasicType.BOOLEAN_TYPE, configType);
                 case FIXED_LEN_BYTE_ARRAY:
                     if (type.getLogicalTypeAnnotation() == null) {
-                        return LocalTimeType.LOCAL_DATE_TIME_TYPE;
+                        return 
getFinalType(LocalTimeType.LOCAL_DATE_TIME_TYPE, configType);
                     }
                     String typeInfo =
                             type.getLogicalTypeAnnotation()
@@ -336,7 +368,8 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                     String[] splits = typeInfo.split(",");
                     int precision = Integer.parseInt(splits[0]);
                     int scale = Integer.parseInt(splits[1]);
-                    return new DecimalType(precision, scale);
+                    DecimalType decimalType = new DecimalType(precision, 
scale);
+                    return getFinalType(decimalType, configType);
                 default:
                     throw CommonError.convertToSeaTunnelTypeError("Parquet", 
type.toString(), name);
             }
@@ -350,8 +383,15 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                 SeaTunnelDataType<?>[] seaTunnelDataTypes = new 
SeaTunnelDataType<?>[fields.size()];
                 for (int i = 0; i < fields.size(); i++) {
                     Type fieldType = fields.get(i);
+                    SeaTunnelDataType<?> configDataType = null;
+                    if (configType instanceof SeaTunnelRowType) {
+                        SeaTunnelRowType configRowType = (SeaTunnelRowType) 
configType;
+                        if (configRowType.getFieldTypes().length > i) {
+                            configDataType = configRowType.getFieldType(i);
+                        }
+                    }
                     SeaTunnelDataType<?> seaTunnelDataType =
-                            parquetType2SeaTunnelType(fields.get(i), name);
+                            parquetType2SeaTunnelType(fields.get(i), 
configDataType, name);
                     fieldNames[i] = fieldType.getName();
                     seaTunnelDataTypes[i] = seaTunnelDataType;
                 }
@@ -360,11 +400,24 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                 switch (logicalTypeAnnotation.toOriginalType()) {
                     case MAP:
                         GroupType groupType = 
type.asGroupType().getType(0).asGroupType();
-                        SeaTunnelDataType<?> keyType =
-                                
parquetType2SeaTunnelType(groupType.getType(0), name);
-                        SeaTunnelDataType<?> valueType =
-                                
parquetType2SeaTunnelType(groupType.getType(1), name);
-                        return new MapType<>(keyType, valueType);
+                        if (configType instanceof MapType) {
+                            SeaTunnelDataType<?> keyDataType =
+                                    ((MapType<?, ?>) configType).getKeyType();
+                            SeaTunnelDataType<?> valueDataType =
+                                    ((MapType<?, ?>) 
configType).getValueType();
+                            keyDataType =
+                                    parquetType2SeaTunnelType(
+                                            groupType.getType(0), keyDataType, 
name);
+                            valueDataType =
+                                    parquetType2SeaTunnelType(
+                                            groupType.getType(1), 
valueDataType, name);
+
+                            return new MapType<>(keyDataType, valueDataType);
+                        } else {
+                            return new MapType<>(
+                                    
parquetType2SeaTunnelType(groupType.getType(0), null, name),
+                                    
parquetType2SeaTunnelType(groupType.getType(1), null, name));
+                        }
                     case LIST:
                         Type elementType;
                         try {
@@ -373,7 +426,13 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                             elementType = type.asGroupType().getType(0);
                         }
                         SeaTunnelDataType<?> fieldType =
-                                parquetType2SeaTunnelType(elementType, name);
+                                parquetType2SeaTunnelType(elementType, null, 
name);
+                        if (configType instanceof ArrayType) {
+                            SeaTunnelDataType<?> seaTunnelDataType =
+                                    ((ArrayType) configType).getElementType();
+                            fieldType =
+                                    parquetType2SeaTunnelType(elementType, 
seaTunnelDataType, name);
+                        }
                         switch (fieldType.getSqlType()) {
                             case STRING:
                                 return ArrayType.STRING_ARRAY_TYPE;
@@ -420,4 +479,24 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
             throw new 
FileConnectorException(FileConnectorErrorCode.FILE_TYPE_INVALID, errorMsg);
         }
     }
+
+    private SeaTunnelDataType<?> getFinalType(
+            SeaTunnelDataType<?> fileType, SeaTunnelDataType<?> configType) {
+        if (configType == null) {
+            return fileType;
+        }
+        return canConvert(fileType, configType) ? configType : fileType;
+    }
+
+    private SeaTunnelDataType<?> getConfigFieldType(
+            SeaTunnelRowType configRowType, String fieldName) {
+
+        if (configRowType == null) {
+            return null;
+        }
+
+        int fieldIndex = 
Arrays.asList(configRowType.getFieldNames()).indexOf(fieldName);
+
+        return fieldIndex == -1 ? null : 
configRowType.getFieldType(fieldIndex);
+    }
 }
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 1050c4a2d9..e66393e263 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -21,6 +21,8 @@ import org.apache.seatunnel.shade.com.typesafe.config.Config;
 import org.apache.seatunnel.shade.com.typesafe.config.ConfigFactory;
 
 import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.catalog.CatalogTable;
+import org.apache.seatunnel.api.table.catalog.CatalogTableUtil;
 import org.apache.seatunnel.api.table.type.ArrayType;
 import org.apache.seatunnel.api.table.type.SeaTunnelRow;
 import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
@@ -28,9 +30,11 @@ import 
org.apache.seatunnel.common.exception.SeaTunnelRuntimeException;
 import org.apache.seatunnel.connectors.seatunnel.file.config.HadoopConf;
 import 
org.apache.seatunnel.connectors.seatunnel.file.source.reader.ParquetReadStrategy;
 
+import org.apache.avro.Conversions;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericArray;
 import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericFixed;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
@@ -57,11 +61,17 @@ import lombok.extern.slf4j.Slf4j;
 
 import java.io.File;
 import java.io.IOException;
+import java.math.BigDecimal;
 import java.net.URL;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.nio.file.Paths;
+import java.time.LocalDate;
 import java.time.LocalDateTime;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.TimeZone;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
@@ -249,6 +259,71 @@ public class ParquetReadStrategyTest {
         parquetReadStrategy.read(NativeParquetWriter.DATA_FILE_PATH, "", 
testCollector);
     }
 
+    @DisabledOnOs(OS.WINDOWS)
+    @Test
+    public void testParquetWithUserConfigRowType() throws Exception {
+        AutoGenerateParquetData.generateTestData();
+        String path = AutoGenerateParquetData.DATA_FILE_PATH;
+
+        URL conf = 
ParquetReadStrategyTest.class.getResource("/test_user_config_read_parquet.conf");
+        Assertions.assertNotNull(conf);
+        String confPath = Paths.get(conf.toURI()).toString();
+        Config pluginConfig = ConfigFactory.parseFile(new File(confPath));
+        CatalogTable catalogTable = 
CatalogTableUtil.buildWithConfig(pluginConfig);
+
+        ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        parquetReadStrategy.init(localConf);
+
+        SeaTunnelRowType configRowType = catalogTable.getSeaTunnelRowType();
+        parquetReadStrategy.getSeaTunnelRowTypeInfoWithUserConfigRowType(path, 
configRowType);
+
+        TestCollector testCollector = new TestCollector();
+        parquetReadStrategy.read(path, "default", testCollector);
+        List<SeaTunnelRow> rows = testCollector.getRows();
+        SeaTunnelRow row = rows.get(0);
+
+        // Verify whether the data type and type conversion are correct
+        // id convert to String
+        Assertions.assertEquals(String.class, row.getField(0).getClass());
+        Assertions.assertEquals(String.class, row.getField(1).getClass());
+        // salary convert to Double
+        Assertions.assertEquals(Double.class, row.getField(2).getClass());
+        Assertions.assertTrue(row.getField(3) instanceof String[]);
+        // age convert to Long
+        Assertions.assertEquals(Long.class, row.getField(4).getClass());
+        Assertions.assertEquals(Boolean.class, row.getField(5).getClass());
+        // score convert to Decimal
+        Assertions.assertEquals(BigDecimal.class, row.getField(6).getClass());
+        Assertions.assertEquals(BigDecimal.class, row.getField(7).getClass());
+        Assertions.assertEquals(LocalDate.class, row.getField(8).getClass());
+        Assertions.assertEquals(LocalDateTime.class, 
row.getField(9).getClass());
+        Assertions.assertEquals(HashMap.class, row.getField(10).getClass());
+        Assertions.assertEquals(byte[].class, row.getField(11).getClass());
+        // binary_as_string convert to String
+        Assertions.assertEquals(String.class, row.getField(12).getClass());
+
+        Assertions.assertEquals("1", row.getField(0));
+        Assertions.assertEquals("Alice", row.getField(1));
+        Assertions.assertEquals(50000.0, row.getField(2));
+        String[] skills = (String[]) row.getField(3);
+        Assertions.assertEquals(2, skills.length);
+        Assertions.assertEquals("Java", skills[0]);
+        Assertions.assertEquals("Python", skills[1]);
+        Assertions.assertEquals(30L, row.getField(4));
+        Assertions.assertEquals(true, row.getField(5));
+        Assertions.assertEquals(new BigDecimal("98.50"), row.getField(6));
+        Assertions.assertEquals(new BigDecimal("1198.02"), row.getField(7));
+        Assertions.assertNotNull(row.getField(8));
+        Assertions.assertNotNull(row.getField(9));
+        Assertions.assertTrue(((HashMap<?, ?>) 
row.getField(10)).containsKey("department"));
+        Assertions.assertArrayEquals(
+                "binary data example".getBytes(StandardCharsets.UTF_8), 
(byte[]) row.getField(11));
+        Assertions.assertEquals("binary_as_string", row.getField(12));
+
+        AutoGenerateParquetData.deleteFile();
+    }
+
     public static class TestCollector implements Collector<SeaTunnelRow> {
 
         private final List<SeaTunnelRow> rows = new ArrayList<>();
@@ -294,12 +369,27 @@ public class ParquetReadStrategyTest {
 
         public static void generateTestData() throws IOException {
             deleteFile();
+
+            // create schema, which includes various data types
             String schemaString =
-                    
"{\"type\":\"record\",\"name\":\"User\",\"fields\":[{\"name\":\"id\",\"type\":\"int\"},{\"name\":\"name\",\"type\":\"string\"},{\"name\":\"salary\",\"type\":\"double\"},{\"name\":\"skills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}}]}";
+                    "{\"type\":\"record\",\"name\":\"User\",\"fields\":["
+                            + "{\"name\":\"id\",\"type\":\"int\"},"
+                            + "{\"name\":\"name\",\"type\":\"string\"},"
+                            + "{\"name\":\"salary\",\"type\":\"float\"},"
+                            + 
"{\"name\":\"skills\",\"type\":{\"type\":\"array\",\"items\":\"string\"}},"
+                            + "{\"name\":\"age\",\"type\":\"int\"},"
+                            + "{\"name\":\"active\",\"type\":\"boolean\"},"
+                            + "{\"name\":\"score\",\"type\":\"double\"},"
+                            + 
"{\"name\":\"budget\",\"type\":{\"type\":\"fixed\",\"name\":\"BudgetDecimal\",\"size\":8,\"logicalType\":\"decimal\",\"precision\":8,\"scale\":2}},"
+                            + 
"{\"name\":\"join_date\",\"type\":{\"type\":\"int\",\"logicalType\":\"date\"}},"
+                            + 
"{\"name\":\"created_at\",\"type\":{\"type\":\"long\",\"logicalType\":\"timestamp-millis\"}},"
+                            + 
"{\"name\":\"properties\",\"type\":{\"type\":\"map\",\"values\":\"string\"}},"
+                            + "{\"name\":\"binary_data\",\"type\":\"bytes\"},"
+                            + 
"{\"name\":\"binary_as_string\",\"type\":\"bytes\"}"
+                            + "]}";
             Schema schema = new Schema.Parser().parse(schemaString);
 
             Configuration conf = new Configuration();
-
             Path file = new Path(DATA_FILE_PATH);
 
             ParquetWriter<GenericRecord> writer =
@@ -309,26 +399,91 @@ public class ParquetReadStrategyTest {
                             .withCompressionCodec(CompressionCodecName.SNAPPY)
                             .build();
 
+            // create first record
             GenericRecord record1 = new GenericData.Record(schema);
             record1.put("id", 1);
             record1.put("name", "Alice");
             record1.put("salary", 50000.0);
+            record1.put("age", 30);
+            record1.put("active", true);
+            record1.put("score", 98.5f);
+            record1.put("created_at", System.currentTimeMillis());
+
+            // Date type
+            record1.put("join_date", 20289);
+
+            // Decimal type
+            BigDecimal budget = new BigDecimal("1198.02");
+            Schema.Field budgetField = schema.getField("budget");
+            Schema budgetSchema = budgetField.schema();
+            Conversions.DecimalConversion decimalConversion = new 
Conversions.DecimalConversion();
+            GenericFixed budgetFixed =
+                    decimalConversion.toFixed(budget, budgetSchema, 
budgetSchema.getLogicalType());
+            record1.put("budget", budgetFixed);
+
+            // Array type
             GenericArray<Utf8> skills1 =
                     new GenericData.Array<>(2, 
schema.getField("skills").schema());
             skills1.add(new Utf8("Java"));
             skills1.add(new Utf8("Python"));
             record1.put("skills", skills1);
+
+            // Map type
+            Map<Utf8, Utf8> properties1 = new HashMap<>();
+            properties1.put(new Utf8("department"), new Utf8("Engineering"));
+            properties1.put(new Utf8("location"), new Utf8("Beijing"));
+            record1.put("properties", properties1);
+
+            // Binary type
+            record1.put(
+                    "binary_data",
+                    ByteBuffer.wrap("binary data 
example".getBytes(StandardCharsets.UTF_8)));
+            record1.put(
+                    "binary_as_string",
+                    
ByteBuffer.wrap("binary_as_string".getBytes(StandardCharsets.UTF_8)));
+
             writer.write(record1);
 
+            // create second record
             GenericRecord record2 = new GenericData.Record(schema);
             record2.put("id", 2);
             record2.put("name", "Bob");
             record2.put("salary", 60000.0);
+            record2.put("age", 35);
+            record2.put("active", false);
+            record2.put("score", 89.2f);
+            record2.put("created_at", System.currentTimeMillis() - 86400000);
+
+            // Date type
+            record2.put("join_date", 20288);
+
+            // Decimal type
+            BigDecimal budget2 = new BigDecimal("2394.13");
+            Schema.Field budgetField2 = schema.getField("budget");
+            Schema budgetSchema2 = budgetField2.schema();
+            GenericFixed budgetFixed2 =
+                    decimalConversion.toFixed(
+                            budget2, budgetSchema2, 
budgetSchema2.getLogicalType());
+            record2.put("budget", budgetFixed2);
+
             GenericArray<Utf8> skills2 =
                     new GenericData.Array<>(2, 
schema.getField("skills").schema());
             skills2.add(new Utf8("C++"));
             skills2.add(new Utf8("Go"));
             record2.put("skills", skills2);
+
+            Map<Utf8, Utf8> properties2 = new HashMap<>();
+            properties2.put(new Utf8("department"), new Utf8("Marketing"));
+            properties2.put(new Utf8("location"), new Utf8("Shanghai"));
+            record2.put("properties", properties2);
+
+            record2.put(
+                    "binary_data",
+                    ByteBuffer.wrap("another binary 
example".getBytes(StandardCharsets.UTF_8)));
+            record2.put(
+                    "binary_as_string",
+                    ByteBuffer.wrap("another 
binary_as_string".getBytes(StandardCharsets.UTF_8)));
+
             writer.write(record2);
 
             writer.close();
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_user_config_read_parquet.conf
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_user_config_read_parquet.conf
new file mode 100644
index 0000000000..bf1183662b
--- /dev/null
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/resources/test_user_config_read_parquet.conf
@@ -0,0 +1,29 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+{
+schema {
+  fields {
+      id = "string"
+      salary = "double"
+      age = "long"
+      score = "decimal(10,2)"
+      binary_as_string = "string"
+      properties = "map<string,string>"
+    }
+  }
+}

Reply via email to