This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new c944818 [GOBBLIN-704] Add serde attributes for orc
c944818 is described below
commit c94481828381a52b0ec7ca06c0d95f2b19c88297
Author: autumnust <[email protected]>
AuthorDate: Wed Mar 20 15:01:09 2019 -0700
[GOBBLIN-704] Add serde attributes for orc
Closes #2574 from
autumnust/addSerDeAttributeForORc
---
.../CompactionCompleteFileOperationAction.java | 2 +-
.../gobblin/hive/orc/HiveOrcSerDeManager.java | 35 +++++++++++++-
.../hive/metastore/HiveMetaStoreUtilsTest.java | 54 ++++++++++++++++++++++
.../gobblin/hive/orc/HiveOrcSerDeManagerTest.java | 12 ++++-
4 files changed, 99 insertions(+), 4 deletions(-)
diff --git
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index eef8474..e4fb747 100644
---
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -69,7 +69,7 @@ public class CompactionCompleteFileOperationAction implements
CompactionComplete
}
/**
- * Replace or append the destination folder with new avro files from
map-reduce job
+ * Replace or append the destination folder with new files from map-reduce
job
* Create a record count file containing the number of records that have
been processed .
*/
public void onCompactionJobComplete (FileSystemDataset dataset) throws
IOException {
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
index 981beb6..436420c 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
import java.util.Collections;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
@@ -31,11 +32,14 @@ import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.hive.ql.io.orc.OrcFile;
import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import com.codahale.metrics.Timer;
import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableList;
@@ -236,9 +240,36 @@ public class HiveOrcSerDeManager extends HiveSerDeManager {
private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit)
throws IOException {
Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(),
path + " is not a directory.");
-
try (Timer.Context context =
metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) {
- hiveUnit.setSerDeProp(SCHEMA_LITERAL, getSchemaFromLatestFile(path,
this.fs).toString());
+ addSchemaPropertiesHelper(path, hiveUnit);
+ }
+ }
+
+ /**
+ * Extensible if there's other source-of-truth for fetching schema instead
of interacting with HDFS.
+ *
+ * For purpose of initializing {@link
org.apache.hadoop.hive.ql.io.orc.OrcSerde} object, it will require:
+ * org.apache.hadoop.hive.serde.serdeConstants#LIST_COLUMNS and
+ * org.apache.hadoop.hive.serde.serdeConstants#LIST_COLUMN_TYPES
+ *
+ * Keeping {@link #SCHEMA_LITERAL} will be a nice-to-have thing but not
actually necessary in terms of functionality.
+ */
+ protected void addSchemaPropertiesHelper(Path path, HiveRegistrationUnit
hiveUnit) throws IOException {
+ TypeInfo schema = getSchemaFromLatestFile(path, this.fs);
+ if (schema instanceof StructTypeInfo) {
+ StructTypeInfo structTypeInfo = (StructTypeInfo) schema;
+
+ hiveUnit.setSerDeProp(SCHEMA_LITERAL, schema);
+ hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMNS,
+ Joiner.on(",").join(structTypeInfo.getAllStructFieldNames()));
+ hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMN_TYPES,
+ Joiner.on(",").join(
+ structTypeInfo.getAllStructFieldTypeInfos().stream().map(x ->
x.getTypeName())
+ .collect(Collectors.toList())));
+ } else {
+ // Hive always uses a struct with a field for each of the top-level
columns as the root object type.
+ // So for here we assume to-be-registered ORC files follow this pattern.
+ throw new IllegalStateException("A valid ORC schema should be an
instance of struct");
}
}
}
diff --git
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
index 9e3cdd6..38d5c65 100644
---
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
+++
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
import org.testng.Assert;
import org.testng.annotations.Test;
@@ -72,6 +75,57 @@ public class HiveMetaStoreUtilsTest {
}
@Test
+ public void testGetTableOrc() {
+ final String databaseName = "db";
+ final String tableName = "tbl";
+ HiveTable.Builder builder = new HiveTable.Builder();
+ builder.withDbName(databaseName).withTableName(tableName);
+
+ HiveTable hiveTable = builder.build();
+
+ // SerDe props are
+ State serdeProps = new State();
+ serdeProps.setProp("columns", "timestamp,namespace,name,metadata");
+ serdeProps.setProp("columns.types",
"bigint,string,string,map<string,string>");
+
+ hiveTable.getProps().addAll(serdeProps);
+
+ hiveTable.setInputFormat(OrcInputFormat.class.getName());
+ hiveTable.setOutputFormat(OrcOutputFormat.class.getName());
+ hiveTable.setSerDeType(OrcSerde.class.getName());
+
+ Table table = HiveMetaStoreUtils.getTable(hiveTable);
+ Assert.assertEquals(table.getDbName(), databaseName);
+ Assert.assertEquals(table.getTableName(), tableName);
+
+ StorageDescriptor sd = table.getSd();
+ Assert.assertEquals(sd.getInputFormat(), OrcInputFormat.class.getName());
+ Assert.assertEquals(sd.getOutputFormat(), OrcOutputFormat.class.getName());
+ Assert.assertNotNull(sd.getSerdeInfo());
+ Assert.assertEquals(sd.getSerdeInfo().getSerializationLib(),
OrcSerde.class.getName());
+
+ // verify column name
+ List<FieldSchema> fields = sd.getCols();
+ Assert.assertTrue(fields != null && fields.size() == 4);
+ FieldSchema fieldA = fields.get(0);
+ Assert.assertEquals(fieldA.getName(), "timestamp");
+ Assert.assertEquals(fieldA.getType(), "bigint");
+
+ FieldSchema fieldB = fields.get(1);
+ Assert.assertEquals(fieldB.getName(), "namespace");
+ Assert.assertEquals(fieldB.getType(), "string");
+
+ FieldSchema fieldC = fields.get(2);
+ Assert.assertEquals(fieldC.getName(), "name");
+ Assert.assertEquals(fieldC.getType(), "string");
+
+
+ FieldSchema fieldD = fields.get(3);
+ Assert.assertEquals(fieldD.getName(), "metadata");
+ Assert.assertEquals(fieldD.getType(), "map<string,string>");
+ }
+
+ @Test
public void testGetTableAvroInvalidSchema() {
final String databaseName = "testdb";
final String tableName = "testtable";
diff --git
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
index 032239f..4ffa738 100644
---
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
+++
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
@@ -20,10 +20,13 @@ package org.apache.gobblin.hive.orc;
import java.io.FileNotFoundException;
import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
@@ -66,7 +69,7 @@ public class HiveOrcSerDeManagerTest {
}
/**
- * Test that the schema is written to the schema literal
+ * Test that the schema is written to the schema literal and attributes
required for initializing orc serde object.
*/
@Test
public void testOrcSchemaLiteral() throws IOException {
@@ -78,6 +81,13 @@ public class HiveOrcSerDeManagerTest {
Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveOrcSerDeManager.SCHEMA_LITERAL).contains(
"name:string,timestamp:bigint"));
+
+ List<String> columns =
Arrays.asList(registrationUnit.getSerDeProps().getProp(serdeConstants.LIST_COLUMNS).split(","));
+ Assert.assertTrue(columns.get(0).equals("name"));
+ Assert.assertTrue(columns.get(1).equals("timestamp"));
+ List<String> columnTypes =
Arrays.asList(registrationUnit.getSerDeProps().getProp(serdeConstants.LIST_COLUMN_TYPES).split(","));
+ Assert.assertTrue(columnTypes.get(0).equals("string"));
+ Assert.assertTrue(columnTypes.get(1).equals("bigint"));
}
/**