[ 
https://issues.apache.org/jira/browse/BEAM-11721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17276550#comment-17276550
 ] 

Tao Li commented on BEAM-11721:
-------------------------------

[~iemejia] sure I would like to contribute to this work. We can sync up on some 
details later.

Looks like I just ran into another blocker issue with ParquetIO. As I have 
mentioned, I was able to use below avro schema to read spark created parquet 
files, which contains only 1 field of array type.

{noformat}

{
 "type": "record",
 "name": "spark_schema",
 "fields": [
 {
 "name": "numbers",
 "type": [
 "null",
 {
 "type": "array",
 "items": {
 "type": "record",
 "name": "list",
 "fields": [
 {
 "name": "element",
 "type": [
 "null",
 "int"
 ],
 "default": null
 }
 ]
 }
 }
 ],
 "default": null
 }
 ]
}

{noformat}

{code}

pipeline.apply(ParquetIO.read(avroSchema).from(inputPath));

{code}

Then I simply use the same avro schema to write parquet files immediately:

{code}

output.apply(FileIO.<GenericRecord>write()
 .withNumShards(10)
 .via(ParquetIO.sink(outputAvroSchema))
 .to(config.getOutputPath())
 .withSuffix(".parquet"));

{code}

 

Then I used parquet tools to inspect the created parquet files:

{noformat}

$ parquet-tools meta ~/Downloads/output-00008-of-00010.parquet
file: file:/Users/taol/Downloads/output-00008-of-00010.parquet
creator: parquet-mr version 1.10.0 (build 
031a6654009e3b82020012a18434c582bd74c73a)
extra: parquet.avro.schema = 
\{"type":"record","name":"spark_schema","fields":[{"name":"numbers","type":["null",{"type":"array","items":{"type":"record","name":"list","fields":[{"name":"element","type":["null","int"],"default":null}]}}],"default":null}]}
extra: writer.model.name = avro

file schema: spark_schema
--------------------------------------------------------------------------------
numbers: OPTIONAL F:1
.array: REPEATED F:1
..element: OPTIONAL INT32 R:1 D:3

row group 1: RC:1 TS:66 OFFSET:4

{noformat}

 

Then I was using ParquetIO to read these files and see below error:

{noformat}

Exception in thread "main" 
org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException: optional int32 element is not a groupException in 
thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
java.lang.ClassCastException: optional int32 element is not a group at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
 at 
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
 at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216) at 
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at 
org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) at 
org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) at 
com.zillow.pipeler.utils.ExecutionContext.runPipeline(ExecutionContext.java:25) 
at 
com.zillow.pipeler.orchestrator.BaseOrchestrator.run(BaseOrchestrator.java:65) 
at 
com.zillow.pipeler.orchestrator.transform.DatasetFlattenerOrchestrator.main(DatasetFlattenerOrchestrator.java:83)Caused
 by: java.lang.ClassCastException: optional int32 element is not a group at 
shaded.org.apache.parquet.schema.Type.asGroupType(Type.java:207) at 
shaded.org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
 at 
shaded.org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
 at 
shaded.org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
 at 
shaded.org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
 at 
shaded.org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
 at 
shaded.org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
 at 
shaded.org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
 at 
shaded.org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
 at 
shaded.org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
 at 
shaded.org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
 at 
shaded.org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
 at 
shaded.org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
 at shaded.org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135) 
at 
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:645)

{noformat}

 

> Cannot read array values with ParquetIO
> ---------------------------------------
>
>                 Key: BEAM-11721
>                 URL: https://issues.apache.org/jira/browse/BEAM-11721
>             Project: Beam
>          Issue Type: Improvement
>          Components: io-java-parquet
>    Affects Versions: 2.25.0
>            Reporter: Tao Li
>            Priority: P1
>         Attachments: from-spark.snappy.parquet, schema.avsc
>
>
> Hi Beam community,
>  I am seeing an error when reading an array field using ParquetIO. I was 
> using beam 2.25.  Both direct runner and spark runner testing is seeing this 
> issue. This is a blocker issue to me for the beam adoption, so a prompt help 
> would be appreciated.
>  Below is the schema tree as a quick visualization. The array field name is 
> "numbers" and the element type is int. 
>  
> root |
>      -- numbers: array (nullable = true) | |
>            -- element: integer (containsNull = true)
>  
> The beam code is very simple: 
> pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));
>  
> Below is the error when running that code:
>  
> {noformat}
> Exception in thread "main" 
> org.apache.beam.sdk.Pipeline$PipelineExecutionException: 
> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record 
> cannot be cast to java.lang.Number
>                 at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
>                 at 
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
>                 at 
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
>                 at 
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
>                 at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
>                 at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
> Caused by: java.lang.ClassCastException: 
> org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
>                 at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
>                 at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>                 at 
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
>                 at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
>                 at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>                 at 
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
>                 at 
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
>                 at 
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
>                 at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
>                 at 
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
>                 at 
> org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
>                 at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
>                 at 
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
>                 at 
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
>                 at 
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
>                 at 
> org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
>                 at 
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
>                 at 
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
>                 at 
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
>                 at 
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
>                 at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
>                 at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
>                 at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
>                 at 
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
>                 at 
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to