This is an automated email from the ASF dual-hosted git repository.

shirshanka pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 78bf163  [GOBBLIN-957] Add recursion eliminating code, converter for 
Avro
78bf163 is described below

commit 78bf1634814ee4cadd160a75e08421daa0a3ed9a
Author: Shirshanka Das <[email protected]>
AuthorDate: Fri Nov 15 17:27:46 2019 -0800

    [GOBBLIN-957] Add recursion eliminating code, converter for Avro
    
    Closes #2806 from shirshanka/recursive
---
 .../avro/AvroRecursionEliminatingConverter.java    |  67 ++++++++
 .../AvroRecursionEliminatingConverterTest.java     | 134 ++++++++++++++++
 .../src/test/resources/converter/recursive.avsc    |  23 +++
 .../java/org/apache/gobblin/util/AvroUtils.java    | 175 ++++++++++++++++++++-
 .../org/apache/gobblin/util/AvroUtilsTest.java     |  67 ++++++++
 .../recursive_schemas/recursive_array.avsc         |  32 ++++
 .../recursive_array_solution.avsc                  |  24 +++
 .../resources/recursive_schemas/recursive_map.avsc |  39 +++++
 .../recursive_schemas/recursive_map_solution.avsc  |  24 +++
 .../recursive_schemas/recursive_multiple.avsc      |  15 ++
 .../recursive_multiple_solution.avsc               |  13 ++
 .../recursive_schemas/recursive_nested.avsc        |  26 +++
 .../recursive_nested_solution.avsc                 |  24 +++
 .../recursive_schemas/recursive_norecursion.avsc   |  24 +++
 .../recursive_norecursion_solution.avsc            |  24 +++
 .../recursive_schemas/recursive_simple.avsc        |  14 ++
 .../recursive_simple_solution.avsc                 |  13 ++
 .../recursive_schemas/recursive_union.avsc         |  25 +++
 .../recursive_union_solution.avsc                  |  24 +++
 19 files changed, 785 insertions(+), 2 deletions(-)

diff --git 
a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverter.java
 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverter.java
new file mode 100644
index 0000000..af5a08f
--- /dev/null
+++ 
b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverter.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter.avro;
+
+import java.io.IOException;
+import java.util.List;
+import java.util.stream.Collectors;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.commons.math3.util.Pair;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.AvroToAvroConverterBase;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.converter.SingleRecordIterable;
+import org.apache.gobblin.util.AvroUtils;
+
+
+/**
+ * A converter that removes recursion from Avro Generic Records
+ */
+@Slf4j
+public class AvroRecursionEliminatingConverter extends AvroToAvroConverterBase 
{
+
+  @Override
+  public Schema convertSchema(Schema inputSchema, WorkUnitState workUnit)
+      throws SchemaConversionException {
+    Pair<Schema, List<AvroUtils.SchemaEntry>> results = 
AvroUtils.dropRecursiveFields(inputSchema);
+    List<AvroUtils.SchemaEntry> recursiveFields = results.getSecond();
+    if (!recursiveFields.isEmpty()) {
+      log.warn("Schema {} is recursive. Will drop fields [{}]", 
inputSchema.getFullName(),
+          recursiveFields.stream().map(entry -> 
entry.getFieldName()).collect(Collectors.joining(",")));
+      log.debug("Projected Schema = {}", results.getFirst());
+    }
+    return results.getFirst();
+  }
+
+  @Override
+  public Iterable<GenericRecord> convertRecordImpl(Schema outputSchema, 
GenericRecord inputRecord,
+      WorkUnitState workUnit)
+      throws DataConversionException {
+    try {
+      return new 
SingleRecordIterable(AvroUtils.convertRecordSchema(inputRecord, outputSchema));
+    } catch (IOException e) {
+      throw new DataConversionException("Failed to convert", e);
+    }
+  }
+}
diff --git 
a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverterTest.java
 
