[
https://issues.apache.org/jira/browse/BEAM-11721?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17275494#comment-17275494
]
Tao Li edited comment on BEAM-11721 at 1/30/21, 6:17 AM:
---------------------------------------------------------
I tried to use parquet-tools to inspect the spark created parquet file. Here is
the meta info:
creator: parquet-mr version 1.10.1 (build
815bcfa4a4aacf66d207b3dc692150d16b5740b9)
extra: org.apache.spark.sql.parquet.row.metadata =
{"type":"struct","fields":[{"name":"numbers","type":
{"type":"array","elementType":"integer","containsNull":true}
,"nullable":true,"metadata":{}}]}
file schema: spark_schema
--------------------------------------------------------------------------------
numbers: OPTIONAL F:1
.list: REPEATED F:1
..element: OPTIONAL INT32 R:1 D:3
As you can see the file schema is "spark_schema" and ".list" and ".element" are
used to define the schema.
In comparison, if we use beam ParquetIO to generate parquet files by specifing
avro schema, the meta info for the parquet file is:
creator: parquet-mr version 1.10.0 (build
031a6654009e3b82020012a18434c582bd74c73a)
extra: parquet.avro.schema =
{"type":"record","name":"topLevelRecord","fields":[{"name":"numbers","type":["null",
{"type":"array","items":"int"}
],"doc":""}]}
extra: writer.model.name = avro
file schema: topLevelRecord
--------------------------------------------------------------------------------
numbers: OPTIONAL F:1
.array: REPEATED INT32 R:1 D:2
As you can see, the file schema is "topLevelRecord" and it's using ".array" to
define the schema. I am not an expert on parquet. Just wondering if this
difference could be the cause to the failure when using ParquetIO to read spark
created parquet files.
was (Author: sekiforever):
I tried to use parquet-tools to inspect the parquet file used in my test which
is created with spark code. Here is the meta info:
creator: parquet-mr version 1.10.1 (build
815bcfa4a4aacf66d207b3dc692150d16b5740b9)
extra: org.apache.spark.sql.parquet.row.metadata =
\{"type":"struct","fields":[{"name":"numbers","type":{"type":"array","elementType":"integer","containsNull":true},"nullable":true,"metadata":{}}]}
file schema: spark_schema
--------------------------------------------------------------------------------
numbers: OPTIONAL F:1
.list: REPEATED F:1
..element: OPTIONAL INT32 R:1 D:3
As you can see the file schema is "spark_schema" and ".list" and ".element" are
used to define the schema.
In comparison, if we use beam's ParquetIO to generate parquet files using avro,
the meta info is:
creator: parquet-mr version 1.10.0 (build
031a6654009e3b82020012a18434c582bd74c73a)
extra: parquet.avro.schema =
\{"type":"record","name":"topLevelRecord","fields":[{"name":"numbers","type":["null",{"type":"array","items":"int"}],"doc":""}]}
extra: writer.model.name = avro
file schema: topLevelRecord
--------------------------------------------------------------------------------
numbers: OPTIONAL F:1
.array: REPEATED INT32 R:1 D:2
As you can see, the file schema is "topLevelRecord" and it's using ".array" to
define the schema.
I am not an expert on parquet. Just wondering if this difference could be the
cause to the failure when using ParquetIO to read spark created parquet files.
> Cannot read array values with ParquetIO
> ---------------------------------------
>
> Key: BEAM-11721
> URL: https://issues.apache.org/jira/browse/BEAM-11721
> Project: Beam
> Issue Type: Bug
> Components: io-java-parquet
> Affects Versions: 2.25.0
> Reporter: Tao Li
> Priority: P0
> Attachments: from-spark.snappy.parquet, schema.avsc
>
>
> Hi Beam community,
> I am seeing an error when reading an array field using ParquetIO. I was
> using beam 2.25. Both direct runner and spark runner testing is seeing this
> issue. This is a blocker issue to me for the beam adoption, so a prompt help
> would be appreciated.
> Below is the schema tree as a quick visualization. The array field name is
> "list" and the element type is int.
>
> root |
> -- numbers: array (nullable = true) | |
> -- element: integer (containsNull = true)
>
> The beam code is very simple:
> pipeline.apply(ParquetIO.read(avroSchema).from(parquetPath));
>
> Here is the error when running that code:
>
> {noformat}
> Exception in thread "main"
> org.apache.beam.sdk.Pipeline$PipelineExecutionException:
> java.lang.ClassCastException: org.apache.avro.generic.GenericData$Record
> cannot be cast to java.lang.Number
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:353)
> at
> org.apache.beam.runners.direct.DirectRunner$DirectPipelineResult.waitUntilFinish(DirectRunner.java:321)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:216)
> at
> org.apache.beam.runners.direct.DirectRunner.run(DirectRunner.java:67)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:317)
> at org.apache.beam.sdk.Pipeline.run(Pipeline.java:303)
> Caused by: java.lang.ClassCastException:
> org.apache.avro.generic.GenericData$Record cannot be cast to java.lang.Number
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:156)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.writeArray(GenericDatumWriter.java:234)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:136)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.writeField(GenericDatumWriter.java:206)
> at
> org.apache.avro.generic.GenericDatumWriter.writeRecord(GenericDatumWriter.java:195)
> at
> org.apache.avro.generic.GenericDatumWriter.writeWithoutConversion(GenericDatumWriter.java:130)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:82)
> at
> org.apache.avro.generic.GenericDatumWriter.write(GenericDatumWriter.java:72)
> at
> org.apache.beam.sdk.coders.AvroCoder.encode(AvroCoder.java:317)
> at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:82)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:66)
> at
> org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:51)
> at
> org.apache.beam.sdk.util.CoderUtils.clone(CoderUtils.java:141)
> at
> org.apache.beam.sdk.util.MutationDetectors$CodedValueMutationDetector.<init>(MutationDetectors.java:115)
> at
> org.apache.beam.sdk.util.MutationDetectors.forValueWithCoder(MutationDetectors.java:46)
> at
> org.apache.beam.runners.direct.ImmutabilityCheckingBundleFactory$ImmutabilityEnforcingBundle.add(ImmutabilityCheckingBundleFactory.java:112)
> at
> org.apache.beam.runners.direct.ParDoEvaluator$BundleOutputManager.output(ParDoEvaluator.java:301)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.outputWindowedValue(SimpleDoFnRunner.java:267)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner.access$900(SimpleDoFnRunner.java:79)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:413)
> at
> org.apache.beam.repackaged.direct_java.runners.core.SimpleDoFnRunner$DoFnProcessContext.output(SimpleDoFnRunner.java:401)
> at
> org.apache.beam.sdk.io.parquet.ParquetIO$ReadFiles$ReadFn.processElement(ParquetIO.java:646)
> {noformat}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)