[
https://issues.apache.org/jira/browse/FLINK-29579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17615641#comment-17615641
]
Tiansu Yu commented on FLINK-29579:
-----------------------------------
The source code that reproduce the issue:
{code:java}
StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
String schemaString =
"message spark_schema {\n"
+ " optional binary field_a (STRING);\n"
+ " optional binary field_b (STRING);\n"
+ " optional int96 field_c;\n"
+ "}";
MessageType schema = MessageTypeParser.parseMessageType(schemaString);
ParquetInputFormat format = new ParquetRowInputFormat(new
Path(<some-local-parquet-file.parquet>), schema);
var stream = env.createInput(format);
stream.print();
env.execute(); {code}
For privacy reasons, I cannot share the source data itself.
> Flink parquet reader cannot read fully optional elements in a repeated list
> ---------------------------------------------------------------------------
>
> Key: FLINK-29579
> URL: https://issues.apache.org/jira/browse/FLINK-29579
> Project: Flink
> Issue Type: Bug
> Components: Formats (JSON, Avro, Parquet, ORC, SequenceFile)
> Affects Versions: 1.13.2
> Reporter: Tiansu Yu
> Priority: Blocker
> Labels: SchemaValidation, parquet, parquetReader
>
> While trying to read a parquet file containing the following field as part of
> the schema,
> {code:java}
> optional group attribute_values (LIST) {
> repeated group list {
> optional group element {
> optional binary attribute_key_id (STRING);
> optional binary attribute_value_id (STRING);
> optional int32 pos;
> }
> }
> } {code}
> I encountered the following problem
> {code:java}
> Exception in thread "main" java.lang.UnsupportedOperationException: List
> field [optional binary attribute_key_id (STRING)] in List [attribute_values]
> has to be required.
> at
> org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertGroupElementToArrayTypeInfo(ParquetSchemaConverter.java:338)
> at
> org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertParquetTypeToTypeInfo(ParquetSchemaConverter.java:271)
> at
> org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.convertFields(ParquetSchemaConverter.java:81)
> at
> org.apache.flink.formats.parquet.utils.ParquetSchemaConverter.fromParquetType(ParquetSchemaConverter.java:61)
> at
> org.apache.flink.formats.parquet.ParquetInputFormat.<init>(ParquetInputFormat.java:120)
> at
> org.apache.flink.formats.parquet.ParquetRowInputFormat.<init>(ParquetRowInputFormat.java:39)
> {code}
> The main code that raises the problem goes as follows:
> {code:java}
> private static ObjectArrayTypeInfo convertGroupElementToArrayTypeInfo(
> GroupType arrayFieldType, GroupType elementType) {
> for (Type type : elementType.getFields()) {
> if (!type.isRepetition(Type.Repetition.REQUIRED)) {
> throw new UnsupportedOperationException(
> String.format(
> "List field [%s] in List [%s] has to be
> required. ",
> type.toString(), arrayFieldType.getName()));
> }
> }
> return
> ObjectArrayTypeInfo.getInfoFor(convertParquetTypeToTypeInfo(elementType));
> } {code}
> I am not very familiar with internals of Parquet schema. But the problem
> looks like to me is that Flink is too restrictive on repetition types inside
> certain nested fields. Would love to hear some feedbacks on this
> (improvements, corrections / workarounds).
--
This message was sent by Atlassian Jira
(v8.20.10#820010)