[
https://issues.apache.org/jira/browse/BEAM-11527?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17289608#comment-17289608
]
Tao Li edited comment on BEAM-11527 at 2/24/21, 3:25 AM:
---------------------------------------------------------
Hi I am using this new feature to solve a problem when using ParquetIO to read
spark created parquet. The problem is that spark created parquet automatically
adds “list” and “element” in the schema for array fields, which lead to “list”
and “element” fields in the beam schema of this PCollection:
{noformat}
{"type":"record","name":"spark_schema","fields":[{"name":"first_field","type":"int"},{"name":"numbers_1","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}},{"name":"numbers_2","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","namespace":"list2","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}}]}
{noformat}
Here is a row data:
{"first_field": 1, "numbers_1": [{"{color:#DE350B}element{color}": 1},
{"{color:#DE350B}element{color}": 2}], "numbers_2":
[{"{color:#DE350B}element{color}": 1}, {"{color:#DE350B}element{color}": 2}]}
It’s really annoying and sometimes really hard to handle these “list” and
“element” fields in my beam app. So I have been trying to figure out a way to
avoid these “list” and “element” fields when reading these spark created
parquet files. I found a way with AvroParquetReader. Setting
“parquet.avro.add-list-element-records" to false solves my problem. Please see
below code. The output avro schema does not have “list” and “element” fields.
{noformat}
Configuration conf = new Configuration();
conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
InputFile inputFile = HadoopInputFile.fromPath(new Path(filePath, conf);
ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(inputFile)
.withConf(conf)
.build();
GenericRecord nextRecord = reader.read();
System.out.println(nextRecord.getSchema());
{noformat}
Then I tried to specify this setting through withConfiguration() for ParquetIO.
In below code, “ADD_LIST_ELEMENT_RECORDS” is set to false to avoid “list” and
“element” fields (hopefully). “inputAvroSchema” is set to a normal avro schema
that does NOT contain “list” and “element” fields. The input is spark created
parquet files that contain a array field.
{noformat}
avroConfig.put(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false");
PCollection<Row> inputData =
pipeline().apply(ParquetIO.read(inputAvroSchema).from(path).withConfiguration(avroConfig))
{noformat}
However I see this error:
{noformat}
java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record
cannot be cast to java.lang.Number
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
{noformat}
This error indicates that the data being written with GenericDatumWriter still
contains the “list” field, which is a record type. Seems like the setting
“ADD_LIST_ELEMENT_RECORDS” is not taking effect. Am I using the
withConfiguration() correctly?
was (Author: sekiforever):
Hi I am using this new feature to solve a problem when using ParquetIO to read
spark created parquet. The problem is that spark created parquet automatically
adds “list” and “element” in the schema for array fields, which lead to “list”
and “element” fields in the beam schema of this PCollection:
{noformat}
{"type":"record","name":"spark_schema","fields":[{"name":"first_field","type":"int"},{"name":"numbers_1","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}},{"name":"numbers_2","type":{"type":"array","items":{"type":"record","name":"{color:#DE350B}list{color}","namespace":"list2","fields":[{"name":"{color:#DE350B}element{color}","type":"int"}]}}}]}
{noformat}
Here is a row data:
{"first_field": 1, "numbers_1": [{"{color:#DE350B}element{color}": 1},
{"{color:#DE350B}element{color}": 2}], "numbers_2":
[{"{color:#DE350B}element{color}": 1}, {"{color:#DE350B}element{color}": 2}]}
It’s really annoying and sometimes really hard to handle these “list” and
“element” fields in my beam app. So I have been trying to figure out a way to
avoid these “list” and “element” fields when reading these spark created
parquet files. I found a way with AvroParquetReader. Setting
“parquet.avro.add-list-element-records" to false solves my problem. Please see
below code. The output avro schema does not have “list” and “element” fields.
{noformat}
Configuration conf = new Configuration();
conf.setBoolean(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, false);
InputFile inputFile = HadoopInputFile.fromPath(new Path(filePath, conf);
ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(inputFile)
.withConf(conf)
.build();
GenericRecord nextRecord = reader.read();
System.out.println(nextRecord.getSchema());
{noformat}
Then I tried to specify this setting through withConfiguration() for ParquetIO.
In below code, “ADD_LIST_ELEMENT_RECORDS” is set to false to avoid “list” and
“element” fields (hopefully). “inputAvroSchema” is set to a normal avro schema
that does NOT contain “list” and “element” fields. The input is spark created
parquet files that contain a array field.
{noformat}
avroConfig.put(AvroSchemaConverter.ADD_LIST_ELEMENT_RECORDS, "false");
PCollection<Row> inputData =
pipeline().apply(ParquetIO.read(inputAvroSchema).from(path).withConfiguration(avroConfig))
{noformat}
However I see this error:
{noformat}
java.lang.ClassCastException: shaded.org.apache.avro.generic.GenericData$Record
cannot be cast to java.lang.Number
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:192)
at
shaded.org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:120)
at
shaded.org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:75)
{noformat}
This error indicates that the data being written with GenericDatumWriter still
contains the “list” field, which is a record type. Seems like the setting
“ADD_LIST_ELEMENT_RECORDS” is not taking effect. Am I using the avro config
correctly?
> Support user configurable Hadoop Configuration flags for ParquetIO
> ------------------------------------------------------------------
>
> Key: BEAM-11527
> URL: https://issues.apache.org/jira/browse/BEAM-11527
> Project: Beam
> Issue Type: Improvement
> Components: io-java-parquet
> Reporter: Anant Damle
> Assignee: Anant Damle
> Priority: P2
> Labels: parquet
> Fix For: 2.28.0
>
> Time Spent: 1.5h
> Remaining Estimate: 0h
>
> Provide an user configurable interface to provide/input Hadoop configuration
> for ParquetIO.
> Current behaviour supports only
> \{code}AvroReadSupport.AVRO_COMPATIBILITY\{code} flag.
>
> There are now more options supported for reader as introduced through
> [PARQUET-1928|https://issues.apache.org/jira/browse/PARQUET-1928] and
> [PR/831|https://github.com/apache/parquet-mr/pull/831]
--
This message was sent by Atlassian Jira
(v8.3.4#803005)