b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverterTest.java
new file mode 100644
index 0000000..a043876
--- /dev/null
+++ 
b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/AvroRecursionEliminatingConverterTest.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.converter.avro;
+
+import java.io.File;
+import java.io.IOException;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.util.Utf8;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+import com.google.common.base.Optional;
+
+import org.apache.gobblin.configuration.WorkUnitState;
+import org.apache.gobblin.converter.DataConversionException;
+import org.apache.gobblin.converter.SchemaConversionException;
+import org.apache.gobblin.util.AvroUtils;
+
+
+public class AvroRecursionEliminatingConverterTest {
+
+
+  public File generateRecord()
+      throws IOException {
+    Schema inputSchema = new 
Schema.Parser().parse(getClass().getResourceAsStream("/converter/recursive.avsc"));
+    GenericDatumWriter<GenericRecord> datumWriter = new 
GenericDatumWriter<GenericRecord>(inputSchema);
+    GenericRecord record = new GenericData.Record(inputSchema);
+    record.put("name", "John");
+    record.put("date_of_birth", 1234L);
+    record.put("last_modified", 4567L);
+    record.put("created", 6789L);
+    GenericRecord addressRecord = new 
GenericData.Record(inputSchema.getField("address").schema());
+    addressRecord.put("city", "Los Angeles");
+    addressRecord.put("street_number", 1234);
+
+
+    GenericRecord innerAddressRecord = new 
GenericData.Record(inputSchema.getField("address").schema());
+    innerAddressRecord.put("city", "San Francisco");
+    innerAddressRecord.put("street_number", 3456);
+
+    addressRecord.put("previous_address", innerAddressRecord);
+    record.put("address", addressRecord);
+
+    File recordFile = 
File.createTempFile(this.getClass().getSimpleName(),"avsc");
+    DataFileWriter<GenericRecord> dataFileWriter = new 
DataFileWriter<GenericRecord>(datumWriter);
+    dataFileWriter.create(inputSchema, recordFile);
+    dataFileWriter.append(record);
+    dataFileWriter.close();
+    recordFile.deleteOnExit();
+    return recordFile;
+  }
+
+  /**
+   * Test schema and record conversion using a recursive schema
+   */
+  @Test
+  public void testConversion()
+      throws IOException {
+
+    File inputFile = generateRecord();
+
+    WorkUnitState workUnitState = new WorkUnitState();
+
+    Schema inputSchema = new 
Schema.Parser().parse(getClass().getResourceAsStream("/converter/recursive.avsc"));
+    GenericDatumReader<GenericRecord> datumReader = new 
GenericDatumReader<GenericRecord>(inputSchema);
+
+    DataFileReader<GenericRecord> dataFileReader = new 
DataFileReader<GenericRecord>(inputFile, datumReader);
+    GenericRecord inputRecord = dataFileReader.next();
+
+    AvroRecursionEliminatingConverter converter = new 
AvroRecursionEliminatingConverter();
+    Schema outputSchema = null;
+    String recursiveFieldPath = "address.previous_address";
+    // test that the inner recursive field is present in input schema
+    Assert.assertTrue(AvroUtils.getFieldSchema(inputSchema,  
recursiveFieldPath).isPresent());
+    try {
+      outputSchema = converter.convertSchema(inputSchema, workUnitState);
+      // test that the inner recursive field is no longer in the schema
+      Assert.assertTrue(!AvroUtils.getFieldSchema(outputSchema, 
recursiveFieldPath).isPresent(),
+          "Inner recursive field " + recursiveFieldPath + " should not be in 
output schema");
+    } catch (SchemaConversionException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    GenericRecord outputRecord = null;
+    try {
+      outputRecord = converter.convertRecord(outputSchema, inputRecord, 
workUnitState).iterator().next();
+    } catch (DataConversionException e) {
+      Assert.fail(e.getMessage());
+    }
+
+    checkEquality("address.street_number", inputRecord, 1234, "Different value 
in input");
+    checkEquality("address.street_number", outputRecord, 1234, "Different 
value in output");
+    checkEquality("name", inputRecord, new Utf8("John"), "Different value in 
input");
+    checkEquality("name", outputRecord, new Utf8("John"), "Different value in 
output");
+
+    // check that inner address record exists in input record
+    checkEquality("address.previous_address.city", inputRecord, new Utf8("San 
Francisco"), "Different value in input");
+    checkEquality("address.previous_address", outputRecord, null, "Failed to 
remove recursive field");
+
+  }
+
+  private void checkEquality(String fieldPath, GenericRecord inputRecord, 
Object expected, String message) {
+    Optional inputValue = AvroUtils.getFieldValue(inputRecord, fieldPath);
+    if (expected != null) {
+      Assert.assertEquals(inputValue.get(), expected, message);
+    } else {
+      Assert.assertTrue(!inputValue.isPresent(), message);
+    }
+
+  }
+
+}
diff --git a/gobblin-core/src/test/resources/converter/recursive.avsc 
b/gobblin-core/src/test/resources/converter/recursive.avsc
new file mode 100644
index 0000000..a3e28cb
--- /dev/null
+++ b/gobblin-core/src/test/resources/converter/recursive.avsc
@@ -0,0 +1,23 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "address",
+      "type" : {
+                   "type": "record",
+                   "name": "dummy_address",
+                   "fields": [
+                       {"name": "city", "type": "string"},
+                       {"name": "street_number",  "type": ["int", "null"]},
+                       {"name": "previous_address", "type": ["null", 
"dummy_address"]}
+                   ]
+               }
+     }
+ ]
+}
diff --git 
a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java 
b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
index 2ea7e10..fa1fa3b 100644
--- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
+++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
@@ -53,6 +53,7 @@ import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.mapred.FsInput;
 import org.apache.avro.util.Utf8;
 import org.apache.commons.lang.StringUtils;
+import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -79,6 +80,9 @@ import com.google.common.collect.Sets;
 import com.google.common.io.Closer;
 
 import javax.annotation.Nonnull;
+import lombok.Builder;
+import lombok.Getter;
+import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 
 /**
@@ -227,8 +231,10 @@ public class AvroUtils {
         return AvroUtils.getFieldHelper(fieldSchema.getValueType(), pathList, 
++field);
       case RECORD:
         return AvroUtils.getFieldHelper(fieldSchema, pathList, ++field);
+      case ARRAY:
+        return AvroUtils.getFieldHelper(fieldSchema.getElementType(), 
pathList, ++field);
       default:
-        throw new AvroRuntimeException("Invalid type in schema : " + schema);
+        throw new AvroRuntimeException("Invalid type " + fieldSchema.getType() 
+ " in schema : " + schema);
     }
   }
 
@@ -388,7 +394,7 @@ public class AvroUtils {
       return reader.read(null, decoder);
     } catch (IOException e) {
       throw new IOException(
-          String.format("Cannot convert avro record to new schema. Origianl 
schema = %s, new schema = %s",
+          String.format("Cannot convert avro record to new schema. Original 
schema = %s, new schema = %s",
               record.getSchema(), newSchema),
           e);
     }
@@ -935,4 +941,169 @@ public class AvroUtils {
     return newSchema;
   }
 
+  @Builder
+  @ToString
+  public static class SchemaEntry {
+    @Getter
+    final String fieldName;
+    final Schema schema;
+    String fullyQualifiedType() {
+      return schema.getFullName();
+    }
+  }
+
+  /**
+   * Check if a schema has recursive fields inside it
+   * @param schema
+   * @param logger : Optional logger if you want the method to log why it 
thinks the schema was recursive
+   * @return true / false
+   */
+  public static boolean isSchemaRecursive(Schema schema, Optional<Logger> 
logger) {
+    List<SchemaEntry> recursiveFields = new ArrayList<>();
+    dropRecursive(new SchemaEntry(null, schema), Collections.EMPTY_LIST, 
recursiveFields);
+    if (recursiveFields.isEmpty()) {
+      return false;
+    } else {
+      if (logger.isPresent()) {
+        logger.get().info("Found recursive fields [{}] in schema {}", 
recursiveFields.stream().map(f -> f.fieldName).collect(Collectors.joining(",")),
+            schema.getFullName());
+      }
+      return true;
+    }
+  }
+
+
+  /**
+   * Drop recursive fields from a Schema. Recursive fields are fields that 
refer to types that are part of the
+   * parent tree.
+   * e.g. consider this Schema for a User
+   * {
+   *   "type": "record",
+   *   "name": "User",
+   *   "fields": [
+   *     {"name": "name", "type": "string",
+   *     {"name": "friend", "type": "User"}
+   *   ]
+   * }
+   * the friend field is a recursive field. After recursion has been 
eliminated we expect the output Schema to look like
+   * {
+   *    "type": "record",
+   *    "name": "User",
+   *    "fields": [
+   *    {"name": "name", "type": "string"}
+   *    ]
+   * }
+   *
+   * @param schema
+   * @return a Pair of (The transformed schema with recursion eliminated, A 
list of @link{SchemaEntry} objects which
+   * represent the fields that were removed from the original schema)
+   */
+  public static Pair<Schema, List<SchemaEntry>> dropRecursiveFields(Schema 
schema) {
+    List<SchemaEntry> recursiveFields = new ArrayList<>();
+    return new Pair(dropRecursive(new SchemaEntry(null, schema), 
Collections.EMPTY_LIST, recursiveFields),
+        recursiveFields);
+  }
+
+  /**
+   * Inner recursive method called by {@link #dropRecursiveFields(Schema)}
+   * @param schemaEntry
+   * @param parents
+   * @param fieldsWithRecursion
+   * @return the transformed Schema, null if schema is recursive w.r.t parent 
schema traversed so far
+   */
+  private static Schema dropRecursive(SchemaEntry schemaEntry, 
List<SchemaEntry> parents, List<SchemaEntry> fieldsWithRecursion) {
+    Schema schema = schemaEntry.schema;
+    // ignore primitive fields
+    switch (schema.getType()) {
+      case UNION:{
+        List<Schema> unionTypes = schema.getTypes();
+        List<Schema> copiedUnionTypes = new ArrayList<Schema>();
+        for (Schema unionSchema: unionTypes) {
+          SchemaEntry unionSchemaEntry = new SchemaEntry(
+              schemaEntry.fieldName, unionSchema);
+          copiedUnionTypes.add(dropRecursive(unionSchemaEntry, parents, 
fieldsWithRecursion));
+        }
+        if (copiedUnionTypes.stream().anyMatch(x -> x == null)) {
+          // one or more types in the union are referring to a parent type 
(directly recursive),
+          // entire union must be dropped
+          return null;
+        }
+        else {
+          Schema copySchema = Schema.createUnion(copiedUnionTypes);
+          copyProperties(schema, copySchema);
+          return copySchema;
+        }
+      }
+      case RECORD:{
+        // check if the type of this schema matches any in the parents list
+        if (parents.stream().anyMatch(parent -> 
parent.fullyQualifiedType().equals(schemaEntry.fullyQualifiedType()))) {
+          fieldsWithRecursion.add(schemaEntry);
+          return null;
+        }
+        List<SchemaEntry> newParents = new ArrayList<>(parents);
+        newParents.add(schemaEntry);
+        List<Schema.Field> copiedSchemaFields = new ArrayList<>();
+        for (Schema.Field field: schema.getFields()) {
+          String fieldName = schemaEntry.fieldName != null ? 
schemaEntry.fieldName + "." + field.name() : field.name();
+          SchemaEntry fieldSchemaEntry = new SchemaEntry(fieldName, 
field.schema());
+          Schema copiedFieldSchema = dropRecursive(fieldSchemaEntry, 
newParents, fieldsWithRecursion);
+          if (copiedFieldSchema == null) {
+          } else {
+            Schema.Field copiedField =
+                new Schema.Field(field.name(), copiedFieldSchema, field.doc(), 
field.defaultValue(), field.order());
+            copyFieldProperties(field, copiedField);
+            copiedSchemaFields.add(copiedField);
+          }
+        }
+        if (copiedSchemaFields.size() > 0) {
+          Schema copiedRecord = Schema.createRecord(schema.getName(), 
schema.getDoc(), schema.getNamespace(),
+              schema.isError());
+          copiedRecord.setFields(copiedSchemaFields);
+          copyProperties(schema, copiedRecord);
+          return copiedRecord;
+        } else {
+          return null;
+        }
+      }
+      case ARRAY: {
+        Schema itemSchema = schema.getElementType();
+        SchemaEntry itemSchemaEntry = new SchemaEntry(schemaEntry.fieldName, 
itemSchema);
+        Schema copiedItemSchema = dropRecursive(itemSchemaEntry, parents, 
fieldsWithRecursion);
+        if (copiedItemSchema == null) {
+          return null;
+        } else {
+          Schema copiedArraySchema = Schema.createArray(copiedItemSchema);
+          copyProperties(schema, copiedArraySchema);
+          return copiedArraySchema;
+        }
+      }
+      case MAP: {
+        Schema valueSchema = schema.getValueType();
+        SchemaEntry valueSchemaEntry = new SchemaEntry(schemaEntry.fieldName, 
valueSchema);
+        Schema copiedValueSchema = dropRecursive(valueSchemaEntry, parents, 
fieldsWithRecursion);
+        if (copiedValueSchema == null) {
+          return null;
+        } else {
+          Schema copiedMapSchema = Schema.createMap(copiedValueSchema);
+          copyProperties(schema, copiedMapSchema);
+          return copiedMapSchema;
+        }
+      }
+      default: {
+        return schema;
+      }
+    }
+  }
+
+  /**
+   * Annoyingly, Avro doesn't provide a field constructor where you can pass 
in "unknown to Avro" properties
+   * to attach to the field object in the schema even though the Schema 
language and object model supports it.
+   * This method allows for such copiers to explicitly copy the properties 
from a source field to a destination field.
+   * @param sourceField
+   * @param copiedField
+   */
+  private static void copyFieldProperties(Schema.Field sourceField, 
Schema.Field copiedField) {
+    sourceField.getProps().forEach((key, value) -> copiedField.addProp(key, 
value));
+  }
+
 }
diff --git 
a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java 
b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
index 248f21f..a2e205b 100644
--- a/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
+++ b/gobblin-utility/src/test/java/org/apache/gobblin/util/AvroUtilsTest.java
@@ -23,8 +23,11 @@ import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import org.apache.avro.AvroRuntimeException;
 import org.apache.avro.Schema;
@@ -42,17 +45,22 @@ import org.apache.avro.io.DecoderFactory;
 import org.apache.avro.io.Encoder;
 import org.apache.avro.io.EncoderFactory;
 import org.apache.avro.mapred.FsInput;
+import org.apache.commons.math3.util.Pair;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.codehaus.jackson.JsonNode;
 import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.node.ArrayNode;
 import org.testng.Assert;
 import org.testng.annotations.Test;
 
 import com.google.common.base.Optional;
 import com.google.common.collect.Maps;
 
+import lombok.extern.slf4j.Slf4j;
 
+
+@Slf4j
 public class AvroUtilsTest {
   private static final String AVRO_DIR = 
"gobblin-utility/src/test/resources/avroDirParent/";
 
@@ -535,4 +543,63 @@ public class AvroUtilsTest {
     Assert.assertEquals(newSchema.getNamespace(), outputNamespace);
   }
 
+
+  @Test
+  public void testisSchemaRecursive()
+      throws IOException {
+    for (String scenario : new String[]{"norecursion", "simple", "union", 
"multiple", "nested", "array", "map"}) {
+      System.out.println("Processing scenario for " + scenario);
+
+      Schema inputSchema = new Schema.Parser()
+          
.parse(getClass().getClassLoader().getResourceAsStream("recursive_schemas/recursive_"
 + scenario + ".avsc"));
+
+      if (scenario.equals("norecursion")) {
+        Assert.assertFalse(AvroUtils.isSchemaRecursive(inputSchema, 
Optional.of(log)),
+            "Schema for scenario " + scenario + " should not be recursive");
+      } else {
+        Assert.assertTrue(AvroUtils.isSchemaRecursive(inputSchema, 
Optional.of(log)),
+            "Schema for scenario " + scenario + " should be recursive");
+      }
+    }
+  }
+
+
+  @Test
+  public void testDropRecursiveSchema()
+      throws IOException {
+
+    for (String scenario : new String[]{"norecursion", "simple", "union", 
"multiple", "nested", "array", "map"}) {
+      System.out.println("Processing scenario for " + scenario);
+
+      Schema inputSchema = new 
Schema.Parser().parse(getClass().getClassLoader()
+          .getResourceAsStream("recursive_schemas/recursive_" + scenario + 
".avsc"));
+
+      Schema solutionSchema = new 
Schema.Parser().parse(getClass().getClassLoader()
+          .getResourceAsStream("recursive_schemas/recursive_" + scenario + 
"_solution.avsc"));
+
+      // get the answer from the input schema (test author needs to provide 
this)
+      ArrayNode foo = (ArrayNode) inputSchema.getJsonProp("recursive_fields");
+      HashSet<String> answers = new HashSet<>();
+      for (JsonNode fieldsWithRecursion: foo) {
+        answers.add(fieldsWithRecursion.getTextValue());
+      }
+
+      Pair<Schema, List<AvroUtils.SchemaEntry>> results = 
AvroUtils.dropRecursiveFields(inputSchema);
+      List<AvroUtils.SchemaEntry> fieldsWithRecursion = results.getSecond();
+      Schema transformedSchema = results.getFirst();
+
+      // Prove that fields with recursion are no longer present
+      for (String answer: answers) {
+        Assert.assertFalse(AvroUtils.getField(transformedSchema, 
answer).isPresent());
+      }
+
+      // Additionally compare schema with solution schema
+      Assert.assertEquals(solutionSchema, transformedSchema,"Transformed 
schema differs from solution schema for scenario " + scenario);
+
+      Set<String> recursiveFieldNames = fieldsWithRecursion.stream().map(se -> 
se.fieldName).collect(Collectors.toSet());
+      Assert.assertEquals(recursiveFieldNames, answers,
+          "Found recursive fields differ from answers listed in the schema for 
scenario " + scenario);
+
+    }
+  }
 }
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_array.avsc 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_array.avsc
new file mode 100644
index 0000000..c983972
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_array.avsc
@@ -0,0 +1,32 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "friends",
+      "type" : {
+                   "type": "array",
+                   "items": "User"
+               }
+     },
+     {"name": "places",
+      "type": {
+                "type": "array",
+                "items": {
+                  "type": "record",
+                  "name": "Place",
+                  "fields": [
+                    {"name": "name", "type": "string"},
+                    {"name": "innerPlace", "type": "Place"}
+                  ]
+                }
+     }},
+     {"name": "friend", "type": ["null", "User"]}
+ ],
+ "recursive_fields": ["places.innerPlace","friends", "friend"] 
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_array_solution.avsc
 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_array_solution.avsc
new file mode 100644
index 0000000..6250de7
--- /dev/null
+++ 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_array_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "places",
+      "type": {
+                "type": "array",
+                "items": {
+                  "type": "record",
+                  "name": "Place",
+                  "fields": [
+                    {"name": "name", "type": "string"}
+                  ]
+                }
+     }}
+ ],
+ "recursive_fields": ["places.innerPlace","friends", "friend"] 
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_map.avsc 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_map.avsc
new file mode 100644
index 0000000..35e82d2
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_map.avsc
@@ -0,0 +1,39 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "friends",
+      "type" : {
+                   "type": "map",
+                   "values": "User"
+               }
+     },
+     {"name": "nullableFriends",
+      "type" : {
+                   "type": "map",
+                   "values": ["null", "User"]
+               }
+     },
+    {"name": "places",
+      "type": {
+                "type": "map",
+                "values": {
+                  "type": "record",
+                  "name": "Place",
+                  "fields": [
+                    {"name": "name", "type": "string"},
+                    {"name": "innerPlace", "type": "Place"},
+                    {"name": "nullableInnerPlace", "type": ["null", "Place"]}
+                  ]
+                }
+     }},
+     {"name": "friend", "type": ["null", "User"]}
+ ],
+ "recursive_fields": ["places.innerPlace", "places.nullableInnerPlace", 
"friends", "nullableFriends","friend"]
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_map_solution.avsc
 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_map_solution.avsc
