Repository: incubator-gobblin Updated Branches: refs/heads/0.12.0 2951fd267 -> b7f123f77
[GOBBLIN-361] Support Nested nullable Record type for JDBCWriter Closes #2233 from jinhyukchang/master Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/c6b3824a Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/c6b3824a Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/c6b3824a Branch: refs/heads/0.12.0 Commit: c6b3824aac01a61cb492a5cd5b672fc2fea1b09f Parents: ec85298 Author: Jin Hyuk Chang <[email protected]> Authored: Tue Jan 23 10:17:10 2018 -0800 Committer: Abhishek Tiwari <[email protected]> Committed: Tue Jan 23 10:17:10 2018 -0800 ---------------------------------------------------------------------- .../filter/AvroFieldsPickConverter.java | 55 +++++- .../filter/AvroFieldsPickConverterTest.java | 37 +++- .../converted_pickfields_nested_with_union.avro | Bin 0 -> 678 bytes .../converted_pickfields_nested_with_union.avsc | 47 +++++ .../converter/pickfields_nested_with_union.avro | Bin 0 -> 1264 bytes .../converter/pickfields_nested_with_union.avsc | 33 ++++ .../jdbc/AvroToJdbcEntryConverter.java | 150 ++++++++------ .../jdbc/AvroToJdbcEntryConverterTest.java | 121 ++++++++++++ .../converter/pickfields_nested_with_union.avro | Bin 0 -> 1264 bytes .../converter/pickfields_nested_with_union.avsc | 33 ++++ .../converter/pickfields_nested_with_union.json | 194 +++++++++++++++++++ 11 files changed, 608 insertions(+), 62 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java index c7e2db5..74ed3f3 100644 --- a/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java +++ b/gobblin-core/src/main/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverter.java @@ -21,8 +21,10 @@ import java.io.IOException; import java.util.Iterator; import java.util.List; import java.util.Map; + import org.apache.avro.Schema; import org.apache.avro.Schema.Field; +import org.apache.avro.Schema.Type; import org.apache.avro.generic.GenericRecord; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +33,7 @@ import com.google.common.base.Preconditions; import com.google.common.base.Splitter; import com.google.common.collect.Lists; import com.google.common.collect.Maps; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.AvroToAvroConverterBase; @@ -133,29 +136,71 @@ public class AvroFieldsPickConverter extends AvroToAvroConverterBase { return createSchemaHelper(schema, root); } - private static Schema createSchemaHelper(Schema inputSchema, TrieNode node) { - Schema newRecord = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(), inputSchema.getNamespace(), - inputSchema.isError()); + private static Schema createSchemaHelper(final Schema inputSchema, TrieNode node) { List<Field> newFields = Lists.newArrayList(); for (TrieNode child : node.children.values()) { - Field innerSrcField = inputSchema.getField(child.val); - Preconditions.checkNotNull(innerSrcField, child.val + " does not exist under " + inputSchema); + Schema recordSchema = getActualRecord(inputSchema); + Field innerSrcField = recordSchema.getField(child.val); + Preconditions.checkNotNull(innerSrcField, child.val + " does not exist under " + recordSchema); if (child.children.isEmpty()) { //Leaf newFields.add( new Field(innerSrcField.name(), innerSrcField.schema(), innerSrcField.doc(), innerSrcField.defaultValue())); } else { Schema innerSrcSchema = innerSrcField.schema(); + Schema innerDestSchema = createSchemaHelper(innerSrcSchema, child); //Recurse of schema Field innerDestField = new Field(innerSrcField.name(), innerDestSchema, innerSrcField.doc(), innerSrcField.defaultValue()); newFields.add(innerDestField); } } + + if (Type.UNION.equals(inputSchema.getType())) { + Preconditions.checkArgument(inputSchema.getTypes().size() <= 2, + "For union type in nested record, it should only have NULL and Record type"); + + Schema recordSchema = getActualRecord(inputSchema); + Schema newRecord = Schema.createRecord(recordSchema.getName(), recordSchema.getDoc(), recordSchema.getNamespace(), + recordSchema.isError()); + newRecord.setFields(newFields); + if (inputSchema.getTypes().size() == 1) { + return Schema.createUnion(newRecord); + } + return Schema.createUnion(Lists.newArrayList(Schema.create(Type.NULL), newRecord)); + } + + Schema newRecord = Schema.createRecord(inputSchema.getName(), inputSchema.getDoc(), inputSchema.getNamespace(), + inputSchema.isError()); newRecord.setFields(newFields); return newRecord; } + /** + * For the schema that is a UNION type with NULL and Record type, it provides Records type. + * @param inputSchema + * @return + */ + private static Schema getActualRecord(Schema inputSchema) { + if (Type.RECORD.equals(inputSchema.getType())) { + return inputSchema; + } + + Preconditions.checkArgument(Type.UNION.equals(inputSchema.getType()), "Nested schema is only support with either record or union type of null with record"); + Preconditions.checkArgument(inputSchema.getTypes().size() <= 2, + "For union type in nested record, it should only have NULL and Record type"); + + for (Schema inner : inputSchema.getTypes()) { + if (Type.NULL.equals(inner.getType())) { + continue; + } + Preconditions.checkArgument(Type.RECORD.equals(inner.getType()), "For union type in nested record, it should only have NULL and Record type"); + return inner; + + } + throw new IllegalArgumentException(inputSchema + " is not supported."); + } + private static TrieNode buildTrie(List<String> fqns) { TrieNode root = new TrieNode(null); for (String fqn : fqns) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java index 009bcc7..a2f71f5 100644 --- a/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java +++ b/gobblin-core/src/test/java/org/apache/gobblin/converter/filter/AvroFieldsPickConverterTest.java @@ -17,12 +17,18 @@ package org.apache.gobblin.converter.filter; +import java.io.File; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.WorkUnitState; import org.apache.gobblin.converter.SchemaConversionException; - import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.commons.io.FileUtils; import org.skyscreamer.jsonassert.JSONAssert; +import org.testng.Assert; import org.testng.annotations.Test; @Test(groups = { "gobblin.converter.filter" }) @@ -59,4 +65,33 @@ public class AvroFieldsPickConverterTest { JSONAssert.assertEquals(expected.toString(), converted.toString(), false); } } + + @Test + public void testFieldsPickWithNestedRecord() throws Exception { + Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/pickfields_nested_with_union.avsc")); + + WorkUnitState workUnitState = new WorkUnitState(); + workUnitState.setProp(ConfigurationKeys.CONVERTER_AVRO_FIELD_PICK_FIELDS, "name,favorite_number,nested1.nested1_string,nested1.nested2_union.nested2_string"); + + try (AvroFieldsPickConverter converter = new AvroFieldsPickConverter()) { + Schema convertedSchema = converter.convertSchema(inputSchema, workUnitState); + Schema expectedSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/converted_pickfields_nested_with_union.avsc")); + JSONAssert.assertEquals(expectedSchema.toString(), convertedSchema.toString(), false); + + try (DataFileReader<GenericRecord> srcDataFileReader = new DataFileReader<GenericRecord>( + new File(getClass().getResource("/converter/pickfields_nested_with_union.avro").toURI()), + new GenericDatumReader<GenericRecord>(inputSchema)); + DataFileReader<GenericRecord> expectedDataFileReader = new DataFileReader<GenericRecord>( + new File(getClass().getResource("/converter/converted_pickfields_nested_with_union.avro").toURI()), + new GenericDatumReader<GenericRecord>(expectedSchema));) { + + while (expectedDataFileReader.hasNext()) { + GenericRecord expected = expectedDataFileReader.next(); + GenericRecord actual = converter.convertRecord(convertedSchema, srcDataFileReader.next(), workUnitState).iterator().next(); + Assert.assertEquals(actual, expected); + } + Assert.assertTrue(!srcDataFileReader.hasNext()); + } + } + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro new file mode 100644 index 0000000..5d63a1a Binary files /dev/null and b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avro differ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc new file mode 100644 index 0000000..cdfc283 --- /dev/null +++ b/gobblin-core/src/test/resources/converter/converted_pickfields_nested_with_union.avsc @@ -0,0 +1,47 @@ +{ + "type": "record", + "name": "User", + "namespace": "example.avro", + "fields": [ + { + "name": "name", + "type": "string" + }, + { + "name": "favorite_number", + "type": [ + "int", + "null" + ] + }, + { + "name": "nested1", + "type": { + "type": "record", + "name": "dummy_nested1", + "fields": [ + { + "name": "nested1_string", + "type": "string" + }, + { + "name": "nested2_union", + "type": [ + "null", + { + "type": "record", + "name": "dummy_nested2", + "fields": [ + { + "name": "nested2_string", + "type": "string" + } + ] + } + ] + } + ] + } + } + ] +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro new file mode 100644 index 0000000..b6e8d63 Binary files /dev/null and b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avro differ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc ---------------------------------------------------------------------- diff --git a/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc new file mode 100644 index 0000000..bbe402d --- /dev/null +++ b/gobblin-core/src/test/resources/converter/pickfields_nested_with_union.avsc @@ -0,0 +1,33 @@ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]}, + {"name": "date_of_birth", "type": "long"}, + {"name": "last_modified", "type": "long"}, + {"name": "created", "type": "long"}, + {"name": "nested1", + "type" : { + "type": "record", + "name": "dummy_nested1", + "fields": [ + {"name": "nested1_string", "type": "string"}, + {"name": "nested1_int", "type": ["int", "null"]}, + {"name": "nested2_union", "type": ["null", { + "type" : "record", + "name" : "dummy_nested2", + "fields": [ + {"name": "nested2_string", "type": "string"}, + {"name": "nested2_int", "type": ["int", "null"]} + ] + } + ] + } + ] + } + } + ] +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java index b787de6..5b7e89d 100644 --- a/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java +++ b/gobblin-modules/gobblin-sql/src/main/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverter.java @@ -20,9 +20,12 @@ package org.apache.gobblin.converter.jdbc; import java.sql.Date; import java.sql.Time; import java.sql.Timestamp; +import java.util.HashMap; +import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Set; import org.apache.avro.Schema; @@ -36,6 +39,7 @@ import org.slf4j.LoggerFactory; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import com.google.common.base.Splitter; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Lists; @@ -68,6 +72,11 @@ import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory; public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, GenericRecord, JdbcEntryData> { public static final String CONVERTER_AVRO_JDBC_DATE_FIELDS = "converter.avro.jdbc.date_fields"; + private static final String AVRO_NESTED_COLUMN_DELIMITER = "."; + private static final String JDBC_FLATTENED_COLUMN_DELIMITER = "_"; + private static final String AVRO_NESTED_COLUMN_DELIMITER_REGEX_COMPATIBLE = "\\."; + private static final Splitter AVRO_RECORD_LEVEL_SPLITTER = Splitter.on(AVRO_NESTED_COLUMN_DELIMITER).omitEmptyStrings(); + private static final Logger LOG = LoggerFactory.getLogger(AvroToJdbcEntryConverter.class); private static final Map<Type, JdbcType> AVRO_TYPE_JDBC_TYPE_MAPPING = @@ -83,6 +92,7 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, ImmutableSet.<Type> builder() .addAll(AVRO_TYPE_JDBC_TYPE_MAPPING.keySet()) .add(Type.UNION) + .add(Type.RECORD) .build(); private static final Set<JdbcType> JDBC_SUPPORTED_TYPES = ImmutableSet.<JdbcType> builder() @@ -93,7 +103,7 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, .build(); private Optional<Map<String, String>> avroToJdbcColPairs = Optional.absent(); - private Optional<Map<String, String>> jdbcToAvroColPairs = Optional.absent(); + private Map<String, String> jdbcToAvroColPairs = new HashMap<>(); public AvroToJdbcEntryConverter() { super(); @@ -128,7 +138,6 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, jdbcToAvroBuilder.put(entry.getValue().getAsString(), entry.getKey()); } this.avroToJdbcColPairs = Optional.of((Map<String, String>) avroToJdbcBuilder.build()); - this.jdbcToAvroColPairs = Optional.of((Map<String, String>) jdbcToAvroBuilder.build()); } } return this; @@ -139,7 +148,7 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, * * Few precondition to the Avro schema * 1. Avro schema should have one entry type record at first depth. - * 2. Avro schema can recurse by having record inside record. As RDBMS structure is not recursive, this is not allowed. + * 2. Avro schema can recurse by having record inside record. * 3. Supported Avro primitive types and conversion * boolean --> java.lang.Boolean * int --> java.lang.Integer @@ -150,9 +159,9 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, * string --> java.lang.String * null: only allowed if it's within union (see complex types for more details) * 4. Supported Avro complex types - * Records: Only first level depth can have Records type. Basically converter will peel out Records type and start with 2nd level. + * Records: Supports nested record type as well. * Enum --> java.lang.String - * Unions --> Only allowed if it have one primitive type in it or null type with one primitive type where null will be ignored. + * Unions --> Only allowed if it have one primitive type in it, along with Record type, or null type with one primitive type where null will be ignored. * Once Union is narrowed down to one primitive type, it will follow conversion of primitive type above. * {@inheritDoc} * @@ -167,6 +176,10 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, @Override public JdbcEntrySchema convertSchema(Schema inputSchema, WorkUnitState workUnit) throws SchemaConversionException { LOG.info("Converting schema " + inputSchema); + Preconditions.checkArgument(Type.RECORD.equals(inputSchema.getType()), + "%s is expected for the first level element in Avro schema %s", + Type.RECORD, inputSchema); + Map<String, Type> avroColumnType = flatten(inputSchema); String jsonStr = Preconditions.checkNotNull(workUnit.getProp(CONVERTER_AVRO_JDBC_DATE_FIELDS)); java.lang.reflect.Type typeOfMap = new TypeToken<Map<String, JdbcType>>() {}.getType(); @@ -175,7 +188,8 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, List<JdbcEntryMetaDatum> jdbcEntryMetaData = Lists.newArrayList(); for (Map.Entry<String, Type> avroEntry : avroColumnType.entrySet()) { - String colName = tryConvertColumn(avroEntry.getKey(), this.avroToJdbcColPairs); + String colName = tryConvertAvroColNameToJdbcColName(avroEntry.getKey()); + JdbcType JdbcType = dateColumnMapping.get(colName); if (JdbcType == null) { JdbcType = AVRO_TYPE_JDBC_TYPE_MAPPING.get(avroEntry.getValue()); @@ -190,15 +204,36 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, return converted; } - private static String tryConvertColumn(String key, Optional<Map<String, String>> mapping) { - if (!mapping.isPresent()) { - return key; + /** + * Convert Avro column name to JDBC column name. If name mapping is defined, follow it. Otherwise, just return avro column name, + * while replacing nested column delimiter, dot, to underscore. + * This method also updates, mapping from JDBC column name to Avro column name for reverse look up. + * @param avroColName + * @return + */ + private String tryConvertAvroColNameToJdbcColName(String avroColName) { + if (!avroToJdbcColPairs.isPresent()) { + String converted = avroColName.replaceAll(AVRO_NESTED_COLUMN_DELIMITER_REGEX_COMPATIBLE, JDBC_FLATTENED_COLUMN_DELIMITER); + jdbcToAvroColPairs.put(converted, avroColName); + return converted; } - String converted = mapping.get().get(key); - return converted != null ? converted : key; + String converted = avroToJdbcColPairs.get().get(avroColName); + converted = converted != null ? converted : avroColName; + jdbcToAvroColPairs.put(converted, avroColName); + return converted; + } + + /** + * Provides JDBC column name based on Avro column name. It's a one liner method but contains knowledge on where the mapping is. + * @param colName + * @return + */ + private String convertJdbcColNameToAvroColName(String colName) { + return Preconditions.checkNotNull(jdbcToAvroColPairs.get(colName)); } + /** * Flattens Avro's (possibly recursive) structure and provides field name and type. * It assumes that the leaf level field name has unique name. @@ -208,41 +243,44 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, */ private static Map<String, Type> flatten(Schema schema) throws SchemaConversionException { Map<String, Type> flattened = new LinkedHashMap<>(); - if (!Type.RECORD.equals(schema.getType())) { - throw new SchemaConversionException( - Type.RECORD + " is expected for the first level element in Avro schema " + schema); - } + Schema recordSchema = determineType(schema); - for (Field f : schema.getFields()) { - produceFlattenedHelper(f.schema(), f, flattened); + Preconditions.checkArgument(Type.RECORD.equals(recordSchema.getType()), "%s is expected. Schema: %s", + Type.RECORD, recordSchema); + + for (Field f : recordSchema.getFields()) { + produceFlattenedHelper(f, flattened); } return flattened; } - private static void produceFlattenedHelper(Schema schema, Field field, Map<String, Type> flattened) + private static void produceFlattenedHelper(Field field, Map<String, Type> flattened) throws SchemaConversionException { - if (Type.RECORD.equals(schema.getType())) { - throw new SchemaConversionException(Type.RECORD + " is only allowed for first level."); + Schema actualSchema = determineType(field.schema()); + if (Type.RECORD.equals(actualSchema.getType())) { + Map<String, Type> map = flatten(actualSchema); + for (Entry<String, Type> entry : map.entrySet()) { + String key = String.format("%s" + AVRO_NESTED_COLUMN_DELIMITER + "%s", field.name(), entry.getKey()); + Type existing = flattened.put(key, entry.getValue()); + Preconditions.checkArgument(existing == null, "Duplicate name detected in Avro schema. Field: " + key); + } + return; } - Type t = determineType(schema); - if (field == null) { - throw new IllegalArgumentException("Invalid Avro schema, no name has been assigned to " + schema); - } - Type existing = flattened.put(field.name(), t); + Type existing = flattened.put(field.name(), actualSchema.getType()); if (existing != null) { //No duplicate name allowed when flattening (not considering name space we don't have any assumption between namespace and actual database field name) throw new SchemaConversionException("Duplicate name detected in Avro schema. " + field.name()); } } - private static Type determineType(Schema schema) throws SchemaConversionException { + private static Schema determineType(Schema schema) throws SchemaConversionException { if (!AVRO_SUPPORTED_TYPES.contains(schema.getType())) { throw new SchemaConversionException(schema.getType() + " is not supported"); } if (!Type.UNION.equals(schema.getType())) { - return schema.getType(); + return schema; } //For UNION, only supported avro type with NULL is allowed. @@ -251,20 +289,13 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, throw new SchemaConversionException("More than two types are not supported " + schemas); } - Type t = null; for (Schema s : schemas) { if (Type.NULL.equals(s.getType())) { continue; } - if (t == null) { - t = s.getType(); - } else { - throw new SchemaConversionException("Union type of " + schemas + " is not supported."); - } - } - if (t != null) { - return t; + return s; } + throw new SchemaConversionException("Cannot determine type of " + schema); } @@ -276,12 +307,14 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, } List<JdbcEntryDatum> jdbcEntryData = Lists.newArrayList(); for (JdbcEntryMetaDatum entry : outputSchema) { - final String colName = entry.getColumnName(); + final String jdbcColName = entry.getColumnName(); final JdbcType jdbcType = entry.getJdbcType(); - final Object val = record.get(tryConvertColumn(colName, this.jdbcToAvroColPairs)); + + String avroColName = convertJdbcColNameToAvroColName(jdbcColName); + final Object val = avroRecordValueGet(record, AVRO_RECORD_LEVEL_SPLITTER.split(avroColName).iterator()); if (val == null) { - jdbcEntryData.add(new JdbcEntryDatum(colName, null)); + jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, null)); continue; } @@ -291,35 +324,23 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, switch (jdbcType) { case VARCHAR: - jdbcEntryData.add(new JdbcEntryDatum(colName, val.toString())); + jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, val.toString())); continue; case INTEGER: case BOOLEAN: case BIGINT: case FLOAT: case DOUBLE: - jdbcEntryData.add(new JdbcEntryDatum(colName, val)); + jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, val)); continue; - // case BOOLEAN: - // jdbcEntryData.add(new JdbcEntryDatum(colName, Boolean.valueOf((boolean) val))); - // continue; - // case BIGINT: - // jdbcEntryData.add(new JdbcEntryDatum(colName, Long.valueOf((long) val))); - // continue; - // case FLOAT: - // jdbcEntryData.add(new JdbcEntryDatum(colName, Float.valueOf((float) val))); - // continue; - // case DOUBLE: - // jdbcEntryData.add(new JdbcEntryDatum(colName, Double.valueOf((double) val))); - // continue; case DATE: - jdbcEntryData.add(new JdbcEntryDatum(colName, new Date((long) val))); + jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, new Date((long) val))); continue; case TIME: - jdbcEntryData.add(new JdbcEntryDatum(colName, new Time((long) val))); + jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, new Time((long) val))); continue; case TIMESTAMP: - jdbcEntryData.add(new JdbcEntryDatum(colName, new Timestamp((long) val))); + jdbcEntryData.add(new JdbcEntryDatum(jdbcColName, new Timestamp((long) val))); continue; default: throw new DataConversionException(jdbcType + " is not supported"); @@ -332,6 +353,23 @@ public class AvroToJdbcEntryConverter extends Converter<Schema, JdbcEntrySchema, return new SingleRecordIterable<>(converted); } + private Object avroRecordValueGet(GenericRecord record, Iterator<String> recordNameIterator) { + String name = recordNameIterator.next(); + Object val = record.get(name); + if (val == null) { + //Either leaf value is null or nested Record (represented as UNION) is null + return null; + } + if (!recordNameIterator.hasNext()) { + //Leaf + return val; + } + + //Recurse + return avroRecordValueGet((GenericRecord) val, recordNameIterator); + } + + @Override public ConverterInitializer getInitializer(State state, WorkUnitStream workUnits, int branches, int branchId) { JdbcWriterCommandsFactory factory = new JdbcWriterCommandsFactory(); if (workUnits.isSafeToMaterialize()) { http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java index de1f0a3..a835d89 100644 --- a/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java +++ b/gobblin-modules/gobblin-sql/src/test/java/org/apache/gobblin/converter/jdbc/AvroToJdbcEntryConverterTest.java @@ -18,29 +18,50 @@ package org.apache.gobblin.converter.jdbc; import static org.mockito.Mockito.*; + import org.apache.gobblin.configuration.ConfigurationKeys; import org.apache.gobblin.configuration.State; import org.apache.gobblin.configuration.WorkUnitState; +import org.apache.gobblin.converter.DataConversionException; import org.apache.gobblin.converter.SchemaConversionException; import org.apache.gobblin.publisher.JdbcPublisher; import org.apache.gobblin.writer.Destination.DestinationType; import org.apache.gobblin.writer.commands.JdbcWriterCommands; import org.apache.gobblin.writer.commands.JdbcWriterCommandsFactory; +import java.io.File; import java.io.IOException; +import java.io.InputStreamReader; +import java.lang.reflect.Type; +import java.net.URISyntaxException; import java.sql.Connection; +import java.sql.Date; import java.sql.SQLException; +import java.sql.Time; +import java.sql.Timestamp; import java.util.ArrayList; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Objects; import org.apache.avro.Schema; +import org.apache.avro.file.DataFileReader; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; import org.testng.Assert; import org.testng.annotations.Test; import com.google.common.collect.Maps; import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import com.google.gson.JsonArray; +import com.google.gson.JsonElement; +import com.google.gson.JsonObject; +import com.google.gson.JsonParser; +import com.google.gson.JsonPrimitive; +import com.google.gson.JsonSerializationContext; +import com.google.gson.JsonSerializer; @Test(groups = {"gobblin.converter"}) public class AvroToJdbcEntryConverterTest { @@ -130,4 +151,104 @@ public class AvroToJdbcEntryConverterTest { Assert.assertEquals(expected, actual); } + + @Test + public void testFlattening() throws IOException, SchemaConversionException, SQLException, URISyntaxException, DataConversionException { + final String db = "db"; + final String table = "users"; + Map<String, JdbcType> dateColums = new HashMap<>(); + dateColums.put("date_of_birth", JdbcType.DATE); + dateColums.put("last_modified", JdbcType.TIME); + dateColums.put("created", JdbcType.TIMESTAMP); + + JdbcWriterCommands mockWriterCommands = mock(JdbcWriterCommands.class); + when(mockWriterCommands.retrieveDateColumns(db, table)).thenReturn(dateColums); + + JdbcWriterCommandsFactory factory = mock(JdbcWriterCommandsFactory.class); + when(factory.newInstance(any(State.class), any(Connection.class))).thenReturn(mockWriterCommands); + + List<JdbcEntryMetaDatum> jdbcEntryMetaData = new ArrayList<>(); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("name", JdbcType.VARCHAR)); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("favorite_number", JdbcType.VARCHAR)); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("favorite_color", JdbcType.VARCHAR)); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("date_of_birth", JdbcType.DATE)); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("last_modified", JdbcType.TIME)); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("created", JdbcType.TIMESTAMP)); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested1_string", JdbcType.VARCHAR)); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested1_int", JdbcType.INTEGER)); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested2_union_nested2_string", JdbcType.VARCHAR)); + jdbcEntryMetaData.add(new JdbcEntryMetaDatum("nested1_nested2_union_nested2_int", JdbcType.INTEGER)); + JdbcEntrySchema expected = new JdbcEntrySchema(jdbcEntryMetaData); + + Schema inputSchema = new Schema.Parser().parse(getClass().getResourceAsStream("/converter/pickfields_nested_with_union.avsc")); + WorkUnitState workUnitState = new WorkUnitState(); + workUnitState.appendToListProp(JdbcPublisher.JDBC_PUBLISHER_FINAL_TABLE_NAME, table); + AvroToJdbcEntryConverter converter = new AvroToJdbcEntryConverter(workUnitState); + + Map<String, JdbcType> dateColumnMapping = Maps.newHashMap(); + dateColumnMapping.put("date_of_birth", JdbcType.DATE); + dateColumnMapping.put("last_modified", JdbcType.TIME); + dateColumnMapping.put("created", JdbcType.TIMESTAMP); + workUnitState.appendToListProp(AvroToJdbcEntryConverter.CONVERTER_AVRO_JDBC_DATE_FIELDS, + new Gson().toJson(dateColumnMapping)); + + JdbcEntrySchema actualSchema = converter.convertSchema(inputSchema, workUnitState); + Assert.assertEquals(expected, actualSchema); + + try ( + DataFileReader<GenericRecord> srcDataFileReader = + new DataFileReader<GenericRecord>(new File(getClass().getResource( + "/converter/pickfields_nested_with_union.avro").toURI()), new GenericDatumReader<GenericRecord>( + inputSchema))) { + + List<JdbcEntryData> entries = new ArrayList<>(); + while (srcDataFileReader.hasNext()) { + JdbcEntryData actualData = converter.convertRecord(actualSchema, srcDataFileReader.next(), workUnitState).iterator().next(); + entries.add(actualData); + } + + final JsonSerializer<JdbcEntryDatum> datumSer = new JsonSerializer<JdbcEntryDatum>() { + @Override + public JsonElement serialize(JdbcEntryDatum datum, Type typeOfSrc, JsonSerializationContext context) { + JsonObject jso = new JsonObject(); + if (datum.getVal() == null) { + jso.add(datum.getColumnName(), null); + return jso; + } + + if (datum.getVal() instanceof Date) { + jso.addProperty(datum.getColumnName(), ((Date) datum.getVal()).getTime()); + } else if (datum.getVal() instanceof Timestamp) { + jso.addProperty(datum.getColumnName(), ((Timestamp) datum.getVal()).getTime()); + } else if (datum.getVal() instanceof Time) { + jso.addProperty(datum.getColumnName(), ((Time) datum.getVal()).getTime()); + } else { + jso.addProperty(datum.getColumnName(), datum.getVal().toString()); + } + return jso; + } + }; + + JsonSerializer<JdbcEntryData> serializer = new JsonSerializer<JdbcEntryData>() { + @Override + public JsonElement serialize(JdbcEntryData src, Type typeOfSrc, JsonSerializationContext context) { + JsonArray arr = new JsonArray(); + for (JdbcEntryDatum datum : src) { + arr.add(datumSer.serialize(datum, datum.getClass(), context)); + } + return arr; + } + }; + + Gson gson = new GsonBuilder().registerTypeAdapter(JdbcEntryData.class, serializer).serializeNulls().create(); + + JsonElement actualSerialized = gson.toJsonTree(entries); + JsonElement expectedSerialized = + new JsonParser().parse(new InputStreamReader(getClass().getResourceAsStream("/converter/pickfields_nested_with_union.json"))); + + Assert.assertEquals(actualSerialized, expectedSerialized); + } + + converter.close(); + } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro new file mode 100644 index 0000000..b6e8d63 Binary files /dev/null and b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avro differ http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc new file mode 100644 index 0000000..bbe402d --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.avsc @@ -0,0 +1,33 @@ +{"namespace": "example.avro", + "type": "record", + "name": "User", + "fields": [ + {"name": "name", "type": "string"}, + {"name": "favorite_number", "type": ["int", "null"]}, + {"name": "favorite_color", "type": ["string", "null"]}, + {"name": "date_of_birth", "type": "long"}, + {"name": "last_modified", "type": "long"}, + {"name": "created", "type": "long"}, + {"name": "nested1", + "type" : { + "type": "record", + "name": "dummy_nested1", + "fields": [ + {"name": "nested1_string", "type": "string"}, + {"name": "nested1_int", "type": ["int", "null"]}, + {"name": "nested2_union", "type": ["null", { + "type" : "record", + "name" : "dummy_nested2", + "fields": [ + {"name": "nested2_string", "type": "string"}, + {"name": "nested2_int", "type": ["int", "null"]} + ] + } + ] + } + ] + } + } + ] +} + http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/c6b3824a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json ---------------------------------------------------------------------- diff --git a/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json new file mode 100644 index 0000000..42db8d6 --- /dev/null +++ b/gobblin-modules/gobblin-sql/src/test/resources/converter/pickfields_nested_with_union.json @@ -0,0 +1,194 @@ +[ + [ + { + "created":-8205952289873597770 + }, + { + "date_of_birth":4924540963523509391 + }, + { + "favorite_color":"�hxqpwvxrf" + }, + { + "favorite_number":"826032918" + }, + { + "last_modified":5532738755424028468 + }, + { + "name":"btnzlrfptk" + }, + { + "nested1_nested1_int":"-36788251" + }, + { + "nested1_nested1_string":"gji\u001a\u0000sujkj" + }, + { + "nested1_nested2_union_nested2_int":"1026040670" + }, + { + "nested1_nested2_union_nested2_string":"yobzdadkgk" + } + ], + [ + { + "created":-1393624224378683129 + }, + { + "date_of_birth":-1175814878817216371 + }, + { + "favorite_color":"x\n\tjhrpgyd" + }, + { + "favorite_number":"1816171539" + }, + { + "last_modified":8404219756951923781 + }, + { + "name":"\u000f\rpkgrlgio" + }, + { + "nested1_nested1_int":"-50507635" + }, + { + "nested1_nested1_string":"q\u000f?uspbscf" + }, + { + "nested1_nested2_union_nested2_int":null + }, + { + "nested1_nested2_union_nested2_string":null + } + ], + [ + { + "created":-7739579554682470032 + }, + { + "date_of_birth":-607816151590576707 + }, + { + "favorite_color":"lyuuuympyg" + }, + { + "favorite_number":"1866476467" + }, + { + "last_modified":-941580389512399179 + }, + { + "name":"g?pbkpjrxh" + }, + { + "nested1_nested1_int":"1327904823" + }, + { + "nested1_nested1_string":"kxqmrenntu" + }, + { + "nested1_nested2_union_nested2_int":null + }, + { + "nested1_nested2_union_nested2_string":null + } + ], + [ + { + "created":-6056615843637407776 + }, + { + "date_of_birth":-1429852167829293715 + }, + { + "favorite_color":"�dgasutgtx" + }, + { + "favorite_number":"-1553608691" + }, + { + "last_modified":450932180461066816 + }, + { + "name":"gqmcmimbhp" + }, + { + "nested1_nested1_int":"-351781782" + }, + { + "nested1_nested1_string":"o\u001ac\u0000bmefwh" + }, + { + "nested1_nested2_union_nested2_int":"-1596923241" + }, + { + "nested1_nested2_union_nested2_string":"dbcczapozw" + } + ], + [ + { + "created":-4666560421015124885 + }, + { + "date_of_birth":-8070729844977385755 + }, + { + "favorite_color":"pfzharskmy" + }, + { + "favorite_number":"-170051651" + }, + { + "last_modified":-7703151747036814734 + }, + { + "name":"f\u000fwszbxhzm" + }, + { + "nested1_nested1_int":"-1126087353" + }, + { + "nested1_nested1_string":"mjwmnevxer" + }, + { + "nested1_nested2_union_nested2_int":"-1722304492" + }, + { + "nested1_nested2_union_nested2_string":"h\rwdawizsu" + } + ], + [ + { + "created":6548727010966246856 + }, + { + "date_of_birth":8554093846897734514 + }, + { + "favorite_color":"\u000fcgsqjdabu" + }, + { + "favorite_number":"-2132346518" + }, + { + "last_modified":3298280474340398482 + }, + { + "name":"k\n\tngmvhpe" + }, + { + "nested1_nested1_int":"-1330607161" + }, + { + "nested1_nested1_string":"ubbhpssdeh" + }, + { + "nested1_nested2_union_nested2_int":"992907867" + }, + { + "nested1_nested2_union_nested2_string":"jk?jknvxkw" + } + ] +] \ No newline at end of file
