[
https://issues.apache.org/jira/browse/PARQUET-1254?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17362013#comment-17362013
]
Andreas Hailu edited comment on PARQUET-1254 at 6/12/21, 3:06 AM:
------------------------------------------------------------------
Hi, I came across this ticket because I believe we are blocked by something
very similar in AvroIndexedRecordConverter - we're using Apache Flink to read
Parquet parquet files given an Avro schema.
What I see in Bob's example is the _elementWrapper_ field has its name as
_array_ in AvroRecordConverter, resulting in the converted Avro schema being
interpreted as incompatible as there's no way to resolve the 2 schemata. The
resulting check for schema compatibility in
{{AvroRecordConverter#isElementType}} {{checkReaderWriterCompatibility}}
returns false, and an {{ElementConverter}} is created instead of a
{{AvroRecordConverter}}.
What the converted Parquet schema from given Avro schema results in:
{code:java}
message record {
required group elements (LIST) {
repeated group array {
required group array_element {
required int32 someField;
}
}
}
}{code}
What it likely should be to generate a compatible Avro schema for comparison:
{code:java}
message record {
required group elements (LIST) {
repeated group elementWrapper {
required group array_element {
required int32 someField;
}
}
}
}{code}
I am able to read the file as expected if I force the
{{AvroRecordConverter#isElementType}} schema compatibility condition to return
{{true}}. It returns false as it fails the schema compatibility check because
_elementWrapper_ is replaced with _array_.
was (Author: ahailu):
Hi, I came across this ticket because I believe we are blocked by something
very similar in AvroIndexedRecordConverter - we're using Apache Flink to read
Parquet parquet files given an Avro schema.
What I see in Bob's example is the _elementWrapper_ field has its name as
_array_ in AvroRecordConverter, resulting in the converted Avro schema being
interpreted as incompatible as there's no way to resolve the 2 schemata. The
resulting check for schema compatibility in
A\{{vroRecordConverter#isElementType}} {{checkReaderWriterCompatibility}}
returns false, and an {{ElementConverter}} is created instead of a
{{AvroRecordConverter}}.
What the converted Parquet schema from given Avro schema results in:
{code:java}
message record {
required group elements (LIST) {
repeated group array {
required group array_element {
required int32 someField;
}
}
}
}{code}
What it likely should be to generate a compatible Avro schema for comparison:
{code:java}
message record {
required group elements (LIST) {
repeated group elementWrapper {
required group array_element {
required int32 someField;
}
}
}
}{code}
I am able to read the file as expected if I force the
{{AvroRecordConverter#isElementType}} schema compatibility condition to return
{{true}}. It returns false as it fails the schema compatibility check because
_elementWrapper_ is replaced with _array_.
> Unable to read deeply nested records from Parquet file with Avro interface.
> ---------------------------------------------------------------------------
>
> Key: PARQUET-1254
> URL: https://issues.apache.org/jira/browse/PARQUET-1254
> Project: Parquet
> Issue Type: Bug
> Components: parquet-avro, parquet-mr
> Affects Versions: 1.8.1
> Reporter: Bob smith
> Priority: Major
>
> I am attempting to read Parquet data, whose schema contains a record nested
> in a wrapper record, which is also nested in an array. E.g:
> {code:java}
> {
> "type": "record",
> "name": "record",
> "fields": [
> {
> "name": "elements",
> "type": {
> "type": "array",
> "items": {
> "type": "record",
> "name": "elementWrapper",
> "fields": [
> {
> "name": "array_element",
> "type": {
> "type": "record",
> "name": "element",
> "namespace": "test",
> "fields": [
> {
> "name": "someField",
> "type": "int"
> }
> ]
> }
> }
> ]
> }
> }
> }
> ]
> }
> {code}
> When reading a parquet file with the above schema using the
> {{ParquetFileReader}}, I can see the file has the following schema, which
> appears to be correct:
> {code:java}
> message record {
> required group elements (LIST) {
> repeated group array {
> required group array_element {
> required int32 someField;
> }
> }
> }
> }
> {code}
> However, when attempting to read records from this file with the Avro
> interface (see below), I get a {{InvalidRecordException}}.
> {code:java}
> final ParquetReader<GenericRecord> parquetReader =
> AvroParquetReader.<GenericRecord>builder(path).build();
> final GenericRecord read = parquetReader.read();
> {code}
> Stepping through the code, it looks like when the record is converted to
> Avro, the field "someField" isn't in scope. Only fields at the top level of
> the schema are in scope.
> Is it expected that Avro Parquet does not support this schema? Is this a bug
> in the AvroRecordConverter?
> Thanks, Iain
> Stacktrace:
> {code:java}
> org.apache.parquet.io.InvalidRecordException: Parquet/Avro schema mismatch:
> Avro field 'someField' not found
> at
> org.apache.parquet.avro.AvroRecordConverter.getAvroField(AvroRecordConverter.java:220)
> at
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:125)
> at
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:274)
> at
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:227)
> at
> org.apache.parquet.avro.AvroRecordConverter.access$100(AvroRecordConverter.java:73)
> at
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter$ElementConverter.<init>(AvroRecordConverter.java:531)
> at
> org.apache.parquet.avro.AvroRecordConverter$AvroCollectionConverter.<init>(AvroRecordConverter.java:481)
> at
> org.apache.parquet.avro.AvroRecordConverter.newConverter(AvroRecordConverter.java:284)
> at
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:136)
> at
> org.apache.parquet.avro.AvroRecordConverter.<init>(AvroRecordConverter.java:90)
> at
> org.apache.parquet.avro.AvroRecordMaterializer.<init>(AvroRecordMaterializer.java:33)
> at
> org.apache.parquet.avro.AvroReadSupport.prepareForRead(AvroReadSupport.java:132)
> at
> org.apache.parquet.hadoop.InternalParquetRecordReader.initialize(InternalParquetRecordReader.java:175)
> at
> org.apache.parquet.hadoop.ParquetReader.initReader(ParquetReader.java:149)
> at org.apache.parquet.hadoop.ParquetReader.read(ParquetReader.java:125)
> {code}
> Below is the full code that creates a Parquet file with this schema, and then
> fails to read it:
> {code:java}
> @Test
> @SneakyThrows
> public void canReadWithNestedArray() {
> final Path path = new Path("test-resources/" + UUID.randomUUID());
> // Construct a record that defines the final nested value we can't
> read
> final Schema element = Schema.createRecord("element", null, "test",
> false);
> element.setFields(Arrays.asList(new Schema.Field("someField",
> Schema.create(Schema.Type.INT), null, null)));
> // Create a wrapper for above nested record
> final Schema elementWrapper = Schema.createRecord("elementWrapper",
> null, null, false);
> elementWrapper.setFields(Arrays.asList(new
> Schema.Field("array_element", element, null, null)));
> // Create top level field that contains array of wrapped records
> final Schema.Field topLevelArrayOfWrappers = new
> Schema.Field("elements", Schema.createArray(elementWrapper), null, null);
> final Schema topLevelElement = Schema.createRecord("record", null,
> null, false);
> topLevelElement.setFields(Arrays.asList(topLevelArrayOfWrappers));
> final GenericRecord genericRecord = new
> GenericData.Record(topLevelElement);
> // Create element
> final GenericData.Record recordValue = new
> GenericData.Record(element);
> recordValue.put("someField", 5);
> // Create element of array, wrapper containing above element
> final GenericData.Record wrapperValue = new
> GenericData.Record(elementWrapper);
> wrapperValue.put("array_element", recordValue);
> genericRecord.put(topLevelArrayOfWrappers.name(),
> Arrays.asList(wrapperValue));
>
> AvroParquetWriter.Builder<GenericRecord> fileWriterBuilder =
> AvroParquetWriter.<GenericRecord>builder(path).withSchema(topLevelElement);
> final ParquetWriter<GenericRecord> fileWriter =
> fileWriterBuilder.build();
> fileWriter.write(genericRecord);
> fileWriter.close();
> final ParquetFileReader parquetFileReader =
> ParquetFileReader.open(new Configuration(), path);
> final FileMetaData fileMetaData = parquetFileReader.getFileMetaData();
> System.out.println(fileMetaData.getSchema().toString());
> final ParquetReader<GenericRecord> parquetReader =
> AvroParquetReader.<GenericRecord>builder(path).build();
> final GenericRecord read = parquetReader.read();
> }
> {code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)