the-other-tim-brown commented on code in PR #13654:
URL: https://github.com/apache/hudi/pull/13654#discussion_r2254484040
##########
hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java:
##########
@@ -419,4 +425,302 @@ public void testFindNestedFieldType() {
assertThrows(HoodieAvroSchemaException.class, () ->
AvroSchemaUtils.findNestedFieldType(sourceSchema, "nested_record.bool"));
assertThrows(HoodieAvroSchemaException.class, () ->
AvroSchemaUtils.findNestedFieldType(sourceSchema,
"non_present_field.also_not_present"));
}
+
+ private static Schema parse(String json) {
+ return new Schema.Parser().parse(json);
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentRecordSchemas() {
+ Schema s1 =
parse("{\"type\":\"record\",\"name\":\"R\",\"fields\":[{\"name\":\"f1\",\"type\":\"int\"}]}");
+ Schema s2 =
parse("{\"type\":\"record\",\"name\":\"R2\",\"fields\":[{\"name\":\"f1\",\"type\":\"int\"}]}");
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentFieldCountInRecords() {
+ Schema s1 =
parse("{\"type\":\"record\",\"name\":\"R1\",\"fields\":[{\"name\":\"a\",\"type\":\"int\"}]}");
+ Schema s2 = parse("{\"type\":\"record\",\"name\":\"R2\",\"fields\":[]}");
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentNestedRecordSchemas() {
+ Schema s1 =
parse("{\"type\":\"record\",\"name\":\"Outer1\",\"fields\":[{\"name\":\"inner\","
+ +
"\"type\":{\"type\":\"record\",\"name\":\"Inner1\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}]}");
+ Schema s2 =
parse("{\"type\":\"record\",\"name\":\"Outer2\",\"fields\":[{\"name\":\"inner\","
+ +
"\"type\":{\"type\":\"record\",\"name\":\"Inner2\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}]}");
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentArraySchemas() {
+ Schema s1 = Schema.createArray(Schema.create(Schema.Type.STRING));
+ Schema s2 = Schema.createArray(Schema.create(Schema.Type.STRING));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentElementTypeInArray() {
+ Schema s1 = Schema.createArray(Schema.create(Schema.Type.STRING));
+ Schema s2 = Schema.createArray(Schema.create(Schema.Type.INT));
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentMapSchemas() {
+ Schema s1 = Schema.createMap(Schema.create(Schema.Type.LONG));
+ Schema s2 = Schema.createMap(Schema.create(Schema.Type.LONG));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentMapValueTypes() {
+ Schema s1 = Schema.createMap(Schema.create(Schema.Type.LONG));
+ Schema s2 = Schema.createMap(Schema.create(Schema.Type.STRING));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentNullableSchemaComparison() {
+ Schema s1 =
AvroSchemaUtils.createNullableSchema(Schema.create(Schema.Type.INT));
+ Schema s2 = Schema.create(Schema.Type.INT);
+ s2.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentUnsupportedUnionThrows() {
Review Comment:
Can you add a case where there is a list field in one schema with a string
element and in the other schema it is just a string (not in a list)? I want to
ensure we don't have any regression in the future around these tricky cases
where we could handle the comparison incorrectly.
We should also have one for Map.
##########
hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java:
##########
@@ -419,4 +425,302 @@ public void testFindNestedFieldType() {
assertThrows(HoodieAvroSchemaException.class, () ->
AvroSchemaUtils.findNestedFieldType(sourceSchema, "nested_record.bool"));
assertThrows(HoodieAvroSchemaException.class, () ->
AvroSchemaUtils.findNestedFieldType(sourceSchema,
"non_present_field.also_not_present"));
}
+
+ private static Schema parse(String json) {
+ return new Schema.Parser().parse(json);
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentRecordSchemas() {
+ Schema s1 =
parse("{\"type\":\"record\",\"name\":\"R\",\"fields\":[{\"name\":\"f1\",\"type\":\"int\"}]}");
+ Schema s2 =
parse("{\"type\":\"record\",\"name\":\"R2\",\"fields\":[{\"name\":\"f1\",\"type\":\"int\"}]}");
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentFieldCountInRecords() {
+ Schema s1 =
parse("{\"type\":\"record\",\"name\":\"R1\",\"fields\":[{\"name\":\"a\",\"type\":\"int\"}]}");
+ Schema s2 = parse("{\"type\":\"record\",\"name\":\"R2\",\"fields\":[]}");
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentNestedRecordSchemas() {
+ Schema s1 =
parse("{\"type\":\"record\",\"name\":\"Outer1\",\"fields\":[{\"name\":\"inner\","
+ +
"\"type\":{\"type\":\"record\",\"name\":\"Inner1\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}]}");
+ Schema s2 =
parse("{\"type\":\"record\",\"name\":\"Outer2\",\"fields\":[{\"name\":\"inner\","
+ +
"\"type\":{\"type\":\"record\",\"name\":\"Inner2\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}]}");
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentArraySchemas() {
+ Schema s1 = Schema.createArray(Schema.create(Schema.Type.STRING));
+ Schema s2 = Schema.createArray(Schema.create(Schema.Type.STRING));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentElementTypeInArray() {
+ Schema s1 = Schema.createArray(Schema.create(Schema.Type.STRING));
+ Schema s2 = Schema.createArray(Schema.create(Schema.Type.INT));
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentMapSchemas() {
+ Schema s1 = Schema.createMap(Schema.create(Schema.Type.LONG));
+ Schema s2 = Schema.createMap(Schema.create(Schema.Type.LONG));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentMapValueTypes() {
+ Schema s1 = Schema.createMap(Schema.create(Schema.Type.LONG));
+ Schema s2 = Schema.createMap(Schema.create(Schema.Type.STRING));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentNullableSchemaComparison() {
+ Schema s1 =
AvroSchemaUtils.createNullableSchema(Schema.create(Schema.Type.INT));
+ Schema s2 = Schema.create(Schema.Type.INT);
+ s2.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentUnsupportedUnionThrows() {
+ Schema intSchema = Schema.create(Schema.Type.INT);
+ intSchema.addProp("prop1", "value1");
+ Schema union = Schema.createUnion(
+ Arrays.asList(intSchema, Schema.create(Schema.Type.STRING))
+ );
+ Schema union2 = Schema.createUnion(
+ Arrays.asList(Schema.create(Schema.Type.INT),
Schema.create(Schema.Type.STRING))
+ );
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> AvroSchemaUtils.areSchemasProjectionEquivalentInternal(union,
union2)
+ );
+ assertEquals("Union schemas are not supported besides nullable",
ex.getMessage());
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentEqualFixedSchemas() {
+ Schema s1 = Schema.createFixed("F", null, null, 16);
+ Schema s2 = Schema.createFixed("F", null, null, 16);
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentFixedSize() {
+ Schema s1 = Schema.createFixed("F", null, null, 8);
+ Schema s2 = Schema.createFixed("F", null, null, 4);
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentEnums() {
+ Schema s1 = Schema.createEnum("E", null, null, Arrays.asList("A", "B",
"C"));
+ Schema s2 = Schema.createEnum("E", null, null, Arrays.asList("C", "B",
"A"));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentEnumSymbols() {
+ Schema s1 = Schema.createEnum("E", null, null, Arrays.asList("X", "Y"));
+ Schema s2 = Schema.createEnum("E", null, null, Arrays.asList("A", "B"));
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentEnumSymbolSubset() {
+ Schema s1 = Schema.createEnum("E", null, null, Arrays.asList("A", "B"));
+ Schema s2 = Schema.createEnum("E", null, null, Arrays.asList("A", "B",
"C"));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s2,
s1));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentEqualDecimalLogicalTypes() {
+ Schema s1 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(12, 2).addToSchema(s1);
+
+ Schema s2 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(12, 2).addToSchema(s2);
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+
+ assertTrue(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentPrecision() {
+ Schema s1 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(12, 2).addToSchema(s1);
+
+ Schema s2 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(13, 2).addToSchema(s2);
+
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentLogicalVsNoLogicalType() {
+ Schema s1 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(10, 2).addToSchema(s1);
+
+ Schema s2 = Schema.create(Schema.Type.BYTES);
+
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentSameReferenceSchema() {
+ Schema s = Schema.create(Schema.Type.STRING);
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s, s));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentNullSchemaComparison() {
+ Schema s = Schema.create(Schema.Type.STRING);
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(null, s));
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s, null));
+ }
+
+ @Test
+ void testPruneRecordFields() {
+ String dataSchemaStr = "{ \"type\": \"record\", \"name\": \"Person\",
\"fields\": ["
+ + "{ \"name\": \"name\", \"type\": \"string\" },"
+ + "{ \"name\": \"age\", \"type\": \"int\" },"
+ + "{ \"name\": \"email\", \"type\": [\"null\", \"string\"],
\"default\": null }"
+ + "]}";
+
+ String requiredSchemaStr = "{ \"type\": \"record\", \"name\": \"Person\",
\"fields\": ["
+ + "{ \"name\": \"name\", \"type\": \"string\" }"
+ + "]}";
+
+ Schema dataSchema = parse(dataSchemaStr);
+ Schema requiredSchema = parse(requiredSchemaStr);
+
+ Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema,
requiredSchema, Collections.emptySet());
+
+ assertEquals(1, pruned.getFields().size());
+ assertEquals("name", pruned.getFields().get(0).name());
+ }
+
+ @Test
+ void testPruningPreserveNullable() {
+ String dataSchemaStr = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"Container\","
+ + "\"fields\": ["
+ + " {"
+ + " \"name\": \"foo\","
+ + " \"type\": [\"null\", {"
+ + " \"type\": \"record\","
+ + " \"name\": \"Foo\","
+ + " \"fields\": ["
+ + " {\"name\": \"field1\", \"type\": \"string\"},"
+ + " {\"name\": \"field2\", \"type\": \"int\"}"
+ + " ]"
+ + " }],"
+ + " \"default\": null"
+ + " }"
+ + "]"
+ + "}";
+
+ String requiredFooStr = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"Foo\","
+ + "\"fields\": ["
+ + " {\"name\": \"field1\", \"type\": \"string\"}"
+ + "]"
+ + "}";
+
+ Schema dataSchema = parse(dataSchemaStr);
+ Schema requiredSchema = parse(requiredFooStr);
+
+ Schema fooFieldSchema = dataSchema.getField("foo").schema();
+ Schema pruned = AvroSchemaUtils.pruneDataSchema(fooFieldSchema,
requiredSchema, Collections.emptySet());
+
+ assertEquals(Schema.Type.UNION, pruned.getType());
+
+ Schema prunedRecord = pruned.getTypes().stream()
+ .filter(s -> s.getType() == Schema.Type.RECORD)
+ .collect(Collectors.toList()).get(0);
+ assertNotNull(prunedRecord.getField("field1"));
+ assertNull(prunedRecord.getField("field2"));
+ }
+
+ @Test
+ void testArrayElementPruning() {
+ String dataSchemaStr = "{ \"type\": \"array\", \"items\": { \"type\":
\"record\", \"name\": \"Item\", \"fields\": ["
+ + "{\"name\": \"a\", \"type\": \"int\"}, {\"name\": \"b\", \"type\":
\"string\"}"
+ + "]}}";
+
+ String requiredSchemaStr = "{ \"type\": \"array\", \"items\": { \"type\":
\"record\", \"name\": \"Item\", \"fields\": ["
+ + "{\"name\": \"b\", \"type\": \"string\"}"
+ + "]}}";
+
+ Schema dataSchema = parse(dataSchemaStr);
+ Schema requiredSchema = parse(requiredSchemaStr);
+
+ Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema,
requiredSchema, Collections.emptySet());
+ Schema itemSchema = pruned.getElementType();
+
+ assertEquals(1, itemSchema.getFields().size());
+ assertEquals("b", itemSchema.getFields().get(0).name());
+ }
+
+ @Test
+ void testMapValuePruning() {
+ String dataSchemaStr = "{ \"type\": \"map\", \"values\": { \"type\":
\"record\", \"name\": \"Entry\", \"fields\": ["
+ + "{\"name\": \"x\", \"type\": \"int\"}, {\"name\": \"y\", \"type\":
\"string\"}"
+ + "]}}";
+
+ String requiredSchemaStr = "{ \"type\": \"map\", \"values\": { \"type\":
\"record\", \"name\": \"Entry\", \"fields\": ["
+ + "{\"name\": \"y\", \"type\": \"string\"}"
+ + "]}}";
+
+ Schema dataSchema = parse(dataSchemaStr);
+ Schema requiredSchema = parse(requiredSchemaStr);
+
+ Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema,
requiredSchema, Collections.emptySet());
+ Schema valueSchema = pruned.getValueType();
+
+ assertEquals(1, valueSchema.getFields().size());
+ assertEquals("y", valueSchema.getFields().get(0).name());
+ }
+
+ @Test
+ void testPruningExcludedFieldIsPreservedIfMissingInDataSchema() {
Review Comment:
Let's add a case with the mandatory fields and nested schemas. The docs
mention the mandatory fields only apply to the top level fields so we can add a
case where a nested field's name matches the provided set and is not included
in the output because it is nested.
##########
hudi-common/src/test/java/org/apache/hudi/avro/TestAvroSchemaUtils.java:
##########
@@ -419,4 +425,302 @@ public void testFindNestedFieldType() {
assertThrows(HoodieAvroSchemaException.class, () ->
AvroSchemaUtils.findNestedFieldType(sourceSchema, "nested_record.bool"));
assertThrows(HoodieAvroSchemaException.class, () ->
AvroSchemaUtils.findNestedFieldType(sourceSchema,
"non_present_field.also_not_present"));
}
+
+ private static Schema parse(String json) {
+ return new Schema.Parser().parse(json);
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentRecordSchemas() {
+ Schema s1 =
parse("{\"type\":\"record\",\"name\":\"R\",\"fields\":[{\"name\":\"f1\",\"type\":\"int\"}]}");
+ Schema s2 =
parse("{\"type\":\"record\",\"name\":\"R2\",\"fields\":[{\"name\":\"f1\",\"type\":\"int\"}]}");
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentFieldCountInRecords() {
+ Schema s1 =
parse("{\"type\":\"record\",\"name\":\"R1\",\"fields\":[{\"name\":\"a\",\"type\":\"int\"}]}");
+ Schema s2 = parse("{\"type\":\"record\",\"name\":\"R2\",\"fields\":[]}");
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentNestedRecordSchemas() {
+ Schema s1 =
parse("{\"type\":\"record\",\"name\":\"Outer1\",\"fields\":[{\"name\":\"inner\","
+ +
"\"type\":{\"type\":\"record\",\"name\":\"Inner1\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}]}");
+ Schema s2 =
parse("{\"type\":\"record\",\"name\":\"Outer2\",\"fields\":[{\"name\":\"inner\","
+ +
"\"type\":{\"type\":\"record\",\"name\":\"Inner2\",\"fields\":[{\"name\":\"x\",\"type\":\"string\"}]}}]}");
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentArraySchemas() {
+ Schema s1 = Schema.createArray(Schema.create(Schema.Type.STRING));
+ Schema s2 = Schema.createArray(Schema.create(Schema.Type.STRING));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentElementTypeInArray() {
+ Schema s1 = Schema.createArray(Schema.create(Schema.Type.STRING));
+ Schema s2 = Schema.createArray(Schema.create(Schema.Type.INT));
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentMapSchemas() {
+ Schema s1 = Schema.createMap(Schema.create(Schema.Type.LONG));
+ Schema s2 = Schema.createMap(Schema.create(Schema.Type.LONG));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentMapValueTypes() {
+ Schema s1 = Schema.createMap(Schema.create(Schema.Type.LONG));
+ Schema s2 = Schema.createMap(Schema.create(Schema.Type.STRING));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentNullableSchemaComparison() {
+ Schema s1 =
AvroSchemaUtils.createNullableSchema(Schema.create(Schema.Type.INT));
+ Schema s2 = Schema.create(Schema.Type.INT);
+ s2.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s1, s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentUnsupportedUnionThrows() {
+ Schema intSchema = Schema.create(Schema.Type.INT);
+ intSchema.addProp("prop1", "value1");
+ Schema union = Schema.createUnion(
+ Arrays.asList(intSchema, Schema.create(Schema.Type.STRING))
+ );
+ Schema union2 = Schema.createUnion(
+ Arrays.asList(Schema.create(Schema.Type.INT),
Schema.create(Schema.Type.STRING))
+ );
+ IllegalArgumentException ex = assertThrows(
+ IllegalArgumentException.class,
+ () -> AvroSchemaUtils.areSchemasProjectionEquivalentInternal(union,
union2)
+ );
+ assertEquals("Union schemas are not supported besides nullable",
ex.getMessage());
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentEqualFixedSchemas() {
+ Schema s1 = Schema.createFixed("F", null, null, 16);
+ Schema s2 = Schema.createFixed("F", null, null, 16);
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentFixedSize() {
+ Schema s1 = Schema.createFixed("F", null, null, 8);
+ Schema s2 = Schema.createFixed("F", null, null, 4);
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentEnums() {
+ Schema s1 = Schema.createEnum("E", null, null, Arrays.asList("A", "B",
"C"));
+ Schema s2 = Schema.createEnum("E", null, null, Arrays.asList("C", "B",
"A"));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentEnumSymbols() {
+ Schema s1 = Schema.createEnum("E", null, null, Arrays.asList("X", "Y"));
+ Schema s2 = Schema.createEnum("E", null, null, Arrays.asList("A", "B"));
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentEnumSymbolSubset() {
+ Schema s1 = Schema.createEnum("E", null, null, Arrays.asList("A", "B"));
+ Schema s2 = Schema.createEnum("E", null, null, Arrays.asList("A", "B",
"C"));
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+ assertTrue(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s2,
s1));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentEqualDecimalLogicalTypes() {
+ Schema s1 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(12, 2).addToSchema(s1);
+
+ Schema s2 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(12, 2).addToSchema(s2);
+ s1.addProp("prop1", "value1"); // prevent Objects.equals from returning
true
+
+ assertTrue(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentDifferentPrecision() {
+ Schema s1 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(12, 2).addToSchema(s1);
+
+ Schema s2 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(13, 2).addToSchema(s2);
+
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentLogicalVsNoLogicalType() {
+ Schema s1 = Schema.create(Schema.Type.BYTES);
+ LogicalTypes.decimal(10, 2).addToSchema(s1);
+
+ Schema s2 = Schema.create(Schema.Type.BYTES);
+
+ assertFalse(AvroSchemaUtils.areSchemaPrimitivesProjectionEquivalent(s1,
s2));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentSameReferenceSchema() {
+ Schema s = Schema.create(Schema.Type.STRING);
+ assertTrue(AvroSchemaUtils.areSchemasProjectionEquivalent(s, s));
+ }
+
+ @Test
+ void testAreSchemasProjectionEquivalentNullSchemaComparison() {
+ Schema s = Schema.create(Schema.Type.STRING);
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(null, s));
+ assertFalse(AvroSchemaUtils.areSchemasProjectionEquivalent(s, null));
+ }
+
+ @Test
+ void testPruneRecordFields() {
+ String dataSchemaStr = "{ \"type\": \"record\", \"name\": \"Person\",
\"fields\": ["
+ + "{ \"name\": \"name\", \"type\": \"string\" },"
+ + "{ \"name\": \"age\", \"type\": \"int\" },"
+ + "{ \"name\": \"email\", \"type\": [\"null\", \"string\"],
\"default\": null }"
+ + "]}";
+
+ String requiredSchemaStr = "{ \"type\": \"record\", \"name\": \"Person\",
\"fields\": ["
+ + "{ \"name\": \"name\", \"type\": \"string\" }"
+ + "]}";
+
+ Schema dataSchema = parse(dataSchemaStr);
+ Schema requiredSchema = parse(requiredSchemaStr);
+
+ Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema,
requiredSchema, Collections.emptySet());
+
+ assertEquals(1, pruned.getFields().size());
+ assertEquals("name", pruned.getFields().get(0).name());
+ }
+
+ @Test
+ void testPruningPreserveNullable() {
+ String dataSchemaStr = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"Container\","
+ + "\"fields\": ["
+ + " {"
+ + " \"name\": \"foo\","
+ + " \"type\": [\"null\", {"
+ + " \"type\": \"record\","
+ + " \"name\": \"Foo\","
+ + " \"fields\": ["
+ + " {\"name\": \"field1\", \"type\": \"string\"},"
+ + " {\"name\": \"field2\", \"type\": \"int\"}"
+ + " ]"
+ + " }],"
+ + " \"default\": null"
+ + " }"
+ + "]"
+ + "}";
+
+ String requiredFooStr = "{"
+ + "\"type\": \"record\","
+ + "\"name\": \"Foo\","
+ + "\"fields\": ["
+ + " {\"name\": \"field1\", \"type\": \"string\"}"
+ + "]"
+ + "}";
+
+ Schema dataSchema = parse(dataSchemaStr);
+ Schema requiredSchema = parse(requiredFooStr);
+
+ Schema fooFieldSchema = dataSchema.getField("foo").schema();
+ Schema pruned = AvroSchemaUtils.pruneDataSchema(fooFieldSchema,
requiredSchema, Collections.emptySet());
+
+ assertEquals(Schema.Type.UNION, pruned.getType());
+
+ Schema prunedRecord = pruned.getTypes().stream()
+ .filter(s -> s.getType() == Schema.Type.RECORD)
+ .collect(Collectors.toList()).get(0);
+ assertNotNull(prunedRecord.getField("field1"));
+ assertNull(prunedRecord.getField("field2"));
+ }
+
+ @Test
+ void testArrayElementPruning() {
+ String dataSchemaStr = "{ \"type\": \"array\", \"items\": { \"type\":
\"record\", \"name\": \"Item\", \"fields\": ["
+ + "{\"name\": \"a\", \"type\": \"int\"}, {\"name\": \"b\", \"type\":
\"string\"}"
+ + "]}}";
+
+ String requiredSchemaStr = "{ \"type\": \"array\", \"items\": { \"type\":
\"record\", \"name\": \"Item\", \"fields\": ["
+ + "{\"name\": \"b\", \"type\": \"string\"}"
+ + "]}}";
+
+ Schema dataSchema = parse(dataSchemaStr);
+ Schema requiredSchema = parse(requiredSchemaStr);
+
+ Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema,
requiredSchema, Collections.emptySet());
+ Schema itemSchema = pruned.getElementType();
+
+ assertEquals(1, itemSchema.getFields().size());
+ assertEquals("b", itemSchema.getFields().get(0).name());
+ }
+
+ @Test
+ void testMapValuePruning() {
+ String dataSchemaStr = "{ \"type\": \"map\", \"values\": { \"type\":
\"record\", \"name\": \"Entry\", \"fields\": ["
+ + "{\"name\": \"x\", \"type\": \"int\"}, {\"name\": \"y\", \"type\":
\"string\"}"
+ + "]}}";
+
+ String requiredSchemaStr = "{ \"type\": \"map\", \"values\": { \"type\":
\"record\", \"name\": \"Entry\", \"fields\": ["
+ + "{\"name\": \"y\", \"type\": \"string\"}"
+ + "]}}";
+
+ Schema dataSchema = parse(dataSchemaStr);
+ Schema requiredSchema = parse(requiredSchemaStr);
+
+ Schema pruned = AvroSchemaUtils.pruneDataSchema(dataSchema,
requiredSchema, Collections.emptySet());
+ Schema valueSchema = pruned.getValueType();
+
+ assertEquals(1, valueSchema.getFields().size());
+ assertEquals("y", valueSchema.getFields().get(0).name());
+ }
+
+ @Test
+ void testPruningExcludedFieldIsPreservedIfMissingInDataSchema() {
+ String dataSchemaStr = "{ \"type\": \"record\", \"name\": \"Rec\",
\"fields\": ["
+ + "{\"name\": \"existing\", \"type\": \"int\"}"
+ + "]}";
+
+ String requiredSchemaStr = "{ \"type\": \"record\", \"name\": \"Rec\",
\"fields\": ["
+ + "{\"name\": \"existing\", \"type\": \"int\"},"
+ + "{\"name\": \"missing\", \"type\": \"string\", \"default\":
\"default\"}"
+ + "]}";
+
+ Schema dataSchema = parse(dataSchemaStr);
+ Schema requiredSchema = parse(requiredSchemaStr);
+
+ Set<String> excludeFields = Collections.singleton("missing");
Review Comment:
Nitpick: Can you update the name here to `mandatoryFields` to match the args
to the method?
##########
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HiveHoodieReaderContext.java:
##########
@@ -121,6 +128,10 @@ public ClosableIterator<ArrayWritable>
getFileRecordIterator(
private ClosableIterator<ArrayWritable> getFileRecordIterator(StoragePath
filePath, String[] hosts, long start, long length, Schema dataSchema,
Schema
requiredSchema, HoodieStorage storage) throws IOException {
+ boolean isParquet =
filePath.getFileExtension().equals(HoodieFileFormat.PARQUET.getFileExtension());
+ Schema avroFileSchema = isParquet ? HoodieIOFactory.getIOFactory(storage)
+ .getFileFormatUtils(filePath).readAvroSchema(storage, filePath) :
dataSchema;
+ Schema actualRequiredSchema = isParquet ?
AvroSchemaUtils.pruneDataSchema(avroFileSchema, requiredSchema,
Collections.emptySet()) : requiredSchema;
Review Comment:
Can you add an inline comment explaining why the pruning is required for
parquet only?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]