This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new c944818  [GOBBLIN-704] Add serde attributes for orc
c944818 is described below

commit c94481828381a52b0ec7ca06c0d95f2b19c88297
Author: autumnust <[email protected]>
AuthorDate: Wed Mar 20 15:01:09 2019 -0700

    [GOBBLIN-704] Add serde attributes for orc
    
    Closes #2574 from
    autumnust/addSerDeAttributeForORc
---
 .../CompactionCompleteFileOperationAction.java     |  2 +-
 .../gobblin/hive/orc/HiveOrcSerDeManager.java      | 35 +++++++++++++-
 .../hive/metastore/HiveMetaStoreUtilsTest.java     | 54 ++++++++++++++++++++++
 .../gobblin/hive/orc/HiveOrcSerDeManagerTest.java  | 12 ++++-
 4 files changed, 99 insertions(+), 4 deletions(-)

diff --git 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
index eef8474..e4fb747 100644
--- 
a/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
+++ 
b/gobblin-compaction/src/main/java/org/apache/gobblin/compaction/action/CompactionCompleteFileOperationAction.java
@@ -69,7 +69,7 @@ public class CompactionCompleteFileOperationAction implements 
CompactionComplete
   }
 
   /**
-   * Replace or append the destination folder with new avro files from 
map-reduce job
+   * Replace or append the destination folder with new files from map-reduce 
job
    * Create a record count file containing the number of records that have 
been processed .
    */
   public void onCompactionJobComplete (FileSystemDataset dataset) throws 
IOException {
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
index 981beb6..436420c 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
@@ -23,6 +23,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 
+import java.util.stream.Collectors;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
@@ -31,11 +32,14 @@ import org.apache.hadoop.fs.PathFilter;
 import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.serde.serdeConstants;
+import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 
 import com.codahale.metrics.Timer;
 import com.google.common.base.Charsets;
+import com.google.common.base.Joiner;
 import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
@@ -236,9 +240,36 @@ public class HiveOrcSerDeManager extends HiveSerDeManager {
   private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit)
       throws IOException {
     Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(), 
path + " is not a directory.");
-
     try (Timer.Context context = 
metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) {
-      hiveUnit.setSerDeProp(SCHEMA_LITERAL, getSchemaFromLatestFile(path, 
this.fs).toString());
+      addSchemaPropertiesHelper(path, hiveUnit);
+    }
+  }
+
+  /**
+   * Extensible if there's other source-of-truth for fetching schema instead 
of interacting with HDFS.
+   *
+   * For purpose of initializing {@link 
org.apache.hadoop.hive.ql.io.orc.OrcSerde} object, it will require:
+   * org.apache.hadoop.hive.serde.serdeConstants#LIST_COLUMNS and
+   * org.apache.hadoop.hive.serde.serdeConstants#LIST_COLUMN_TYPES
+   *
+   * Keeping {@link #SCHEMA_LITERAL} will be a nice-to-have thing but not 
actually necessary in terms of functionality.
+   */
+  protected void addSchemaPropertiesHelper(Path path, HiveRegistrationUnit 
hiveUnit) throws IOException {
+    TypeInfo schema = getSchemaFromLatestFile(path, this.fs);
+    if (schema instanceof StructTypeInfo) {
+      StructTypeInfo structTypeInfo = (StructTypeInfo) schema;
+
+      hiveUnit.setSerDeProp(SCHEMA_LITERAL, schema);
+      hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMNS,
+          Joiner.on(",").join(structTypeInfo.getAllStructFieldNames()));
+      hiveUnit.setSerDeProp(serdeConstants.LIST_COLUMN_TYPES,
+          Joiner.on(",").join(
+              structTypeInfo.getAllStructFieldTypeInfos().stream().map(x -> 
x.getTypeName())
+                  .collect(Collectors.toList())));
+    } else {
+      // Hive always uses a struct with a field for each of the top-level 
columns as the root object type.
+      // So for here we assume to-be-registered ORC files follow this pattern.
+      throw new IllegalStateException("A valid ORC schema should be an 
instance of struct");
     }
   }
 }
diff --git 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
index 9e3cdd6..38d5c65 100644
--- 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
+++ 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/metastore/HiveMetaStoreUtilsTest.java
@@ -26,6 +26,9 @@ import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.ql.io.avro.AvroContainerInputFormat;
 import org.apache.hadoop.hive.ql.io.avro.AvroContainerOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
 import org.apache.hadoop.hive.serde2.avro.AvroSerDe;
 import org.testng.Assert;
 import org.testng.annotations.Test;
