[GOBBLIN-226] Nested schema support in JsonStringToJsonIntermediateConverter and JsonIntermediateToAvroConverter
Closes #2080 from tilakpatidar/nested_schema Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/6dd36a50 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/6dd36a50 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/6dd36a50 Branch: refs/heads/master Commit: 6dd36a506d574a261bf678aaf071d89e597044e8 Parents: f058211 Author: tilakpatidar <[email protected]> Authored: Wed Oct 18 11:03:52 2017 +0530 Committer: Abhishek Tiwari <[email protected]> Committed: Wed Oct 18 11:03:52 2017 +0530 ---------------------------------------------------------------------- .../avro/JsonElementConversionFactory.java | 414 +++++++-- ...nElementConversionWithAvroSchemaFactory.java | 38 +- .../avro/JsonIntermediateToAvroConverter.java | 98 +- .../JsonRecordAvroSchemaToAvroConverter.java | 3 + .../gobblin/converter/json/JsonSchema.java | 297 ++++++ .../JsonStringToJsonIntermediateConverter.java | 211 ++++- .../avro/JsonElementConversionFactoryTest.java | 397 ++++++++ .../JsonIntermediateToAvroConverterTest.java | 73 +- ...onStringToJsonIntermediateConverterTest.java | 118 +++ .../JsonElementConversionFactoryTest.json | 856 +++++++++++++++++ .../JsonStringToJsonIntermediateConverter.json | 919 +++++++++++++++++++ .../src/test/resources/converter/complex1.json | 527 +++++++++++ .../src/test/resources/converter/complex2.json | 186 ++++ .../src/test/resources/converter/complex3.json | 548 +++++++++++ .../src/test/resources/converter/record.json | 23 - .../src/test/resources/converter/record3.json | 27 + .../src/test/resources/converter/schema.json | 724 ++++++++------- .../Configuration-Properties-Glossary.md | 33 +- 18 files changed, 4880 insertions(+), 612 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java index aa015a1..07e1fc5 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionFactory.java @@ -28,19 +28,29 @@ import java.util.TimeZone; import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; import org.apache.avro.util.Utf8; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.EmptyIterable; +import org.apache.gobblin.converter.json.JsonSchema; +import org.codehaus.jackson.node.JsonNodeFactory; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; - -import sun.util.calendar.ZoneInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonObject; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.WorkUnitState; +import lombok.extern.java.Log; +import sun.util.calendar.ZoneInfo; + +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.*; +import static org.apache.gobblin.converter.json.JsonSchema.*; /** @@ -67,81 +77,121 @@ public class JsonElementConversionFactory { BOOLEAN, ARRAY, MAP, - ENUM + ENUM, + RECORD, + NULL, + UNION; + + private static List<Type> primitiveTypes = + Arrays.asList(NULL, BOOLEAN, INT, LONG, FLOAT, DOUBLE, BYTES, STRING, ENUM, FIXED); + + public static boolean isPrimitive(Type type) { + return primitiveTypes.contains(type); + } } /** * Use to create a converter for a single field from a schema. - * - * @param fieldName - * @param fieldType - * @param nullable * @param schemaNode + * @param namespace * @param state - * @return + * @return {@link JsonElementConverter} * @throws UnsupportedDateTypeException */ - public static JsonElementConverter getConvertor(String fieldName, String fieldType, JsonObject schemaNode, - WorkUnitState state, boolean nullable) throws UnsupportedDateTypeException { + public static JsonElementConverter getConvertor(JsonSchema schemaNode, String namespace, WorkUnitState state) + throws UnsupportedDateTypeException { - Type type; - try { - type = Type.valueOf(fieldType.toUpperCase()); - } catch (IllegalArgumentException e) { - throw new UnsupportedDateTypeException(fieldType + " is unsupported"); - } + Type type = schemaNode.getType(); DateTimeZone timeZone = getTimeZone(state.getProp(ConfigurationKeys.CONVERTER_AVRO_DATE_TIMEZONE, "UTC")); switch (type) { case DATE: - return new DateConverter(fieldName, nullable, type.toString(), + return new DateConverter(schemaNode, state.getProp(ConfigurationKeys.CONVERTER_AVRO_DATE_FORMAT, "yyyy-MM-dd HH:mm:ss"), timeZone, state); case TIMESTAMP: - return new DateConverter(fieldName, nullable, type.toString(), + return new DateConverter(schemaNode, state.getProp(ConfigurationKeys.CONVERTER_AVRO_TIMESTAMP_FORMAT, "yyyy-MM-dd HH:mm:ss"), timeZone, state); case TIME: - return new DateConverter(fieldName, nullable, type.toString(), - state.getProp(ConfigurationKeys.CONVERTER_AVRO_TIME_FORMAT, "HH:mm:ss"), timeZone, state); + return new DateConverter(schemaNode, state.getProp(ConfigurationKeys.CONVERTER_AVRO_TIME_FORMAT, "HH:mm:ss"), + timeZone, state); case FIXED: - throw new UnsupportedDateTypeException(fieldType + " is unsupported"); + throw new UnsupportedDateTypeException(type.toString() + " is unsupported"); case STRING: - return new StringConverter(fieldName, nullable, type.toString()); + return new StringConverter(schemaNode); case BYTES: - return new BinaryConverter(fieldName, nullable, type.toString(), - state.getProp(ConfigurationKeys.CONVERTER_AVRO_BINARY_CHARSET, "UTF8")); + return new BinaryConverter(schemaNode, state.getProp(ConfigurationKeys.CONVERTER_AVRO_BINARY_CHARSET, "UTF8")); case INT: - return new IntConverter(fieldName, nullable, type.toString()); + return new IntConverter(schemaNode); case LONG: - return new LongConverter(fieldName, nullable, type.toString()); + return new LongConverter(schemaNode); case FLOAT: - return new FloatConverter(fieldName, nullable, type.toString()); + return new FloatConverter(schemaNode); case DOUBLE: - return new DoubleConverter(fieldName, nullable, type.toString()); + return new DoubleConverter(schemaNode); case BOOLEAN: - return new BooleanConverter(fieldName, nullable, type.toString()); + return new BooleanConverter(schemaNode); case ARRAY: - return new ArrayConverter(fieldName, nullable, type.toString(), schemaNode, state); + return new ArrayConverter(schemaNode, state); case MAP: - return new MapConverter(fieldName, nullable, type.toString(), schemaNode, state); + return new MapConverter(schemaNode, state); case ENUM: - return new EnumConverter(fieldName, nullable, type.toString(), schemaNode); + return new EnumConverter(schemaNode, namespace); + + case RECORD: + return new RecordConverter(schemaNode, state, namespace); + + case NULL: + return new NullConverter(schemaNode); + + case UNION: + return new UnionConverter(schemaNode, state); default: - throw new UnsupportedDateTypeException(fieldType + " is unsupported"); + throw new UnsupportedDateTypeException(type.toString() + " is unsupported"); + } + } + + /** + * Backward Compatible form of {@link JsonElementConverter#getConvertor(JsonSchema, String, WorkUnitState)} + * @param fieldName + * @param fieldType + * @param schemaNode + * @param state + * @param nullable + * @return + * @throws UnsupportedDateTypeException + */ + public static JsonElementConverter getConvertor(String fieldName, String fieldType, JsonObject schemaNode, + WorkUnitState state, boolean nullable) + throws UnsupportedDateTypeException { + if (!schemaNode.has(COLUMN_NAME_KEY)) { + schemaNode.addProperty(COLUMN_NAME_KEY, fieldName); + } + if (!schemaNode.has(DATA_TYPE_KEY)) { + schemaNode.add(DATA_TYPE_KEY, new JsonObject()); + } + JsonObject dataType = schemaNode.get(DATA_TYPE_KEY).getAsJsonObject(); + if (!dataType.has(TYPE_KEY)) { + dataType.addProperty(TYPE_KEY, fieldType); + } + if (!schemaNode.has(IS_NULLABLE_KEY)) { + schemaNode.addProperty(IS_NULLABLE_KEY, nullable); } + JsonSchema schema = new JsonSchema(schemaNode); + return getConvertor(schema, null, state); } private static DateTimeZone getTimeZone(String id) { @@ -166,19 +216,17 @@ public class JsonElementConversionFactory { * */ public static abstract class JsonElementConverter { - private String name; - private boolean nullable; - private String sourceType; + private final JsonSchema jsonSchema; + + public JsonElementConverter(JsonSchema jsonSchema) { + this.jsonSchema = jsonSchema; + } - /** - * - * @param fieldName - * @param nullable - */ public JsonElementConverter(String fieldName, boolean nullable, String sourceType) { - this.name = fieldName; - this.nullable = nullable; - this.sourceType = sourceType; + JsonSchema jsonSchema = buildBaseSchema(Type.valueOf(sourceType.toUpperCase())); + jsonSchema.setColumnName(fieldName); + jsonSchema.setNullable(nullable); + this.jsonSchema = jsonSchema; } /** @@ -186,7 +234,7 @@ public class JsonElementConversionFactory { * @return */ public String getName() { - return this.name; + return this.jsonSchema.getColumnName(); } /** @@ -194,7 +242,7 @@ public class JsonElementConversionFactory { * @return */ public boolean isNullable() { - return this.nullable; + return this.jsonSchema.isNullable(); } /** @@ -202,7 +250,7 @@ public class JsonElementConversionFactory { * @return */ public Schema getSchema() { - if (this.nullable) { + if (isNullable()) { List<Schema> list = new ArrayList<>(); list.add(Schema.create(Schema.Type.NULL)); list.add(schema()); @@ -213,8 +261,8 @@ public class JsonElementConversionFactory { protected Schema schema() { Schema schema = Schema.create(getTargetType()); - schema.addProp("source.type", this.sourceType.toLowerCase()); - return schema; + schema.addProp("source.type", this.jsonSchema.getType().toString().toLowerCase()); + return buildUnionIfNullable(schema); } /** @@ -224,7 +272,7 @@ public class JsonElementConversionFactory { */ public Object convert(JsonElement value) { if (value.isJsonNull()) { - if (this.nullable) { + if (isNullable()) { return null; } throw new RuntimeException("Field: " + getName() + " is not nullable and contains a null value"); @@ -244,12 +292,29 @@ public class JsonElementConversionFactory { * @return */ public abstract Schema.Type getTargetType(); + + protected static String buildNamespace(String namespace, String name) { + if (namespace == null || namespace.isEmpty()) { + return null; + } + if (name == null || name.isEmpty()) { + return null; + } + return namespace.trim() + "." + name.trim(); + } + + protected Schema buildUnionIfNullable(Schema schema) { + if (this.isNullable()) { + return Schema.createUnion(Schema.create(Schema.Type.NULL), schema); + } + return schema; + } } public static class StringConverter extends JsonElementConverter { - public StringConverter(String fieldName, boolean nullable, String sourceType) { - super(fieldName, nullable, sourceType); + public StringConverter(JsonSchema schema) { + super(schema); } @Override @@ -265,8 +330,8 @@ public class JsonElementConversionFactory { public static class IntConverter extends JsonElementConverter { - public IntConverter(String fieldName, boolean nullable, String sourceType) { - super(fieldName, nullable, sourceType); + public IntConverter(JsonSchema schema) { + super(schema); } @Override @@ -283,8 +348,8 @@ public class JsonElementConversionFactory { public static class LongConverter extends JsonElementConverter { - public LongConverter(String fieldName, boolean nullable, String sourceType) { - super(fieldName, nullable, sourceType); + public LongConverter(JsonSchema schema) { + super(schema); } @Override @@ -301,8 +366,8 @@ public class JsonElementConversionFactory { public static class DoubleConverter extends JsonElementConverter { - public DoubleConverter(String fieldName, boolean nullable, String sourceType) { - super(fieldName, nullable, sourceType); + public DoubleConverter(JsonSchema schema) { + super(schema); } @Override @@ -318,8 +383,8 @@ public class JsonElementConversionFactory { public static class FloatConverter extends JsonElementConverter { - public FloatConverter(String fieldName, boolean nullable, String sourceType) { - super(fieldName, nullable, sourceType); + public FloatConverter(JsonSchema schema) { + super(schema); } @Override @@ -335,8 +400,8 @@ public class JsonElementConversionFactory { public static class BooleanConverter extends JsonElementConverter { - public BooleanConverter(String fieldName, boolean nullable, String sourceType) { - super(fieldName, nullable, sourceType); + public BooleanConverter(JsonSchema schema) { + super(schema); } @Override @@ -356,9 +421,8 @@ public class JsonElementConversionFactory { private DateTimeZone timeZone; private WorkUnitState state; - public DateConverter(String fieldName, boolean nullable, String sourceType, String pattern, DateTimeZone zone, - WorkUnitState state) { - super(fieldName, nullable, sourceType); + public DateConverter(JsonSchema schema, String pattern, DateTimeZone zone, WorkUnitState state) { + super(schema); this.inputPatterns = pattern; this.timeZone = zone; this.state = state; @@ -398,8 +462,8 @@ public class JsonElementConversionFactory { public static class BinaryConverter extends JsonElementConverter { private String charSet; - public BinaryConverter(String fieldName, boolean nullable, String sourceType, String charSet) { - super(fieldName, nullable, sourceType); + public BinaryConverter(JsonSchema schema, String charSet) { + super(schema); this.charSet = charSet; } @@ -421,6 +485,10 @@ public class JsonElementConversionFactory { public static abstract class ComplexConverter extends JsonElementConverter { private JsonElementConverter elementConverter; + public ComplexConverter(JsonSchema schema) { + super(schema); + } + public ComplexConverter(String fieldName, boolean nullable, String sourceType) { super(fieldName, nullable, sourceType); } @@ -432,27 +500,46 @@ public class JsonElementConversionFactory { public JsonElementConverter getElementConverter() { return this.elementConverter; } + + protected void processNestedItems(JsonSchema schema, WorkUnitState state) + throws UnsupportedDateTypeException { + JsonSchema nestedItem = null; + if (schema.isType(ARRAY)) { + nestedItem = schema.getItemsWithinDataType(); + } + if (schema.isType(MAP)) { + nestedItem = schema.getValuesWithinDataType(); + } + this.setElementConverter(getConvertor(nestedItem, null, state)); + } } public static class ArrayConverter extends ComplexConverter { - public ArrayConverter(String fieldName, boolean nullable, String sourceType, JsonObject schemaNode, - WorkUnitState state) throws UnsupportedDateTypeException { - super(fieldName, nullable, sourceType); - super.setElementConverter( - getConvertor(fieldName, schemaNode.get("dataType").getAsJsonObject().get("items").getAsString(), - schemaNode.get("dataType").getAsJsonObject(), state, isNullable())); + public ArrayConverter(JsonSchema schema, WorkUnitState state) + throws UnsupportedDateTypeException { + super(schema); + processNestedItems(schema, state); } @Override Object convertField(JsonElement value) { + if (this.isNullable() && value.isJsonNull()) { + return null; + } List<Object> list = new ArrayList<>(); for (JsonElement elem : (JsonArray) value) { list.add(getElementConverter().convertField(elem)); } - return new GenericData.Array<>(schema(), list); + return new GenericData.Array<>(arraySchema(), list); + } + + private Schema arraySchema() { + Schema schema = Schema.createArray(getElementConverter().schema()); + schema.addProp(SOURCE_TYPE, ARRAY.toString().toLowerCase()); + return schema; } @Override @@ -462,20 +549,16 @@ public class JsonElementConversionFactory { @Override public Schema schema() { - Schema schema = Schema.createArray(getElementConverter().schema()); - schema.addProp("source.type", "array"); - return schema; + return buildUnionIfNullable(arraySchema()); } } public static class MapConverter extends ComplexConverter { - public MapConverter(String fieldName, boolean nullable, String sourceType, JsonObject schemaNode, - WorkUnitState state) throws UnsupportedDateTypeException { - super(fieldName, nullable, sourceType); - super.setElementConverter( - getConvertor(fieldName, schemaNode.get("dataType").getAsJsonObject().get("values").getAsString(), - schemaNode.get("dataType").getAsJsonObject(), state, isNullable())); + public MapConverter(JsonSchema schema, WorkUnitState state) + throws UnsupportedDateTypeException { + super(schema); + processNestedItems(schema, state); } @Override @@ -497,23 +580,106 @@ public class JsonElementConversionFactory { @Override public Schema schema() { Schema schema = Schema.createMap(getElementConverter().schema()); - schema.addProp("source.type", "map"); - return schema; + schema.addProp(SOURCE_TYPE, MAP.toString().toLowerCase()); + return buildUnionIfNullable(schema); + } + } + + @Log + public static class RecordConverter extends ComplexConverter { + private static final Logger LOG = LoggerFactory.getLogger(RecordConverter.class); + private HashMap<String, JsonElementConverter> converters = new HashMap<>(); + private Schema _schema; + private long numFailedConversion = 0; + private State workUnit; + + public RecordConverter(JsonSchema schema, WorkUnitState state, String namespace) + throws UnsupportedDateTypeException { + super(schema); + workUnit = state; + String name = schema.isRoot() ? schema.getColumnName() : schema.getName(); + _schema = buildRecordSchema(schema.getValuesWithinDataType(), state, name, namespace); + } + + private Schema buildRecordSchema(JsonSchema schema, WorkUnitState workUnit, String name, String namespace) { + List<Schema.Field> fields = new ArrayList<>(); + for (int i = 0; i < schema.fieldsCount(); i++) { + JsonSchema map = schema.getFieldSchemaAt(i); + String childNamespace = buildNamespace(namespace, name); + JsonElementConverter converter; + String sourceType; + Schema fldSchema; + try { + sourceType = map.isType(UNION) ? UNION.toString().toLowerCase() : map.getType().toString().toLowerCase(); + converter = getConvertor(map, childNamespace, workUnit); + this.converters.put(map.getColumnName(), converter); + fldSchema = converter.schema(); + } catch (UnsupportedDateTypeException e) { + throw new UnsupportedOperationException(e); + } + + Schema.Field fld = new Schema.Field(map.getColumnName(), fldSchema, map.getComment(), + map.isNullable() ? JsonNodeFactory.instance.nullNode() : null); + fld.addProp(SOURCE_TYPE, sourceType); + fields.add(fld); + } + Schema avroSchema = Schema.createRecord(name.isEmpty() ? null : name, "", namespace, false); + avroSchema.setFields(fields); + + return avroSchema; + } + + @Override + Object convertField(JsonElement value) { + GenericRecord avroRecord = new GenericData.Record(_schema); + long maxFailedConversions = this.workUnit.getPropAsLong(ConfigurationKeys.CONVERTER_AVRO_MAX_CONVERSION_FAILURES, + ConfigurationKeys.DEFAULT_CONVERTER_AVRO_MAX_CONVERSION_FAILURES); + for (Map.Entry<String, JsonElement> entry : ((JsonObject) value).entrySet()) { + try { + avroRecord.put(entry.getKey(), this.converters.get(entry.getKey()).convert(entry.getValue())); + } catch (Exception e) { + this.numFailedConversion++; + if (this.numFailedConversion < maxFailedConversions) { + LOG.error("Dropping record " + value + " because it cannot be converted to Avro", e); + return new EmptyIterable<>(); + } + throw new RuntimeException( + "Unable to convert field:" + entry.getKey() + " for value:" + entry.getValue() + " for record: " + value, + e); + } + } + return avroRecord; + } + + @Override + public org.apache.avro.Schema.Type getTargetType() { + return Schema.Type.RECORD; + } + + @Override + public Schema schema() { + Schema schema = _schema; + schema.addProp(SOURCE_TYPE, RECORD.toString().toLowerCase()); + return buildUnionIfNullable(schema); } } public static class EnumConverter extends JsonElementConverter { String enumName; + String namespace; List<String> enumSet = new ArrayList<>(); Schema schema; - public EnumConverter(String fieldName, boolean nullable, String sourceType, JsonObject schemaNode) { - super(fieldName, nullable, sourceType); + public EnumConverter(JsonSchema schema, String namespace) { + super(schema); - for (JsonElement elem : schemaNode.get("dataType").getAsJsonObject().get("symbols").getAsJsonArray()) { + JsonObject dataType = schema.getDataType(); + for (JsonElement elem : dataType.get(ENUM_SYMBOLS_KEY).getAsJsonArray()) { this.enumSet.add(elem.getAsString()); } - this.enumName = schemaNode.get("dataType").getAsJsonObject().get("name").getAsString(); + String enumName = schema.getName(); + this.enumName = enumName.isEmpty() ? null : enumName; + this.namespace = namespace; } @Override @@ -528,9 +694,69 @@ public class JsonElementConversionFactory { @Override public Schema schema() { - this.schema = Schema.createEnum(this.enumName, "", "", this.enumSet); - this.schema.addProp("source.type", "enum"); - return this.schema; + this.schema = Schema.createEnum(this.enumName, "", namespace, this.enumSet); + this.schema.addProp(SOURCE_TYPE, ENUM.toString().toLowerCase()); + return buildUnionIfNullable(this.schema); + } + } + + public static class NullConverter extends JsonElementConverter { + + public NullConverter(JsonSchema schema) { + super(schema); + } + + @Override + Object convertField(JsonElement value) { + return value.getAsJsonNull(); + } + + @Override + public org.apache.avro.Schema.Type getTargetType() { + return Schema.Type.NULL; + } + } + + public static class UnionConverter extends JsonElementConverter { + private final Schema firstSchema; + private final Schema secondSchema; + private final JsonElementConverter firstConverter; + private final JsonElementConverter secondConverter; + + public UnionConverter(JsonSchema schemaNode, WorkUnitState state) { + super(schemaNode); + List<JsonSchema> types = schemaNode.getDataTypes(); + firstConverter = getConverter(types.get(0), state); + secondConverter = getConverter(types.get(1), state); + firstSchema = firstConverter.schema(); + secondSchema = secondConverter.schema(); + } + + private JsonElementConverter getConverter(JsonSchema schemaElement, WorkUnitState state) { + try { + return JsonElementConversionFactory.getConvertor(schemaElement, null, state); + } catch (UnsupportedDateTypeException e) { + throw new UnsupportedOperationException(e); + } + } + + @Override + Object convertField(JsonElement value) { + try { + return firstConverter.convert(value); + } catch (Exception e) { + return secondConverter.convert(value); + } + } + + @Override + public Schema.Type getTargetType() { + return Schema.Type.UNION; + } + + @Override + protected Schema schema() { + return Schema.createUnion(firstSchema, secondSchema); } } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java index ca5d88d..1da8d31 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonElementConversionWithAvroSchemaFactory.java @@ -17,17 +17,19 @@ package org.apache.gobblin.converter.avro; -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.gobblin.configuration.WorkUnitState; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + /** * Creates a converter for Json types to Avro types. Overrides {@link ArrayConverter}, {@link MapConverter}, @@ -41,7 +43,8 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve */ public static JsonElementConverter getConvertor(String fieldName, String fieldType, Schema schemaNode, - WorkUnitState state, boolean nullable) throws UnsupportedDateTypeException { + WorkUnitState state, boolean nullable) + throws UnsupportedDateTypeException { Type type; try { @@ -52,27 +55,30 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve switch (type) { case ARRAY: - return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(), schemaNode, state); + return new JsonElementConversionWithAvroSchemaFactory.ArrayConverter(fieldName, nullable, type.toString(), + schemaNode, state); case MAP: - return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(), schemaNode, state); + return new JsonElementConversionWithAvroSchemaFactory.MapConverter(fieldName, nullable, type.toString(), + schemaNode, state); case ENUM: - return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(), schemaNode); + return new JsonElementConversionWithAvroSchemaFactory.EnumConverter(fieldName, nullable, type.toString(), + schemaNode); default: - return JsonElementConversionFactory.getConvertor(fieldName, fieldType, null, state, nullable); + return JsonElementConversionFactory.getConvertor(fieldName, fieldType, new JsonObject(), state, nullable); } } public static class ArrayConverter extends ComplexConverter { - public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, - WorkUnitState state) throws UnsupportedDateTypeException { + public ArrayConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state) + throws UnsupportedDateTypeException { super(fieldName, nullable, sourceType); super.setElementConverter( - getConvertor(fieldName, schemaNode.getElementType().getType().getName(), - schemaNode.getElementType(), state, isNullable())); + getConvertor(fieldName, schemaNode.getElementType().getType().getName(), schemaNode.getElementType(), state, + isNullable())); } @Override @@ -101,12 +107,12 @@ public class JsonElementConversionWithAvroSchemaFactory extends JsonElementConve public static class MapConverter extends ComplexConverter { - public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, - WorkUnitState state) throws UnsupportedDateTypeException { + public MapConverter(String fieldName, boolean nullable, String sourceType, Schema schemaNode, WorkUnitState state) + throws UnsupportedDateTypeException { super(fieldName, nullable, sourceType); super.setElementConverter( - getConvertor(fieldName, schemaNode.getValueType().getType().getName(), - schemaNode.getValueType(), state, isNullable())); + getConvertor(fieldName, schemaNode.getValueType().getType().getName(), schemaNode.getValueType(), state, + isNullable())); } @Override http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java index 0735c03..5b1810b 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverter.java @@ -18,34 +18,26 @@ package org.apache.gobblin.converter.avro; import java.io.IOException; -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; import org.apache.avro.Schema; -import org.apache.avro.Schema.Field; -import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.Path; -import org.codehaus.jackson.node.JsonNodeFactory; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.gson.JsonArray; -import com.google.gson.JsonElement; -import com.google.gson.JsonObject; - import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.DataConversionException; -import org.apache.gobblin.converter.EmptyIterable; import org.apache.gobblin.converter.SchemaConversionException; import org.apache.gobblin.converter.SingleRecordIterable; import org.apache.gobblin.converter.ToAvroConverterBase; +import org.apache.gobblin.converter.avro.JsonElementConversionFactory.RecordConverter; +import org.apache.gobblin.converter.json.JsonSchema; import org.apache.gobblin.util.AvroUtils; import org.apache.gobblin.util.WriterUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; /** @@ -55,76 +47,37 @@ import org.apache.gobblin.util.WriterUtils; * */ public class JsonIntermediateToAvroConverter extends ToAvroConverterBase<JsonArray, JsonObject> { - private Map<String, JsonElementConversionFactory.JsonElementConverter> converters = new HashMap<>(); private static final Logger LOG = LoggerFactory.getLogger(JsonIntermediateToAvroConverter.class); private static final String CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED = "converter.avro.nullify.fields.enabled"; private static final boolean DEFAULT_CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED = Boolean.FALSE; private static final String CONVERTER_AVRO_NULLIFY_FIELDS_ORIGINAL_SCHEMA_PATH = "converter.avro.nullify.fields.original.schema.path"; - private long numFailedConversion = 0; + private RecordConverter recordConverter; @Override - public Schema convertSchema(JsonArray schema, WorkUnitState workUnit) throws SchemaConversionException { - List<Schema.Field> fields = new ArrayList<>(); - - for (JsonElement elem : schema) { - JsonObject map = (JsonObject) elem; - - String columnName = map.get("columnName").getAsString(); - String comment = map.has("comment") ? map.get("comment").getAsString() : ""; - boolean nullable = map.has("isNullable") ? map.get("isNullable").getAsBoolean() : false; - Schema fldSchema; - - try { - JsonElementConversionFactory.JsonElementConverter converter = JsonElementConversionFactory.getConvertor( - columnName, map.get("dataType").getAsJsonObject().get("type").getAsString(), map, workUnit, nullable); - this.converters.put(columnName, converter); - fldSchema = converter.getSchema(); - } catch (UnsupportedDateTypeException e) { - throw new SchemaConversionException(e); - } - - Field fld = new Field(columnName, fldSchema, comment, nullable ? JsonNodeFactory.instance.nullNode() : null); - fld.addProp("source.type", map.get("dataType").getAsJsonObject().get("type").getAsString()); - fields.add(fld); + public Schema convertSchema(JsonArray schema, WorkUnitState workUnit) + throws SchemaConversionException { + try { + JsonSchema jsonSchema = new JsonSchema(schema); + jsonSchema.setColumnName(workUnit.getExtract().getTable()); + recordConverter = new RecordConverter(jsonSchema, workUnit, workUnit.getExtract().getNamespace()); + } catch (UnsupportedDateTypeException e) { + throw new SchemaConversionException(e); } - - Schema avroSchema = - Schema.createRecord(workUnit.getExtract().getTable(), "", workUnit.getExtract().getNamespace(), false); - avroSchema.setFields(fields); - - if (workUnit.getPropAsBoolean(CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED, - DEFAULT_CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED)) { - return this.generateSchemaWithNullifiedField(workUnit, avroSchema); + Schema recordSchema = recordConverter.schema(); + if (workUnit + .getPropAsBoolean(CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED, DEFAULT_CONVERTER_AVRO_NULLIFY_FIELDS_ENABLED)) { + return this.generateSchemaWithNullifiedField(workUnit, recordSchema); } - - return avroSchema; + return recordSchema; } @Override public Iterable<GenericRecord> convertRecord(Schema outputSchema, JsonObject inputRecord, WorkUnitState workUnit) throws DataConversionException { - GenericRecord avroRecord = new GenericData.Record(outputSchema); - long maxFailedConversions = workUnit.getPropAsLong(ConfigurationKeys.CONVERTER_AVRO_MAX_CONVERSION_FAILURES, - ConfigurationKeys.DEFAULT_CONVERTER_AVRO_MAX_CONVERSION_FAILURES); - - for (Map.Entry<String, JsonElement> entry : inputRecord.entrySet()) { - try { - avroRecord.put(entry.getKey(), this.converters.get(entry.getKey()).convert(entry.getValue())); - } catch (Exception e) { - this.numFailedConversion++; - if (this.numFailedConversion < maxFailedConversions) { - LOG.error("Dropping record " + inputRecord + " because it cannot be converted to Avro", e); - return new EmptyIterable<>(); - } - throw new DataConversionException("Unable to convert field:" + entry.getKey() + " for value:" + entry.getValue() - + " for record: " + inputRecord, e); - } - } - - return new SingleRecordIterable<>(avroRecord); + return new SingleRecordIterable<>((GenericRecord) recordConverter.convert(inputRecord)); } /** @@ -151,8 +104,7 @@ public class JsonIntermediateToAvroConverter extends ToAvroConverterBase<JsonArr + "is not specified. Trying to get the orignal schema from previous avro files."); originalSchemaPath = WriterUtils .getDataPublisherFinalDir(workUnitState, workUnitState.getPropAsInt(ConfigurationKeys.FORK_BRANCHES_KEY, 1), - workUnitState.getPropAsInt(ConfigurationKeys.FORK_BRANCH_ID_KEY, 0)) - .getParent(); + workUnitState.getPropAsInt(ConfigurationKeys.FORK_BRANCH_ID_KEY, 0)).getParent(); } try { Schema prevSchema = AvroUtils.getDirectorySchema(originalSchemaPath, conf, false); http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java index c495bae..97f0121 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/avro/JsonRecordAvroSchemaToAvroConverter.java @@ -18,6 +18,7 @@ package org.apache.gobblin.converter.avro; import java.util.List; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; @@ -27,6 +28,8 @@ import org.apache.gobblin.converter.DataConversionException; import org.apache.gobblin.converter.SchemaConversionException; import org.apache.gobblin.converter.SingleRecordIterable; import org.apache.gobblin.converter.ToAvroConverterBase; + +import com.google.common.base.Preconditions; import com.google.gson.JsonObject; import com.google.common.base.Preconditions; import com.google.common.base.Splitter; http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java new file mode 100644 index 0000000..21bfeaf --- /dev/null +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonSchema.java @@ -0,0 +1,297 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter.json; + +import java.util.Arrays; +import java.util.Collections; +import java.util.List; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type; +import org.apache.gobblin.source.extractor.schema.Schema; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.ENUM; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.FIXED; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.RECORD; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.UNION; +import static org.apache.gobblin.converter.json.JsonSchema.SchemaType.CHILD; +import static org.apache.gobblin.converter.json.JsonSchema.SchemaType.ROOT; + + +/** + * Represents a source schema declared in the configuration with {@link ConfigurationKeys#SOURCE_SCHEMA}. + * The source schema is represented by a {@link JsonArray}. + * @author tilakpatidar + */ +public class JsonSchema extends Schema { + public static final String RECORD_FIELDS_KEY = "values"; + public static final String TYPE_KEY = "type"; + public static final String NAME_KEY = "name"; + public static final String SIZE_KEY = "size"; + public static final String ENUM_SYMBOLS_KEY = "symbols"; + public static final String COLUMN_NAME_KEY = "columnName"; + public static final String DATA_TYPE_KEY = "dataType"; + public static final String COMMENT_KEY = "comment"; + public static final String DEFAULT_VALUE_KEY = "defaultValue"; + public static final String IS_NULLABLE_KEY = "isNullable"; + public static final String DEFAULT_RECORD_COLUMN_NAME = "root"; + public static final String DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY = ""; + public static final String ARRAY_ITEMS_KEY = "items"; + public static final String MAP_ITEMS_KEY = "values"; + public static final String SOURCE_TYPE = "source.type"; + private final Type type; + private final JsonObject json; + private final SchemaType schemaNestedLevel; + private JsonSchema secondType; + private JsonSchema firstType; + private JsonArray jsonArray; + + public enum SchemaType { + ROOT, CHILD + } + + /** + * Build a {@link JsonSchema} using {@link JsonArray} + * This will create a {@link SchemaType} of {@link SchemaType#ROOT} + * @param jsonArray + */ + public JsonSchema(JsonArray jsonArray) { + JsonObject jsonObject = new JsonObject(); + JsonObject dataType = new JsonObject(); + jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME); + dataType.addProperty(TYPE_KEY, RECORD.toString()); + dataType.add(RECORD_FIELDS_KEY, jsonArray); + jsonObject.add(DATA_TYPE_KEY, dataType); + setJsonSchemaProperties(jsonObject); + this.type = RECORD; + this.json = jsonObject; + this.jsonArray = jsonArray; + this.schemaNestedLevel = ROOT; + } + + /** + * Build a {@link JsonSchema} using {@link JsonArray} + * This will create a {@link SchemaType} of {@link SchemaType#CHILD} + * @param jsonObject + */ + public JsonSchema(JsonObject jsonObject) { + JsonObject root = new JsonObject(); + if (!jsonObject.has(COLUMN_NAME_KEY) && !jsonObject.has(DATA_TYPE_KEY)) { + root.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME); + root.add(DATA_TYPE_KEY, jsonObject); + jsonObject = root; + } + if (!jsonObject.has(COLUMN_NAME_KEY) && jsonObject.has(DATA_TYPE_KEY)) { + jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME); + } + setJsonSchemaProperties(jsonObject); + JsonElement typeElement = getDataType().get(TYPE_KEY); + if (typeElement.isJsonPrimitive()) { + this.type = Type.valueOf(typeElement.getAsString().toUpperCase()); + } else if (typeElement.isJsonArray()) { + JsonArray jsonArray = typeElement.getAsJsonArray(); + if (jsonArray.size() != 2) { + throw new RuntimeException("Invalid " + TYPE_KEY + "property in schema for union types"); + } + this.type = UNION; + JsonElement type1 = jsonArray.get(0); + JsonElement type2 = jsonArray.get(1); + if (type1.isJsonPrimitive()) { + this.firstType = buildBaseSchema(Type.valueOf(type1.getAsString().toUpperCase())); + } + if (type2.isJsonPrimitive()) { + this.secondType = buildBaseSchema(Type.valueOf(type2.getAsString().toUpperCase())); + } + if (type1.isJsonObject()) { + this.firstType = buildBaseSchema(type1.getAsJsonObject()); + } + if (type2.isJsonObject()) { + this.secondType = buildBaseSchema(type2.getAsJsonObject()); + } + } else { + throw new RuntimeException("Invalid " + TYPE_KEY + "property in schema"); + } + this.json = jsonObject; + JsonArray jsonArray = new JsonArray(); + jsonArray.add(jsonObject); + this.jsonArray = jsonArray; + this.schemaNestedLevel = CHILD; + } + + /** + * Get symbols for a {@link Type#ENUM} type. + * @return + */ + public JsonArray getSymbols() { + if (this.type.equals(ENUM)) { + return getDataType().get(ENUM_SYMBOLS_KEY).getAsJsonArray(); + } + return new JsonArray(); + } + + /** + * Get {@link Type} for this {@link JsonSchema}. + * @return + */ + public Type getType() { + return type; + } + + /** + * Builds a {@link JsonSchema} object for a given {@link Type} object. + * @param type + * @return + */ + public static JsonSchema buildBaseSchema(Type type) { + JsonObject jsonObject = new JsonObject(); + JsonObject dataType = new JsonObject(); + jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME); + dataType.addProperty(TYPE_KEY, type.toString()); + jsonObject.add(DATA_TYPE_KEY, dataType); + return new JsonSchema(jsonObject); + } + + /** + * Builds a {@link JsonSchema} object for a given {@link Type} object. + * @return + */ + public static JsonSchema buildBaseSchema(JsonObject root) { + root.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME); + return new JsonSchema(root); + } + + /** + * Get optional property from a {@link JsonObject} for a {@link String} key. + * If key does'nt exists returns {@link #DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY}. + * @param jsonObject + * @param key + * @return + */ + public static String getOptionalProperty(JsonObject jsonObject, String key) { + return jsonObject.has(key) ? jsonObject.get(key).getAsString() : DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY; + } + + /** + * Fetches dataType.values from the JsonObject + * @return + */ + public JsonSchema getValuesWithinDataType() { + JsonElement element = this.getDataType().get(MAP_ITEMS_KEY); + if (element.isJsonObject()) { + return new JsonSchema(element.getAsJsonObject()); + } + if (element.isJsonArray()) { + return new JsonSchema(element.getAsJsonArray()); + } + if (element.isJsonPrimitive()) { + return buildBaseSchema(Type.valueOf(element.getAsString().toUpperCase())); + } + throw new UnsupportedOperationException( + "Map values can only be defined using JsonObject, JsonArray or JsonPrimitive."); + } + + /** + * Gets size for fixed type viz dataType.size from the JsonObject + * @return + */ + public int getSizeOfFixedData() { + if (this.type.equals(FIXED)) { + return this.getDataType().get(SIZE_KEY).getAsInt(); + } + return 0; + } + + public boolean isType(Type type) { + return this.type.equals(type); + } + + /** + * Fetches the nested or primitive array items type from schema. + * @return + * @throws DataConversionException + */ + public Type getTypeOfArrayItems() + throws DataConversionException { + JsonSchema arrayValues = getItemsWithinDataType(); + if (arrayValues == null) { + throw new DataConversionException("Array types only allow values as primitive, null or JsonObject"); + } + return arrayValues.getType(); + } + + public JsonSchema getItemsWithinDataType() { + JsonElement element = this.getDataType().get(ARRAY_ITEMS_KEY); + if (element.isJsonObject()) { + return new JsonSchema(element.getAsJsonObject()); + } + if (element.isJsonPrimitive()) { + return buildBaseSchema(Type.valueOf(element.getAsString().toUpperCase())); + } + throw new UnsupportedOperationException("Array items can only be defined using JsonObject or JsonPrimitive."); + } + + public JsonSchema getFirstTypeSchema() { + return this.firstType; + } + + public JsonSchema getSecondTypeSchema() { + return this.secondType; + } + + public int fieldsCount() { + return this.jsonArray.size(); + } + + public JsonSchema getFieldSchemaAt(int i) { + if (i >= this.jsonArray.size()) { + return new JsonSchema(this.json); + } + return new JsonSchema(this.jsonArray.get(i).getAsJsonObject()); + } + + public List<JsonSchema> getDataTypes() { + if (firstType != null && secondType != null) { + return Arrays.asList(firstType, secondType); + } + return Collections.singletonList(this); + } + + public boolean isRoot() { + return this.schemaNestedLevel.equals(ROOT); + } + + public String getName() { + return getOptionalProperty(this.getDataType(), NAME_KEY); + } + + /** + * Set properties for {@link JsonSchema} from a {@link JsonObject}. + * @param jsonObject + */ + private void setJsonSchemaProperties(JsonObject jsonObject) { + setColumnName(jsonObject.get(COLUMN_NAME_KEY).getAsString()); + setDataType(jsonObject.get(DATA_TYPE_KEY).getAsJsonObject()); + setNullable(jsonObject.has(IS_NULLABLE_KEY) && jsonObject.get(IS_NULLABLE_KEY).getAsBoolean()); + setComment(getOptionalProperty(jsonObject, COMMENT_KEY)); + setDefaultValue(getOptionalProperty(jsonObject, DEFAULT_VALUE_KEY)); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java index b1cd467..4cb8d84 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverter.java @@ -17,25 +17,30 @@ package org.apache.gobblin.converter.json; +import java.io.IOException; +import java.util.Map.Entry; + import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.Converter; import org.apache.gobblin.converter.DataConversionException; import org.apache.gobblin.converter.SchemaConversionException; import org.apache.gobblin.converter.SingleRecordIterable; - -import java.io.IOException; -import java.util.Map; - +import org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; import com.google.gson.JsonArray; import com.google.gson.JsonElement; import com.google.gson.JsonNull; import com.google.gson.JsonObject; import com.google.gson.JsonParser; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.FIXED; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.MAP; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.NULL; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.RECORD; +import static org.apache.gobblin.converter.json.JsonSchema.DEFAULT_RECORD_COLUMN_NAME; + /** * Converts a json string to a {@link JsonObject}. @@ -44,7 +49,9 @@ public class JsonStringToJsonIntermediateConverter extends Converter<String, Jso private final static Logger log = LoggerFactory.getLogger(JsonStringToJsonIntermediateConverter.class); - private static final String UNPACK_COMPLEX_SCHEMAS_KEY = "gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas"; + private static final String UNPACK_COMPLEX_SCHEMAS_KEY = + "gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas"; + public static final boolean DEFAULT_UNPACK_COMPLEX_SCHEMAS_KEY = Boolean.TRUE; private boolean unpackComplexSchemas; @@ -53,8 +60,10 @@ public class JsonStringToJsonIntermediateConverter extends Converter<String, Jso * @return a JsonArray representation of the schema */ @Override - public JsonArray convertSchema(String inputSchema, WorkUnitState workUnit) throws SchemaConversionException { - this.unpackComplexSchemas = workUnit.getPropAsBoolean(UNPACK_COMPLEX_SCHEMAS_KEY, true); + public JsonArray convertSchema(String inputSchema, WorkUnitState workUnit) + throws SchemaConversionException { + this.unpackComplexSchemas = + workUnit.getPropAsBoolean(UNPACK_COMPLEX_SCHEMAS_KEY, DEFAULT_UNPACK_COMPLEX_SCHEMAS_KEY); JsonParser jsonParser = new JsonParser(); log.info("Schema: " + inputSchema); @@ -76,40 +85,172 @@ public class JsonStringToJsonIntermediateConverter extends Converter<String, Jso if (!this.unpackComplexSchemas) { return new SingleRecordIterable<>(inputRecord); } + JsonSchema schema = new JsonSchema(outputSchema); + JsonObject rec = parse(inputRecord, schema); + return new SingleRecordIterable(rec); + } - JsonObject outputRecord = new JsonObject(); - - for (int i = 0; i < outputSchema.size(); i++) { - String expectedColumnName = outputSchema.get(i).getAsJsonObject().get("columnName").getAsString(); + /** + * Parses a provided JsonObject input using the provided JsonArray schema into + * a JsonObject. + * @param element + * @param schema + * @return + * @throws DataConversionException + */ + private JsonElement parse(JsonElement element, JsonSchema schema) + throws DataConversionException { + JsonObject root = new JsonObject(); + root.add(DEFAULT_RECORD_COLUMN_NAME, element); + JsonObject jsonObject = parse(root, schema); + return jsonObject.get(DEFAULT_RECORD_COLUMN_NAME); + } - if (inputRecord.has(expectedColumnName)) { - //As currently org.apache.gobblin.converter.avro.JsonIntermediateToAvroConverter is not able to handle complex schema's so storing it as string + /** + * Parses a provided JsonObject input using the provided JsonArray schema into + * a JsonObject. + * @param record + * @param schema + * @return + * @throws DataConversionException + */ + private JsonObject parse(JsonObject record, JsonSchema schema) + throws DataConversionException { + JsonObject output = new JsonObject(); + for (int i = 0; i < schema.fieldsCount(); i++) { + JsonSchema schemaElement = schema.getFieldSchemaAt(i); + String columnKey = schemaElement.getColumnName(); + JsonElement parsed; + if (!record.has(columnKey)) { + output.add(columnKey, JsonNull.INSTANCE); + continue; + } - if (inputRecord.get(expectedColumnName).isJsonArray()) { - outputRecord.addProperty(expectedColumnName, inputRecord.get(expectedColumnName).toString()); - } else if (inputRecord.get(expectedColumnName).isJsonObject()) { - //To check if internally in an JsonObject there is multiple hierarchy - boolean isMultiHierarchyInsideJsonObject = false; - for (Map.Entry<String, JsonElement> entry : ((JsonObject) inputRecord.get(expectedColumnName)).entrySet()) { - if (entry.getValue().isJsonArray() || entry.getValue().isJsonObject()) { - isMultiHierarchyInsideJsonObject = true; - break; + JsonElement columnValue = record.get(columnKey); + switch (schemaElement.getType()) { + case UNION: + parsed = parseUnionType(schemaElement, columnValue); + break; + case ENUM: + parsed = parseEnumType(schemaElement, columnValue); + break; + default: + if (columnValue.isJsonArray()) { + parsed = parseJsonArrayType(schemaElement, columnValue); + } else if (columnValue.isJsonObject()) { + parsed = parseJsonObjectType(schemaElement, columnValue); + } else { + parsed = parsePrimitiveType(schemaElement, columnValue); } - } - if (isMultiHierarchyInsideJsonObject) { - outputRecord.addProperty(expectedColumnName, inputRecord.get(expectedColumnName).toString()); - } else { - outputRecord.add(expectedColumnName, inputRecord.get(expectedColumnName)); - } - - } else { - outputRecord.add(expectedColumnName, inputRecord.get(expectedColumnName)); } - } else { - outputRecord.add(expectedColumnName, JsonNull.INSTANCE); + output.add(columnKey, parsed); + } + return output; + } + + private JsonElement parseUnionType(JsonSchema schemaElement, JsonElement columnValue) + throws DataConversionException { + try { + return parse(columnValue, schemaElement.getFirstTypeSchema()); + } catch (DataConversionException e) { + return parse(columnValue, schemaElement.getSecondTypeSchema()); + } + } + + /** + * Parses Enum type values + * @param schema + * @param value + * @return + * @throws DataConversionException + */ + private JsonElement parseEnumType(JsonSchema schema, JsonElement value) + throws DataConversionException { + if (schema.getSymbols().contains(value)) { + return value; + } + throw new DataConversionException( + "Invalid symbol: " + value.getAsString() + " allowed values: " + schema.getSymbols().toString()); + } + + /** + * Parses JsonArray type values + * @param schema + * @param value + * @return + * @throws DataConversionException + */ + private JsonElement parseJsonArrayType(JsonSchema schema, JsonElement value) + throws DataConversionException { + Type arrayType = schema.getTypeOfArrayItems(); + JsonArray tempArray = new JsonArray(); + if (Type.isPrimitive(arrayType)) { + return value; + } + JsonSchema nestedSchema = schema.getItemsWithinDataType(); + for (JsonElement v : value.getAsJsonArray()) { + tempArray.add(parse(v, nestedSchema)); + } + return tempArray; + } + + /** + * Parses JsonObject type values + * @param value + * @return + * @throws DataConversionException + */ + private JsonElement parseJsonObjectType(JsonSchema schema, JsonElement value) + throws DataConversionException { + JsonSchema valuesWithinDataType = schema.getValuesWithinDataType(); + if (schema.isType(MAP)) { + if (Type.isPrimitive(valuesWithinDataType.getType())) { + return value; } + JsonObject map = new JsonObject(); + for (Entry<String, JsonElement> mapEntry : value.getAsJsonObject().entrySet()) { + JsonElement mapValue = mapEntry.getValue(); + map.add(mapEntry.getKey(), parse(mapValue, valuesWithinDataType)); + } + return map; + } else if (schema.isType(RECORD)) { + JsonSchema schemaArray = valuesWithinDataType.getValuesWithinDataType(); + return parse((JsonObject) value, schemaArray); + } else { + return JsonNull.INSTANCE; + } + } + + /** + * Parses primitive types + * @param schema + * @param value + * @return + * @throws DataConversionException + */ + private JsonElement parsePrimitiveType(JsonSchema schema, JsonElement value) + throws DataConversionException { + + if ((schema.isType(NULL) || schema.isNullable()) && value.isJsonNull()) { + return JsonNull.INSTANCE; + } + + if ((schema.isType(NULL) && !value.isJsonNull()) || (!schema.isType(NULL) && value.isJsonNull())) { + throw new DataConversionException( + "Type mismatch for " + value.toString() + " of type " + schema.getDataTypes().toString()); + } + + if (schema.isType(FIXED)) { + int expectedSize = schema.getSizeOfFixedData(); + if (value.getAsString().length() == expectedSize) { + return value; + } else { + throw new DataConversionException( + "Fixed type value is not same as defined value expected fieldsCount: " + expectedSize); + } + } else { + return value; } - return new SingleRecordIterable<>(outputRecord); } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java new file mode 100644 index 0000000..7f39d30 --- /dev/null +++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonElementConversionFactoryTest.java @@ -0,0 +1,397 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter.avro; + +import java.io.InputStreamReader; +import java.lang.reflect.Type; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.converter.SchemaConversionException; +import org.apache.gobblin.converter.avro.JsonElementConversionFactory.EnumConverter; +import org.apache.gobblin.converter.avro.JsonElementConversionFactory.MapConverter; +import org.apache.gobblin.converter.avro.JsonElementConversionFactory.NullConverter; +import org.apache.gobblin.converter.avro.JsonElementConversionFactory.RecordConverter; +import org.apache.gobblin.converter.avro.JsonElementConversionFactory.StringConverter; +import org.apache.gobblin.converter.json.JsonSchema; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.ArrayConverter; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.JsonElementConverter; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.JsonElementConverter.buildNamespace; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.Type.NULL; +import static org.apache.gobblin.converter.avro.JsonElementConversionFactory.UnionConverter; + + +/** + * Unit test for {@link JsonElementConversionFactory} + * + * @author Tilak Patidar + */ +@Test(groups = {"gobblin.converter"}) +public class JsonElementConversionFactoryTest { + + private static WorkUnitState state; + private static JsonObject testData; + private static JsonParser jsonParser = new JsonParser(); + + @BeforeClass + public static void setUp() { + WorkUnit workUnit = new WorkUnit(new SourceState(), + new Extract(new SourceState(), Extract.TableType.SNAPSHOT_ONLY, "namespace", "dummy_table")); + state = new WorkUnitState(workUnit); + Type listType = new TypeToken<JsonObject>() { + }.getType(); + Gson gson = new Gson(); + testData = gson.fromJson(new InputStreamReader( + JsonElementConversionFactoryTest.class.getResourceAsStream("/converter/JsonElementConversionFactoryTest.json")), + listType); + } + + @Test + public void schemaWithArrayOfMaps() + throws Exception { + String testName = "schemaWithArrayOfMaps"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + JsonSchema jsonSchema = new JsonSchema(schema); + jsonSchema.setColumnName("dummy"); + + ArrayConverter converter = new ArrayConverter(jsonSchema, state); + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithArrayOfRecords() + throws Exception { + String testName = "schemaWithArrayOfRecords"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + JsonSchema jsonSchema = new JsonSchema(schema); + jsonSchema.setColumnName("dummy1"); + + ArrayConverter converter = new ArrayConverter(jsonSchema, state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithRecord() + throws DataConversionException, SchemaConversionException, UnsupportedDateTypeException { + String testName = "schemaWithRecord"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + JsonSchema jsonSchema = new JsonSchema(schema); + jsonSchema.setColumnName("dummy1"); + + RecordConverter converter = + new RecordConverter(jsonSchema, state, buildNamespace(state.getExtract().getNamespace(), "something")); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithArrayOfInts() + throws Exception { + String testName = "schemaWithArrayOfInts"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + ArrayConverter converter = new ArrayConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithNullType() { + NullConverter nullConverter = new NullConverter(JsonSchema.buildBaseSchema(NULL)); + JsonObject expected = new JsonObject(); + expected.addProperty("type", "null"); + expected.addProperty("source.type", "null"); + + Assert.assertEquals(avroSchemaToJsonElement(nullConverter), expected); + } + + @Test + public void schemaWithArrayOfEnums() + throws Exception { + String testName = "schemaWithArrayOfEnums"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + ArrayConverter converter = new ArrayConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithMap() + throws Exception { + String testName = "schemaWithMap"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + MapConverter converter = new MapConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithMapOfRecords() + throws Exception { + String testName = "schemaWithMapOfRecords"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + MapConverter converter = new MapConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithMapOfArrays() + throws Exception { + String testName = "schemaWithMapOfArrays"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + MapConverter converter = new MapConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithMapOfEnum() + throws Exception { + String testName = "schemaWithMapOfEnum"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + MapConverter converter = new MapConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithRecordOfMap() + throws Exception { + String testName = "schemaWithRecordOfMap"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + RecordConverter converter = new RecordConverter(new JsonSchema(schema), state, + buildNamespace(state.getExtract().getNamespace(), "something")); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithRecordOfArray() + throws Exception { + String testName = "schemaWithRecordOfArray"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + RecordConverter converter = new RecordConverter(new JsonSchema(schema), state, + buildNamespace(state.getExtract().getNamespace(), "something")); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithRecordOfEnum() + throws Exception { + String testName = "schemaWithRecordOfEnum"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + RecordConverter converter = new RecordConverter(new JsonSchema(schema), state, + buildNamespace(state.getExtract().getNamespace(), "something")); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test(expectedExceptions = IllegalStateException.class) + public void schemaWithMapValuesAsJsonArray() + throws Exception { + String testName = "schemaWithMapValuesAsJsonArray"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + + new RecordConverter(new JsonSchema(schema), state, buildNamespace(state.getExtract().getNamespace(), "something")); + } + + @Test(expectedExceptions = UnsupportedOperationException.class) + public void schemaWithMapValuesAsJsonNull() + throws Exception { + String testName = "schemaWithMapValuesAsJsonNull"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + + new RecordConverter(new JsonSchema(schema), state, buildNamespace(state.getExtract().getNamespace(), "something")); + } + + @Test + public void schemaWithRecordOfRecord() + throws Exception { + String testName = "schemaWithRecordOfRecord"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + RecordConverter converter = new RecordConverter(new JsonSchema(schema), state, + buildNamespace(state.getExtract().getNamespace(), "something")); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithRecordOfRecordCheckNamespace() + throws Exception { + String testName = "schemaWithRecordOfRecordCheckNamespace"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + RecordConverter converter = + new RecordConverter(new JsonSchema(schema), state, buildNamespace(state.getExtract().getNamespace(), "person")); + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + Assert.assertEquals(converter.schema().getField("someperson").schema().getNamespace(), "namespace.person.myrecord"); + Assert.assertEquals(converter.schema().getNamespace(), "namespace.person"); + } + + @Test + public void schemaWithRecordOfEnumCheckNamespace() + throws Exception { + String testName = "schemaWithRecordOfEnumCheckNamespace"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonObject expected = getExpectedSchema(testName).getAsJsonObject(); + + RecordConverter converter = new RecordConverter(new JsonSchema(schema), state, + buildNamespace(state.getExtract().getNamespace(), "something")); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + Assert.assertEquals(converter.schema().getField("someperson").schema().getNamespace(), + "namespace.something.myrecord"); + Assert.assertEquals(converter.schema().getNamespace(), "namespace.something"); + } + + @Test + public void schemaWithUnion() + throws Exception { + String testName = "schemaWithUnion"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonArray expected = getExpectedSchema(testName).getAsJsonArray(); + + UnionConverter converter = new UnionConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithComplexUnion() + throws Exception { + String testName = "schemaWithComplexUnion"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonArray expected = getExpectedSchema(testName).getAsJsonArray(); + + UnionConverter converter = new UnionConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithIsNullable() + throws Exception { + String testName = "schemaWithIsNullable"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonArray expected = getExpectedSchema(testName).getAsJsonArray(); + + StringConverter converter = new StringConverter(new JsonSchema(schema)); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithRecordIsNullable() + throws Exception { + String testName = "schemaWithRecordIsNullable"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonArray expected = getExpectedSchema(testName).getAsJsonArray(); + + RecordConverter converter = new RecordConverter(new JsonSchema(schema), state, + buildNamespace(state.getExtract().getNamespace(), "something")); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithMapIsNullable() + throws Exception { + String testName = "schemaWithMapIsNullable"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonArray expected = getExpectedSchema(testName).getAsJsonArray(); + + MapConverter converter = new MapConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithEnumIsNullable() + throws Exception { + String testName = "schemaWithEnumIsNullable"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonArray expected = getExpectedSchema(testName).getAsJsonArray(); + + EnumConverter converter = new EnumConverter(new JsonSchema(schema), "something"); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + @Test + public void schemaWithArrayIsNullable() + throws Exception { + String testName = "schemaWithArrayIsNullable"; + JsonObject schema = getSchemaData(testName).getAsJsonObject(); + JsonArray expected = getExpectedSchema(testName).getAsJsonArray(); + + ArrayConverter converter = new ArrayConverter(new JsonSchema(schema), state); + + Assert.assertEquals(avroSchemaToJsonElement(converter), expected); + } + + private JsonElement avroSchemaToJsonElement(JsonElementConverter converter) { + return jsonParser.parse(converter.schema().toString()); + } + + private JsonElement getExpectedSchema(String methodName) { + return testData.get(methodName).getAsJsonArray().get(1); + } + + private JsonElement getSchemaData(String methodName) { + return testData.get(methodName).getAsJsonArray().get(0); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java index 558c1f6..be2f417 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/avro/JsonIntermediateToAvroConverterTest.java @@ -26,22 +26,25 @@ import java.util.TimeZone; import org.apache.avro.Schema; import org.apache.avro.generic.GenericArray; import org.apache.avro.generic.GenericRecord; +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.converter.SchemaConversionException; +import org.apache.gobblin.source.workunit.Extract; +import org.apache.gobblin.source.workunit.WorkUnit; import org.joda.time.DateTime; import org.joda.time.DateTimeZone; import org.joda.time.format.DateTimeFormat; import org.joda.time.format.DateTimeFormatter; import org.testng.Assert; -import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; import com.google.gson.Gson; import com.google.gson.JsonArray; import com.google.gson.JsonObject; +import com.google.gson.JsonParser; import com.google.gson.reflect.TypeToken; -import org.apache.gobblin.configuration.ConfigurationKeys; -import org.apache.gobblin.configuration.SourceState; -import org.apache.gobblin.configuration.WorkUnitState; -import org.apache.gobblin.source.workunit.Extract.TableType; /** @@ -55,28 +58,48 @@ public class JsonIntermediateToAvroConverterTest { private JsonObject jsonRecord; private WorkUnitState state; - @BeforeClass - public void setUp() - throws Exception { - Type listType = new TypeToken<JsonArray>() { + /** + * To test schema and record using the path to their resource file. + * @param resourceFilePath + * @throws SchemaConversionException + * @throws DataConversionException + */ + private void complexSchemaTest(String resourceFilePath) + throws SchemaConversionException, DataConversionException { + JsonObject testData = initResources(resourceFilePath); + + JsonIntermediateToAvroConverter converter = new JsonIntermediateToAvroConverter(); + + Schema avroSchema = converter.convertSchema(jsonSchema, state); + GenericRecord genericRecord = converter.convertRecord(avroSchema, jsonRecord, state).iterator().next(); + JsonParser parser = new JsonParser(); + Assert.assertEquals(parser.parse(avroSchema.toString()).getAsJsonObject(), + testData.get("expectedSchema").getAsJsonObject()); + Assert.assertEquals(parser.parse(genericRecord.toString()), testData.get("expectedRecord").getAsJsonObject()); + } + + private JsonObject initResources(String resourceFilePath) { + Type listType = new TypeToken<JsonObject>() { }.getType(); Gson gson = new Gson(); - jsonSchema = gson.fromJson(new InputStreamReader(this.getClass().getResourceAsStream("/converter/schema.json")), listType); + JsonObject testData = + gson.fromJson(new InputStreamReader(this.getClass().getResourceAsStream(resourceFilePath)), listType); - listType = new TypeToken<JsonObject>() { - }.getType(); - jsonRecord = gson.fromJson(new InputStreamReader(this.getClass().getResourceAsStream("/converter/record.json")), listType); + jsonRecord = testData.get("record").getAsJsonObject(); + jsonSchema = testData.get("schema").getAsJsonArray(); - SourceState source = new SourceState(); - state = new WorkUnitState( - source.createWorkUnit(source.createExtract(TableType.SNAPSHOT_ONLY, "test_table", "test_namespace"))); + WorkUnit workUnit = new WorkUnit(new SourceState(), + new Extract(new SourceState(), Extract.TableType.SNAPSHOT_ONLY, "namespace", "dummy_table")); + state = new WorkUnitState(workUnit); state.setProp(ConfigurationKeys.CONVERTER_AVRO_TIME_FORMAT, "HH:mm:ss"); state.setProp(ConfigurationKeys.CONVERTER_AVRO_DATE_TIMEZONE, "PST"); + return testData; } @Test public void testConverter() throws Exception { + initResources("/converter/schema.json"); JsonIntermediateToAvroConverter converter = new JsonIntermediateToAvroConverter(); Schema avroSchema = converter.convertSchema(jsonSchema, state); @@ -121,4 +144,22 @@ public class JsonIntermediateToAvroConverterTest { Assert.assertNotEquals(record.get("LastModifiedDate"), record2.get("LastModifiedDate")); } + + @Test + public void testComplexSchema1() + throws Exception { + complexSchemaTest("/converter/complex1.json"); + } + + @Test + public void testComplexSchema2() + throws Exception { + complexSchemaTest("/converter/complex2.json"); + } + + @Test + public void testComplexSchema3() + throws Exception { + complexSchemaTest("/converter/complex3.json"); + } } http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/6dd36a50/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java new file mode 100644 index 0000000..305113b --- /dev/null +++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/json/JsonStringToJsonIntermediateConverterTest.java @@ -0,0 +1,118 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter.json; + +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.util.Map; + +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.converter.SchemaConversionException; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.gson.Gson; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.reflect.TypeToken; + +import gobblin.configuration.WorkUnitState; + +import static org.junit.Assert.assertEquals; + + +/** + * Unit test for {@link JsonStringToJsonIntermediateConverter} + * + * @author Tilak Patidar + */ +@Test(groups = {"gobblin.converter"}) +public class JsonStringToJsonIntermediateConverterTest { + + private static JsonStringToJsonIntermediateConverter converter; + private static JsonObject testJsonData; + + @BeforeClass + public static void setUp() + throws SchemaConversionException { + converter = new JsonStringToJsonIntermediateConverter(); + WorkUnitState workUnit = new WorkUnitState(); + workUnit.getPropAsBoolean("gobblin.converter.jsonStringToJsonIntermediate.unpackComplexSchemas", true); + converter.convertSchema("[]", workUnit); + Type jsonType = new TypeToken<JsonObject>() { + }.getType(); + Gson gson = new Gson(); + testJsonData = gson.fromJson(new InputStreamReader(JsonStringToJsonIntermediateConverterTest.class + .getResourceAsStream("/converter/JsonStringToJsonIntermediateConverter.json")), jsonType); + } + + private JsonObject parseJsonObject(JsonObject json, JsonArray record) + throws DataConversionException { + return converter.convertRecord(record, json.toString(), new WorkUnitState()).iterator().next(); + } + + @Test + public void testAllCases() + throws DataConversionException { + for (Map.Entry<String, JsonElement> keyset : testJsonData.entrySet()) { + JsonArray testData = keyset.getValue().getAsJsonArray(); + JsonObject json = testData.get(0).getAsJsonObject(); + JsonArray schema = testData.get(1).getAsJsonArray(); + JsonObject expected = testData.get(2).getAsJsonObject(); + JsonObject result = null; + try { + result = parseJsonObject(json, schema); + } catch (Exception e) { + e.printStackTrace(); + assertEquals("Test case failed : " + keyset.getKey(), "No exception", e.getMessage()); + } + assertEquals("Test case failed : " + keyset.getKey(), expected, result); + } + } + + @Test(expectedExceptions = DataConversionException.class, expectedExceptionsMessageRegExp = "Invalid symbol.*") + public void jsonWithInvalidEnumEntry() + throws DataConversionException { + String jsonStr = "{\"a\":\"somename\", \"b\":\"TROLL\"}"; + String schemaStr = + " [{\"columnName\":\"a\", \"dataType\":{\"type\":\"string\"}},{\"columnName\":\"b\", \"dataType\":{\"type\":\"enum\", \"symbols\":[\"HELL\",\"BELLS\"]}}]"; + + parseJsonObject(buildJsonObject(jsonStr), buildJsonArray(schemaStr)); + } + + @Test(expectedExceptions = UnsupportedOperationException.class, expectedExceptionsMessageRegExp = "Array items can only be defined using JsonObject or JsonPrimitive.") + public void jsonWithArrayOfMapContainingRecordWithWrongSchema() + throws DataConversionException { + String jsonStr = "{\"a\":\"somename\", \"b\":[{\"d\":{\"age\":\"10\"}},{\"d\":{\"age\":\"1\"}}]}"; + String schemaStr = + "[{\"columnName\":\"a\", \"dataType\":{\"type\":\"string\"}},{\"columnName\":\"b\", \"dataType\":{\"type\":\"array\", \"items\":[{\"dataType\":{\"type\":\"map\", \"values\":{\"dataType\":{\"type\":\"record\",\"values\":[{\"columnName\":\"age\", \"dataType\":{\"type\":\"int\"}}]}}}}]}}]"; + + parseJsonObject(buildJsonObject(jsonStr), buildJsonArray(schemaStr)); + } + + private JsonObject buildJsonObject(String s) { + JsonParser parser = new JsonParser(); + return (JsonObject) parser.parse(s); + } + + private JsonArray buildJsonArray(String schemaStr) { + JsonParser parser = new JsonParser(); + return parser.parse(schemaStr).getAsJsonArray(); + } +} \ No newline at end of file
