This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 9e362c85d9 [parquet] Support parquet two level list representations 
(#5602)
9e362c85d9 is described below

commit 9e362c85d92ef60b1e59f64ec8da2e1f7783d1fd
Author: YeJunHao <[email protected]>
AuthorDate: Wed May 14 15:26:46 2025 +0800

    [parquet] Support parquet two level list representations (#5602)
---
 .../format/parquet/ParquetReaderFactory.java       |  38 ++++--
 .../format/parquet/ParquetSchemaConverter.java     |  21 +++-
 .../reader/FileTypeNotMatchReadTypeTest.java       |  40 +++++++
 .../parquet/reader/SimpleGroupWriteSupport.java    | 130 +++++++++++++++++++++
 4 files changed, 212 insertions(+), 17 deletions(-)

diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
index 65577fba30..6af5e0c01a 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java
@@ -196,20 +196,34 @@ public class ParquetReaderFactory implements 
FormatReaderFactory {
                 Preconditions.checkArgument(
                         listSubFields == 1,
                         "Parquet list group type should only have one middle 
level REPEATED field.");
+                // There are two representations for array type in parquet.
+                // See link:
+                // 
https://impala.apache.org/docs/build/html/topics/impala_parquet_array_resolution.html.
+                int level = arrayGroup.getType(0) instanceof GroupType ? 3 : 2;
                 Type elementType =
                         clipParquetType(
-                                arrayType.getElementType(), 
parquetListElementType(arrayGroup));
-                // In case that the name in middle level is not "list".
-                Type groupMiddle =
-                        new GroupType(
-                                Type.Repetition.REPEATED,
-                                arrayGroup.getType(0).getName(),
-                                elementType);
-                return new GroupType(
-                        arrayGroup.getRepetition(),
-                        arrayGroup.getName(),
-                        OriginalType.LIST,
-                        groupMiddle);
+                                arrayType.getElementType(),
+                                parquetListElementType(arrayGroup, level));
+
+                if (level == 3) {
+                    // In case that the name in middle level is not "list".
+                    Type groupMiddle =
+                            new GroupType(
+                                    Type.Repetition.REPEATED,
+                                    arrayGroup.getType(0).getName(),
+                                    elementType);
+                    return new GroupType(
+                            arrayGroup.getRepetition(),
+                            arrayGroup.getName(),
+                            OriginalType.LIST,
+                            groupMiddle);
+                } else {
+                    return new GroupType(
+                            arrayGroup.getRepetition(),
+                            arrayGroup.getName(),
+                            OriginalType.LIST,
+                            elementType);
+                }
             default:
                 return parquetType;
         }
diff --git 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
index 6b02ed6f7d..e2fe63618c 100644
--- 
a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
+++ 
b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java
@@ -373,9 +373,11 @@ public class ParquetSchemaConverter {
         } else {
             GroupType groupType = parquetType.asGroupType();
             if (logicalType instanceof 
LogicalTypeAnnotation.ListLogicalTypeAnnotation) {
+                int level = groupType.getType(0) instanceof GroupType ? 3 : 2;
                 paimonDataType =
                         new ArrayType(
-                                
convertToPaimonField(parquetListElementType(groupType)).type());
+                                
convertToPaimonField(parquetListElementType(groupType, level))
+                                        .type());
             } else if (logicalType instanceof 
LogicalTypeAnnotation.MapLogicalTypeAnnotation) {
                 Pair<Type, Type> keyValueType = 
parquetMapKeyValueType(groupType);
                 paimonDataType =
@@ -400,10 +402,19 @@ public class ParquetSchemaConverter {
         return new DataField(parquetType.getId().intValue(), 
parquetType.getName(), paimonDataType);
     }
 
-    public static Type parquetListElementType(GroupType listType) {
-        // List type should only have one middle group type, which is 
repeated, and one element
-        // type, which is optional.
-        return listType.getType(0).asGroupType().getType(0);
+    public static Type parquetListElementType(GroupType listType, int level) {
+        if (level == 3) {
+            // Level 3 representation of list type.
+            // List type should only have one middle group type, which is 
repeated, and one element
+            // type, which is optional.
+            return listType.getType(0).asGroupType().getType(0);
+        } else if (level == 2) {
+            // Level 2 representation of list type
+            return listType.getType(0);
+        } else {
+            throw new UnsupportedOperationException(
+                    "Parquet list type only have two level representation and 
three level representation.");
+        }
     }
 
     public static Pair<Type, Type> parquetMapKeyValueType(GroupType mapType) {
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
index 71bcf7af03..bae1c98a8c 100644
--- 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/FileTypeNotMatchReadTypeTest.java
@@ -32,6 +32,7 @@ import 
org.apache.paimon.format.parquet.ParquetSchemaConverter;
 import org.apache.paimon.format.parquet.writer.ParquetRowDataBuilder;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.reader.FileRecordIterator;
 import org.apache.paimon.reader.FileRecordReader;
 import org.apache.paimon.table.SpecialFields;
 import org.apache.paimon.types.ArrayType;
@@ -53,6 +54,7 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.nio.file.Path;
+import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.Random;
@@ -193,6 +195,44 @@ public class FileTypeNotMatchReadTypeTest {
         file.delete();
     }
 
+    @Test
+    public void testArray2() throws Exception {
+        String fileName = "test.parquet";
+        String fileWholePath = tempDir + "/" + fileName;
+
+        SimpleGroupWriteSupport simpleGroupWriteSupport = new 
SimpleGroupWriteSupport();
+        simpleGroupWriteSupport.writeTest(
+                fileWholePath,
+                Arrays.asList(
+                        new 
SimpleGroupWriteSupport.SimpleGroup(Arrays.asList(1, 21, 242)),
+                        new 
SimpleGroupWriteSupport.SimpleGroup(Arrays.asList(4, 221, 12))));
+
+        RowType rowType =
+                RowType.of(new DataField(0, "list_of_ints", 
DataTypes.ARRAY(DataTypes.INT())));
+
+        ParquetReaderFactory parquetReaderFactory =
+                new ParquetReaderFactory(new Options(), rowType, 100, null);
+
+        File file = new File(fileWholePath);
+        FileRecordReader<InternalRow> fileRecordReader =
+                parquetReaderFactory.createReader(
+                        new FormatReaderContext(
+                                LocalFileIO.create(),
+                                new 
org.apache.paimon.fs.Path(tempDir.toString(), fileName),
+                                file.length()));
+
+        FileRecordIterator<InternalRow> batch = fileRecordReader.readBatch();
+        InternalRow row = batch.next();
+        assertThat(row.getArray(0).getInt(0)).isEqualTo(1);
+        assertThat(row.getArray(0).getInt(1)).isEqualTo(21);
+        assertThat(row.getArray(0).getInt(2)).isEqualTo(242);
+        row = batch.next();
+        assertThat(row.getArray(0).getInt(0)).isEqualTo(4);
+        assertThat(row.getArray(0).getInt(1)).isEqualTo(221);
+        assertThat(row.getArray(0).getInt(2)).isEqualTo(12);
+        file.delete();
+    }
+
     @Test
     public void testMap() throws Exception {
         String fileName = "test.parquet";
diff --git 
a/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/SimpleGroupWriteSupport.java
 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/SimpleGroupWriteSupport.java
new file mode 100644
index 0000000000..6b995392ac
--- /dev/null
+++ 
b/paimon-format/src/test/java/org/apache/paimon/format/parquet/reader/SimpleGroupWriteSupport.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.format.parquet.reader;
+
+import org.apache.paimon.format.parquet.writer.StreamOutputFile;
+import org.apache.paimon.fs.local.LocalFileIO;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.api.RecordConsumer;
+import org.apache.parquet.schema.GroupType;
+import org.apache.parquet.schema.MessageTypeParser;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+
+/** For writing two level representations for parquet list type. */
+class SimpleGroupWriteSupport extends 
WriteSupport<SimpleGroupWriteSupport.SimpleGroup> {
+
+    private org.apache.parquet.schema.MessageType schema;
+    private RecordConsumer recordConsumer;
+
+    static class SimpleGroup {
+        List<Integer> numbers;
+
+        public SimpleGroup(List<Integer> numbers) {
+            this.numbers = numbers;
+        }
+    }
+
+    @Override
+    public WriteContext init(Configuration configuration) {
+        String schemaString =
+                "message Record { required group list_of_ints (LIST) { 
repeated int32 list_of_ints_tuple; } }";
+        this.schema = MessageTypeParser.parseMessageType(schemaString);
+        return new WriteContext(schema, Collections.emptyMap());
+    }
+
+    @Override
+    public void prepareForWrite(RecordConsumer recordConsumer) {
+        this.recordConsumer = recordConsumer;
+    }
+
+    @Override
+    public void write(SimpleGroup record) {
+        recordConsumer.startMessage();
+
+        int listGroupIndex = schema.getFieldIndex("list_of_ints");
+        String listGroupName = schema.getFieldName(listGroupIndex);
+        GroupType listGroupType = schema.getType(listGroupIndex).asGroupType();
+
+        int elementsFieldIndex = 
listGroupType.getFieldIndex("list_of_ints_tuple");
+        String elementsFieldName = 
listGroupType.getFieldName(elementsFieldIndex);
+        recordConsumer.startField(listGroupName, listGroupIndex);
+
+        if (record != null && record.numbers != null) {
+            recordConsumer.startField(elementsFieldName, elementsFieldIndex);
+            for (Integer number : record.numbers) {
+                recordConsumer.addInteger(number);
+            }
+            recordConsumer.endField(elementsFieldName, elementsFieldIndex);
+        }
+        recordConsumer.endField(listGroupName, listGroupIndex);
+        recordConsumer.endMessage();
+    }
+
+    static class Builder
+            extends ParquetWriter.Builder<SimpleGroupWriteSupport.SimpleGroup, 
Builder> {
+
+        protected Builder(OutputFile path) {
+            super(path);
+        }
+
+        @Override
+        protected Builder self() {
+            return this;
+        }
+
+        @Override
+        protected WriteSupport<SimpleGroupWriteSupport.SimpleGroup> 
getWriteSupport(
+                Configuration conf) {
+            return new SimpleGroupWriteSupport();
+        }
+    }
+
+    public void writeTest(String path, 
List<SimpleGroupWriteSupport.SimpleGroup> records)
+            throws IOException {
+        Configuration conf = new Configuration();
+
+        ParquetWriter<SimpleGroupWriteSupport.SimpleGroup> writer = null;
+        try {
+
+            writer =
+                    new Builder(
+                                    new StreamOutputFile(
+                                            new 
LocalFileIO.LocalPositionOutputStream(
+                                                    new File(path))))
+                            .withConf(conf)
+                            .build();
+            for (SimpleGroupWriteSupport.SimpleGroup record : records) {
+                writer.write(record);
+            }
+
+        } finally {
+            if (writer != null) {
+                writer.close();
+            }
+        }
+    }
+}

Reply via email to