@@ -72,6 +75,57 @@ public class HiveMetaStoreUtilsTest {
   }
 
   @Test
+  public void testGetTableOrc() {
+    final String databaseName = "db";
+    final String tableName = "tbl";
+    HiveTable.Builder builder = new HiveTable.Builder();
+    builder.withDbName(databaseName).withTableName(tableName);
+
+    HiveTable hiveTable = builder.build();
+
+    // SerDe props are
+    State serdeProps = new State();
+    serdeProps.setProp("columns", "timestamp,namespace,name,metadata");
+    serdeProps.setProp("columns.types", 
"bigint,string,string,map<string,string>");
+
+    hiveTable.getProps().addAll(serdeProps);
+
+    hiveTable.setInputFormat(OrcInputFormat.class.getName());
+    hiveTable.setOutputFormat(OrcOutputFormat.class.getName());
+    hiveTable.setSerDeType(OrcSerde.class.getName());
+
+    Table table = HiveMetaStoreUtils.getTable(hiveTable);
+    Assert.assertEquals(table.getDbName(), databaseName);
+    Assert.assertEquals(table.getTableName(), tableName);
+
+    StorageDescriptor sd = table.getSd();
+    Assert.assertEquals(sd.getInputFormat(), OrcInputFormat.class.getName());
+    Assert.assertEquals(sd.getOutputFormat(), OrcOutputFormat.class.getName());
+    Assert.assertNotNull(sd.getSerdeInfo());
+    Assert.assertEquals(sd.getSerdeInfo().getSerializationLib(), 
OrcSerde.class.getName());
+
+    // verify column name
+    List<FieldSchema> fields = sd.getCols();
+    Assert.assertTrue(fields != null && fields.size() == 4);
+    FieldSchema fieldA = fields.get(0);
+    Assert.assertEquals(fieldA.getName(), "timestamp");
+    Assert.assertEquals(fieldA.getType(), "bigint");
+
+    FieldSchema fieldB = fields.get(1);
+    Assert.assertEquals(fieldB.getName(), "namespace");
+    Assert.assertEquals(fieldB.getType(), "string");
+
+    FieldSchema fieldC = fields.get(2);
+    Assert.assertEquals(fieldC.getName(), "name");
+    Assert.assertEquals(fieldC.getType(), "string");
+
+
+    FieldSchema fieldD = fields.get(3);
+    Assert.assertEquals(fieldD.getName(), "metadata");
+    Assert.assertEquals(fieldD.getType(), "map<string,string>");
+  }
+
+  @Test
   public void testGetTableAvroInvalidSchema() {
     final String databaseName = "testdb";
     final String tableName = "testtable";
diff --git 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
index 032239f..4ffa738 100644
--- 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
+++ 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
@@ -20,10 +20,13 @@ package org.apache.gobblin.hive.orc;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 
+import java.util.Arrays;
+import java.util.List;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde.serdeConstants;
 import org.apache.hadoop.hive.serde2.SerDeException;
 import org.testng.Assert;
 import org.testng.annotations.AfterClass;
@@ -66,7 +69,7 @@ public class HiveOrcSerDeManagerTest {
   }
 
   /**
-   * Test that the schema is written to the schema literal
+   * Test that the schema is written to the schema literal and attributes 
required for initializing orc serde object.
    */
   @Test
   public void testOrcSchemaLiteral() throws IOException {
@@ -78,6 +81,13 @@ public class HiveOrcSerDeManagerTest {
 
     
Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveOrcSerDeManager.SCHEMA_LITERAL).contains(
         "name:string,timestamp:bigint"));
+
+    List<String> columns = 
Arrays.asList(registrationUnit.getSerDeProps().getProp(serdeConstants.LIST_COLUMNS).split(","));
+    Assert.assertTrue(columns.get(0).equals("name"));
+    Assert.assertTrue(columns.get(1).equals("timestamp"));
+    List<String> columnTypes = 
Arrays.asList(registrationUnit.getSerDeProps().getProp(serdeConstants.LIST_COLUMN_TYPES).split(","));
+    Assert.assertTrue(columnTypes.get(0).equals("string"));
+    Assert.assertTrue(columnTypes.get(1).equals("bigint"));
   }
 
   /**

Reply via email to