[
https://issues.apache.org/jira/browse/BEAM-11721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17275732#comment-17275732
]
Tao Li commented on BEAM-11721:
-------------------------------
Then I used this schema to update schema.avsc and things are working fine. I
think this is a great progress.
I have these 2 questions for the ParquetIO owners:
# It's not quite intuitive to create a avro schema for ParquetIO, which
contains spark defined fields ("list", "element" etc), when ingesting spark
created parquet files. Is it possible to support the standard avro definition
for an array (like "type":"array","elementType":"integer","containsNull":true)?
Can beam help do the schema translation under the hood?
2. Taking a step back, why does ParquetIO require an avro schema
specification, while AvroParquetReader does not require the schema? I briefly
looked at the ParquetIO source code but has not figured it out yet.
Thanks so much!
> Cannot read array values with ParquetIO
> ---------------------------------------
>
> Key: BEAM-11721
> URL: https://issues.apache.org/jira/browse/BEAM-11721
> Project: Beam
> Issue Type: Bug
> Components: io-java-parquet
> Affects Versions: 2.25.0
> Reporter: Tao Li
> Priority: P0
> Attachments: from-spark.snappy.parquet, schema.avsc
>
>
> Hi Beam community,
> I am seeing an error when reading an array field using ParquetIO. I was
> using beam 2.25. Both direct runner and spark runner testing is seeing this
> issue. This is a blocker issue to me for the beam adoption, so a prompt help
> would be appreciated.
> Below is the schema tree as a quick visualization. The array field name is
> "numbers" and the element type is int.
>
> root |
> -- numbers: array (nullable = true) | |
> -- element: integer (containsNull = true)
>
> The beam code is very simple:
> pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));
>
> Below is the error when running that code:
>
> {noformat}
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to java.lang.Number
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
> Caused by: java.lang.ClassCastException:
> org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> at
> org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
> at
> org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
> at
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
> at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
> at
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)