[ 
https://issues.apache.org/jira/browse/BEAM-11527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17289608#comment-17289608
 ] 

Tao Li edited comment on BEAM-11527 at 2/24/21, 3:11 AM:
---------------------------------------------------------

Hi I am using this new feature to solve a problem when using ParquetIO to read 
spark created parquet. The problem is that spark created parquet automatically 
adds “list” and “element” in the schema for array fields, which lead to “list” 
and “element” fields in the beam schema of this PCollection:
{"type":"record","name":"spark_schema","fields":[{"name":"first_field","type":"int"},{"name":"numbers_1","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}},{"name":"numbers_2","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","namespace":"list2","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}}]}

Here is a row data:
{"first_field": 1, "numbers_1": [{"{color:#DE350B}element{color}": 1}, 
{"{color:#DE350B}element{color}": 2}], "numbers_2": 
[{"{color:#DE350B}element{color}": 1}, {"{color:#DE350B}element{color}": 2}]}

It’s really annoying and sometimes really hard to handle these “list” and 
“element” fields in my beam app. So I have been trying to figure out a way to 
avoid these “list” and “element” fields when reading these spark created 
parquet files. I found a way with AvroParquetReader. Setting 
“parquet.avro.add-list-element-records" to false solves my problem. Please see 
below code. The output avro schema does not have “list” and “element” fields. 
 
        Configuration conf = new Configuration();
        conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
        InputFile inputFile = HadoopInputFile.fromPath(new Path(filePath, conf);
        ParquetReader<GenericRecord> reader =  AvroParquetReader
                .<GenericRecord>builder(inputFile)
                .withConf(conf)
                .build();
        GenericRecord nextRecord = reader.read();
        System.out.println(nextRecord.getSchema());
 
After reading parquet IO source code, looks like ADD_LIST_ELEMENT_RECORDS 
setting should take effect in AvroParquetReader first, then we use the input 
Avro schema for encoding purpose. If this understanding is correct, below code 
should just work fine. In below code, “ADD_LIST_ELEMENT_RECORDS” is set to 
false to avoid “list” and “element” fields (hopefully). “inputAvroSchema” is 
set to a normal avro schema that does NOT contain “list” and “element” fields. 
The input is spark created parquet files that contain a array field.
 
        avroConfig.put(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false");
        PCollection<Row> inputData = 
pipeline().apply(ParquetIO.read(inputAvroSchema).from(path).withConfiguration(avroConfig))
 
However I see this error:
java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record 
cannot be cast to java.lang.Number
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
 
This error indicates that the data being written with GenericDatumWriter still 
contains the “list” field, which is a record type. Seems like the setting 
“ADD_LIST_ELEMENT_RECORDS” is not taking effect. Am I using the avro config 
correctly?


was (Author: sekiforever):
Hi I am using this new feature to solve a problem when using ParquetIO to read 
spark created parquet. The problem is that spark created parquet automatically 
adds “list” and “element” in the schema for array fields, which lead to “list” 
and “element” fields in the beam schema of this PCollection from ParquetIO 
reader.

{"type":"record","name":"spark_schema","fields":[{"name":"first_field","type":"int"},{"name":"numbers_1","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}},{"name":"numbers_2","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","namespace":"list2","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}}]}

Here is a row:
{"first_field": 1, "numbers_1": [{"{color:#DE350B}element{color}": 1}, 
{"{color:#DE350B}element{color}": 2}], "numbers_2": 
[{"{color:#DE350B}element{color}": 1}, {"{color:#DE350B}element{color}": 2}]}

It’s really annoying and sometimes really hard to handle these “list” and 
“element” fields in my beam app. So I have been trying to figure out a way to 
avoid these “list” and “element” fields when reading these spark created 
parquet files. I found a way with AvroParquetReader. Setting 
“parquet.avro.add-list-element-records" to false solves my problem. Please see 
below code. The output avro schema does not have “list” and “element” fields. 
 
        Configuration conf = new Configuration();
        conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
        InputFile inputFile = HadoopInputFile.fromPath(new Path(filePath, conf);
        ParquetReader<GenericRecord> reader =  AvroParquetReader
                .<GenericRecord>builder(inputFile)
                .withConf(conf)
                .build();
        GenericRecord nextRecord = reader.read();
        System.out.println(nextRecord.getSchema());
 
After reading parquet IO source code, looks like ADD_LIST_ELEMENT_RECORDS 
setting should take effect in AvroParquetReader first, then we use the input 
Avro schema for encoding purpose. If this understanding is correct, below code 
should just work fine. In below code, “ADD_LIST_ELEMENT_RECORDS” is set to 
false to avoid “list” and “element” fields (hopefully). “inputAvroSchema” is 
set to a normal avro schema that does NOT contain “list” and “element” fields. 
The input is spark created parquet files that contain a array field.
 
        avroConfig.put(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false");
        PCollection<Row> inputData = 
pipeline().apply(ParquetIO.read(inputAvroSchema).from(path).withConfiguration(avroConfig))
 
However I see this error:
java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record 
cannot be cast to java.lang.Number
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
                at 
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
 
This error indicates that the data being written with GenericDatumWriter still 
contains the “list” field, which is a record type. Seems like the setting 
“ADD_LIST_ELEMENT_RECORDS” is not taking effect. Am I using the avro config 
correctly?

> Support user configurable Hadoop Configuration flags for ParquetIO
> ------------------------------------------------------------------
>
>                 Key: BEAM-11527
>                 URL: https://issues.apache.org/jira/browse/BEAM-11527
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-parquet
>            Reporter: Anant Damle
>            Assignee: Anant Damle
>            Priority: P2
>              Labels: parquet
>             Fix For: 2.28.0
>
>          Time Spent: 1.5h
>  Remaining Estimate: 0h
>
> Provide an user configurable interface to provide/input Hadoop configuration 
> for ParquetIO.
> Current behaviour supports only 
> \{code}AvroReadSupport.AVRO_COMPATIBILITY\{code} flag.
>  
> There are now more options supported for reader as introduced through 
> [PARQUET-1928|https://issues.apache.org/jira/browse/PARQUET-1928] and 
> [PR/831|https://github.com/apache/parquet-mr/pull/831]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to