[
https://issues.apache.org/jira/browse/BEAM-11721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17276550#comment-17276550
]
Tao Li commented on BEAM-11721:
-------------------------------
[~iemejia] sure I would like to contribute to this work. We can sync up on some
details later.
Looks like I just ran into another blocker issue with ParquetIO. As I have
mentioned, I was able to use below avro schema to read spark created parquet
files, which contains only 1 field of array type.
{noformat}
{
"type": "record",
"name": "spark_schema",
"fields": [
{
"name": "numbers",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "list",
"fields": [
{
"name": "element",
"type": [
"null",
"int"
],
"default": null
}
]
}
}
],
"default": null
}
]
}
{noformat}
{code}
pipeline.apply(ParquetIO.read(avroSchema).from(inputPath));
{code}
Then I simply use the same avro schema to write parquet files immediately:
{code}
output.apply(FileIO.<GenericRecord>write()
.withNumShards(10)
.via(ParquetIO.sink(outputAvroSchema))
.to(config.getOutputPath())
.withSuffix(".parquet"));
{code}
Then I used parquet tools to inspect the created parquet files:
{noformat}
$ parquet-tools meta ~/Downloads/output-00008-of-00010.parquet
file: file:/Users/taol/Downloads/output-00008-of-00010.parquet
creator: parquet-mr version 1.10.0 (build
031a6654009e3b82020012a18434c582bd74c73a)
extra: parquet.avro.schema =
\{"type":"record","name":"spark_schema","fields":[{"name":"numbers","type":["null",{"type":"array","items":{"type":"record","name":"list","fields":[{"name":"element","type":["null","int"],"default":null}]}}],"default":null}]}
extra: writer.model.name = avro
file schema: spark_schema
--------------------------------------------------------------------------------
numbers: OPTIONAL F:1
.array: REPEATED F:1
..element: OPTIONAL INT32 R:1 D:3
row group 1: RC:1 TS:66 OFFSET:4
{noformat}
Then I was using ParquetIO to read these files and see below error:
{noformat}
Exception in thread "main"
org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException: optional int32 element is not a groupException in
thread "main" org.apache.beam.sdk.Pipeline$PipelineExecutionException:
java.lang.ClassCastException: optional int32 element is not a group at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
at
org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
at org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216) at
org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67) at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:317) at
org.apache.beam.sdk.Pipeline.run(Pipeline.java:303) at
com.zillow.pipeler.utils.ExecutionContext.runPipeline(ExecutionContext.java:25)
at
com.zillow.pipeler.orchestrator.BaseOrchestrator.run(BaseOrchestrator.java:65)
at
com.zillow.pipeler.orchestrator.transform.DatasetFlattenerOrchestrator.main(DatasetFlattenerOrchestrator.java:83)Caused
by: java.lang.ClassCastException: optional int32 element is not a group at
shaded.org.apache.parquet.schema.Type.asGroupType(Type.java:207) at
shaded.org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:279)
at
shaded.org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:232)
at
shaded.org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:78)
at
shaded.org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:536)
at
shaded.org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:486)
at
shaded.org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:289)
at
shaded.org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:141)
at
shaded.org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:95)
at
shaded.org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
at
shaded.org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:138)
at
shaded.org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:183)
at
shaded.org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:156)
at shaded.org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:135)
at
org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:645)
{noformat}
> Cannot read array values with ParquetIO
> ---------------------------------------
>
> Key: BEAM-11721
> URL: https://issues.apache.org/jira/browse/BEAM-11721
> Project: Beam
> Issue Type: Improvement
> Components: io-java-parquet
> Affects Versions: 2.25.0
> Reporter: Tao Li
> Priority: P1
> Attachments: from-spark.snappy.parquet, schema.avsc
>
>
> Hi Beam community,
> I am seeing an error when reading an array field using ParquetIO. I was
> using beam 2.25. Both direct runner and spark runner testing is seeing this
> issue. This is a blocker issue to me for the beam adoption, so a prompt help
> would be appreciated.
> Below is the schema tree as a quick visualization. The array field name is
> "numbers" and the element type is int.
>
> root |
> -- numbers: array (nullable = true) | |
> -- element: integer (containsNull = true)
>
> The beam code is very simple:
> pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));
>
> Below is the error when running that code:
>
> {noformat}
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to java.lang.Number
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
> Caused by: java.lang.ClassCastException:
> org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> at
> org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
> at
> org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
> at
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
> at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
> at
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)