This is an automated email from the ASF dual-hosted git repository.

wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git


The following commit(s) were added to refs/heads/dev by this push:
     new e6413c388e [Fix][connector-file-base] fix parquet int32 convert error 
(#9142)
e6413c388e is described below

commit e6413c388e9536130199c23e3fc793524099e08f
Author: JeremyXin <[email protected]>
AuthorDate: Thu Apr 17 22:27:13 2025 +0800

    [Fix][connector-file-base] fix parquet int32 convert error (#9142)
    
    Co-authored-by: JeremyXin <[email protected]>
---
 .../file/source/reader/ParquetReadStrategy.java    |  2 +
 .../file/writer/ParquetReadStrategyTest.java       | 82 ++++++++++++++++++++++
 2 files changed, 84 insertions(+)

diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 1264df9807..37d03b8ab7 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -297,6 +297,8 @@ public class ParquetReadStrategy extends 
AbstractReadStrategy {
                             return BasicType.BYTE_TYPE;
                         case INT_16:
                             return BasicType.SHORT_TYPE;
+                        case INT_32:
+                            return BasicType.INT_TYPE;
                         case DATE:
                             return LocalTimeType.LOCAL_DATE_TYPE;
                         default:
diff --git 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 66af39f18d..1050c4a2d9 100644
--- 
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++ 
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -36,8 +36,17 @@ import org.apache.avro.util.Utf8;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetFileReader;
 import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
 
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -56,6 +65,7 @@ import java.util.List;
 import java.util.TimeZone;
 
 import static 
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
 
 @Slf4j
 public class ParquetReadStrategyTest {
@@ -211,6 +221,34 @@ public class ParquetReadStrategyTest {
         AutoGenerateParquetData.deleteFile();
     }
 
+    @DisabledOnOs(OS.WINDOWS)
+    @Test
+    public void testParquetTypeInt32WithLogicalTypeAnnotation() throws 
IOException {
+
+        NativeParquetWriter.generateTestData();
+
+        try (ParquetFileReader reader =
+                ParquetFileReader.open(
+                        HadoopInputFile.fromPath(
+                                new Path(NativeParquetWriter.DATA_FILE_PATH),
+                                new Configuration()))) {
+
+            MessageType schema = reader.getFileMetaData().getSchema();
+            LogicalTypeAnnotation type = 
schema.getType("id").getLogicalTypeAnnotation();
+            Assertions.assertTrue(type instanceof 
LogicalTypeAnnotation.IntLogicalTypeAnnotation);
+        }
+
+        ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+        LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+        parquetReadStrategy.init(localConf);
+        SeaTunnelRowType seaTunnelRowTypeInfo =
+                
parquetReadStrategy.getSeaTunnelRowTypeInfo(NativeParquetWriter.DATA_FILE_PATH);
+        Assertions.assertNotNull(seaTunnelRowTypeInfo);
+        
Assertions.assertEquals(seaTunnelRowTypeInfo.getFieldType(0).getTypeClass(), 
Integer.class);
+        TestCollector testCollector = new TestCollector();
+        parquetReadStrategy.read(NativeParquetWriter.DATA_FILE_PATH, "", 
testCollector);
+    }
+
     public static class TestCollector implements Collector<SeaTunnelRow> {
 
         private final List<SeaTunnelRow> rows = new ArrayList<>();
@@ -344,4 +382,48 @@ public class ParquetReadStrategyTest {
             }
         }
     }
+
+    /** Write data based on the Parquet native api */
+    public static class NativeParquetWriter {
+
+        public static final String DATA_FILE_PATH = "/tmp/data_native.parquet";
+
+        // 1. Define Parquet Native Schema (MessageType)
+        public static MessageType createSchema() {
+            return Types.buildMessage()
+                    .required(INT32)
+                    .as(LogicalTypeAnnotation.intType(32, true))
+                    .named("id")
+                    .named("User");
+        }
+
+        // 2. write data
+        public static void generateTestData() throws IOException {
+            deleteFile();
+            MessageType schema = createSchema();
+            Configuration conf = new Configuration();
+
+            GroupWriteSupport.setSchema(schema, conf);
+
+            Path file = new Path(DATA_FILE_PATH);
+            try (ParquetWriter<Group> writer =
+                    ExampleParquetWriter.builder(file)
+                            .withConf(conf)
+                            .withCompressionCodec(CompressionCodecName.SNAPPY)
+                            .build()) {
+
+                Group record1 = new SimpleGroup(schema);
+                record1.add("id", 1);
+
+                writer.write(record1);
+            }
+        }
+
+        private static void deleteFile() {
+            File parquetFile = new File(DATA_FILE_PATH);
+            if (parquetFile.exists()) {
+                parquetFile.delete();
+            }
+        }
+    }
 }

Reply via email to