new file mode 100644
index 0000000..e92f568
--- /dev/null
+++ 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_map_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "places",
+      "type": {
+                "type": "map",
+                "values": {
+                  "type": "record",
+                  "name": "Place",
+                  "fields": [
+                    {"name": "name", "type": "string"}
+                  ]
+                }
+     }}
+ ],
+ "recursive_fields": ["places.innerPlace", "places.nullableInnerPlace", 
"friends", "nullableFriends","friend"]
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple.avsc 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple.avsc
new file mode 100644
index 0000000..4dfc373
--- /dev/null
+++ 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple.avsc
@@ -0,0 +1,15 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "friend", "type": "User"},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "other_friend", "type": "User"}
+ ],
+ "recursive_fields": ["friend", "other_friend"]
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple_solution.avsc
 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple_solution.avsc
new file mode 100644
index 0000000..c0c5373
--- /dev/null
+++ 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_multiple_solution.avsc
@@ -0,0 +1,13 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"}
+ ],
+ "recursive_fields": ["friend", "other_friend"]
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested.avsc 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested.avsc
new file mode 100644
index 0000000..7fe75fc
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested.avsc
@@ -0,0 +1,26 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "home_address",
+      "type" : {
+                   "type": "record",
+                   "name": "dummy_address",
+                   "fields": [
+                       {"name": "city", "type": "string"},
+                       {"name": "street_number",  "type": ["int", "null"]},
+                       {"name": "previous_address", "type": ["null", 
"dummy_address"]}
+                   ]
+               }
+     },
+     {"name": "office_address", "type": "dummy_address"},
+     {"name": "friend", "type": ["null", "User"]}
+ ],
+ "recursive_fields": 
["home_address.previous_address","office_address.previous_address","friend"] 
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested_solution.avsc
 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested_solution.avsc
