Repository: parquet-mr Updated Branches: refs/heads/master 6b605a4ea -> 440882c65
PARQUET-364: Fix compatibility for Avro lists of lists. This fixes lists of lists that have been written with Avro's 2-level representation. The conversion setup logic missed the case where the inner field is repeated and cannot be the element in a 3-level list. This also fixes the schema conversion for cases where an unknown writer used a 2-level list of lists. This is based on @liancheng's #264 but fixes the problem in a slightly different way. Author: Ryan Blue <[email protected]> Closes #272 from rdblue/PARQUET-364-fix-avro-lists-of-lists and squashes the following commits: 41a70e0 [Ryan Blue] PARQUET-364: Fix compatibility for Avro lists of lists. Project: http://git-wip-us.apache.org/repos/asf/parquet-mr/repo Commit: http://git-wip-us.apache.org/repos/asf/parquet-mr/commit/440882c6 Tree: http://git-wip-us.apache.org/repos/asf/parquet-mr/tree/440882c6 Diff: http://git-wip-us.apache.org/repos/asf/parquet-mr/diff/440882c6 Branch: refs/heads/master Commit: 440882c659967572311402c7fe534cf13d501cf4 Parents: 6b605a4 Author: Ryan Blue <[email protected]> Authored: Tue Nov 17 15:09:50 2015 -0800 Committer: Ryan Blue <[email protected]> Committed: Tue Nov 17 15:09:50 2015 -0800 ---------------------------------------------------------------------- .../avro/AvroIndexedRecordConverter.java | 36 +---- .../parquet/avro/AvroRecordConverter.java | 8 +- .../parquet/avro/AvroSchemaConverter.java | 10 +- .../parquet/avro/TestArrayCompatibility.java | 148 ++++++++++++++++++- .../parquet/avro/TestAvroSchemaConverter.java | 92 ++++++++++++ 5 files changed, 248 insertions(+), 46 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java index a5e4141..06c66d6 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroIndexedRecordConverter.java @@ -125,7 +125,8 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter } } if (avroField == null) { - throw new InvalidRecordException(String.format("Parquet/Avro schema mismatch. Avro field '%s' not found.", + throw new InvalidRecordException(String.format( + "Parquet/Avro schema mismatch. Avro field '%s' not found.", parquetFieldName)); } return avroField; @@ -313,7 +314,7 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter Type repeatedType = type.getType(0); // always determine whether the repeated type is the element type by // matching it against the element schema. - if (isElementType(repeatedType, elementSchema)) { + if (AvroRecordConverter.isElementType(repeatedType, elementSchema)) { // the element type is the repeated type (and required) converter = newConverter(elementSchema, repeatedType, model, new ParentValueContainer() { @Override @@ -344,37 +345,6 @@ class AvroIndexedRecordConverter<T extends IndexedRecord> extends GroupConverter } /** - * Returns whether the given type is the element type of a list or is a - * synthetic group with one field that is the element type. This is - * determined by checking whether the type can be a synthetic group and by - * checking whether a potential synthetic group matches the expected schema. - * <p> - * Unlike {@link AvroSchemaConverter#isElementType(Type, String)}, this - * method never guesses because the expected schema is known. - * - * @param repeatedType a type that may be the element type - * @param elementSchema the expected Schema for list elements - * @return {@code true} if the repeatedType is the element schema - */ - static boolean isElementType(Type repeatedType, Schema elementSchema) { - if (repeatedType.isPrimitive() || - repeatedType.asGroupType().getFieldCount() > 1) { - // The repeated type must be the element type because it is an invalid - // synthetic wrapper (must be a group with one field). - return true; - } else if (elementSchema != null && - elementSchema.getType() == Schema.Type.RECORD && - elementSchema.getFields().size() == 1 && - elementSchema.getFields().get(0).name().equals( - repeatedType.asGroupType().getFieldName(0))) { - // The repeated type must be the element type because it matches the - // structure of the Avro element's schema. - return true; - } - return false; - } - - /** * Converter for list elements. * * <pre> http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java index 57ad18a..61d7d8e 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroRecordConverter.java @@ -50,6 +50,7 @@ import org.apache.parquet.schema.GroupType; import org.apache.parquet.schema.MessageType; import org.apache.parquet.schema.Type; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; import static org.apache.parquet.schema.Type.Repetition.REQUIRED; /** @@ -744,11 +745,12 @@ class AvroRecordConverter<T> extends AvroConverters.AvroGroupConverter { * @param elementSchema the expected Schema for list elements * @return {@code true} if the repeatedType is the element schema */ - private static boolean isElementType(Type repeatedType, Schema elementSchema) { + static boolean isElementType(Type repeatedType, Schema elementSchema) { if (repeatedType.isPrimitive() || - repeatedType.asGroupType().getFieldCount() > 1) { + repeatedType.asGroupType().getFieldCount() > 1 || + repeatedType.asGroupType().getType(0).isRepetition(REPEATED)) { // The repeated type must be the element type because it is an invalid - // synthetic wrapper (must be a group with one field). + // synthetic wrapper. Must be a group with one optional or required field return true; } else if (elementSchema != null && elementSchema.getType() == Schema.Type.RECORD && http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java index 04fe3a7..6cfa8d1 100644 --- a/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java +++ b/parquet-avro/src/main/java/org/apache/parquet/avro/AvroSchemaConverter.java @@ -36,6 +36,7 @@ import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE; import static org.apache.parquet.avro.AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE_DEFAULT; import static org.apache.parquet.schema.OriginalType.*; import static org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName.*; +import static org.apache.parquet.schema.Type.Repetition.REPEATED; /** * <p> @@ -135,7 +136,7 @@ public class AvroSchemaConverter { } else if (type.equals(Schema.Type.ARRAY)) { if (writeOldListStructure) { return ConversionPatterns.listType(repetition, fieldName, - convertField("array", schema.getElementType(), Type.Repetition.REPEATED)); + convertField("array", schema.getElementType(), REPEATED)); } else { return ConversionPatterns.listOfElements(repetition, fieldName, convertField(AvroWriteSupport.LIST_ELEMENT_NAME, schema.getElementType())); @@ -213,7 +214,7 @@ public class AvroSchemaConverter { List<Schema.Field> fields = new ArrayList<Schema.Field>(); for (Type parquetType : parquetFields) { Schema fieldSchema = convertField(parquetType); - if (parquetType.isRepetition(Type.Repetition.REPEATED)) { + if (parquetType.isRepetition(REPEATED)) { throw new UnsupportedOperationException("REPEATED not supported outside LIST or MAP. Type: " + parquetType); } else if (parquetType.isRepetition(Type.Repetition.OPTIONAL)) { fields.add(new Schema.Field(parquetType.getName(), optional(fieldSchema), null, @@ -282,7 +283,7 @@ public class AvroSchemaConverter { throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); } Type repeatedType = parquetGroupType.getType(0); - if (!repeatedType.isRepetition(Type.Repetition.REPEATED)) { + if (!repeatedType.isRepetition(REPEATED)) { throw new UnsupportedOperationException("Invalid list type " + parquetGroupType); } if (isElementType(repeatedType, parquetGroupType.getName())) { @@ -302,7 +303,7 @@ public class AvroSchemaConverter { throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); } GroupType mapKeyValType = parquetGroupType.getType(0).asGroupType(); - if (!mapKeyValType.isRepetition(Type.Repetition.REPEATED) || + if (!mapKeyValType.isRepetition(REPEATED) || mapKeyValType.getFieldCount()!=2) { throw new UnsupportedOperationException("Invalid map type " + parquetGroupType); } @@ -348,6 +349,7 @@ public class AvroSchemaConverter { // can't be a synthetic layer because it would be invalid repeatedType.isPrimitive() || repeatedType.asGroupType().getFieldCount() > 1 || + repeatedType.asGroupType().getType(0).isRepetition(REPEATED) || // known patterns without the synthetic layer repeatedType.getName().equals("array") || repeatedType.getName().equals(parentName + "_tuple") || http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java index c4585a7..9c29e50 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestArrayCompatibility.java @@ -567,9 +567,9 @@ public class TestArrayCompatibility { } @Test - public void testAvroCompatRequiredGroupInList() throws Exception { + public void testAvroCompatOptionalGroupInList() throws Exception { Path test = writeDirect( - "message AvroCompatRequiredGroupInList {" + + "message AvroCompatOptionalGroupInList {" + " optional group locations (LIST) {" + " repeated group array {" + " optional group element {" + @@ -634,7 +634,7 @@ public class TestArrayCompatibility { // old behavior - assume that the repeated type is the element type Schema elementRecord = record("array", optionalField("element", location)); - Schema oldSchema = record("AvroCompatRequiredGroupInList", + Schema oldSchema = record("AvroCompatOptionalGroupInList", optionalField("locations", array(elementRecord))); GenericRecord oldRecord = instance(oldSchema, "locations", Arrays.asList( @@ -649,9 +649,9 @@ public class TestArrayCompatibility { } @Test - public void testAvroCompatRequiredGroupInListWithSchema() throws Exception { + public void testAvroCompatOptionalGroupInListWithSchema() throws Exception { Path test = writeDirect( - "message AvroCompatRequiredGroupInListWithSchema {" + + "message AvroCompatOptionalGroupInListWithSchema {" + " optional group locations (LIST) {" + " repeated group array {" + " optional group element {" + @@ -714,7 +714,7 @@ public class TestArrayCompatibility { field("latitude", primitive(Schema.Type.DOUBLE)), field("longitude", primitive(Schema.Type.DOUBLE))); - Schema newSchema = record("HiveCompatOptionalGroupInList", + Schema newSchema = record("AvroCompatOptionalGroupInListWithSchema", optionalField("locations", array(optional(location)))); GenericRecord newRecord = instance(newSchema, "locations", Arrays.asList( @@ -738,6 +738,142 @@ public class TestArrayCompatibility { } @Test + public void testAvroCompatListInList() throws Exception { + Path test = writeDirect( + "message AvroCompatListInList {" + + " optional group listOfLists (LIST) {" + + " repeated group array (LIST) {" + + " repeated int32 array;" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("array", 0); // start writing array contents + + rc.startGroup(); + rc.startField("array", 0); // start writing inner array contents + + // write [34, 35, 36] + rc.addInteger(34); + rc.addInteger(35); + rc.addInteger(36); + + rc.endField("array", 0); // finished writing inner array contents + rc.endGroup(); + + // write an empty list + rc.startGroup(); + rc.endGroup(); + + rc.startGroup(); + rc.startField("array", 0); // start writing inner array contents + + // write [32, 33, 34] + rc.addInteger(32); + rc.addInteger(33); + rc.addInteger(34); + + rc.endField("array", 0); // finished writing inner array contents + rc.endGroup(); + + rc.endField("array", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + Schema listOfLists = array(array(primitive(Schema.Type.INT))); + Schema oldSchema = record("AvroCompatListInList", + optionalField("listOfLists", listOfLists)); + + GenericRecord oldRecord = instance(oldSchema, + "listOfLists", Arrays.asList( + Arrays.asList(34, 35, 36), + Arrays.asList(), + Arrays.asList(32, 33, 34))); + + // both should detect the "array" name + assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord); + assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord); + } + + @Test + public void testThriftCompatListInList() throws Exception { + Path test = writeDirect( + "message ThriftCompatListInList {" + + " optional group listOfLists (LIST) {" + + " repeated group listOfLists_tuple (LIST) {" + + " repeated int32 listOfLists_tuple_tuple;" + + " }" + + " }" + + "}", + new DirectWriter() { + @Override + public void write(RecordConsumer rc) { + rc.startMessage(); + rc.startField("locations", 0); + + rc.startGroup(); + rc.startField("listOfLists_tuple", 0); // start writing array contents + + rc.startGroup(); + rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents + + // write [34, 35, 36] + rc.addInteger(34); + rc.addInteger(35); + rc.addInteger(36); + + rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents + rc.endGroup(); + + // write an empty list + rc.startGroup(); + rc.endGroup(); + + rc.startGroup(); + rc.startField("listOfLists_tuple_tuple", 0); // start writing inner array contents + + // write [32, 33, 34] + rc.addInteger(32); + rc.addInteger(33); + rc.addInteger(34); + + rc.endField("listOfLists_tuple_tuple", 0); // finished writing inner array contents + rc.endGroup(); + + rc.endField("listOfLists_tuple", 0); // finished writing array contents + rc.endGroup(); + + rc.endField("locations", 0); + rc.endMessage(); + } + }); + + Schema listOfLists = array(array(primitive(Schema.Type.INT))); + Schema oldSchema = record("ThriftCompatListInList", + optionalField("listOfLists", listOfLists)); + + GenericRecord oldRecord = instance(oldSchema, + "listOfLists", Arrays.asList( + Arrays.asList(34, 35, 36), + Arrays.asList(), + Arrays.asList(32, 33, 34))); + + // both should detect the "_tuple" names + assertReaderContains(oldBehaviorReader(test), oldSchema, oldRecord); + assertReaderContains(newBehaviorReader(test), oldSchema, oldRecord); + } + + @Test public void testThriftCompatRequiredGroupInList() throws Exception { Path test = writeDirect( "message ThriftCompatRequiredGroupInList {" + http://git-wip-us.apache.org/repos/asf/parquet-mr/blob/440882c6/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java ---------------------------------------------------------------------- diff --git a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java index 6c802a6..b393615 100644 --- a/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java +++ b/parquet-avro/src/test/java/org/apache/parquet/avro/TestAvroSchemaConverter.java @@ -394,6 +394,98 @@ public class TestAvroSchemaConverter { } @Test + public void testOldAvroListOfLists() throws Exception { + Schema listOfLists = optional(Schema.createArray(Schema.createArray( + Schema.create(Schema.Type.INT)))); + Schema schema = Schema.createRecord("AvroCompatListInList", null, null, false); + schema.setFields(Lists.newArrayList( + new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance()) + )); + System.err.println("Avro schema: " + schema.toString(true)); + + testRoundTripConversion(schema, + "message AvroCompatListInList {\n" + + " optional group listOfLists (LIST) {\n" + + " repeated group array (LIST) {\n" + + " repeated int32 array;\n" + + " }\n" + + " }\n" + + "}"); + // Cannot use round-trip assertion because 3-level representation is used + testParquetToAvroConversion(NEW_BEHAVIOR, schema, + "message AvroCompatListInList {\n" + + " optional group listOfLists (LIST) {\n" + + " repeated group array (LIST) {\n" + + " repeated int32 array;\n" + + " }\n" + + " }\n" + + "}"); + } + + @Test + public void testOldThriftListOfLists() throws Exception { + Schema listOfLists = optional(Schema.createArray(Schema.createArray( + Schema.create(Schema.Type.INT)))); + Schema schema = Schema.createRecord("ThriftCompatListInList", null, null, false); + schema.setFields(Lists.newArrayList( + new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance()) + )); + System.err.println("Avro schema: " + schema.toString(true)); + + // Cannot use round-trip assertion because repeated group names differ + testParquetToAvroConversion(schema, + "message ThriftCompatListInList {\n" + + " optional group listOfLists (LIST) {\n" + + " repeated group listOfLists_tuple (LIST) {\n" + + " repeated int32 listOfLists_tuple_tuple;\n" + + " }\n" + + " }\n" + + "}"); + // Cannot use round-trip assertion because 3-level representation is used + testParquetToAvroConversion(NEW_BEHAVIOR, schema, + "message ThriftCompatListInList {\n" + + " optional group listOfLists (LIST) {\n" + + " repeated group listOfLists_tuple (LIST) {\n" + + " repeated int32 listOfLists_tuple_tuple;\n" + + " }\n" + + " }\n" + + "}"); + } + + @Test + public void testUnknownTwoLevelListOfLists() throws Exception { + // This tests the case where we don't detect a 2-level list by the repeated + // group's name, but it must be 2-level because the repeated group doesn't + // contain an optional or repeated element as required for 3-level lists + Schema listOfLists = optional(Schema.createArray(Schema.createArray( + Schema.create(Schema.Type.INT)))); + Schema schema = Schema.createRecord("UnknownTwoLevelListInList", null, null, false); + schema.setFields(Lists.newArrayList( + new Schema.Field("listOfLists", listOfLists, null, NullNode.getInstance()) + )); + System.err.println("Avro schema: " + schema.toString(true)); + + // Cannot use round-trip assertion because repeated group names differ + testParquetToAvroConversion(schema, + "message UnknownTwoLevelListInList {\n" + + " optional group listOfLists (LIST) {\n" + + " repeated group mylist (LIST) {\n" + + " repeated int32 innerlist;\n" + + " }\n" + + " }\n" + + "}"); + // Cannot use round-trip assertion because 3-level representation is used + testParquetToAvroConversion(NEW_BEHAVIOR, schema, + "message UnknownTwoLevelListInList {\n" + + " optional group listOfLists (LIST) {\n" + + " repeated group mylist (LIST) {\n" + + " repeated int32 innerlist;\n" + + " }\n" + + " }\n" + + "}"); + } + + @Test public void testParquetMapWithoutMapKeyValueAnnotation() throws Exception { Schema schema = Schema.createRecord("myrecord", null, null, false); Schema map = Schema.createMap(Schema.create(Schema.Type.INT));
