Repository: incubator-gobblin Updated Branches: refs/heads/master 626d312a2 -> f0582115b
[GOBBLIN-248] Converter for Json to Parquet Closes #2101 from tilakpatidar/parquet Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/f0582115 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/f0582115 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/f0582115 Branch: refs/heads/master Commit: f0582115ba3d655febec9af9e395ed858efcd335 Parents: 626d312 Author: tilakpatidar <[email protected]> Authored: Wed Oct 18 11:02:17 2017 +0530 Committer: Abhishek Tiwari <[email protected]> Committed: Wed Oct 18 11:02:17 2017 +0530 ---------------------------------------------------------------------- gobblin-modules/gobblin-parquet/build.gradle | 1 + .../parquet/JsonElementConversionFactory.java | 485 +++++++++++++++++++ ...JsonIntermediateToParquetGroupConverter.java | 57 +++ .../gobblin/converter/parquet/JsonSchema.java | 162 +++++++ .../gobblin/converter/parquet/ParquetGroup.java | 233 +++++++++ ...IntermediateToParquetGroupConverterTest.java | 128 +++++ .../JsonIntermediateToParquetConverter.json | 223 +++++++++ 7 files changed, 1289 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/build.gradle ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/build.gradle b/gobblin-modules/gobblin-parquet/build.gradle index e43f543..75530b9 100644 --- a/gobblin-modules/gobblin-parquet/build.gradle +++ b/gobblin-modules/gobblin-parquet/build.gradle @@ -20,6 +20,7 @@ apply plugin: 'java' dependencies { compile project(":gobblin-core") + compile externalDependency.gson compile externalDependency.parquet testCompile externalDependency.testng http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java new file mode 100644 index 0000000..1d3636a --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonElementConversionFactory.java @@ -0,0 +1,485 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter.parquet; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; + +import org.apache.gobblin.converter.parquet.JsonSchema.*; + +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; + +import parquet.example.data.Group; +import parquet.example.data.simple.BinaryValue; +import parquet.example.data.simple.BooleanValue; +import parquet.example.data.simple.DoubleValue; +import parquet.example.data.simple.FloatValue; +import parquet.example.data.simple.IntegerValue; +import parquet.example.data.simple.LongValue; +import parquet.io.api.Binary; +import parquet.schema.GroupType; +import parquet.schema.MessageType; +import parquet.schema.PrimitiveType; +import parquet.schema.PrimitiveType.PrimitiveTypeName; +import parquet.schema.Type; +import parquet.schema.Types; + +import static org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.CHILD; +import static org.apache.gobblin.converter.parquet.JsonSchema.*; +import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.STRING; +import static parquet.schema.OriginalType.UTF8; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.BINARY; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT32; +import static parquet.schema.PrimitiveType.PrimitiveTypeName.INT64; +import static parquet.schema.Type.Repetition.OPTIONAL; +import static parquet.schema.Type.Repetition.REPEATED; + + +/** + * <p> + * Creates a JsonElement to Parquet converter for all supported data types. + * </p> + * + * @author tilakpatidar + * + */ +public class JsonElementConversionFactory { + + /** + * Use to create a converter for a single field from a parquetSchema. + * + * @param schema + * @param repeated - Is the {@link Type} repeated in the parent {@link Group} + * @return + */ + public static JsonElementConverter getConverter(JsonSchema schema, boolean repeated) { + + InputType fieldType = schema.getInputType(); + switch (fieldType) { + case INT: + return new IntConverter(schema, repeated); + + case LONG: + return new LongConverter(schema, repeated); + + case FLOAT: + return new FloatConverter(schema, repeated); + + case DOUBLE: + return new DoubleConverter(schema, repeated); + + case BOOLEAN: + return new BooleanConverter(schema, repeated); + + case STRING: + return new StringConverter(schema, repeated); + + case ARRAY: + return new ArrayConverter(schema); + + case ENUM: + return new EnumConverter(schema); + + case RECORD: + return new RecordConverter(schema); + + case MAP: + return new MapConverter(schema); + + default: + throw new UnsupportedOperationException(fieldType + " is unsupported"); + } + } + + /** + * Converts a JsonElement into a supported ParquetType + * @author tilakpatidar + * + */ + public static abstract class JsonElementConverter { + protected final JsonSchema jsonSchema; + + protected JsonElementConverter(JsonSchema schema) { + this.jsonSchema = schema; + } + + /** + * Convert value to a parquet type and perform null check. + * @param value + * @return Parquet safe type + */ + public Object convert(JsonElement value) { + if (value.isJsonNull()) { + if (this.jsonSchema.isNullable()) { + return null; + } + throw new RuntimeException( + "Field: " + this.jsonSchema.getColumnName() + " is not nullable and contains a null value"); + } + return convertField(value); + } + + /** + * Returns a {@link Type} parquet schema + * @return + */ + abstract public Type schema(); + + /** + * Convert JsonElement to Parquet type + * @param value + * @return + */ + abstract Object convertField(JsonElement value); + } + + /** + * Converts a {@link JsonSchema} to a {@link PrimitiveType} + */ + public static abstract class PrimitiveConverter extends JsonElementConverter { + protected final boolean repeated; + private PrimitiveTypeName outputType; + protected Type schema; + + /** + * @param jsonSchema + * @param repeated + * @param outputType + */ + public PrimitiveConverter(JsonSchema jsonSchema, boolean repeated, PrimitiveTypeName outputType) { + super(jsonSchema); + this.repeated = repeated; + this.outputType = outputType; + this.schema = buildSchema(); + } + + protected Type buildSchema() { + return new PrimitiveType(this.repeated ? REPEATED : this.jsonSchema.optionalOrRequired(), this.outputType, + this.jsonSchema.getColumnName()); + } + + @Override + public Type schema() { + return this.schema; + } + } + + /** + * Converts {@link JsonSchema} having collection of elements of {@link InputType} into a {@link GroupType}. + */ + public static abstract class CollectionConverter extends JsonElementConverter { + protected InputType elementType; + protected JsonElementConverter elementConverter; + protected Type schema; + + public CollectionConverter(JsonSchema collectionSchema, InputType elementType, boolean repeated) { + super(collectionSchema); + this.elementType = elementType; + this.elementConverter = getConverter(getElementSchema(), repeated); + this.schema = buildSchema(); + } + + @Override + public Type schema() { + return this.schema; + } + + /** + * Prepare a {@link JsonSchema} for the elements in a collection. + * @return + */ + abstract JsonSchema getElementSchema(); + + abstract Type buildSchema(); + } + + public static class IntConverter extends PrimitiveConverter { + + public IntConverter(JsonSchema schema, boolean repeated) { + super(schema, repeated, INT32); + } + + @Override + IntegerValue convertField(JsonElement value) { + return new IntegerValue(value.getAsInt()); + } + } + + public static class LongConverter extends PrimitiveConverter { + + public LongConverter(JsonSchema schema, boolean repeated) { + super(schema, repeated, INT64); + } + + @Override + LongValue convertField(JsonElement value) { + return new LongValue(value.getAsLong()); + } + } + + public static class FloatConverter extends PrimitiveConverter { + + public FloatConverter(JsonSchema schema, boolean repeated) { + super(schema, repeated, PrimitiveTypeName.FLOAT); + } + + @Override + FloatValue convertField(JsonElement value) { + return new FloatValue(value.getAsFloat()); + } + } + + public static class DoubleConverter extends PrimitiveConverter { + + public DoubleConverter(JsonSchema schema, boolean repeated) { + super(schema, repeated, PrimitiveTypeName.DOUBLE); + } + + @Override + DoubleValue convertField(JsonElement value) { + return new DoubleValue(value.getAsDouble()); + } + } + + public static class BooleanConverter extends PrimitiveConverter { + + public BooleanConverter(JsonSchema schema, boolean repeated) { + super(schema, repeated, PrimitiveTypeName.BOOLEAN); + } + + @Override + BooleanValue convertField(JsonElement value) { + return new BooleanValue(value.getAsBoolean()); + } + } + + public static class StringConverter extends PrimitiveConverter { + + public StringConverter(JsonSchema schema, boolean repeated) { + super(schema, repeated, BINARY); + this.schema = buildSchema(); + } + + @Override + BinaryValue convertField(JsonElement value) { + return new BinaryValue(Binary.fromString(value.getAsString())); + } + + @Override + protected Type buildSchema() { + String columnName = this.jsonSchema.getColumnName(); + if (this.repeated) { + return Types.repeated(BINARY).as(UTF8).named(columnName); + } + switch (this.jsonSchema.optionalOrRequired()) { + case OPTIONAL: + return Types.optional(BINARY).as(UTF8).named(columnName); + case REQUIRED: + return Types.required(BINARY).as(UTF8).named(columnName); + default: + throw new RuntimeException("Unsupported Repetition type"); + } + } + } + + public static class ArrayConverter extends CollectionConverter { + + public ArrayConverter(JsonSchema arraySchema) { + super(arraySchema, arraySchema.getElementTypeUsingKey(ARRAY_ITEMS_KEY), true); + } + + @Override + Object convertField(JsonElement value) { + ParquetGroup array = new ParquetGroup((GroupType) schema()); + JsonElementConverter converter = this.elementConverter; + for (JsonElement elem : (JsonArray) value) { + array.add(ARRAY_KEY, converter.convert(elem)); + } + return array; + } + + @Override + protected Type buildSchema() { + List<Type> fields = new ArrayList<>(); + fields.add(0, this.elementConverter.schema()); + return new GroupType(this.jsonSchema.optionalOrRequired(), this.jsonSchema.getColumnName(), fields); + } + + @Override + JsonSchema getElementSchema() { + JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType); + jsonSchema.setColumnName(ARRAY_KEY); + return jsonSchema; + } + } + + public static class EnumConverter extends CollectionConverter { + private final HashSet<String> symbols = new HashSet<>(); + + public EnumConverter(JsonSchema enumSchema) { + super(enumSchema, STRING, false); + JsonArray symbolsArray = enumSchema.getSymbols(); + symbolsArray.forEach(e -> symbols.add(e.getAsString())); + } + + @Override + Object convertField(JsonElement value) { + if (symbols.contains(value.getAsString()) || this.jsonSchema.isNullable()) { + return this.elementConverter.convert(value); + } + throw new RuntimeException("Symbol " + value.getAsString() + " does not belong to set " + symbols.toString()); + } + + @Override + protected Type buildSchema() { + return this.elementConverter.schema(); + } + + @Override + JsonSchema getElementSchema() { + JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING); + jsonSchema.setColumnName(this.jsonSchema.getColumnName()); + return jsonSchema; + } + } + + public static class RecordConverter extends JsonElementConverter { + + private final HashMap<String, JsonElementConverter> converters; + private final RecordType recordType; + private final Type schema; + + public enum RecordType { + ROOT, CHILD + } + + public RecordConverter(JsonSchema recordSchema) { + this(recordSchema, CHILD); + } + + public RecordConverter(JsonSchema recordSchema, RecordType recordType) { + super(recordSchema); + this.converters = new HashMap<>(); + this.recordType = recordType; + this.schema = buildSchema(); + } + + @Override + Object convertField(JsonElement value) { + ParquetGroup r1 = new ParquetGroup((GroupType) schema()); + JsonObject inputRecord = value.getAsJsonObject(); + for (Map.Entry<String, JsonElement> entry : inputRecord.entrySet()) { + String key = entry.getKey(); + JsonElementConverter converter = this.converters.get(key); + Object convertedValue = converter.convert(entry.getValue()); + boolean valueIsNull = convertedValue == null; + Type.Repetition repetition = converter.jsonSchema.optionalOrRequired(); + if (valueIsNull && repetition.equals(OPTIONAL)) { + continue; + } + r1.add(key, convertedValue); + } + return r1; + } + + private Type buildSchema() { + JsonArray inputSchema = this.jsonSchema.getDataTypeValues(); + List<Type> parquetTypes = new ArrayList<>(); + for (JsonElement element : inputSchema) { + JsonObject map = (JsonObject) element; + JsonSchema elementSchema = new JsonSchema(map); + String columnName = elementSchema.getColumnName(); + JsonElementConverter converter = JsonElementConversionFactory.getConverter(elementSchema, false); + Type schemaType = converter.schema(); + this.converters.put(columnName, converter); + parquetTypes.add(schemaType); + } + String docName = this.jsonSchema.getColumnName(); + switch (recordType) { + case ROOT: + return new MessageType(docName, parquetTypes); + case CHILD: + return new GroupType(this.jsonSchema.optionalOrRequired(), docName, parquetTypes); + default: + throw new RuntimeException("Unsupported Record type"); + } + } + + @Override + public Type schema() { + return this.schema; + } + } + + public static class MapConverter extends CollectionConverter { + + public MapConverter(JsonSchema mapSchema) { + super(mapSchema, mapSchema.getElementTypeUsingKey(MAP_ITEMS_KEY), false); + } + + @Override + Object convertField(JsonElement value) { + ParquetGroup mapGroup = new ParquetGroup((GroupType) schema()); + JsonElementConverter converter = this.elementConverter; + JsonObject map = (JsonObject) value; + + for (Map.Entry<String, JsonElement> entry : map.entrySet()) { + ParquetGroup entrySet = (ParquetGroup) mapGroup.addGroup(MAP_KEY); + entrySet.add(MAP_KEY_COLUMN_NAME, entry.getKey()); + entrySet.add(MAP_VALUE_COLUMN_NAME, converter.convert(entry.getValue())); + } + + return mapGroup; + } + + @Override + protected Type buildSchema() { + JsonElementConverter elementConverter = this.elementConverter; + JsonElementConverter keyConverter = getKeyConverter(); + GroupType mapGroup = + Types.repeatedGroup().addFields(keyConverter.schema(), elementConverter.schema()).named(MAP_KEY) + .asGroupType(); + String columnName = this.jsonSchema.getColumnName(); + switch (this.jsonSchema.optionalOrRequired()) { + case OPTIONAL: + return Types.optionalGroup().addFields(mapGroup).named(columnName).asGroupType(); + case REQUIRED: + return Types.requiredGroup().addFields(mapGroup).named(columnName).asGroupType(); + default: + return null; + } + } + + @Override + JsonSchema getElementSchema() { + JsonSchema jsonSchema = JsonSchema.buildBaseSchema(this.elementType); + jsonSchema.setColumnName(MAP_VALUE_COLUMN_NAME); + return jsonSchema; + } + + public JsonElementConverter getKeyConverter() { + JsonSchema jsonSchema = JsonSchema.buildBaseSchema(STRING); + jsonSchema.setColumnName(MAP_KEY_COLUMN_NAME); + return getConverter(jsonSchema, false); + } + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java new file mode 100644 index 0000000..b04dcf8 --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverter.java @@ -0,0 +1,57 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter.parquet; + +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.Converter; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.converter.SchemaConversionException; +import org.apache.gobblin.converter.SingleRecordIterable; +import org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + +import parquet.example.data.Group; +import parquet.schema.MessageType; + +import static org.apache.gobblin.converter.parquet.JsonElementConversionFactory.RecordConverter.RecordType.ROOT; + + +/** + * A converter to Convert JsonIntermediate to Parquet + * @author tilakpatidar + */ +public class JsonIntermediateToParquetGroupConverter extends Converter<JsonArray, MessageType, JsonObject, Group> { + private RecordConverter recordConverter; + + @Override + public MessageType convertSchema(JsonArray inputSchema, WorkUnitState workUnit) + throws SchemaConversionException { + String fieldName = workUnit.getExtract().getTable(); + JsonSchema jsonSchema = new JsonSchema(inputSchema); + jsonSchema.setColumnName(fieldName); + recordConverter = new RecordConverter(jsonSchema, ROOT); + return (MessageType) recordConverter.schema(); + } + + @Override + public Iterable<Group> convertRecord(MessageType outputSchema, JsonObject inputRecord, WorkUnitState workUnit) + throws DataConversionException { + return new SingleRecordIterable<>((Group) recordConverter.convert(inputRecord)); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java new file mode 100644 index 0000000..b7e001b --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/JsonSchema.java @@ -0,0 +1,162 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter.parquet; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.source.extractor.schema.Schema; + +import com.google.gson.JsonArray; +import com.google.gson.JsonObject; + +import parquet.schema.Type.Repetition; + +import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.ENUM; +import static org.apache.gobblin.converter.parquet.JsonSchema.InputType.RECORD; +import static parquet.schema.Type.Repetition.OPTIONAL; +import static parquet.schema.Type.Repetition.REQUIRED; + + +/** + * Represents a source schema declared in the configuration with {@link ConfigurationKeys#SOURCE_SCHEMA}. + * The source schema is represented by a {@link JsonArray}. + * @author tilakpatidar + */ +public class JsonSchema extends Schema { + public static final String RECORD_FIELDS_KEY = "values"; + public static final String TYPE_KEY = "type"; + public static final String ENUM_SYMBOLS_KEY = "symbols"; + public static final String COLUMN_NAME_KEY = "columnName"; + public static final String DATA_TYPE_KEY = "dataType"; + public static final String COMMENT_KEY = "comment"; + public static final String DEFAULT_VALUE_KEY = "defaultValue"; + public static final String IS_NULLABLE_KEY = "isNullable"; + public static final String DEFAULT_RECORD_COLUMN_NAME = "temp"; + public static final String DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY = ""; + public static final String ARRAY_KEY = "item"; + public static final String ARRAY_ITEMS_KEY = "items"; + public static final String MAP_ITEMS_KEY = "values"; + public static final String MAP_KEY = "map"; + public static final String MAP_KEY_COLUMN_NAME = "key"; + public static final String MAP_VALUE_COLUMN_NAME = "value"; + private final InputType type; + + public enum InputType { + STRING, INT, LONG, FLOAT, DOUBLE, BOOLEAN, ARRAY, ENUM, RECORD, MAP + } + + public JsonSchema(JsonArray jsonArray) { + JsonObject jsonObject = new JsonObject(); + JsonObject dataType = new JsonObject(); + jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME); + dataType.addProperty(TYPE_KEY, RECORD.toString()); + dataType.add(RECORD_FIELDS_KEY, jsonArray); + jsonObject.add(DATA_TYPE_KEY, dataType); + setJsonSchemaProperties(jsonObject); + this.type = RECORD; + } + + public JsonSchema(JsonObject jsonobject) { + setJsonSchemaProperties(jsonobject); + this.type = InputType.valueOf(getDataType().get(TYPE_KEY).getAsString().toUpperCase()); + } + + /** + * Get source.schema within a {@link InputType#RECORD} type. + * The source.schema is represented by a {@link JsonArray} + * @return + */ + public JsonArray getDataTypeValues() { + if (this.type.equals(RECORD)) { + return getDataType().get(RECORD_FIELDS_KEY).getAsJsonArray(); + } + return new JsonArray(); + } + + /** + * Get symbols for a {@link InputType#ENUM} type. + * @return + */ + public JsonArray getSymbols() { + if (this.type.equals(ENUM)) { + return getDataType().get(ENUM_SYMBOLS_KEY).getAsJsonArray(); + } + return new JsonArray(); + } + + /** + * Get {@link InputType} for this {@link JsonSchema}. + * @return + */ + public InputType getInputType() { + return type; + } + + /** + * Builds a {@link JsonSchema} object for a given {@link InputType} object. + * @param type + * @return + */ + public static JsonSchema buildBaseSchema(InputType type) { + JsonObject jsonObject = new JsonObject(); + JsonObject dataType = new JsonObject(); + jsonObject.addProperty(COLUMN_NAME_KEY, DEFAULT_RECORD_COLUMN_NAME); + dataType.addProperty(TYPE_KEY, type.toString()); + jsonObject.add(DATA_TYPE_KEY, dataType); + return new JsonSchema(jsonObject); + } + + /** + * {@link InputType} of the elements composed within complex type. + * @param itemKey + * @return + */ + public InputType getElementTypeUsingKey(String itemKey) { + String type = this.getDataType().get(itemKey).getAsString().toUpperCase(); + return InputType.valueOf(type); + } + + /** + * Parquet {@link Repetition} for this {@link JsonSchema}. + * @return + */ + public Repetition optionalOrRequired() { + return this.isNullable() ? OPTIONAL : REQUIRED; + } + + /** + * Set properties for {@link JsonSchema} from a {@link JsonObject}. + * @param jsonObject + */ + private void setJsonSchemaProperties(JsonObject jsonObject) { + setColumnName(jsonObject.get(COLUMN_NAME_KEY).getAsString()); + setDataType(jsonObject.get(DATA_TYPE_KEY).getAsJsonObject()); + setNullable(jsonObject.has(IS_NULLABLE_KEY) && jsonObject.get(IS_NULLABLE_KEY).getAsBoolean()); + setComment(getOptionalProperty(jsonObject, COMMENT_KEY)); + setDefaultValue(getOptionalProperty(jsonObject, DEFAULT_VALUE_KEY)); + } + + /** + * Get optional property from a {@link JsonObject} for a {@link String} key. + * If key does'nt exists returns {@link #DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY}. + * @param jsonObject + * @param key + * @return + */ + private String getOptionalProperty(JsonObject jsonObject, String key) { + return jsonObject.has(key) ? jsonObject.get(key).getAsString() : DEFAULT_VALUE_FOR_OPTIONAL_PROPERTY; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java new file mode 100644 index 0000000..783d845 --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/main/java/org/apache/gobblin/converter/parquet/ParquetGroup.java @@ -0,0 +1,233 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter.parquet; + +import java.util.ArrayList; +import java.util.List; + +import parquet.example.data.Group; +import parquet.example.data.simple.BinaryValue; +import parquet.example.data.simple.BooleanValue; +import parquet.example.data.simple.DoubleValue; +import parquet.example.data.simple.FloatValue; +import parquet.example.data.simple.Int96Value; +import parquet.example.data.simple.IntegerValue; +import parquet.example.data.simple.LongValue; +import parquet.example.data.simple.NanoTime; +import parquet.example.data.simple.Primitive; +import parquet.io.api.Binary; +import parquet.io.api.RecordConsumer; +import parquet.schema.GroupType; +import parquet.schema.PrimitiveType; +import parquet.schema.Type; + +import static parquet.schema.Type.Repetition.REPEATED; + + +/** + * Custom Implementation of {@link Group} to support adding {@link Object} of type {@link Primitive} or {@link Group}. + * Also provides methods to add {@link Primitive} and {@link Group} with {@link String} key if index is not known. + * @author tilakpatidar + */ +public class ParquetGroup extends Group { + + private final GroupType schema; + //each item represents data of a field, which is indexed by the fieldIndex of the schema + private final List<Object>[] data; + + public ParquetGroup(GroupType schema) { + this.schema = schema; + this.data = new List[schema.getFields().size()]; + + for (int i = 0; i < schema.getFieldCount(); ++i) { + this.data[i] = new ArrayList(); + } + } + + public String toString() { + return this.toString(""); + } + + public String toString(String indent) { + StringBuilder result = new StringBuilder(); + int i = 0; + for (Type field : this.schema.getFields()) { + String name = field.getName(); + List<Object> values = this.data[i]; + for (Object value : values) { + result.append(indent).append(name); + if (value == null) { + result.append(": NULL\n"); + } else if (value instanceof Group) { + result.append("\n").append(((ParquetGroup) value).toString(indent + " ")); + } else { + result.append(": ").append(value.toString()).append("\n"); + } + } + i++; + } + return result.toString(); + } + + public Group addGroup(int fieldIndex) { + ParquetGroup g = new ParquetGroup(this.schema.getType(fieldIndex).asGroupType()); + this.data[fieldIndex].add(g); + return g; + } + + public Group getGroup(int fieldIndex, int index) { + return (Group) this.getValue(fieldIndex, index); + } + + private Object getValue(int fieldIndex, int index) { + List<Object> list; + try { + list = this.data[fieldIndex]; + } catch (IndexOutOfBoundsException var6) { + throw new RuntimeException( + "not found " + fieldIndex + "(" + this.schema.getFieldName(fieldIndex) + ") in group:\n" + this); + } + + try { + return list.get(index); + } catch (IndexOutOfBoundsException var5) { + throw new RuntimeException( + "not found " + fieldIndex + "(" + this.schema.getFieldName(fieldIndex) + ") element number " + index + + " in group:\n" + this); + } + } + + public void add(int fieldIndex, Primitive value) { + Type type = this.schema.getType(fieldIndex); + List<Object> list = this.data[fieldIndex]; + if (!type.isRepetition(REPEATED) && !list.isEmpty()) { + throw new IllegalStateException( + "field " + fieldIndex + " (" + type.getName() + ") can not have more than one value: " + list); + } else { + list.add(value); + } + } + + public int getFieldRepetitionCount(int fieldIndex) { + List<Object> list = this.data[fieldIndex]; + return list == null ? 0 : list.size(); + } + + public String getValueToString(int fieldIndex, int index) { + return String.valueOf(this.getValue(fieldIndex, index)); + } + + public String getString(int fieldIndex, int index) { + return ((BinaryValue) this.getValue(fieldIndex, index)).getString(); + } + + public int getInteger(int fieldIndex, int index) { + return ((IntegerValue) this.getValue(fieldIndex, index)).getInteger(); + } + + public boolean getBoolean(int fieldIndex, int index) { + return ((BooleanValue) this.getValue(fieldIndex, index)).getBoolean(); + } + + public Binary getBinary(int fieldIndex, int index) { + return ((BinaryValue) this.getValue(fieldIndex, index)).getBinary(); + } + + public Binary getInt96(int fieldIndex, int index) { + return ((Int96Value) this.getValue(fieldIndex, index)).getInt96(); + } + + public void add(int fieldIndex, int value) { + this.add(fieldIndex, new IntegerValue(value)); + } + + public void add(int fieldIndex, long value) { + this.add(fieldIndex, new LongValue(value)); + } + + public void add(int fieldIndex, String value) { + this.add(fieldIndex, new BinaryValue(Binary.fromString(value))); + } + + public void add(int fieldIndex, NanoTime value) { + this.add(fieldIndex, value.toInt96()); + } + + public void add(int fieldIndex, boolean value) { + this.add(fieldIndex, new BooleanValue(value)); + } + + public void add(int fieldIndex, Binary value) { + switch (this.getType().getType(fieldIndex).asPrimitiveType().getPrimitiveTypeName()) { + case BINARY: + this.add(fieldIndex, new BinaryValue(value)); + break; + case INT96: + this.add(fieldIndex, new Int96Value(value)); + break; + default: + throw new UnsupportedOperationException( + this.getType().asPrimitiveType().getName() + " not supported for Binary"); + } + } + + public void add(int fieldIndex, float value) { + this.add(fieldIndex, new FloatValue(value)); + } + + public void add(int fieldIndex, double value) { + this.add(fieldIndex, new DoubleValue(value)); + } + + public GroupType getType() { + return this.schema; + } + + public void writeValue(int field, int index, RecordConsumer recordConsumer) { + ((Primitive) this.getValue(field, index)).writeValue(recordConsumer); + } + + /** + * Add any object of {@link PrimitiveType} or {@link Group} type with a String key. + * @param key + * @param object + */ + public void add(String key, Object object) { + int fieldIndex = getIndex(key); + if (object.getClass() == ParquetGroup.class) { + this.addGroup(key, (Group) object); + } else { + this.add(fieldIndex, (Primitive) object); + } + } + + private int getIndex(String key) { + return getType().getFieldIndex(key); + } + + /** + * Add a {@link Group} given a String key. + * @param key + * @param object + */ + private void addGroup(String key, Group object) { + int fieldIndex = getIndex(key); + this.schema.getType(fieldIndex).asGroupType(); + this.data[fieldIndex].add(object); + } +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java new file mode 100644 index 0000000..c92834b --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/test/java/org/apache/gobblin/converter/parquet/JsonIntermediateToParquetGroupConverterTest.java @@ -0,0 +1,128 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.converter.parquet; + +import java.io.InputStreamReader; +import java.lang.reflect.Type; + +import org.apache.gobblin.configuration.SourceState; +import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; +import org.apache.gobblin.converter.SchemaConversionException; +import org.apache.gobblin.source.workunit.Extract; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +import com.google.gson.Gson; +import com.google.gson.JsonObject; +import com.google.gson.reflect.TypeToken; + +import parquet.example.data.Group; +import parquet.schema.MessageType; + +import static org.testng.Assert.assertEquals; + + +@Test(groups = {"gobblin.converter"}) +public class JsonIntermediateToParquetGroupConverterTest { + private static final String RESOURCE_PATH = "/converter/JsonIntermediateToParquetConverter.json"; + private static JsonObject testCases; + private static WorkUnitState workUnit; + private static JsonIntermediateToParquetGroupConverter parquetConverter; + + @BeforeClass + public static void setUp() { + Type listType = new TypeToken<JsonObject>() { + }.getType(); + Gson gson = new Gson(); + JsonObject testData = gson.fromJson( + new InputStreamReader(JsonIntermediateToParquetGroupConverter.class.getResourceAsStream(RESOURCE_PATH)), listType); + + testCases = testData.getAsJsonObject(); + SourceState source = new SourceState(); + workUnit = new WorkUnitState( + source.createWorkUnit(source.createExtract(Extract.TableType.SNAPSHOT_ONLY, "test_namespace", "test_table"))); + } + + private void testCase(String testCaseName) + throws SchemaConversionException, DataConversionException { + JsonObject test = testCases.get(testCaseName).getAsJsonObject(); + parquetConverter = new JsonIntermediateToParquetGroupConverter(); + + MessageType schema = parquetConverter.convertSchema(test.get("schema").getAsJsonArray(), workUnit); + Group record = + parquetConverter.convertRecord(schema, test.get("record").getAsJsonObject(), workUnit).iterator().next(); + + assertEqualsIgnoreSpaces(schema.toString(), test.get("expectedSchema").getAsString()); + assertEqualsIgnoreSpaces(record.toString(), test.get("expectedRecord").getAsString()); + } + + @Test(expectedExceptions = RuntimeException.class, expectedExceptionsMessageRegExp = "Symbol .* does not belong to set \\[.*?\\]") + public void testEnumTypeBelongsToEnumSet() + throws Exception { + JsonObject test = testCases.get("enum").getAsJsonObject(); + parquetConverter = new JsonIntermediateToParquetGroupConverter(); + + MessageType schema = parquetConverter.convertSchema(test.get("schema").getAsJsonArray(), workUnit); + JsonObject jsonRecord = test.get("record").getAsJsonObject(); + jsonRecord.addProperty("some_enum", "HELL"); + + parquetConverter.convertRecord(schema, jsonRecord, workUnit).iterator().next(); + } + + @Test + public void testPrimitiveTypes() + throws Exception { + testCase("simplePrimitiveTypes"); + } + + @Test + public void testArrayType() + throws Exception { + testCase("array"); + } + + @Test + public void testEnumType() + throws Exception { + testCase("enum"); + } + + @Test + public void testRecordType() + throws Exception { + testCase("record"); + } + + @Test + public void testMapType() + throws Exception { + testCase("map"); + } + + @Test + public void testNullValueInOptionalField() + throws Exception { + testCase("nullValueInOptionalField"); + + } + + private void assertEqualsIgnoreSpaces(String actual, String expected) { + assertEquals(actual.replaceAll("\\n", ";").replaceAll("\\s|\\t", ""), + expected.replaceAll("\\n", ";").replaceAll("\\s|\\t", "")); + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/f0582115/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json b/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json new file mode 100644 index 0000000..e12325d --- /dev/null +++ b/gobblin-modules/gobblin-parquet/src/test/resources/converter/JsonIntermediateToParquetConverter.json @@ -0,0 +1,223 @@ +{ + "simplePrimitiveTypes": { + "record": { + "a": 5, + "b": 5.0, + "c": 8.0, + "d": true, + "e": "somestring" + }, + "schema": [ + { + "columnName": "a", + "dataType": { + "type": "int" + } + }, + { + "columnName": "b", + "dataType": { + "type": "float" + } + }, + { + "columnName": "c", + "dataType": { + "type": "double" + } + }, + { + "columnName": "d", + "dataType": { + "type": "boolean" + } + }, + { + "columnName": "e", + "dataType": { + "type": "string" + } + } + ], + "expectedRecord": "a: 5 ; b: 5.0 ; c: 8.0 ; d: true ; e: somestring ; ", + "expectedSchema": "message test_table{ ; required int32 a ; ; required float b ; ; required double c ; ; required boolean d ; ; required binary e (UTF8) ; ; } ; " + }, + "array": { + "record": { + "somearray": [ + 1, + 2, + 3 + ], + "somearray1": [ + 1, + 2, + 3 + ], + "somearray2": [ + 1.0, + 2.0, + 3.0 + ], + "somearray3": [ + 1.0, + 2.0, + 3.0 + ], + "somearray4": [ + true, + false, + true + ], + "somearray5": [ + "hello", + "world" + ] + }, + "schema": [ + { + "columnName": "somearray", + "dataType": { + "type": "array", + "items": "int" + } + }, + { + "columnName": "somearray1", + "dataType": { + "type": "array", + "items": "long" + } + }, + { + "columnName": "somearray2", + "dataType": { + "type": "array", + "items": "float" + } + }, + { + "columnName": "somearray3", + "dataType": { + "type": "array", + "items": "double" + } + }, + { + "columnName": "somearray4", + "dataType": { + "type": "array", + "items": "boolean" + } + }, + { + "columnName": "somearray5", + "dataType": { + "type": "array", + "items": "string" + } + } + ], + "expectedRecord": "somearray ; item:1 ; item:2 ; item:3 ; somearray1 ; item:1 ; item:2 ; item:3 ; somearray2 ; item:1.0 ; item:2.0 ; item:3.0 ; somearray3 ; item:1.0 ; item:2.0 ; item:3.0 ; somearray4 ; item:true ; item:false ; item:true ; somearray5 ; item:hello ; item:world ; ", + "expectedSchema": "message test_table { ; required group somearray { ; repeated int32 item ; ; } ; required groupsomearray1 { ; repeated int64 item ; ; } ; required groupsomearray2 { ; repeated float item ; ; } ; required groupsomearray3 { ; repeated double item ; ; } ; required groupsomearray4 { ; repeated boolean item ; ; } ; required groupsomearray5 { ; repeated binary item(UTF8) ; ; } ; } ; " + }, + "enum": { + "record": { + "some_enum": "HELLO" + }, + "schema": [ + { + "columnName": "some_enum", + "dataType": { + "type": "enum", + "symbols": [ + "HELLO", + "WORLD" + ] + } + } + ], + "expectedRecord": "some_enum : HELLO ;", + "expectedSchema": "message test_table { ; required binary some_enum (UTF8) ;; } ;" + }, + "record": { + "record": { + "some_record": { + "name": "me", + "age": 22, + "some_array": [ + 3, + 4, + 5 + ] + } + }, + "schema": [ + { + "columnName": "some_record", + "dataType": { + "type": "record", + "values": [ + { + "columnName": "name", + "dataType": { + "type": "string" + } + }, + { + "columnName": "age", + "dataType": { + "type": "long" + } + }, + { + "columnName": "some_array", + "dataType": { + "type": "array", + "items": "int" + } + } + ] + } + } + ], + "expectedRecord": "some_record ; name:me ; age:22 ; some_array ; item:3 ; item:4 ; item:5 ;", + "expectedSchema": "message test_table { ; required group some_record { ; required binary name (UTF8) ; ; required int64 age ; ; required group some_array { ; repeated int32 item ; ; } ; } ; } ; " + }, + "map": { + "schema": [ + { + "columnName": "cityToCountry", + "dataType": { + "type": "map", + "values": "string" + } + } + ], + "record": { + "cityToCountry": { + "ny": "US", + "london": "UK", + "delhi": "India" + } + }, + "expectedRecord": "cityToCountry; map; key:ny;value:US; map; key:london;value:UK; map; key:delhi;value:India;", + "expectedSchema": "message test_table { ; required groupcityToCountry { ; repeated group map { ; required binary key (UTF8) ; ; required binary value (UTF8) ; ; } ; } ; } ;" + }, + "nullValueInOptionalField": { + "record": { + "a": null + }, + "schema": [ + { + "columnName": "a", + "isNullable": true, + "dataType": { + "type": "int" + } + } + ], + "expectedRecord": "", + "expectedSchema": "message test_table {; optional int32 a ;; };" + } +} \ No newline at end of file
