This is an automated email from the ASF dual-hosted git repository. lzljs3620320 pushed a commit to branch release-1.1 in repository https://gitbox.apache.org/repos/asf/paimon.git
commit d20512c1440a4568df126f7ffd63921a55d5d331 Author: YeJunHao <[email protected]> AuthorDate: Tue May 13 14:58:54 2025 +0800 [parquet] Support the non-standard list and map parquet message type (#5595) --- .../format/parquet/ParquetReaderFactory.java | 28 ++++++++++++++++++---- .../format/parquet/ParquetSchemaConverter.java | 23 +++++++++--------- 2 files changed, 35 insertions(+), 16 deletions(-) diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java index 15db2d113a..73da2ccb44 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetReaderFactory.java @@ -45,6 +45,7 @@ import org.apache.paimon.types.MapType; import org.apache.paimon.types.RowType; import org.apache.paimon.utils.Pair; import org.apache.paimon.utils.Pool; +import org.apache.paimon.utils.Preconditions; import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.column.ColumnDescriptor; @@ -57,6 +58,7 @@ import org.apache.parquet.io.MessageColumnIO; import org.apache.parquet.schema.ConversionPatterns; import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; +import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.Type; import org.apache.parquet.schema.Types; import org.slf4j.Logger; @@ -71,7 +73,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; -import static org.apache.paimon.format.parquet.ParquetSchemaConverter.MAP_REPEATED_NAME; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.PAIMON_SCHEMA; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetListElementType; import static org.apache.paimon.format.parquet.ParquetSchemaConverter.parquetMapKeyValueType; @@ -233,21 +234,38 @@ public class ParquetReaderFactory implements FormatReaderFactory { case MAP: MapType mapType = (MapType) readType; GroupType mapGroup = (GroupType) parquetType; + int mapSubFields = mapGroup.getFieldCount(); + Preconditions.checkArgument( + mapSubFields == 1, + "Parquet map group type should only have one middle level REPEATED field."); Pair<Type, Type> keyValueType = parquetMapKeyValueType(mapGroup); return ConversionPatterns.mapType( mapGroup.getRepetition(), mapGroup.getName(), - MAP_REPEATED_NAME, + mapGroup.getType(0).getName(), clipParquetType(mapType.getKeyType(), keyValueType.getLeft()), clipParquetType(mapType.getValueType(), keyValueType.getRight())); case ARRAY: ArrayType arrayType = (ArrayType) readType; GroupType arrayGroup = (GroupType) parquetType; - return ConversionPatterns.listOfElements( + int listSubFields = arrayGroup.getFieldCount(); + Preconditions.checkArgument( + listSubFields == 1, + "Parquet list group type should only have one middle level REPEATED field."); + Type elementType = + clipParquetType( + arrayType.getElementType(), parquetListElementType(arrayGroup)); + // In case that the name in middle level is not "list". + Type groupMiddle = + new GroupType( + Type.Repetition.REPEATED, + arrayGroup.getType(0).getName(), + elementType); + return new GroupType( arrayGroup.getRepetition(), arrayGroup.getName(), - clipParquetType( - arrayType.getElementType(), parquetListElementType(arrayGroup))); + OriginalType.LIST, + groupMiddle); default: return parquetType; } diff --git a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java index cb9415d50a..6b02ed6f7d 100644 --- a/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java +++ b/paimon-format/src/main/java/org/apache/paimon/format/parquet/ParquetSchemaConverter.java @@ -51,20 +51,19 @@ import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; /** Schema converter converts Parquet schema to and from Paimon internal types. */ public class ParquetSchemaConverter { - static final String PAIMON_SCHEMA = "paimon_schema"; + public static final String PAIMON_SCHEMA = "paimon_schema"; - static final String MAP_REPEATED_NAME = "key_value"; - static final String MAP_KEY_NAME = "key"; - static final String MAP_VALUE_NAME = "value"; - static final String LIST_REPEATED_NAME = "list"; - static final String LIST_ELEMENT_NAME = "element"; + public static final String MAP_REPEATED_NAME = "key_value"; + public static final String MAP_KEY_NAME = "key"; + public static final String MAP_VALUE_NAME = "value"; + public static final String LIST_ELEMENT_NAME = "element"; /** Convert paimon {@link RowType} to parquet {@link MessageType}. */ public static MessageType convertToParquetMessageType(RowType rowType) { return new MessageType(PAIMON_SCHEMA, convertToParquetTypes(rowType)); } - private static Type[] convertToParquetTypes(RowType rowType) { + public static Type[] convertToParquetTypes(RowType rowType) { return rowType.getFields().stream() .map(ParquetSchemaConverter::convertToParquetType) .toArray(Type[]::new); @@ -75,7 +74,7 @@ public class ParquetSchemaConverter { return convertToParquetType(field.name(), field.type(), field.id(), 0); } - private static Type convertToParquetType(String name, DataType type, int fieldId, int depth) { + public static Type convertToParquetType(String name, DataType type, int fieldId, int depth) { Type.Repetition repetition = type.isNullable() ? Type.Repetition.OPTIONAL : Type.Repetition.REQUIRED; switch (type.getTypeRoot()) { @@ -234,7 +233,7 @@ public class ParquetSchemaConverter { } } - private static Type createTimestampWithLogicalType( + public static Type createTimestampWithLogicalType( String name, int precision, Type.Repetition repetition, boolean isAdjustToUTC) { if (precision <= 3) { return Types.primitive(INT64, repetition) @@ -402,11 +401,13 @@ public class ParquetSchemaConverter { } public static Type parquetListElementType(GroupType listType) { - return listType.getType(LIST_REPEATED_NAME).asGroupType().getType(LIST_ELEMENT_NAME); + // List type should only have one middle group type, which is repeated, and one element + // type, which is optional. + return listType.getType(0).asGroupType().getType(0); } public static Pair<Type, Type> parquetMapKeyValueType(GroupType mapType) { - GroupType keyValue = mapType.getType(MAP_REPEATED_NAME).asGroupType(); + GroupType keyValue = mapType.getType(0).asGroupType(); return Pair.of(keyValue.getType(MAP_KEY_NAME), keyValue.getType(MAP_VALUE_NAME)); } }