new file mode 100644
index 0000000..7440b2d
--- /dev/null
+++ 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_nested_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "home_address",
+      "type" : {
+                   "type": "record",
+                   "name": "dummy_address",
+                   "fields": [
+                       {"name": "city", "type": "string"},
+                       {"name": "street_number",  "type": ["int", "null"]}
+                   ]
+               }
+     },
+     {"name": "office_address", "type": "dummy_address"}
+ ],
+ "recursive_fields": 
["home_address.previous_address","office_address.previous_address","friend"] 
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion.avsc
 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion.avsc
new file mode 100644
index 0000000..3a9edb3
--- /dev/null
+++ 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "home_address",
+      "type" : {
+                   "type": "record",
+                   "name": "dummy_address",
+                   "fields": [
+                       {"name": "city", "type": "string"},
+                       {"name": "street_number",  "type": ["int", "null"]}
+                   ]
+               }
+     },
+     {"name": "office_address", "type": "dummy_address"}
+ ],
+ "recursive_fields": []
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion_solution.avsc
 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion_solution.avsc
new file mode 100644
index 0000000..3a9edb3
--- /dev/null
+++ 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_norecursion_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "home_address",
+      "type" : {
+                   "type": "record",
+                   "name": "dummy_address",
+                   "fields": [
+                       {"name": "city", "type": "string"},
+                       {"name": "street_number",  "type": ["int", "null"]}
+                   ]
+               }
+     },
+     {"name": "office_address", "type": "dummy_address"}
+ ],
+ "recursive_fields": []
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple.avsc 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple.avsc
new file mode 100644
index 0000000..809e677
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple.avsc
@@ -0,0 +1,14 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "friend", "type": "User"}
+ ],
+ "recursive_fields": ["friend"]
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple_solution.avsc
 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple_solution.avsc
