[
https://issues.apache.org/jira/browse/GOBBLIN-957?focusedWorklogId=343564&page=com.atlassian.jira.plugin.system.issuetabpanels:worklog-tabpanel#worklog-343564
]
ASF GitHub Bot logged work on GOBBLIN-957:
------------------------------------------
Author: ASF GitHub Bot
Created on: 14/Nov/19 17:39
Start Date: 14/Nov/19 17:39
Worklog Time Spent: 10m
Work Description: autumnust commented on pull request #2806: GOBBLIN-957:
Add recursion eliminating converter for Avro
URL: https://github.com/apache/incubator-gobblin/pull/2806#discussion_r346450139
##########
File path: gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java
##########
@@ -935,4 +941,141 @@ public static Schema overrideNameAndNamespace(Schema
input, String nameOverride,
return newSchema;
}
+ @Builder
+ @ToString
+ public static class SchemaEntry {
+ @Getter
+ final String fieldName;
+ final Schema schema;
+ String fullyQualifiedType() {
+ return schema.getFullName();
+ }
+ }
+
+ /**
+ * Drop recursive fields from a Schema. Recursive fields are fields that
refer to types that are part of the
+ * parent tree.
+ * e.g. consider this Schema for a User
+ * {
+ * "type": "record",
+ * "name": "User",
+ * "fields": [
+ * {"name": "name", "type": "string",
+ * {"name": "friend", "type": "User"}
+ * ]
+ * }
+ * the friend field is a recursive field. After recursion has been
eliminated we expect the output Schema to look like
+ * {
+ * "type": "record",
+ * "name": "User",
+ * "fields": [
+ * {"name": "name", "type": "string"}
+ * ]
+ * }
+ *
+ * @param schema
+ * @return a Pair of (The transformed schema with recursion eliminated, A
list of @link{SchemaEntry} objects which
+ * represent the fields that were removed from the original schema)
+ */
+ public static Pair<Schema, List<SchemaEntry>> dropRecursiveFields(Schema
schema) {
+ List<SchemaEntry> recursiveFields = new ArrayList<>();
+ return new Pair(dropRecursive(new SchemaEntry(null, schema),
Collections.EMPTY_LIST, recursiveFields),
+ recursiveFields);
+ }
+
+ /**
+ * Inner recursive method called by {@link #dropRecursiveFields(Schema)}
+ * @param schemaEntry
+ * @param parents
+ * @param fieldsWithRecursion
+ * @return the transformed Schema, null if schema is recursive w.r.t parent
schema traversed so far
+ */
+ private static Schema dropRecursive(SchemaEntry schemaEntry,
List<SchemaEntry> parents, List<SchemaEntry> fieldsWithRecursion) {
+ Schema schema = schemaEntry.schema;
+ // ignore primitive fields
+ switch (schema.getType()) {
+ case UNION:{
+ List<Schema> unionTypes = schema.getTypes();
+ List<Schema> copiedUnionTypes = new ArrayList<Schema>();
+ for (Schema unionSchema: unionTypes) {
+ SchemaEntry unionSchemaEntry = new SchemaEntry(
+ schemaEntry.fieldName, unionSchema);
+ copiedUnionTypes.add(dropRecursive(unionSchemaEntry, parents,
fieldsWithRecursion));
+ }
+ if (copiedUnionTypes.stream().anyMatch(x -> x == null)) {
+ // one or more types in the union are referring to a parent type
(directly recursive),
+ // entire union must be dropped
+ return null;
+ }
+ else {
+ Schema copySchema = Schema.createUnion(copiedUnionTypes);
+ copyProperties(schema, copySchema);
+ return copySchema;
+ }
+ }
+ case RECORD:{
+ // check if the type of this schema matches any in the parents list
+ if (parents.stream().anyMatch(parent ->
parent.fullyQualifiedType().equals(schemaEntry.fullyQualifiedType()))) {
+ fieldsWithRecursion.add(schemaEntry);
+ return null;
+ }
+ List<SchemaEntry> newParents = new ArrayList<>(parents);
+ newParents.add(schemaEntry);
+ List<Schema.Field> copiedSchemaFields = new ArrayList<>();
+ for (Schema.Field field: schema.getFields()) {
+ String fieldName = schemaEntry.fieldName != null ?
schemaEntry.fieldName + "." + field.name() : field.name();
+ SchemaEntry fieldSchemaEntry = new SchemaEntry(fieldName,
field.schema());
+ Schema copiedFieldSchema = dropRecursive(fieldSchemaEntry,
newParents, fieldsWithRecursion);
+ if (copiedFieldSchema == null) {
+ } else {
+ Schema.Field copiedField =
+ new Schema.Field(field.name(), copiedFieldSchema, field.doc(),
field.defaultValue(), field.order());
+ copyFieldProperties(field, copiedField);
+ copiedSchemaFields.add(copiedField);
+ }
+ }
+ if (copiedSchemaFields.size() > 0) {
+ Schema copiedRecord = Schema.createRecord(schema.getName(),
schema.getDoc(), schema.getNamespace(),
+ schema.isError());
+ copiedRecord.setFields(copiedSchemaFields);
+ copyProperties(schema, copiedRecord);
+ return copiedRecord;
+ } else {
+ return null;
+ }
+ }
+ case ARRAY: {
+ Schema itemSchema = schema.getElementType();
+ SchemaEntry itemSchemaEntry = new SchemaEntry(schemaEntry.fieldName,
itemSchema);
+ Schema copiedItemSchema = dropRecursive(itemSchemaEntry, parents,
fieldsWithRecursion);
+ if (copiedItemSchema == null) {
+ return null;
+ } else {
+ Schema copiedArraySchema = Schema.createArray(copiedItemSchema);
+ copyProperties(schema, copiedArraySchema);
+ return copiedArraySchema;
+ }
+ }
+ case MAP: {
+ Schema valueSchema = schema.getValueType();
+ SchemaEntry valueSchemaEntry = new SchemaEntry(schemaEntry.fieldName,
valueSchema);
+ Schema copiedValueSchema = dropRecursive(valueSchemaEntry, parents,
fieldsWithRecursion);
+ if (copiedValueSchema == null) {
+ return null;
+ } else {
+ Schema copiedMapSchema = Schema.createMap(copiedValueSchema);
+ copyProperties(schema, copiedMapSchema);
+ return copiedMapSchema;
+ }
+ }
+ default: {
+ return schema;
+ }
+ }
+ }
+
+ private static void copyFieldProperties(Schema.Field field, Schema.Field
copiedField) {
Review comment:
What about other elements of `Schema` object like `doc` ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
Issue Time Tracking
-------------------
Worklog Id: (was: 343564)
Time Spent: 20m (was: 10m)
> Support automatic recursion removal from schemas
> ------------------------------------------------
>
> Key: GOBBLIN-957
> URL: https://issues.apache.org/jira/browse/GOBBLIN-957
> Project: Apache Gobblin
> Issue Type: Improvement
> Reporter: Shirshanka Das
> Assignee: Shirshanka Das
> Priority: Major
> Labels: avro
> Time Spent: 20m
> Remaining Estimate: 0h
>
> Analytics engines like Hive etc cannot handle recursive schemas: schemas
> where inner fields can refer to the wrapping type.
> This Jira proposes that we provide support for automatic recursion removal in
> data during data ingestion.
> The simple proposal is to just drop the fields in the schema that introduce
> the recursion.
> e.g. (pseudo-schema)
> User {
> string name;
> User friend;
> }
> gets converted to :
> User {
> string name;
> }
>
> A more sophisticated solution would be to do one or two levels of
> "schema-unrolling" before dropping data.
> e.g.
> output schema with one-level unrolling would look like:
> User {
> string name;
> User1 friend;
> }
> User 1 {
> string name;
> }
>
>
>
>
>
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)