Repository: incubator-gobblin Updated Branches: refs/heads/master a14c08e28 -> 55bf7a42b
[GOBBLIN-485] AvroSchemaManager does not support using schema generated from Hive columns[] Closes #2355 from erwa/avroschemamanager-support- schema-from-cols Project: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/commit/55bf7a42 Tree: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/tree/55bf7a42 Diff: http://git-wip-us.apache.org/repos/asf/incubator-gobblin/diff/55bf7a42 Branch: refs/heads/master Commit: 55bf7a42bce374445e6a78a102ea25c6efa7883e Parents: a14c08e Author: Anthony Hsu <[email protected]> Authored: Thu May 3 14:25:09 2018 -0700 Committer: Hung Tran <[email protected]> Committed: Thu May 3 14:25:09 2018 -0700 ---------------------------------------------------------------------- .gitignore | 1 + .../conversion/hive/avro/AvroSchemaManager.java | 23 +++++-- .../hive/avro/AvroSchemaManagerTest.java | 70 ++++++++++++++++++++ .../avroSchemaManagerTest/expectedSchema.avsc | 1 + .../java/org/apache/gobblin/util/AvroUtils.java | 2 +- 5 files changed, 92 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55bf7a42/.gitignore ---------------------------------------------------------------------- diff --git a/.gitignore b/.gitignore index b23aa66..4982e54 100644 --- a/.gitignore +++ b/.gitignore @@ -21,6 +21,7 @@ **/build/ .gradle **/.gradle +gradle.properties.release test-output **/test-output dist http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55bf7a42/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/avro/AvroSchemaManager.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/avro/AvroSchemaManager.java b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/avro/AvroSchemaManager.java index 4d99825..bf00df3 100644 --- a/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/avro/AvroSchemaManager.java +++ b/gobblin-data-management/src/main/java/org/apache/gobblin/data/management/conversion/hive/avro/AvroSchemaManager.java @@ -20,7 +20,9 @@ import java.io.IOException; import java.net.URI; import java.net.URISyntaxException; import java.nio.charset.StandardCharsets; +import java.util.List; import java.util.Map; +import java.util.stream.Collectors; import lombok.extern.slf4j.Slf4j; import org.apache.avro.Schema; @@ -28,9 +30,13 @@ import org.apache.commons.io.IOUtils; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.ql.metadata.Partition; import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde2.avro.TypeInfoToSchema; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import com.google.common.base.Preconditions; import com.google.common.collect.Maps; @@ -127,8 +133,9 @@ public class AvroSchemaManager { String schemaString = StringUtils.EMPTY; try { // Try to fetch from SCHEMA URL - if (sd.getSerdeInfo().getParameters().containsKey(HiveAvroSerDeManager.SCHEMA_URL)) { - String schemaUrl = sd.getSerdeInfo().getParameters().get(HiveAvroSerDeManager.SCHEMA_URL); + Map<String,String> serdeParameters = sd.getSerdeInfo().getParameters(); + if (serdeParameters != null && serdeParameters.containsKey(HiveAvroSerDeManager.SCHEMA_URL)) { + String schemaUrl = serdeParameters.get(HiveAvroSerDeManager.SCHEMA_URL); if (schemaUrl.startsWith("http")) { // Fetch schema literal via HTTP GET if scheme is http(s) schemaString = IOUtils.toString(new URI(schemaUrl), StandardCharsets.UTF_8); @@ -142,12 +149,20 @@ public class AvroSchemaManager { } } // Try to fetch from SCHEMA LITERAL - else if (sd.getSerdeInfo().getParameters().containsKey(HiveAvroSerDeManager.SCHEMA_LITERAL)) { - schemaString = sd.getSerdeInfo().getParameters().get(HiveAvroSerDeManager.SCHEMA_LITERAL); + else if (serdeParameters != null && serdeParameters.containsKey(HiveAvroSerDeManager.SCHEMA_LITERAL)) { + schemaString = serdeParameters.get(HiveAvroSerDeManager.SCHEMA_LITERAL); log.debug("Schema string is: " + schemaString); Schema schema = HiveAvroORCQueryGenerator.readSchemaFromString(schemaString); return getOrGenerateSchemaFile(schema); + } else { // Generate schema form Hive schema + List<FieldSchema> fields = sd.getCols(); + List<String> colNames = fields.stream().map(fs -> fs.getName()).collect(Collectors.toList()); + List<TypeInfo> typeInfos = fields.stream().map(fs -> TypeInfoUtils.getTypeInfoFromTypeString(fs.getType())) + .collect(Collectors.toList()); + List<String> comments = fields.stream().map(fs -> fs.getComment()).collect(Collectors.toList()); + Schema schema = new TypeInfoToSchema().convert(colNames, typeInfos, comments, null, null, null); + return getOrGenerateSchemaFile(schema); } } catch (URISyntaxException e) { log.error(String.format("Failed to parse schema from schema string. Falling back to HDFS schema: %s", http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55bf7a42/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/avro/AvroSchemaManagerTest.java ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/avro/AvroSchemaManagerTest.java b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/avro/AvroSchemaManagerTest.java new file mode 100644 index 0000000..c28a3d3 --- /dev/null +++ b/gobblin-data-management/src/test/java/org/apache/gobblin/data/management/conversion/hive/avro/AvroSchemaManagerTest.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.gobblin.data.management.conversion.hive.avro; + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Paths; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.metastore.api.FieldSchema; +import org.apache.hadoop.hive.metastore.api.SerDeInfo; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; +import org.apache.hadoop.hive.ql.metadata.HiveException; +import org.apache.hadoop.hive.ql.metadata.Partition; +import org.apache.hadoop.hive.ql.metadata.Table; +import org.apache.hadoop.hive.serde2.avro.AvroSerDe; +import org.testng.Assert; +import org.testng.annotations.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Lists; + +import org.apache.gobblin.configuration.ConfigurationKeys; +import org.apache.gobblin.configuration.State; + + +public class AvroSchemaManagerTest { + @Test + public void testGetSchemaFromUrlUsingHiveSchema() throws IOException, HiveException { + FileSystem fs = FileSystem.getLocal(new Configuration()); + + String jobId = "123"; + State state = new State(); + state.setProp(ConfigurationKeys.JOB_ID_KEY, jobId); + + AvroSchemaManager asm = new AvroSchemaManager(fs, state); + Partition partition = getTestPartition(new Table("testDb", "testTable")); + Path schemaPath = asm.getSchemaUrl(partition); + + String actualSchema = fs.open(schemaPath).readUTF(); + String expectedSchema = new String(Files.readAllBytes( + Paths.get(getClass().getClassLoader().getResource("avroSchemaManagerTest/expectedSchema.avsc").getFile()))); + Assert.assertEquals(actualSchema, expectedSchema); + } + + private Partition getTestPartition(Table table) throws HiveException { + Partition partition = new Partition(table, ImmutableMap.of("partition_key", "1"), null); + StorageDescriptor sd = new StorageDescriptor(); + sd.setSerdeInfo(new SerDeInfo("avro", AvroSerDe.class.getName(), null)); + sd.setCols(Lists.newArrayList(new FieldSchema("foo", "int", null))); + partition.getTPartition().setSd(sd); + return partition; + } +} http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55bf7a42/gobblin-data-management/src/test/resources/avroSchemaManagerTest/expectedSchema.avsc ---------------------------------------------------------------------- diff --git a/gobblin-data-management/src/test/resources/avroSchemaManagerTest/expectedSchema.avsc b/gobblin-data-management/src/test/resources/avroSchemaManagerTest/expectedSchema.avsc new file mode 100644 index 0000000..976ef7d --- /dev/null +++ b/gobblin-data-management/src/test/resources/avroSchemaManagerTest/expectedSchema.avsc @@ -0,0 +1 @@ +{"type":"record","name":"baseRecord","fields":[{"name":"foo","type":["null","int"],"default":null}]} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/incubator-gobblin/blob/55bf7a42/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java ---------------------------------------------------------------------- diff --git a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java index d09a6d9..4226497 100644 --- a/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java +++ b/gobblin-utility/src/main/java/org/apache/gobblin/util/AvroUtils.java @@ -395,7 +395,7 @@ public class AvroUtils { } try (DataOutputStream dos = fs.create(filePath)) { - dos.writeChars(schema.toString()); + dos.writeUTF(schema.toString()); } fs.setPermission(filePath, perm); }