new file mode 100644
index 0000000..6219834
--- /dev/null
+++ 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_simple_solution.avsc
@@ -0,0 +1,13 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"}
+ ],
+ "recursive_fields": ["friend"]
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_union.avsc 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_union.avsc
new file mode 100644
index 0000000..dbfac42
--- /dev/null
+++ b/gobblin-utility/src/test/resources/recursive_schemas/recursive_union.avsc
@@ -0,0 +1,25 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "home_address",
+      "type" : {
+                   "type": "record",
+                   "name": "dummy_address",
+                   "fields": [
+                       {"name": "city", "type": "string"},
+                       {"name": "street_number",  "type": ["int", "null"]}
+                   ]
+               }
+     },
+     {"name": "office_address", "type": "dummy_address"},
+     {"name": "friend", "type": ["null", "User"]}
+ ],
+ "recursive_fields": ["friend"]
+}
diff --git 
a/gobblin-utility/src/test/resources/recursive_schemas/recursive_union_solution.avsc
 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_union_solution.avsc
new file mode 100644
index 0000000..4f54ead
--- /dev/null
+++ 
b/gobblin-utility/src/test/resources/recursive_schemas/recursive_union_solution.avsc
@@ -0,0 +1,24 @@
+{"namespace": "example.avro",
+ "type": "record",
+ "name": "User",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "favorite_number",  "type": ["int", "null"]},
+     {"name": "favorite_color", "type": ["string", "null"]},
+     {"name": "date_of_birth", "type": "long"},
+     {"name": "last_modified", "type": "long"},
+     {"name": "created", "type": "long"},
+     {"name": "home_address",
+      "type" : {
+                   "type": "record",
+                   "name": "dummy_address",
+                   "fields": [
+                       {"name": "city", "type": "string"},
+                       {"name": "street_number",  "type": ["int", "null"]}
+                   ]
+               }
+     },
+     {"name": "office_address", "type": "dummy_address"}
+ ],
+ "recursive_fields": ["friend"]
+}

Reply via email to