This is an automated email from the ASF dual-hosted git repository.
wanghailin pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new e6413c388e [Fix][connector-file-base] fix parquet int32 convert error
(#9142)
e6413c388e is described below
commit e6413c388e9536130199c23e3fc793524099e08f
Author: JeremyXin <[email protected]>
AuthorDate: Thu Apr 17 22:27:13 2025 +0800
[Fix][connector-file-base] fix parquet int32 convert error (#9142)
Co-authored-by: JeremyXin <[email protected]>
---
.../file/source/reader/ParquetReadStrategy.java | 2 +
.../file/writer/ParquetReadStrategyTest.java | 82 ++++++++++++++++++++++
2 files changed, 84 insertions(+)
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
index 1264df9807..37d03b8ab7 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/main/java/org/apache/seatunnel/connectors/seatunnel/file/source/reader/ParquetReadStrategy.java
@@ -297,6 +297,8 @@ public class ParquetReadStrategy extends
AbstractReadStrategy {
return BasicType.BYTE_TYPE;
case INT_16:
return BasicType.SHORT_TYPE;
+ case INT_32:
+ return BasicType.INT_TYPE;
case DATE:
return LocalTimeType.LOCAL_DATE_TYPE;
default:
diff --git
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
index 66af39f18d..1050c4a2d9 100644
---
a/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
+++
b/seatunnel-connectors-v2/connector-file/connector-file-base/src/test/java/org/apache/seatunnel/connectors/seatunnel/file/writer/ParquetReadStrategyTest.java
@@ -36,8 +36,17 @@ import org.apache.avro.util.Utf8;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.avro.AvroParquetWriter;
+import org.apache.parquet.example.data.Group;
+import org.apache.parquet.example.data.simple.SimpleGroup;
+import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.example.ExampleParquetWriter;
+import org.apache.parquet.hadoop.example.GroupWriteSupport;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.schema.LogicalTypeAnnotation;
+import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -56,6 +65,7 @@ import java.util.List;
import java.util.TimeZone;
import static
org.apache.hadoop.fs.CommonConfigurationKeysPublic.FS_DEFAULT_NAME_DEFAULT;
+import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT32;
@Slf4j
public class ParquetReadStrategyTest {
@@ -211,6 +221,34 @@ public class ParquetReadStrategyTest {
AutoGenerateParquetData.deleteFile();
}
+ @DisabledOnOs(OS.WINDOWS)
+ @Test
+ public void testParquetTypeInt32WithLogicalTypeAnnotation() throws
IOException {
+
+ NativeParquetWriter.generateTestData();
+
+ try (ParquetFileReader reader =
+ ParquetFileReader.open(
+ HadoopInputFile.fromPath(
+ new Path(NativeParquetWriter.DATA_FILE_PATH),
+ new Configuration()))) {
+
+ MessageType schema = reader.getFileMetaData().getSchema();
+ LogicalTypeAnnotation type =
schema.getType("id").getLogicalTypeAnnotation();
+ Assertions.assertTrue(type instanceof
LogicalTypeAnnotation.IntLogicalTypeAnnotation);
+ }
+
+ ParquetReadStrategy parquetReadStrategy = new ParquetReadStrategy();
+ LocalConf localConf = new LocalConf(FS_DEFAULT_NAME_DEFAULT);
+ parquetReadStrategy.init(localConf);
+ SeaTunnelRowType seaTunnelRowTypeInfo =
+
parquetReadStrategy.getSeaTunnelRowTypeInfo(NativeParquetWriter.DATA_FILE_PATH);
+ Assertions.assertNotNull(seaTunnelRowTypeInfo);
+
Assertions.assertEquals(seaTunnelRowTypeInfo.getFieldType(0).getTypeClass(),
Integer.class);
+ TestCollector testCollector = new TestCollector();
+ parquetReadStrategy.read(NativeParquetWriter.DATA_FILE_PATH, "",
testCollector);
+ }
+
public static class TestCollector implements Collector<SeaTunnelRow> {
private final List<SeaTunnelRow> rows = new ArrayList<>();
@@ -344,4 +382,48 @@ public class ParquetReadStrategyTest {
}
}
}
+
+ /** Write data based on the Parquet native api */
+ public static class NativeParquetWriter {
+
+ public static final String DATA_FILE_PATH = "/tmp/data_native.parquet";
+
+ // 1. Define Parquet Native Schema (MessageType)
+ public static MessageType createSchema() {
+ return Types.buildMessage()
+ .required(INT32)
+ .as(LogicalTypeAnnotation.intType(32, true))
+ .named("id")
+ .named("User");
+ }
+
+ // 2. write data
+ public static void generateTestData() throws IOException {
+ deleteFile();
+ MessageType schema = createSchema();
+ Configuration conf = new Configuration();
+
+ GroupWriteSupport.setSchema(schema, conf);
+
+ Path file = new Path(DATA_FILE_PATH);
+ try (ParquetWriter<Group> writer =
+ ExampleParquetWriter.builder(file)
+ .withConf(conf)
+ .withCompressionCodec(CompressionCodecName.SNAPPY)
+ .build()) {
+
+ Group record1 = new SimpleGroup(schema);
+ record1.add("id", 1);
+
+ writer.write(record1);
+ }
+ }
+
+ private static void deleteFile() {
+ File parquetFile = new File(DATA_FILE_PATH);
+ if (parquetFile.exists()) {
+ parquetFile.delete();
+ }
+ }
+ }
}