This is an automated email from the ASF dual-hosted git repository.
shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 78bf163 [GOBBLIN-957] Add recursion eliminating code, converter for
Avro
78bf163 is described below
commit 78bf1634814ee4cadd160a75e08421daa0a3ed9a
Author: Shirshanka Das <[email protected]>
AuthorDate: Fri Nov 15 17:27:46 2019 -0800
[GOBBLIN-957] Add recursion eliminating code, converter for Avro
Closes #2806 from shirshanka/recursive
---
.../avro/AvroRecursionEliminatingConverter.java | 67 ++++++++
.../AvroRecursionEliminatingConverterTest.java | 134 ++++++++++++++++
.../src/test/resources/converter/recursive.avsc | 23 +++
.../java/org/apache/gobblin/util/AvroUtils.java | 175 ++++++++++++++++++++-
.../org/apache/gobblin/util/AvroUtilsTest.java | 67 ++++++++
.../recursive_schemas/recursive_array.avsc | 32 ++++
.../recursive_array_solution.avsc | 24 +++
.../resources/recursive_schemas/recursive_map.avsc | 39 +++++
.../recursive_schemas/recursive_map_solution.avsc | 24 +++
.../recursive_schemas/recursive_multiple.avsc | 15 ++
.../recursive_multiple_solution.avsc | 13 ++
.../recursive_schemas/recursive_nested.avsc | 26 +++
.../recursive_nested_solution.avsc | 24 +++
.../recursive_schemas/recursive_norecursion.avsc | 24 +++
.../recursive_norecursion_solution.avsc | 24 +++
.../recursive_schemas/recursive_simple.avsc | 14 ++
.../recursive_simple_solution.avsc | 13 ++
.../recursive_schemas/recursive_union.avsc | 25 +++
.../recursive_union_solution.avsc | 24 +++
19 files changed, 785 insertions(+), 2 deletions(-)
diff --git
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverter.java
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverter.java
new file mode 100644
index 0000000..af5a08f
--- /dev/null
+++
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.math3.util.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.AvroToAvroConverterBase;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.SingleRecordIterable;
+import org.apache.gobblin.util.AvroUtils;
+
+
+/**
+ * A converter that removes recursion from Avro Generic Records
+ */
+@Slf4j
+public class AvroRecursionEliminatingConverter extends AvroToAvroConverterBase
{
+
+ @Override
+ public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit)
+ throws SchemaConversionException {
+ Pair<Schema, List<AvroUtils.SchemaEntry>> results =
AvroUtils.dropRecursiveFields(inputSchema);
+ List<AvroUtils.SchemaEntry> recursiveFields = results.getSecond();
+ if (!recursiveFields.isEmpty()) {
+ log.warn("Schema {} is recursive. Will drop fields [{}]",
inputSchema.getFullName(),
+ recursiveFields.stream().map(entry ->
entry.getFieldName()).collect(Collectors.joining(",")));
+ log.debug("Projected Schema = {}", results.getFirst());
+ }
+ return results.getFirst();
+ }
+
+ @Override
+ public Iterable<GenericRecord> convertRecordImpl(Schema outputSchema,
GenericRecord inputRecord,
+ WorkUnitState workUnit)
+ throws DataConversionException {
+ try {
+ return new
SingleRecordIterable(AvroUtils.convertRecordSchema(inputRecord, outputSchema));
+ } catch (IOException e) {
+ throw new DataConversionException("Failed to convert", e);
+ }
+ }
+}
diff --git
a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverterTest.java
b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverterTest.java
new file mode 100644
index 0000000..a043876
--- /dev/null
+++
b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverterTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter.avro;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.util.AvroUtils;
+
+
+public class AvroRecursionEliminatingConverterTest {
+
+
+ public File generateRecord()
+ throws IOException {
+ Schema inputSchema = new
Schema.Parser().parse(getClass().getResourceAsStream("/converter/recursive.avsc"));
+ GenericDatumWriter<GenericRecord> datumWriter = new
GenericDatumWriter<GenericRecord>(inputSchema);
+ GenericRecord record = new GenericData.Record(inputSchema);
+ record.put("name", "John");
+ record.put("date_of_birth", 1234L);
+ record.put("last_modified", 4567L);
+ record.put("created", 6789L);
+ GenericRecord addressRecord = new
GenericData.Record(inputSchema.getField("address").schema());
+ addressRecord.put("city", "Los Angeles");
+ addressRecord.put("street_number", 1234);
+
+
+ GenericRecord innerAddressRecord = new
GenericData.Record(inputSchema.getField("address").schema());
+ innerAddressRecord.put("city", "San Francisco");
+ innerAddressRecord.put("street_number", 3456);
+
+ addressRecord.put("previous_address", innerAddressRecord);
+ record.put("address", addressRecord);
+
+ File recordFile =
File.createTempFile(this.getClass().getSimpleName(),"avsc");
+ DataFileWriter<GenericRecord> dataFileWriter = new
DataFileWriter<GenericRecord>(datumWriter);
+ dataFileWriter.create(inputSchema, recordFile);
+ dataFileWriter.append(record);
+ dataFileWriter.close();
+ recordFile.deleteOnExit();
+ return recordFile;
+ }
+
+ /**
+ * Test schema and record conversion using a recursive schema
+ */
+ @Test
+ public void testConversion()
+ throws IOException {
+
+ File inputFile = generateRecord();
+
+ WorkUnitState workUnitState = new WorkUnitState();
+
+ Schema inputSchema = new
Schema.Parser().parse(getClass().getResourceAsStream("/converter/recursive.avsc"));
+ GenericDatumReader<GenericRecord> datumReader = new
GenericDatumReader<GenericRecord>(inputSchema);
+
+ DataFileReader<GenericRecord> dataFileReader = new
DataFileReader<GenericRecord>(inputFile, datumReader);
+ GenericRecord inputRecord = dataFileReader.next();
+
+ AvroRecursionEliminatingConverter converter = new
AvroRecursionEliminatingConverter();
+ Schema outputSchema = null;
+ String recursiveFieldPath = "address.previous_address";
+ // test that the inner recursive field is present in input schema
+ Assert.assertTrue(AvroUtils.getFieldSchema(inputSchema,
recursiveFieldPath).isPresent());
+ try {
+ outputSchema = converter.convertSchema(inputSchema, workUnitState);
+ // test that the inner recursive field is no longer in the schema
+ Assert.assertTrue(!AvroUtils.getFieldSchema(outputSchema,
recursiveFieldPath).isPresent(),
+ "Inner recursive field " + recursiveFieldPath + " should not be in
output schema");
+ } catch (SchemaConversionException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ GenericRecord outputRecord = null;
+ try {
+ outputRecord = converter.convertRecord(outputSchema, inputRecord,
workUnitState).iterator().next();
+ } catch (DataConversionException e) {
+ Assert.fail(e.getMessage());
+ }
+
+ checkEquality("address.street_number", inputRecord, 1234, "Different value
in input");
+ checkEquality("address.street_number", outputRecord, 1234, "Different
value in output");
+ checkEquality("name", inputRecord, new Utf8("John"), "Different value in
input");
+ checkEquality("name", outputRecord, new Utf8("John"), "Different value in
output");
+
+ // check that inner address record exists in input record
+ checkEquality("address.previous_address.city", inputRecord, new Utf8("San
Francisco"), "Different value in input");
+ checkEquality("address.previous_address", outputRecord, null, "Failed to
remove recursive field");
+
+ }
+
+ private void checkEquality(String fieldPath, GenericRecord inputRecord,
Object expected, String message) {
+ Optional inputValue = AvroUtils.getFieldValue(inputRecord, fieldPath);
+ if (expected != null) {
+ Assert.assertEquals(inputValue.get(), expected, message);
+ } else {
+ Assert.assertTrue(!inputValue.isPresent(), message);
+ }
+
+ }
+
+}
diff --git a/gobblin-core/src/test/resources/converter/recursive.avsc
b/gobblin-core/src/test/resources/converter/recursive.avsc
new file mode 100644
index 0000000..a3e28cb
--- /dev/null
+++ b/gobblin-core/src/test/resources/converter/recursive.avsc
@@ -0,0 +1,23 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "address",
+ "type" : {
+ "type": "record",
+ "name": "dummy_address",
+ "fields": [
+ {"name": "city", "type": "string"},
+ {"name": "street_number", "type": ["int", "null"]},
+ {"name": "previous_address", "type": ["null",
"dummy_address"]}
+ ]
+ }
+ }
+ ]
+}
diff --git
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 2ea7e10..fa1fa3b 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -53,6 +53,7 @@ import org.apache.avro.io.EncoderFactory;
import org.apache.avro.mapred.FsInput;
import org.apache.avro.util.Utf8;
import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
@@ -79,6 +80,9 @@ import com.google.common.collect.Sets;
import com.google.common.io.Closer;
import javax.annotation.Nonnull;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
/**
@@ -227,8 +231,10 @@ public class AvroUtils {
return AvroUtils.getFieldHelper(fieldSchema.getValueType(), pathList,
++field);
case RECORD:
return AvroUtils.getFieldHelper(fieldSchema, pathList, ++field);
+ case ARRAY:
+ return AvroUtils.getFieldHelper(fieldSchema.getElementType(),
pathList, ++field);
default:
- throw new AvroRuntimeException("Invalid type in schema : " + schema);
+ throw new AvroRuntimeException("Invalid type " + fieldSchema.getType()
+ " in schema : " + schema);
}
}
@@ -388,7 +394,7 @@ public class AvroUtils {
return reader.read(null, decoder);
} catch (IOException e) {
throw new IOException(
- String.format("Cannot convert avro record to new schema. Origianl
schema = %s, new schema = %s",
+ String.format("Cannot convert avro record to new schema. Original
schema = %s, new schema = %s",
record.getSchema(), newSchema),
e);
}
@@ -935,4 +941,169 @@ public class AvroUtils {
return newSchema;
}
+ @Builder
+ @ToString
+ public static class SchemaEntry {
+ @Getter
+ final String fieldName;
+ final Schema schema;
+ String fullyQualifiedType() {
+ return schema.getFullName();
+ }
+ }
+
+ /**
+ * Check if a schema has recursive fields inside it
+ * @param schema
+ * @param logger : Optional logger if you want the method to log why it
thinks the schema was recursive
+ * @return true / false
+ */
+ public static boolean isSchemaRecursive(Schema schema, Optional<Logger>
logger) {
+ List<SchemaEntry> recursiveFields = new ArrayList<>();
+ dropRecursive(new SchemaEntry(null, schema), Collections.EMPTY_LIST,
recursiveFields);
+ if (recursiveFields.isEmpty()) {
+ return false;
+ } else {
+ if (logger.isPresent()) {
+ logger.get().info("Found recursive fields [{}] in schema {}",
recursiveFields.stream().map(f -> f.fieldName).collect(Collectors.joining(",")),
+ schema.getFullName());
+ }
+ return true;
+ }
+ }
+
+
+ /**
+ * Drop recursive fields from a Schema. Recursive fields are fields that
refer to types that are part of the
+ * parent tree.
+ * e.g. consider this Schema for a User
+ * {
+ * "type": "record",
+ * "name": "User",
+ * "fields": [
+ * {"name": "name", "type": "string",
+ * {"name": "friend", "type": "User"}
+ * ]
+ * }
+ * the friend field is a recursive field. After recursion has been
eliminated we expect the output Schema to look like
+ * {
+ * "type": "record",
+ * "name": "User",
+ * "fields": [
+ * {"name": "name", "type": "string"}
+ * ]
+ * }
+ *
+ * @param schema
+ * @return a Pair of (The transformed schema with recursion eliminated, A
list of @link{SchemaEntry} objects which
+ * represent the fields that were removed from the original schema)
+ */
+ public static Pair<Schema, List<SchemaEntry>> dropRecursiveFields(Schema
schema) {
+ List<SchemaEntry> recursiveFields = new ArrayList<>();
+ return new Pair(dropRecursive(new SchemaEntry(null, schema),
Collections.EMPTY_LIST, recursiveFields),
+ recursiveFields);
+ }
+
+ /**
+ * Inner recursive method called by {@link #dropRecursiveFields(Schema)}
+ * @param schemaEntry
+ * @param parents
+ * @param fieldsWithRecursion
+ * @return the transformed Schema, null if schema is recursive w.r.t parent
schema traversed so far
+ */
+ private static Schema dropRecursive(SchemaEntry schemaEntry,
List<SchemaEntry> parents, List<SchemaEntry> fieldsWithRecursion) {
+ Schema schema = schemaEntry.schema;
+ // ignore primitive fields
+ switch (schema.getType()) {
+ case UNION:{
+ List<Schema> unionTypes = schema.getTypes();
+ List<Schema> copiedUnionTypes = new ArrayList<Schema>();
+ for (Schema unionSchema: unionTypes) {
+ SchemaEntry unionSchemaEntry = new SchemaEntry(
+ schemaEntry.fieldName, unionSchema);
+ copiedUnionTypes.add(dropRecursive(unionSchemaEntry, parents,
fieldsWithRecursion));
+ }
+ if (copiedUnionTypes.stream().anyMatch(x -> x == null)) {
+ // one or more types in the union are referring to a parent type
(directly recursive),
+ // entire union must be dropped
+ return null;
+ }
+ else {
+ Schema copySchema = Schema.createUnion(copiedUnionTypes);
+ copyProperties(schema, copySchema);
+ return copySchema;
+ }
+ }
+ case RECORD:{
+ // check if the type of this schema matches any in the parents list
+ if (parents.stream().anyMatch(parent ->
parent.fullyQualifiedType().equals(schemaEntry.fullyQualifiedType()))) {
+ fieldsWithRecursion.add(schemaEntry);
+ return null;
+ }
+ List<SchemaEntry> newParents = new ArrayList<>(parents);
+ newParents.add(schemaEntry);
+ List<Schema.Field> copiedSchemaFields = new ArrayList<>();
+ for (Schema.Field field: schema.getFields()) {
+ String fieldName = schemaEntry.fieldName != null ?
schemaEntry.fieldName + "." + field.name() : field.name();
+ SchemaEntry fieldSchemaEntry = new SchemaEntry(fieldName,
field.schema());
+ Schema copiedFieldSchema = dropRecursive(fieldSchemaEntry,
newParents, fieldsWithRecursion);
+ if (copiedFieldSchema == null) {
+ } else {
+ Schema.Field copiedField =
+ new Schema.Field(field.name(), copiedFieldSchema, field.doc(),
field.defaultValue(), field.order());
+ copyFieldProperties(field, copiedField);
+ copiedSchemaFields.add(copiedField);
+ }
+ }
+ if (copiedSchemaFields.size() > 0) {
+ Schema copiedRecord = Schema.createRecord(schema.getName(),
schema.getDoc(), schema.getNamespace(),
+ schema.isError());
+ copiedRecord.setFields(copiedSchemaFields);
+ copyProperties(schema, copiedRecord);
+ return copiedRecord;
+ } else {
+ return null;
+ }
+ }
+ case ARRAY: {
+ Schema itemSchema = schema.getElementType();
+ SchemaEntry itemSchemaEntry = new SchemaEntry(schemaEntry.fieldName,
itemSchema);
+ Schema copiedItemSchema = dropRecursive(itemSchemaEntry, parents,
fieldsWithRecursion);
+ if (copiedItemSchema == null) {
+ return null;
+ } else {
+ Schema copiedArraySchema = Schema.createArray(copiedItemSchema);
+ copyProperties(schema, copiedArraySchema);
+ return copiedArraySchema;
+ }
+ }
+ case MAP: {
+ Schema valueSchema = schema.getValueType();
+ SchemaEntry valueSchemaEntry = new SchemaEntry(schemaEntry.fieldName,
valueSchema);
+ Schema copiedValueSchema = dropRecursive(valueSchemaEntry, parents,
fieldsWithRecursion);
+ if (copiedValueSchema == null) {
+ return null;
+ } else {
+ Schema copiedMapSchema = Schema.createMap(copiedValueSchema);
+ copyProperties(schema, copiedMapSchema);
+ return copiedMapSchema;
+ }
+ }
+ default: {
+ return schema;
+ }
+ }
+ }
+
+ /**
+ * Annoyingly, Avro doesn't provide a field constructor where you can pass
in "unknown to Avro" properties
+ * to attach to the field object in the schema even though the Schema
language and object model supports it.
+ * This method allows for such copiers to explicitly copy the properties
from a source field to a destination field.
+ * @param sourceField
+ * @param copiedField
+ */
+ private static void copyFieldProperties(Schema.Field sourceField,
Schema.Field copiedField) {
+ sourceField.getProps().forEach((key, value) -> copiedField.addProp(key,
value));
+ }
+
}
diff --git
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 248f21f..a2e205b 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -23,8 +23,11 @@ import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.Schema;
@@ -42,17 +45,22 @@ import org.apache.avro.io.DecoderFactory;
import org.apache.avro.io.Encoder;
import org.apache.avro.io.EncoderFactory;
import org.apache.avro.mapred.FsInput;
+import org.apache.commons.math3.util.Pair;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.codehaus.jackson.JsonNode;
import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
import org.testng.Assert;
import org.testng.annotations.Test;
import com.google.common.base.Optional;
import com.google.common.collect.Maps;
+import lombok.extern.slf4j.Slf4j;
+
+@Slf4j
public class AvroUtilsTest {
private static final String AVRO_DIR =
"gobblin-utility/src/test/resources/avroDirParent/";
@@ -535,4 +543,63 @@ public class AvroUtilsTest {
Assert.assertEquals(newSchema.getNamespace(), outputNamespace);
}
+
+ @Test
+ public void testisSchemaRecursive()
+ throws IOException {
+ for (String scenario : new String[]{"norecursion", "simple", "union",
"multiple", "nested", "array", "map"}) {
+ System.out.println("Processing scenario for " + scenario);
+
+ Schema inputSchema = new Schema.Parser()
+
.parse(getClass().getClassLoader().getResourceAsStream("recursive_schemas/recursive_"
+ scenario + ".avsc"));
+
+ if (scenario.equals("norecursion")) {
+ Assert.assertFalse(AvroUtils.isSchemaRecursive(inputSchema,
Optional.of(log)),
+ "Schema for scenario " + scenario + " should not be recursive");
+ } else {
+ Assert.assertTrue(AvroUtils.isSchemaRecursive(inputSchema,
Optional.of(log)),
+ "Schema for scenario " + scenario + " should be recursive");
+ }
+ }
+ }
+
+
+ @Test
+ public void testDropRecursiveSchema()
+ throws IOException {
+
+ for (String scenario : new String[]{"norecursion", "simple", "union",
"multiple", "nested", "array", "map"}) {
+ System.out.println("Processing scenario for " + scenario);
+
+ Schema inputSchema = new
Schema.Parser().parse(getClass().getClassLoader()
+ .getResourceAsStream("recursive_schemas/recursive_" + scenario +
".avsc"));
+
+ Schema solutionSchema = new
Schema.Parser().parse(getClass().getClassLoader()
+ .getResourceAsStream("recursive_schemas/recursive_" + scenario +
"_solution.avsc"));
+
+ // get the answer from the input schema (test author needs to provide
this)
+ ArrayNode foo = (ArrayNode) inputSchema.getJsonProp("recursive_fields");
+ HashSet<String> answers = new HashSet<>();
+ for (JsonNode fieldsWithRecursion: foo) {
+ answers.add(fieldsWithRecursion.getTextValue());
+ }
+
+ Pair<Schema, List<AvroUtils.SchemaEntry>> results =
AvroUtils.dropRecursiveFields(inputSchema);
+ List<AvroUtils.SchemaEntry> fieldsWithRecursion = results.getSecond();
+ Schema transformedSchema = results.getFirst();
+
+ // Prove that fields with recursion are no longer present
+ for (String answer: answers) {
+ Assert.assertFalse(AvroUtils.getField(transformedSchema,
answer).isPresent());
+ }
+
+ // Additionally compare schema with solution schema
+ Assert.assertEquals(solutionSchema, transformedSchema,"Transformed
schema differs from solution schema for scenario " + scenario);
+
+ Set<String> recursiveFieldNames = fieldsWithRecursion.stream().map(se ->
se.fieldName).collect(Collectors.toSet());
+ Assert.assertEquals(recursiveFieldNames, answers,
+ "Found recursive fields differ from answers listed in the schema for
scenario " + scenario);
+
+ }
+ }
}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_array.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_array.avsc
new file mode 100644
index 0000000..c983972
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_array.avsc
@@ -0,0 +1,32 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "friends",
+ "type" : {
+ "type": "array",
+ "items": "User"
+ }
+ },
+ {"name": "places",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "Place",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "innerPlace", "type": "Place"}
+ ]
+ }
+ }},
+ {"name": "friend", "type": ["null", "User"]}
+ ],
+ "recursive_fields": ["places.innerPlace","friends", "friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_array_solution.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_array_solution.avsc
new file mode 100644
index 0000000..6250de7
--- /dev/null
+++
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_array_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "places",
+ "type": {
+ "type": "array",
+ "items": {
+ "type": "record",
+ "name": "Place",
+ "fields": [
+ {"name": "name", "type": "string"}
+ ]
+ }
+ }}
+ ],
+ "recursive_fields": ["places.innerPlace","friends", "friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_map.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_map.avsc
new file mode 100644
index 0000000..35e82d2
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_map.avsc
@@ -0,0 +1,39 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "friends",
+ "type" : {
+ "type": "map",
+ "values": "User"
+ }
+ },
+ {"name": "nullableFriends",
+ "type" : {
+ "type": "map",
+ "values": ["null", "User"]
+ }
+ },
+ {"name": "places",
+ "type": {
+ "type": "map",
+ "values": {
+ "type": "record",
+ "name": "Place",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "innerPlace", "type": "Place"},
+ {"name": "nullableInnerPlace", "type": ["null", "Place"]}
+ ]
+ }
+ }},
+ {"name": "friend", "type": ["null", "User"]}
+ ],
+ "recursive_fields": ["places.innerPlace", "places.nullableInnerPlace",
"friends", "nullableFriends","friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_map_solution.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_map_solution.avsc
new file mode 100644
index 0000000..e92f568
--- /dev/null
+++
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_map_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "places",
+ "type": {
+ "type": "map",
+ "values": {
+ "type": "record",
+ "name": "Place",
+ "fields": [
+ {"name": "name", "type": "string"}
+ ]
+ }
+ }}
+ ],
+ "recursive_fields": ["places.innerPlace", "places.nullableInnerPlace",
"friends", "nullableFriends","friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple.avsc
new file mode 100644
index 0000000..4dfc373
--- /dev/null
+++
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple.avsc
@@ -0,0 +1,15 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "friend", "type": "User"},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "other_friend", "type": "User"}
+ ],
+ "recursive_fields": ["friend", "other_friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple_solution.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple_solution.avsc
new file mode 100644
index 0000000..c0c5373
--- /dev/null
+++
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple_solution.avsc
@@ -0,0 +1,13 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"}
+ ],
+ "recursive_fields": ["friend", "other_friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested.avsc
new file mode 100644
index 0000000..7fe75fc
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested.avsc
@@ -0,0 +1,26 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "home_address",
+ "type" : {
+ "type": "record",
+ "name": "dummy_address",
+ "fields": [
+ {"name": "city", "type": "string"},
+ {"name": "street_number", "type": ["int", "null"]},
+ {"name": "previous_address", "type": ["null",
"dummy_address"]}
+ ]
+ }
+ },
+ {"name": "office_address", "type": "dummy_address"},
+ {"name": "friend", "type": ["null", "User"]}
+ ],
+ "recursive_fields":
["home_address.previous_address","office_address.previous_address","friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested_solution.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested_solution.avsc
new file mode 100644
index 0000000..7440b2d
--- /dev/null
+++
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "home_address",
+ "type" : {
+ "type": "record",
+ "name": "dummy_address",
+ "fields": [
+ {"name": "city", "type": "string"},
+ {"name": "street_number", "type": ["int", "null"]}
+ ]
+ }
+ },
+ {"name": "office_address", "type": "dummy_address"}
+ ],
+ "recursive_fields":
["home_address.previous_address","office_address.previous_address","friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion.avsc
new file mode 100644
index 0000000..3a9edb3
--- /dev/null
+++
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "home_address",
+ "type" : {
+ "type": "record",
+ "name": "dummy_address",
+ "fields": [
+ {"name": "city", "type": "string"},
+ {"name": "street_number", "type": ["int", "null"]}
+ ]
+ }
+ },
+ {"name": "office_address", "type": "dummy_address"}
+ ],
+ "recursive_fields": []
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion_solution.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion_solution.avsc
new file mode 100644
index 0000000..3a9edb3
--- /dev/null
+++
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "home_address",
+ "type" : {
+ "type": "record",
+ "name": "dummy_address",
+ "fields": [
+ {"name": "city", "type": "string"},
+ {"name": "street_number", "type": ["int", "null"]}
+ ]
+ }
+ },
+ {"name": "office_address", "type": "dummy_address"}
+ ],
+ "recursive_fields": []
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple.avsc
new file mode 100644
index 0000000..809e677
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple.avsc
@@ -0,0 +1,14 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "friend", "type": "User"}
+ ],
+ "recursive_fields": ["friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple_solution.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple_solution.avsc
new file mode 100644
index 0000000..6219834
--- /dev/null
+++
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple_solution.avsc
@@ -0,0 +1,13 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"}
+ ],
+ "recursive_fields": ["friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_union.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_union.avsc
new file mode 100644
index 0000000..dbfac42
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_union.avsc
@@ -0,0 +1,25 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "home_address",
+ "type" : {
+ "type": "record",
+ "name": "dummy_address",
+ "fields": [
+ {"name": "city", "type": "string"},
+ {"name": "street_number", "type": ["int", "null"]}
+ ]
+ }
+ },
+ {"name": "office_address", "type": "dummy_address"},
+ {"name": "friend", "type": ["null", "User"]}
+ ],
+ "recursive_fields": ["friend"]
+}
diff --git
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_union_solution.avsc
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_union_solution.avsc
new file mode 100644
index 0000000..4f54ead
--- /dev/null
+++
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_union_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "favorite_number", "type": ["int", "null"]},
+ {"name": "favorite_color", "type": ["string", "null"]},
+ {"name": "date_of_birth", "type": "long"},
+ {"name": "last_modified", "type": "long"},
+ {"name": "created", "type": "long"},
+ {"name": "home_address",
+ "type" : {
+ "type": "record",
+ "name": "dummy_address",
+ "fields": [
+ {"name": "city", "type": "string"},
+ {"name": "street_number", "type": ["int", "null"]}
+ ]
+ }
+ },
+ {"name": "office_address", "type": "dummy_address"}
+ ],
+ "recursive_fields": ["friend"]
+}