Repository: incubator-gobblin Updated Branches: refs/heads/master 1c03ea22f -> e386cf67e
[GOBBLIN-467] Fix json to avro conversion for records within arrays Closes #2339 from jack-moseley/jsontoavro-fix Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/e386cf67 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/e386cf67 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/e386cf67 Branch: refs/heads/master Commit: e386cf67e337931947152fbac526c6729ffc7d0f Parents: 1c03ea2 Author: Jack Moseley <[email protected]> Authored: Tue Apr 17 11:09:57 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Tue Apr 17 11:09:57 2018 -0700 ---------------------------------------------------------------------- ...nElementConversionWithAvroSchemaFactory.java | 59 ++++++++++++++++---- .../JsonRecordAvroSchemaToAvroConverter.java | 13 +++-- ...JsonRecordAvroSchemaToAvroConverterTest.java | 9 ++- .../resources/converter/jsonToAvroRecord.json | 13 ++++- .../resources/converter/jsonToAvroSchema.avsc | 52 +++++++++++++++++ 5 files changed, 127 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java index 1da8d31..cc51874 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java @@ -25,6 +25,7 @@ import java.util.Map; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; import com.google.gson.JsonArray; import com.google.gson.JsonElement; @@ -43,8 +44,7 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve */ public static JsonElementConverter getConvertor(String fieldName, String fieldType, Schema schemaNode, - WorkUnitState state, boolean nullable) - throws UnsupportedDateTypeException { + WorkUnitState state, boolean nullable, List<String> ignoreFields) throws UnsupportedDateTypeException { Type type; try { @@ -56,16 +56,20 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve switch (type) { case ARRAY: return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(), - schemaNode, state); + schemaNode, state, ignoreFields); case MAP: return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(), - schemaNode, state); + schemaNode, state, ignoreFields); case ENUM: return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(), schemaNode); + case RECORD: + return new JsonElementConversionWithAvroSchemaFactory.RecordConverter(fieldName, nullable, type.toString(), + schemaNode, state, ignoreFields); + default: return JsonElementConversionFactory.getConvertor(fieldName, fieldType, new JsonObject(), state, nullable); } @@ -73,12 +77,12 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve public static class ArrayConverter extends ComplexConverter { - public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state) - throws UnsupportedDateTypeException { + public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state, + List<String> ignoreFields) throws UnsupportedDateTypeException { super(fieldName, nullable, sourceType); super.setElementConverter( getConvertor(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state, - isNullable())); + isNullable(), ignoreFields)); } @Override @@ -107,12 +111,12 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve public static class MapConverter extends ComplexConverter { - public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state) - throws UnsupportedDateTypeException { + public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state, + List<String> ignoreFields) throws UnsupportedDateTypeException { super(fieldName, nullable, sourceType); super.setElementConverter( getConvertor(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state, - isNullable())); + isNullable(), ignoreFields)); } @Override @@ -169,4 +173,39 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve return this.schema; } } + + public static class RecordConverter extends ComplexConverter { + + List<String> ignoreFields; + Schema schema; + WorkUnitState state; + + public RecordConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, + WorkUnitState state, List<String> ignoreFields) throws UnsupportedDateTypeException { + super(fieldName, nullable, sourceType); + this.schema = schemaNode; + this.state = state; + this.ignoreFields = ignoreFields; + } + + @Override + Object convertField(JsonElement value) { + try { + return JsonRecordAvroSchemaToAvroConverter.convertNestedRecord(this.schema, value.getAsJsonObject(), this.state, + this.ignoreFields); + } catch (DataConversionException e) { + throw new RuntimeException("Failed to convert nested record", e); + } + } + + @Override + public Schema.Type getTargetType() { + return Schema.Type.RECORD; + } + + @Override + public Schema schema() { + return this.schema; + } + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java index 8e25975..e3e2a0d 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java @@ -68,16 +68,17 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase @Override public Iterable<GenericRecord> convertRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit) throws DataConversionException { - GenericRecord avroRecord = convertNestedRecord(outputSchema, inputRecord, workUnit); + GenericRecord avroRecord = convertNestedRecord(outputSchema, inputRecord, workUnit, this.ignoreFields); return new SingleRecordIterable<>(avroRecord); } - private GenericRecord convertNestedRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit) throws DataConversionException { + public static GenericRecord convertNestedRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit, + List<String> ignoreFields) throws DataConversionException { GenericRecord avroRecord = new GenericData.Record(outputSchema); JsonElementConversionWithAvroSchemaFactory.JsonElementConverter converter; for (Schema.Field field : outputSchema.getFields()) { - if (this.ignoreFields.contains(field.name())) { + if (ignoreFields.contains(field.name())) { continue; } @@ -115,15 +116,15 @@ public class JsonRecordAvroSchemaToAvroConverter<SI> extends ToAvroConverterBase avroRecord.put(field.name(), null); } else { avroRecord.put(field.name(), - convertNestedRecord(schema, inputRecord.get(field.name()).getAsJsonObject(), workUnit)); + convertNestedRecord(schema, inputRecord.get(field.name()).getAsJsonObject(), workUnit, ignoreFields)); } } else { try { converter = JsonElementConversionWithAvroSchemaFactory.getConvertor(field.name(), type.getName(), schema, - workUnit, nullable); + workUnit, nullable, ignoreFields); avroRecord.put(field.name(), converter.convert(inputRecord.get(field.name()))); } catch (Exception e) { - throw new DataConversionException("Could not convert field " + field.name()); + throw new DataConversionException("Could not convert field " + field.name(), e); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java index 4cf6898..ed2dbb6 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverterTest.java @@ -76,7 +76,12 @@ public class JsonRecordAvroSchemaToAvroConverterTest { Assert.assertTrue(record.get("mapField") instanceof Map); - Assert.assertEquals(((GenericRecord)record.get("nestedRecords")).get("nestedField").toString(), "test"); - Assert.assertEquals(((GenericRecord)record.get("nestedRecords")).get("nestedField2").toString(), "test2"); + Assert.assertEquals(((GenericRecord) record.get("nestedRecords")).get("nestedField").toString(), "test"); + Assert.assertEquals(((GenericRecord) record.get("nestedRecords")).get("nestedField2").toString(), "test2"); + + Assert.assertTrue(((GenericArray) record.get("emptyArrayOfRecords")).isEmpty()); + + GenericRecord recordInArray = (GenericRecord) (((GenericArray) record.get("arrayOfRecords")).get(0)); + Assert.assertEquals(recordInArray.get("field1").toString(), "test1"); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json index 65bfce6..7ab2c50 100644 --- a/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json +++ b/gobblin-core/src/test/resources/converter/jsonToAvroRecord.json @@ -9,5 +9,16 @@ "nestedRecords": { "nestedField": "test", "nestedField2": "test2" - } + }, + "emptyArrayOfRecords": [], + "arrayOfRecords": [ + { + "field1": "test1", + "field2": "test2" + }, + { + "field1": "test3", + "field2": "test4" + } + ] } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/e386cf67/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc index effc91c..0198eb8 100644 --- a/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc +++ b/gobblin-core/src/test/resources/converter/jsonToAvroSchema.avsc @@ -45,6 +45,58 @@ } ] } + }, + { + "name": "emptyArrayOfRecords", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "recordInEmptyArray", + "fields": [ + { + "name": "field1", + "type": [ + "null", + "string" + ] + }, + { + "name": "field2", + "type": [ + "null", + "string" + ] + } + ] + } + } + }, + { + "name": "arrayOfRecords", + "type": { + "type": "array", + "items": { + "type": "record", + "name": "recordInArray", + "fields": [ + { + "name": "field1", + "type": [ + "null", + "string" + ] + }, + { + "name": "field2", + "type": [ + "null", + "string" + ] + } + ] + } + } } ] }
