This is an automated email from the ASF dual-hosted git repository.

hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git


The following commit(s) were added to refs/heads/master by this push:
     new 81ea169  [GOBBLIN-693] Add ORC hive serde manager
81ea169 is described below

commit 81ea16963f63e3fcda77221f655accb1f5406d34
Author: Hung Tran <[email protected]>
AuthorDate: Tue Mar 12 17:05:13 2019 -0700

    [GOBBLIN-693] Add ORC hive serde manager
    
    Closes #2565 from htran1/orc_hive_reg
---
 .../gobblin/binary_creation/OrcTestTools.java      |  12 +-
 gobblin-hive-registration/build.gradle             |   1 +
 .../org/apache/gobblin/hive/HiveSerDeManager.java  |   9 +-
 .../gobblin/hive/orc/HiveOrcSerDeManager.java      | 244 +++++++++++++++++++++
 .../gobblin/hive/orc/HiveOrcSerDeManagerTest.java  | 153 +++++++++++++
 .../src/test/resources/avro_input/input.json       |   5 +
 .../src/test/resources/avro_input/schema.avsc      |  10 +
 7 files changed, 425 insertions(+), 9 deletions(-)

diff --git 
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
index 07c46ab..465fb89 100644
--- 
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
+++ 
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
@@ -184,7 +184,7 @@ public class OrcTestTools extends 
DataTestTools<OrcTestTools.OrcRowIterator, Typ
   @Override
   public TypeInfo writeJsonResourceRecordsAsBinary(String baseResource, 
@Nullable FileSystem fs, Path targetPath,
       @Nullable TypeInfo schema) throws IOException {
-    TreeMap<String, OrcRowIterator> recordMap = 
readAllRecordsInJsonResource(baseResource, null);
+    TreeMap<String, OrcRowIterator> recordMap = 
readAllRecordsInJsonResource(baseResource, schema);
 
     TypeInfo outputSchema = recordMap.lastEntry().getValue().getTypeInfo();
     for (Map.Entry<String, OrcRowIterator> entry : recordMap.entrySet()) {
@@ -229,9 +229,13 @@ public class OrcTestTools extends 
DataTestTools<OrcTestTools.OrcRowIterator, Typ
       @Nullable TypeInfo schema) throws IOException {
     TypeInfo orcSchema;
     try {
-      File schemaFile = new File(baseResource, "schema.avsc");
-      String schemaResource = schemaFile.toString();
-      orcSchema = convertAvroSchemaToOrcSchema(readAvscSchema(schemaResource, 
OrcTestTools.class));
+      if (schema == null) {
+        File schemaFile = new File(baseResource, "schema.avsc");
+        String schemaResource = schemaFile.toString();
+        orcSchema = 
convertAvroSchemaToOrcSchema(readAvscSchema(schemaResource, 
OrcTestTools.class));
+      } else {
+        orcSchema = schema;
+      }
     } catch (SerDeException se) {
       throw new RuntimeException("Provided Avro Schema cannot be transformed 
to ORC schema", se);
     }
diff --git a/gobblin-hive-registration/build.gradle 
b/gobblin-hive-registration/build.gradle
index 5e26bf2..472f359 100644
--- a/gobblin-hive-registration/build.gradle
+++ b/gobblin-hive-registration/build.gradle
@@ -43,6 +43,7 @@ dependencies {
   compile externalDependency.avroMapredH2
 
   testCompile externalDependency.testng
+  testCompile project(":gobblin-binary-management")
 }
 
 ext.classification="library"
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
index 9ed696c..71583cd 100644
--- 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
@@ -28,6 +28,7 @@ import com.google.common.base.Optional;
 import org.apache.gobblin.annotation.Alpha;
 import org.apache.gobblin.configuration.State;
 import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
+import org.apache.gobblin.hive.orc.HiveOrcSerDeManager;
 
 
 /**
@@ -79,7 +80,8 @@ public abstract class HiveSerDeManager {
   public abstract boolean haveSameSchema(HiveRegistrationUnit unit1, 
HiveRegistrationUnit unit2) throws IOException;
 
   public enum Implementation {
-    AVRO(HiveAvroSerDeManager.class.getName());
+    AVRO(HiveAvroSerDeManager.class.getName()),
+    ORC(HiveOrcSerDeManager.class.getName());
 
     private final String schemaManagerClassName;
 
@@ -96,11 +98,8 @@ public abstract class HiveSerDeManager {
   /**
    * Get an instance of {@link HiveSerDeManager}.
    *
-   * @param type The {@link HiveSerDeManager} type. It should be either AVRO, 
or the name of a class that implements
-   * {@link HiveSerDeManager}. The specified {@link HiveSerDeManager} type 
must have a constructor that takes a
-   * {@link State} object.
    * @param props A {@link State} object. To get a specific implementation of 
{@link HiveSerDeManager}, specify either
-   * one of the values in {@link Implementation} (e.g., AVRO) or the name of a 
class that implements
+   * one of the values in {@link Implementation} (e.g., AVRO or ORC) or the 
name of a class that implements
    * {@link HiveSerDeManager} in property {@link #HIVE_ROW_FORMAT}. The {@link 
State} object is also used to
    * instantiate the {@link HiveSerDeManager}.
    */
diff --git 
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
new file mode 100644
index 0000000..981beb6
--- /dev/null
+++ 
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.hive.orc;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.hive.HiveSerDeManager;
+import org.apache.gobblin.hive.HiveSerDeWrapper;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.HadoopUtils;
+
+/**
+ * A derived class of {@link org.apache.gobblin.hive.HiveSerDeManager} that is 
mainly responsible for adding schema
+ * information into {@link HiveRegistrationUnit#serDeProps}, based on the 
format of the data.
+ */
+@Slf4j
+public class HiveOrcSerDeManager extends HiveSerDeManager {
+  // Schema is in the format of TypeDescriptor
+  public static final String SCHEMA_LITERAL = "orc.schema.literal";
+
+  // Extensions of files containing ORC data
+  public static final String FILE_EXTENSIONS_KEY = 
"hiveOrcSerdeManager.fileExtensions";
+  public static final String DEFAULT_FILE_EXTENSIONS = ".orc";
+
+  // Files with these prefixes are ignored when finding the latest schema
+  public static final String IGNORED_FILE_PREFIXES_KEY = 
"hiveOrcSerdeManager.ignoredPrefixes";
+  public static final String DEFAULT_IGNORED_FILE_PREFIXES = "_,.";
+
+  // The serde type
+  public static final String SERDE_TYPE_KEY = "hiveOrcSerdeManager.serdeType";
+  public static final String DEFAULT_SERDE_TYPE = "ORC";
+  public static final String INPUT_FORMAT_CLASS_KEY = 
"hiveOrcSerdeManager.inputFormatClass";
+  public static final String DEFAULT_INPUT_FORMAT_CLASS = 
OrcInputFormat.class.getName();
+
+  public static final String OUTPUT_FORMAT_CLASS_KEY = 
"hiveOrcSerdeManager.outputFormatClass";
+  public static final String DEFAULT_OUTPUT_FORMAT_CLASS = 
OrcOutputFormat.class.getName();
+
+  public static final String HIVE_SPEC_SCHEMA_READING_TIMER = 
"hiveOrcSerdeManager.schemaReadTimer";
+
+  private static final int EXPECTED_FOOTER_SIZE = 16 * 1024;
+  private static final String ORC_FORMAT = "ORC";
+  private static final ByteBuffer MAGIC_BUFFER = 
ByteBuffer.wrap(ORC_FORMAT.getBytes(Charsets.UTF_8));
+
+  private final FileSystem fs;
+  private final HiveSerDeWrapper serDeWrapper;
+  private final List<String> fileExtensions;
+  private final List<String> ignoredFilePrefixes;
+  private final MetricContext metricContext;
+
+  public HiveOrcSerDeManager(State props)
+      throws IOException {
+    super(props);
+    this.fs = FileSystem.get(HadoopUtils.getConfFromState(props));
+
+    List<String> extensions = props.getPropAsList(FILE_EXTENSIONS_KEY, 
DEFAULT_FILE_EXTENSIONS);
+    this.fileExtensions = extensions.isEmpty() ? ImmutableList.of("") : 
extensions;
+
+    this.ignoredFilePrefixes = props.getPropAsList(IGNORED_FILE_PREFIXES_KEY, 
DEFAULT_IGNORED_FILE_PREFIXES);
+    this.metricContext = Instrumented.getMetricContext(props, 
HiveOrcSerDeManager.class);
+    this.serDeWrapper = HiveSerDeWrapper.get(props.getProp(SERDE_TYPE_KEY, 
DEFAULT_SERDE_TYPE),
+        Optional.of(props.getProp(INPUT_FORMAT_CLASS_KEY, 
DEFAULT_INPUT_FORMAT_CLASS)),
+        Optional.of(props.getProp(OUTPUT_FORMAT_CLASS_KEY, 
DEFAULT_OUTPUT_FORMAT_CLASS)));
+  }
+
+  @Override
+  public boolean haveSameSchema(HiveRegistrationUnit unit1, 
HiveRegistrationUnit unit2)
+      throws IOException {
+    if (unit1.getSerDeProps().contains(SCHEMA_LITERAL) && 
unit2.getSerDeProps().contains(SCHEMA_LITERAL)) {
+      return 
unit1.getSerDeProps().getProp(SCHEMA_LITERAL).equals(unit2.getSerDeProps().getProp(SCHEMA_LITERAL));
+    } else {
+      return false;
+    }
+  }
+
+  /**
+   * Add ORC SerDe attributes into HiveUnit
+   *
+   * @param path
+   * @param hiveUnit
+   * @throws IOException
+   */
+  @Override
+  public void addSerDeProperties(Path path, HiveRegistrationUnit hiveUnit)
+      throws IOException {
+    hiveUnit.setSerDeType(this.serDeWrapper.getSerDe().getClass().getName());
+    hiveUnit.setInputFormat(this.serDeWrapper.getInputFormatClassName());
+    hiveUnit.setOutputFormat(this.serDeWrapper.getOutputFormatClassName());
+
+    addSchemaProperties(path, hiveUnit);
+  }
+
+  @Override
+  public void addSerDeProperties(HiveRegistrationUnit source, 
HiveRegistrationUnit target)
+      throws IOException {
+    if (source.getSerDeType().isPresent()) {
+      target.setSerDeType(source.getSerDeType().get());
+    }
+    if (source.getInputFormat().isPresent()) {
+      target.setInputFormat(source.getInputFormat().get());
+    }
+    if (source.getOutputFormat().isPresent()) {
+      target.setOutputFormat(source.getOutputFormat().get());
+    }
+    if (source.getSerDeProps().contains(SCHEMA_LITERAL)) {
+      target.setSerDeProp(SCHEMA_LITERAL, 
source.getSerDeProps().getProp(SCHEMA_LITERAL));
+    }
+  }
+
+  @Override
+  public void updateSchema(HiveRegistrationUnit existingUnit, 
HiveRegistrationUnit newUnit)
+      throws IOException {
+    Preconditions.checkArgument(
+        newUnit.getSerDeProps().contains(SCHEMA_LITERAL));
+
+    existingUnit.setSerDeProp(SCHEMA_LITERAL, 
newUnit.getSerDeProps().getProp(SCHEMA_LITERAL));
+  }
+
+  /**
+   * Get the schema as a TypeInfo object
+   * @param path path that contains the ORC files
+   * @param fs {@link FileSystem}
+   * @return {@link TypeInfo} with the schema information
+   * @throws IOException
+   */
+  public TypeInfo getSchemaFromLatestFile(Path path, FileSystem fs)
+      throws IOException {
+    if (fs.isDirectory(path)) {
+      List<FileStatus> files = Arrays.asList(fs.listStatus(path, new 
PathFilter() {
+        @Override
+        public boolean accept(Path path) {
+          try {
+            return ignoredFilePrefixes.stream().noneMatch(e -> 
path.getName().startsWith(e))
+                && fileExtensions.stream().anyMatch(e -> 
path.getName().endsWith(e))
+                && isORC(path, fs);
+          } catch(IOException e) {
+            log.error("Error checking file for schema retrieval", e);
+            return false;
+          }
+        }
+      }));
+
+      if (files.size() > 0) {
+        Collections.sort((files), FileListUtils.LATEST_MOD_TIME_ORDER);
+      } else {
+        throw new FileNotFoundException("No files in Dataset:" + path + " 
found for schema retrieval");
+      }
+      return getSchemaFromLatestFile(files.get(0).getPath(), fs);
+    } else {
+       return 
TypeInfoUtils.getTypeInfoFromObjectInspector(OrcFile.createReader(fs, 
path).getObjectInspector());
+    }
+  }
+
+  /**
+   * Determine if a file is ORC format.
+   * Steal ideas & code from presto/OrcReader under Apache License 2.0.
+   */
+  private static boolean isORC(Path file, FileSystem fs)
+      throws IOException {
+    try {
+      FSDataInputStream inputStream = fs.open(file);
+      long size = fs.getFileStatus(file).getLen();
+      byte[] buffer = new byte[Math.toIntExact(Math.min(size, 
EXPECTED_FOOTER_SIZE))];
+      if (size < buffer.length) {
+        return false;
+      }
+
+      inputStream.readFully(size - buffer.length, buffer);
+
+      // get length of PostScript - last byte of the file
+      int postScriptSize = buffer[buffer.length - 1] & 0xff;
+      int magicLen = MAGIC_BUFFER.remaining();
+
+      if (postScriptSize < magicLen + 1 || postScriptSize >= buffer.length) {
+        return false;
+      }
+
+      if (!MAGIC_BUFFER.equals(ByteBuffer.wrap(buffer, buffer.length - 1 - 
magicLen, magicLen))) {
+        // Old versions of ORC (0.11) wrote the magic to the head of the file
+        byte[] headerMagic = new byte[magicLen];
+        inputStream.readFully(0, headerMagic);
+
+        // if it isn't there, this isn't an ORC file
+        if (!MAGIC_BUFFER.equals(ByteBuffer.wrap(headerMagic))) {
+          return false;
+        }
+      }
+
+      return true;
+    } catch (Exception e) {
+      throw new RuntimeException("Error occured when checking the type of 
file:" + file);
+    }
+  }
+
+  private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit)
+      throws IOException {
+    Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(), 
path + " is not a directory.");
+
+    try (Timer.Context context = 
metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) {
+      hiveUnit.setSerDeProp(SCHEMA_LITERAL, getSchemaFromLatestFile(path, 
this.fs).toString());
+    }
+  }
+}
diff --git 
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
new file mode 100644
index 0000000..032239f
--- /dev/null
+++ 
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive.orc;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.binary_creation.AvroTestTools;
+import org.apache.gobblin.binary_creation.OrcTestTools;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.util.HadoopUtils;
+
+
+@Test(singleThreaded = true)
+public class HiveOrcSerDeManagerTest {
+  private static String TEST_DB = "testDB";
+  private static String TEST_TABLE = "testTable";
+  private Path testBasePath;
+  private Path testRegisterPath;
+
+  @BeforeClass
+  public void setUp() throws IOException, SerDeException {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    this.testBasePath = new Path("orctestdir");
+    this.testRegisterPath = new Path(this.testBasePath, "register");
+    fs.delete(this.testBasePath, true);
+    fs.mkdirs(this.testRegisterPath);
+
+    OrcTestTools orcTestTools = new OrcTestTools();
+
+    orcTestTools.writeJsonResourceRecordsAsBinary("avro_input", fs, 
this.testBasePath, null);
+    HadoopUtils.copyFile(fs, new Path(this.testBasePath, "input"), fs, new 
Path(this.testRegisterPath,
+        "prefix-hive-test.orc"), true, new Configuration());
+
+    AvroTestTools avroTestTools = new AvroTestTools();
+
+    avroTestTools.writeJsonResourceRecordsAsBinary("avro_input", fs, 
this.testBasePath, null);
+    HadoopUtils.copyFile(fs, new Path(this.testBasePath, "input.avro"), fs, 
new Path(this.testRegisterPath, "hive-test.notOrc"), true, new Configuration());
+  }
+
+  /**
+   * Test that the schema is written to the schema literal
+   */
+  @Test
+  public void testOrcSchemaLiteral() throws IOException {
+    State state = new State();
+    HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+    HiveRegistrationUnit registrationUnit = (new 
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+    manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+
+    
Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveOrcSerDeManager.SCHEMA_LITERAL).contains(
+        "name:string,timestamp:bigint"));
+  }
+
+  /**
+   * Test empty extension
+   */
+  @Test
+  public void testEmptyExtension() throws IOException {
+    State state = new State();
+    state.setProp(HiveOrcSerDeManager.FILE_EXTENSIONS_KEY, ",");
+    HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+    HiveRegistrationUnit registrationUnit = (new 
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+    manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+
+    
Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveOrcSerDeManager.SCHEMA_LITERAL).contains(
+        "name:string,timestamp:bigint"));
+  }
+
+  /**
+   * Test custom serde config
+   */
+  @Test
+  public void testCustomSerdeConfig() throws IOException {
+    State state = new State();
+    state.setProp(HiveOrcSerDeManager.SERDE_TYPE_KEY, 
OrcSerde.class.getName());
+    state.setProp(HiveOrcSerDeManager.INPUT_FORMAT_CLASS_KEY, 
"customInputFormat");
+    state.setProp(HiveOrcSerDeManager.OUTPUT_FORMAT_CLASS_KEY, 
"customOutputFormat");
+
+    HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+    HiveRegistrationUnit registrationUnit = (new 
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+    manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+
+    
Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveOrcSerDeManager.SCHEMA_LITERAL).contains(
+        "name:string,timestamp:bigint"));
+    Assert.assertEquals(registrationUnit.getSerDeType().get(), 
OrcSerde.class.getName());
+    Assert.assertEquals(registrationUnit.getInputFormat().get(), 
"customInputFormat");
+    Assert.assertEquals(registrationUnit.getOutputFormat().get(), 
"customOutputFormat");
+  }
+
+  /**
+   * Test that error is raised if no orc files found during schema retrieval
+   */
+  @Test(expectedExceptions = FileNotFoundException.class, 
expectedExceptionsMessageRegExp = "No files in Dataset:orctestdir/register 
found for schema retrieval")
+  public void testNoOrcFiles() throws IOException {
+    State state = new State();
+    state.setProp(HiveOrcSerDeManager.FILE_EXTENSIONS_KEY, ".notOrc");
+    HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+    HiveRegistrationUnit registrationUnit = (new 
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+    manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+  }
+
+  /**
+   * Test prefix filter
+   */
+  @Test(expectedExceptions = FileNotFoundException.class, 
expectedExceptionsMessageRegExp = "No files in Dataset:orctestdir/register 
found for schema retrieval")
+  public void testPrefixFilter() throws IOException {
+    State state = new State();
+    state.setProp(HiveOrcSerDeManager.IGNORED_FILE_PREFIXES_KEY, "prefix-");
+    HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+    HiveRegistrationUnit registrationUnit = (new 
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+    manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+  }
+
+  @AfterClass
+  public void tearDown() throws IOException {
+    FileSystem fs = FileSystem.getLocal(new Configuration());
+    fs.delete(this.testBasePath, true);
+  }
+
+}
diff --git a/gobblin-hive-registration/src/test/resources/avro_input/input.json 
b/gobblin-hive-registration/src/test/resources/avro_input/input.json
new file mode 100644
index 0000000..919581b
--- /dev/null
+++ b/gobblin-hive-registration/src/test/resources/avro_input/input.json
@@ -0,0 +1,5 @@
+{"name":"name_0","timestamp":200000,"favorite_number":0}
+{"name":"name_1","timestamp":200001,"favorite_number":1}
+{"name":"name_2","timestamp":200002,"favorite_number":2}
+{"name":"name_3","timestamp":200003,"favorite_number":3}
+{"name":"name_4","timestamp":200004,"favorite_number":4}
diff --git 
a/gobblin-hive-registration/src/test/resources/avro_input/schema.avsc 
b/gobblin-hive-registration/src/test/resources/avro_input/schema.avsc
new file mode 100644
index 0000000..18cd665
--- /dev/null
+++ b/gobblin-hive-registration/src/test/resources/avro_input/schema.avsc
@@ -0,0 +1,10 @@
+{
+ "namespace": "DB",
+ "type": "record",
+ "name": "TABLE",
+ "fields": [
+     {"name": "name", "type": "string"},
+     {"name": "timestamp",  "type": "long"},
+     {"name": "favorite_number",  "type": "int"}
+ ]
+}
\ No newline at end of file

Reply via email to