This is an automated email from the ASF dual-hosted git repository.
hutran pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-gobblin.git
The following commit(s) were added to refs/heads/master by this push:
new 81ea169 [GOBBLIN-693] Add ORC hive serde manager
81ea169 is described below
commit 81ea16963f63e3fcda77221f655accb1f5406d34
Author: Hung Tran <[email protected]>
AuthorDate: Tue Mar 12 17:05:13 2019 -0700
[GOBBLIN-693] Add ORC hive serde manager
Closes #2565 from htran1/orc_hive_reg
---
.../gobblin/binary_creation/OrcTestTools.java | 12 +-
gobblin-hive-registration/build.gradle | 1 +
.../org/apache/gobblin/hive/HiveSerDeManager.java | 9 +-
.../gobblin/hive/orc/HiveOrcSerDeManager.java | 244 +++++++++++++++++++++
.../gobblin/hive/orc/HiveOrcSerDeManagerTest.java | 153 +++++++++++++
.../src/test/resources/avro_input/input.json | 5 +
.../src/test/resources/avro_input/schema.avsc | 10 +
7 files changed, 425 insertions(+), 9 deletions(-)
diff --git
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
index 07c46ab..465fb89 100644
---
a/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
+++
b/gobblin-binary-management/src/main/java/org/apache/gobblin/binary_creation/OrcTestTools.java
@@ -184,7 +184,7 @@ public class OrcTestTools extends
DataTestTools<OrcTestTools.OrcRowIterator, Typ
@Override
public TypeInfo writeJsonResourceRecordsAsBinary(String baseResource,
@Nullable FileSystem fs, Path targetPath,
@Nullable TypeInfo schema) throws IOException {
- TreeMap<String, OrcRowIterator> recordMap =
readAllRecordsInJsonResource(baseResource, null);
+ TreeMap<String, OrcRowIterator> recordMap =
readAllRecordsInJsonResource(baseResource, schema);
TypeInfo outputSchema = recordMap.lastEntry().getValue().getTypeInfo();
for (Map.Entry<String, OrcRowIterator> entry : recordMap.entrySet()) {
@@ -229,9 +229,13 @@ public class OrcTestTools extends
DataTestTools<OrcTestTools.OrcRowIterator, Typ
@Nullable TypeInfo schema) throws IOException {
TypeInfo orcSchema;
try {
- File schemaFile = new File(baseResource, "schema.avsc");
- String schemaResource = schemaFile.toString();
- orcSchema = convertAvroSchemaToOrcSchema(readAvscSchema(schemaResource,
OrcTestTools.class));
+ if (schema == null) {
+ File schemaFile = new File(baseResource, "schema.avsc");
+ String schemaResource = schemaFile.toString();
+ orcSchema =
convertAvroSchemaToOrcSchema(readAvscSchema(schemaResource,
OrcTestTools.class));
+ } else {
+ orcSchema = schema;
+ }
} catch (SerDeException se) {
throw new RuntimeException("Provided Avro Schema cannot be transformed
to ORC schema", se);
}
diff --git a/gobblin-hive-registration/build.gradle
b/gobblin-hive-registration/build.gradle
index 5e26bf2..472f359 100644
--- a/gobblin-hive-registration/build.gradle
+++ b/gobblin-hive-registration/build.gradle
@@ -43,6 +43,7 @@ dependencies {
compile externalDependency.avroMapredH2
testCompile externalDependency.testng
+ testCompile project(":gobblin-binary-management")
}
ext.classification="library"
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
index 9ed696c..71583cd 100644
---
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/HiveSerDeManager.java
@@ -28,6 +28,7 @@ import com.google.common.base.Optional;
import org.apache.gobblin.annotation.Alpha;
import org.apache.gobblin.configuration.State;
import org.apache.gobblin.hive.avro.HiveAvroSerDeManager;
+import org.apache.gobblin.hive.orc.HiveOrcSerDeManager;
/**
@@ -79,7 +80,8 @@ public abstract class HiveSerDeManager {
public abstract boolean haveSameSchema(HiveRegistrationUnit unit1,
HiveRegistrationUnit unit2) throws IOException;
public enum Implementation {
- AVRO(HiveAvroSerDeManager.class.getName());
+ AVRO(HiveAvroSerDeManager.class.getName()),
+ ORC(HiveOrcSerDeManager.class.getName());
private final String schemaManagerClassName;
@@ -96,11 +98,8 @@ public abstract class HiveSerDeManager {
/**
* Get an instance of {@link HiveSerDeManager}.
*
- * @param type The {@link HiveSerDeManager} type. It should be either AVRO,
or the name of a class that implements
- * {@link HiveSerDeManager}. The specified {@link HiveSerDeManager} type
must have a constructor that takes a
- * {@link State} object.
* @param props A {@link State} object. To get a specific implementation of
{@link HiveSerDeManager}, specify either
- * one of the values in {@link Implementation} (e.g., AVRO) or the name of a
class that implements
+ * one of the values in {@link Implementation} (e.g., AVRO or ORC) or the
name of a class that implements
* {@link HiveSerDeManager} in property {@link #HIVE_ROW_FORMAT}. The {@link
State} object is also used to
* instantiate the {@link HiveSerDeManager}.
*/
diff --git
a/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
new file mode 100644
index 0000000..981beb6
--- /dev/null
+++
b/gobblin-hive-registration/src/main/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManager.java
@@ -0,0 +1,244 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.gobblin.hive.orc;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.hive.ql.io.orc.OrcFile;
+import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
+import org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
+import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
+
+import com.codahale.metrics.Timer;
+import com.google.common.base.Charsets;
+import com.google.common.base.Optional;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+
+import lombok.extern.slf4j.Slf4j;
+
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.hive.HiveSerDeManager;
+import org.apache.gobblin.hive.HiveSerDeWrapper;
+import org.apache.gobblin.instrumented.Instrumented;
+import org.apache.gobblin.metrics.MetricContext;
+import org.apache.gobblin.util.FileListUtils;
+import org.apache.gobblin.util.HadoopUtils;
+
+/**
+ * A derived class of {@link org.apache.gobblin.hive.HiveSerDeManager} that is
mainly responsible for adding schema
+ * information into {@link HiveRegistrationUnit#serDeProps}, based on the
format of the data.
+ */
+@Slf4j
+public class HiveOrcSerDeManager extends HiveSerDeManager {
+ // Schema is in the format of TypeDescriptor
+ public static final String SCHEMA_LITERAL = "orc.schema.literal";
+
+ // Extensions of files containing ORC data
+ public static final String FILE_EXTENSIONS_KEY =
"hiveOrcSerdeManager.fileExtensions";
+ public static final String DEFAULT_FILE_EXTENSIONS = ".orc";
+
+ // Files with these prefixes are ignored when finding the latest schema
+ public static final String IGNORED_FILE_PREFIXES_KEY =
"hiveOrcSerdeManager.ignoredPrefixes";
+ public static final String DEFAULT_IGNORED_FILE_PREFIXES = "_,.";
+
+ // The serde type
+ public static final String SERDE_TYPE_KEY = "hiveOrcSerdeManager.serdeType";
+ public static final String DEFAULT_SERDE_TYPE = "ORC";
+ public static final String INPUT_FORMAT_CLASS_KEY =
"hiveOrcSerdeManager.inputFormatClass";
+ public static final String DEFAULT_INPUT_FORMAT_CLASS =
OrcInputFormat.class.getName();
+
+ public static final String OUTPUT_FORMAT_CLASS_KEY =
"hiveOrcSerdeManager.outputFormatClass";
+ public static final String DEFAULT_OUTPUT_FORMAT_CLASS =
OrcOutputFormat.class.getName();
+
+ public static final String HIVE_SPEC_SCHEMA_READING_TIMER =
"hiveOrcSerdeManager.schemaReadTimer";
+
+ private static final int EXPECTED_FOOTER_SIZE = 16 * 1024;
+ private static final String ORC_FORMAT = "ORC";
+ private static final ByteBuffer MAGIC_BUFFER =
ByteBuffer.wrap(ORC_FORMAT.getBytes(Charsets.UTF_8));
+
+ private final FileSystem fs;
+ private final HiveSerDeWrapper serDeWrapper;
+ private final List<String> fileExtensions;
+ private final List<String> ignoredFilePrefixes;
+ private final MetricContext metricContext;
+
+ public HiveOrcSerDeManager(State props)
+ throws IOException {
+ super(props);
+ this.fs = FileSystem.get(HadoopUtils.getConfFromState(props));
+
+ List<String> extensions = props.getPropAsList(FILE_EXTENSIONS_KEY,
DEFAULT_FILE_EXTENSIONS);
+ this.fileExtensions = extensions.isEmpty() ? ImmutableList.of("") :
extensions;
+
+ this.ignoredFilePrefixes = props.getPropAsList(IGNORED_FILE_PREFIXES_KEY,
DEFAULT_IGNORED_FILE_PREFIXES);
+ this.metricContext = Instrumented.getMetricContext(props,
HiveOrcSerDeManager.class);
+ this.serDeWrapper = HiveSerDeWrapper.get(props.getProp(SERDE_TYPE_KEY,
DEFAULT_SERDE_TYPE),
+ Optional.of(props.getProp(INPUT_FORMAT_CLASS_KEY,
DEFAULT_INPUT_FORMAT_CLASS)),
+ Optional.of(props.getProp(OUTPUT_FORMAT_CLASS_KEY,
DEFAULT_OUTPUT_FORMAT_CLASS)));
+ }
+
+ @Override
+ public boolean haveSameSchema(HiveRegistrationUnit unit1,
HiveRegistrationUnit unit2)
+ throws IOException {
+ if (unit1.getSerDeProps().contains(SCHEMA_LITERAL) &&
unit2.getSerDeProps().contains(SCHEMA_LITERAL)) {
+ return
unit1.getSerDeProps().getProp(SCHEMA_LITERAL).equals(unit2.getSerDeProps().getProp(SCHEMA_LITERAL));
+ } else {
+ return false;
+ }
+ }
+
+ /**
+ * Add ORC SerDe attributes into HiveUnit
+ *
+ * @param path
+ * @param hiveUnit
+ * @throws IOException
+ */
+ @Override
+ public void addSerDeProperties(Path path, HiveRegistrationUnit hiveUnit)
+ throws IOException {
+ hiveUnit.setSerDeType(this.serDeWrapper.getSerDe().getClass().getName());
+ hiveUnit.setInputFormat(this.serDeWrapper.getInputFormatClassName());
+ hiveUnit.setOutputFormat(this.serDeWrapper.getOutputFormatClassName());
+
+ addSchemaProperties(path, hiveUnit);
+ }
+
+ @Override
+ public void addSerDeProperties(HiveRegistrationUnit source,
HiveRegistrationUnit target)
+ throws IOException {
+ if (source.getSerDeType().isPresent()) {
+ target.setSerDeType(source.getSerDeType().get());
+ }
+ if (source.getInputFormat().isPresent()) {
+ target.setInputFormat(source.getInputFormat().get());
+ }
+ if (source.getOutputFormat().isPresent()) {
+ target.setOutputFormat(source.getOutputFormat().get());
+ }
+ if (source.getSerDeProps().contains(SCHEMA_LITERAL)) {
+ target.setSerDeProp(SCHEMA_LITERAL,
source.getSerDeProps().getProp(SCHEMA_LITERAL));
+ }
+ }
+
+ @Override
+ public void updateSchema(HiveRegistrationUnit existingUnit,
HiveRegistrationUnit newUnit)
+ throws IOException {
+ Preconditions.checkArgument(
+ newUnit.getSerDeProps().contains(SCHEMA_LITERAL));
+
+ existingUnit.setSerDeProp(SCHEMA_LITERAL,
newUnit.getSerDeProps().getProp(SCHEMA_LITERAL));
+ }
+
+ /**
+ * Get the schema as a TypeInfo object
+ * @param path path that contains the ORC files
+ * @param fs {@link FileSystem}
+ * @return {@link TypeInfo} with the schema information
+ * @throws IOException
+ */
+ public TypeInfo getSchemaFromLatestFile(Path path, FileSystem fs)
+ throws IOException {
+ if (fs.isDirectory(path)) {
+ List<FileStatus> files = Arrays.asList(fs.listStatus(path, new
PathFilter() {
+ @Override
+ public boolean accept(Path path) {
+ try {
+ return ignoredFilePrefixes.stream().noneMatch(e ->
path.getName().startsWith(e))
+ && fileExtensions.stream().anyMatch(e ->
path.getName().endsWith(e))
+ && isORC(path, fs);
+ } catch(IOException e) {
+ log.error("Error checking file for schema retrieval", e);
+ return false;
+ }
+ }
+ }));
+
+ if (files.size() > 0) {
+ Collections.sort((files), FileListUtils.LATEST_MOD_TIME_ORDER);
+ } else {
+ throw new FileNotFoundException("No files in Dataset:" + path + "
found for schema retrieval");
+ }
+ return getSchemaFromLatestFile(files.get(0).getPath(), fs);
+ } else {
+ return
TypeInfoUtils.getTypeInfoFromObjectInspector(OrcFile.createReader(fs,
path).getObjectInspector());
+ }
+ }
+
+ /**
+ * Determine if a file is ORC format.
+ * Steal ideas & code from presto/OrcReader under Apache License 2.0.
+ */
+ private static boolean isORC(Path file, FileSystem fs)
+ throws IOException {
+ try {
+ FSDataInputStream inputStream = fs.open(file);
+ long size = fs.getFileStatus(file).getLen();
+ byte[] buffer = new byte[Math.toIntExact(Math.min(size,
EXPECTED_FOOTER_SIZE))];
+ if (size < buffer.length) {
+ return false;
+ }
+
+ inputStream.readFully(size - buffer.length, buffer);
+
+ // get length of PostScript - last byte of the file
+ int postScriptSize = buffer[buffer.length - 1] & 0xff;
+ int magicLen = MAGIC_BUFFER.remaining();
+
+ if (postScriptSize < magicLen + 1 || postScriptSize >= buffer.length) {
+ return false;
+ }
+
+ if (!MAGIC_BUFFER.equals(ByteBuffer.wrap(buffer, buffer.length - 1 -
magicLen, magicLen))) {
+ // Old versions of ORC (0.11) wrote the magic to the head of the file
+ byte[] headerMagic = new byte[magicLen];
+ inputStream.readFully(0, headerMagic);
+
+ // if it isn't there, this isn't an ORC file
+ if (!MAGIC_BUFFER.equals(ByteBuffer.wrap(headerMagic))) {
+ return false;
+ }
+ }
+
+ return true;
+ } catch (Exception e) {
+ throw new RuntimeException("Error occured when checking the type of
file:" + file);
+ }
+ }
+
+ private void addSchemaProperties(Path path, HiveRegistrationUnit hiveUnit)
+ throws IOException {
+ Preconditions.checkArgument(this.fs.getFileStatus(path).isDirectory(),
path + " is not a directory.");
+
+ try (Timer.Context context =
metricContext.timer(HIVE_SPEC_SCHEMA_READING_TIMER).time()) {
+ hiveUnit.setSerDeProp(SCHEMA_LITERAL, getSchemaFromLatestFile(path,
this.fs).toString());
+ }
+ }
+}
diff --git
a/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
new file mode 100644
index 0000000..032239f
--- /dev/null
+++
b/gobblin-hive-registration/src/test/java/org/apache/gobblin/hive/orc/HiveOrcSerDeManagerTest.java
@@ -0,0 +1,153 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.gobblin.hive.orc;
+
+import java.io.FileNotFoundException;
+import java.io.IOException;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.ql.io.orc.OrcSerde;
+import org.apache.hadoop.hive.serde2.SerDeException;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+import org.apache.gobblin.binary_creation.AvroTestTools;
+import org.apache.gobblin.binary_creation.OrcTestTools;
+import org.apache.gobblin.configuration.State;
+import org.apache.gobblin.hive.HiveRegistrationUnit;
+import org.apache.gobblin.hive.HiveTable;
+import org.apache.gobblin.util.HadoopUtils;
+
+
+@Test(singleThreaded = true)
+public class HiveOrcSerDeManagerTest {
+ private static String TEST_DB = "testDB";
+ private static String TEST_TABLE = "testTable";
+ private Path testBasePath;
+ private Path testRegisterPath;
+
+ @BeforeClass
+ public void setUp() throws IOException, SerDeException {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ this.testBasePath = new Path("orctestdir");
+ this.testRegisterPath = new Path(this.testBasePath, "register");
+ fs.delete(this.testBasePath, true);
+ fs.mkdirs(this.testRegisterPath);
+
+ OrcTestTools orcTestTools = new OrcTestTools();
+
+ orcTestTools.writeJsonResourceRecordsAsBinary("avro_input", fs,
this.testBasePath, null);
+ HadoopUtils.copyFile(fs, new Path(this.testBasePath, "input"), fs, new
Path(this.testRegisterPath,
+ "prefix-hive-test.orc"), true, new Configuration());
+
+ AvroTestTools avroTestTools = new AvroTestTools();
+
+ avroTestTools.writeJsonResourceRecordsAsBinary("avro_input", fs,
this.testBasePath, null);
+ HadoopUtils.copyFile(fs, new Path(this.testBasePath, "input.avro"), fs,
new Path(this.testRegisterPath, "hive-test.notOrc"), true, new Configuration());
+ }
+
+ /**
+ * Test that the schema is written to the schema literal
+ */
+ @Test
+ public void testOrcSchemaLiteral() throws IOException {
+ State state = new State();
+ HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+ HiveRegistrationUnit registrationUnit = (new
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+ manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+
+
Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveOrcSerDeManager.SCHEMA_LITERAL).contains(
+ "name:string,timestamp:bigint"));
+ }
+
+ /**
+ * Test empty extension
+ */
+ @Test
+ public void testEmptyExtension() throws IOException {
+ State state = new State();
+ state.setProp(HiveOrcSerDeManager.FILE_EXTENSIONS_KEY, ",");
+ HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+ HiveRegistrationUnit registrationUnit = (new
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+ manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+
+
Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveOrcSerDeManager.SCHEMA_LITERAL).contains(
+ "name:string,timestamp:bigint"));
+ }
+
+ /**
+ * Test custom serde config
+ */
+ @Test
+ public void testCustomSerdeConfig() throws IOException {
+ State state = new State();
+ state.setProp(HiveOrcSerDeManager.SERDE_TYPE_KEY,
OrcSerde.class.getName());
+ state.setProp(HiveOrcSerDeManager.INPUT_FORMAT_CLASS_KEY,
"customInputFormat");
+ state.setProp(HiveOrcSerDeManager.OUTPUT_FORMAT_CLASS_KEY,
"customOutputFormat");
+
+ HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+ HiveRegistrationUnit registrationUnit = (new
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+ manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+
+
Assert.assertTrue(registrationUnit.getSerDeProps().getProp(HiveOrcSerDeManager.SCHEMA_LITERAL).contains(
+ "name:string,timestamp:bigint"));
+ Assert.assertEquals(registrationUnit.getSerDeType().get(),
OrcSerde.class.getName());
+ Assert.assertEquals(registrationUnit.getInputFormat().get(),
"customInputFormat");
+ Assert.assertEquals(registrationUnit.getOutputFormat().get(),
"customOutputFormat");
+ }
+
+ /**
+ * Test that error is raised if no orc files found during schema retrieval
+ */
+ @Test(expectedExceptions = FileNotFoundException.class,
expectedExceptionsMessageRegExp = "No files in Dataset:orctestdir/register
found for schema retrieval")
+ public void testNoOrcFiles() throws IOException {
+ State state = new State();
+ state.setProp(HiveOrcSerDeManager.FILE_EXTENSIONS_KEY, ".notOrc");
+ HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+ HiveRegistrationUnit registrationUnit = (new
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+ manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+ }
+
+ /**
+ * Test prefix filter
+ */
+ @Test(expectedExceptions = FileNotFoundException.class,
expectedExceptionsMessageRegExp = "No files in Dataset:orctestdir/register
found for schema retrieval")
+ public void testPrefixFilter() throws IOException {
+ State state = new State();
+ state.setProp(HiveOrcSerDeManager.IGNORED_FILE_PREFIXES_KEY, "prefix-");
+ HiveOrcSerDeManager manager = new HiveOrcSerDeManager(state);
+ HiveRegistrationUnit registrationUnit = (new
HiveTable.Builder()).withDbName(TEST_DB).withTableName(TEST_TABLE).build();
+
+ manager.addSerDeProperties(this.testRegisterPath, registrationUnit);
+ }
+
+ @AfterClass
+ public void tearDown() throws IOException {
+ FileSystem fs = FileSystem.getLocal(new Configuration());
+ fs.delete(this.testBasePath, true);
+ }
+
+}
diff --git a/gobblin-hive-registration/src/test/resources/avro_input/input.json
b/gobblin-hive-registration/src/test/resources/avro_input/input.json
new file mode 100644
index 0000000..919581b
--- /dev/null
+++ b/gobblin-hive-registration/src/test/resources/avro_input/input.json
@@ -0,0 +1,5 @@
+{"name":"name_0","timestamp":200000,"favorite_number":0}
+{"name":"name_1","timestamp":200001,"favorite_number":1}
+{"name":"name_2","timestamp":200002,"favorite_number":2}
+{"name":"name_3","timestamp":200003,"favorite_number":3}
+{"name":"name_4","timestamp":200004,"favorite_number":4}
diff --git
a/gobblin-hive-registration/src/test/resources/avro_input/schema.avsc
b/gobblin-hive-registration/src/test/resources/avro_input/schema.avsc
new file mode 100644
index 0000000..18cd665
--- /dev/null
+++ b/gobblin-hive-registration/src/test/resources/avro_input/schema.avsc
@@ -0,0 +1,10 @@
+{
+ "namespace": "DB",
+ "type": "record",
+ "name": "TABLE",
+ "fields": [
+ {"name": "name", "type": "string"},
+ {"name": "timestamp", "type": "long"},
+ {"name": "favorite_number", "type": "int"}
+ ]
+}
\ No newline at end of file