This is an automated email from the ASF dual-hosted git repository.
gangwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/parquet-mr.git
The following commit(s) were added to refs/heads/master by this push:
new 30fa1de0b PARQUET-2450: Fix Avro projection for single-field repeated
record types (#1300)
30fa1de0b is described below
commit 30fa1de0b43037ce451063ea75570663fc9c663c
Author: Claire McGinty <[email protected]>
AuthorDate: Mon Mar 18 21:57:51 2024 -0400
PARQUET-2450: Fix Avro projection for single-field repeated record types
(#1300)
---
.../apache/parquet/avro/AvroRecordConverter.java | 1 +
.../parquet/avro/TestArrayCompatibility.java | 130 ++++++++++++++++++++-
.../apache/parquet/avro/TestSpecificReadWrite.java | 39 +++++++
3 files changed, 168 insertions(+), 2 deletions(-)
diff --git
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
index 87325a0af..62d1f89fd 100644
---
a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
+++
b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java
@@ -930,6 +930,7 @@ class AvroRecordConverter<T> extends
AvroConverters.AvroGroupConverter {
static boolean isElementType(Type repeatedType, Schema elementSchema) {
if (repeatedType.isPrimitive()
|| repeatedType.asGroupType().getFieldCount() > 1
+ || repeatedType.getName().equals("array")
|| repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) {
// The repeated type must be the element type because it is an invalid
// synthetic wrapper. Must be a group with one optional or required field
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
index b4b543340..fd4cf2011 100644
---
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
+++
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java
@@ -37,7 +37,9 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.DirectWriterTest;
import org.apache.parquet.hadoop.ParquetFileReader;
+import org.apache.parquet.io.InvalidRecordException;
import org.apache.parquet.schema.MessageType;
+import org.apache.parquet.schema.MessageTypeParser;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
@@ -603,7 +605,7 @@ public class TestArrayCompatibility extends
DirectWriterTest {
public void testAvroCompatOptionalGroupInListWithSchema() throws Exception {
Path test = writeDirect(
"message AvroCompatOptionalGroupInListWithSchema {" + " optional
group locations (LIST) {"
- + " repeated group array {"
+ + " repeated group my_list {"
+ " optional group element {"
+ " required double latitude;"
+ " required double longitude;"
@@ -616,7 +618,7 @@ public class TestArrayCompatibility extends
DirectWriterTest {
rc.startField("locations", 0);
rc.startGroup();
- rc.startField("array", 0); // start writing array contents
+ rc.startField("my_list", 0); // start writing array contents
// write a non-null element
rc.startGroup(); // array level
@@ -1103,6 +1105,130 @@ public class TestArrayCompatibility extends
DirectWriterTest {
assertReaderContains(newBehaviorReader(test, newDoubleSchema),
newDoubleSchema, newDoubleRecord);
}
+ @Test
+ public void testIsElementTypeRequiredRepeatedRecord() {
+ // Test `_tuple` style naming
+ MessageType parquetSchema = MessageTypeParser.parseMessageType("message
SchemaWithList {\n"
+ + " required group list_field (LIST) {\n"
+ + " repeated group list_field_tuple (LIST) {\n"
+ + " repeated int32 int_field;\n"
+ + " }\n"
+ + " }\n"
+ + "}");
+ Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+
+ Assert.assertTrue(AvroRecordConverter.isElementType(
+
parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
+ avroSchema.getFields().get(0).schema()));
+
+ // Test `array` style naming
+ parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList
{\n"
+ + " required group list_field (LIST) {\n"
+ + " repeated group array {\n"
+ + " required int32 a;\n"
+ + " }\n"
+ + " }\n"
+ + "}");
+ avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+
+ Assert.assertTrue(AvroRecordConverter.isElementType(
+ parquetSchema.getType("list_field"),
+ avroSchema.getFields().get(0).schema()));
+ }
+
+ @Test
+ public void testIsElementTypeOptionalRepeatedRecord() {
+ // Test `_tuple` style naming
+ MessageType parquetSchema = MessageTypeParser.parseMessageType("message
SchemaWithList {\n"
+ + " optional group list_field (LIST) {\n"
+ + " repeated group list_field_tuple (LIST) {\n"
+ + " repeated int32 int_field;\n"
+ + " }\n"
+ + " }\n"
+ + "}");
+ Schema avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+
+ Assert.assertTrue(AvroRecordConverter.isElementType(
+
parquetSchema.getType("list_field").asGroupType().getType("list_field_tuple"),
+ avroSchema.getFields().get(0).schema()));
+
+ // Test `array` style naming
+ parquetSchema = MessageTypeParser.parseMessageType("message SchemaWithList
{\n"
+ + " optional group list_field (LIST) {\n"
+ + " repeated group array {\n"
+ + " required int32 a;\n"
+ + " }\n"
+ + " }\n"
+ + "}");
+ avroSchema = new AvroSchemaConverter().convert(parquetSchema);
+
+ Assert.assertTrue(AvroRecordConverter.isElementType(
+ parquetSchema.getType("list_field"),
+ avroSchema.getFields().get(0).schema()));
+ }
+
+ @Test
+ public void testIsElementTypeFailsInvalidSchema() throws Exception {
+ Path test = writeDirect(
+ "message MessageWithInvalidArraySchema {"
+ + " optional group locations (LIST) {"
+ + " repeated group array {"
+ + " optional group element {"
+ + " required double latitude;"
+ + " required double longitude;"
+ + " }"
+ + " }"
+ + " }"
+ + "}",
+ rc -> {
+ rc.startMessage();
+ rc.startField("locations", 0);
+
+ rc.startGroup();
+ rc.startField("array", 0); // start writing array contents
+
+ // write a non-null element
+ rc.startGroup(); // array level
+ rc.startField("element", 0);
+ rc.startGroup();
+ rc.startField("latitude", 0);
+ rc.addDouble(0.0);
+ rc.endField("latitude", 0);
+ rc.startField("longitude", 1);
+ rc.addDouble(180.0);
+ rc.endField("longitude", 1);
+ rc.endGroup();
+ rc.endField("element", 0);
+ rc.endGroup(); // array level
+
+ rc.endField("array", 0); // finished writing array contents
+ rc.endGroup();
+
+ rc.endField("locations", 0);
+ rc.endMessage();
+ });
+
+ Schema location = record(
+ "element",
+ field("latitude", primitive(Schema.Type.DOUBLE)),
+ field("longitude", primitive(Schema.Type.DOUBLE)));
+
+ Schema newSchema =
+ record("MessageWithInvalidArraySchema", optionalField("locations",
array(optional(location))));
+ GenericRecord newRecord = instance(
+ newSchema,
+ "locations",
+ Arrays.asList(
+ instance(location, "latitude", 0.0, "longitude", 180.0),
+ instance(location, "latitude", 0.0, "longitude", 0.0)));
+
+ Configuration oldConfWithSchema = new Configuration();
+ AvroReadSupport.setAvroReadSchema(oldConfWithSchema, newSchema);
+
+ AvroParquetReader<GenericRecord> reader = new
AvroParquetReader<>(oldConfWithSchema, test);
+ Assert.assertThrows(InvalidRecordException.class, reader::read);
+ }
+
public <T extends IndexedRecord> AvroParquetReader<T> oldBehaviorReader(Path
path) throws IOException {
return new AvroParquetReader<T>(OLD_BEHAVIOR_CONF, path);
}
diff --git
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
index c7874fe53..2355847f0 100644
---
a/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
+++
b/parquet-avro/src/test/java/org/apache/parquet/avro/TestSpecificReadWrite.java
@@ -38,6 +38,7 @@ import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.parquet.hadoop.ParquetReader;
@@ -232,6 +233,44 @@ public class TestSpecificReadWrite {
}
}
+ @Test
+ public void testRepeatedRecordProjection() throws IOException {
+ Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED,
false);
+ Configuration conf = new Configuration(testConf);
+ Schema schema = Car.getClassSchema();
+
+ // Project a single field from repeated record schema
+ final Schema projectedSchema = SchemaBuilder.builder(schema.getNamespace())
+ .record("Car")
+ .fields()
+ .name("serviceHistory")
+ .type(SchemaBuilder.unionOf()
+ .nullBuilder()
+ .endNull()
+ .and()
+ .array()
+ .items(SchemaBuilder.builder(schema.getNamespace())
+ .record("Service")
+ .fields()
+ .requiredString("mechanic")
+ .endRecord())
+ .endUnion())
+ .noDefault()
+ .endRecord();
+
+ AvroReadSupport.setRequestedProjection(conf, projectedSchema);
+
+ try (ParquetReader<Car> reader = new AvroParquetReader<>(conf, path)) {
+ for (Car car = reader.read(); car != null; car = reader.read()) {
+ assertNotNull(car.getServiceHistory());
+ car.getServiceHistory().forEach(service -> {
+ assertNotNull(service.getMechanic());
+ assertEquals(0L, service.getDate());
+ });
+ }
+ }
+ }
+
@Test
public void testAvroReadSchema() throws IOException {
Path path = writeCarsToParquetFile(1, CompressionCodecName.UNCOMPRESSED,
false);