Repository: drill Updated Branches: refs/heads/master b9960f89e -> 84ce21c91
DRILL-3688: Drill should honor "skip.header.line.count" and "skip.footer.line.count" attribute of Hive table 1. Functionality to skip header and footer lines while reading Hive data. 2. Unit tests. Project: http://git-wip-us.apache.org/repos/asf/drill/repo Commit: http://git-wip-us.apache.org/repos/asf/drill/commit/84ce21c9 Tree: http://git-wip-us.apache.org/repos/asf/drill/tree/84ce21c9 Diff: http://git-wip-us.apache.org/repos/asf/drill/diff/84ce21c9 Branch: refs/heads/master Commit: 84ce21c9147f646bce55071573b1b6f72fd2a45e Parents: b9960f8 Author: Arina Ielchiieva <[email protected]> Authored: Thu Feb 11 17:16:30 2016 +0000 Committer: Parth Chandra <[email protected]> Committed: Mon Feb 29 16:35:13 2016 -0800 ---------------------------------------------------------------------- .../drill/exec/store/hive/HiveRecordReader.java | 178 +++++++++++++++++-- .../apache/drill/exec/hive/TestHiveStorage.java | 91 ++++++++++ .../exec/hive/TestInfoSchemaOnHiveStorage.java | 14 ++ .../exec/store/hive/HiveTestDataGenerator.java | 51 +++++- 4 files changed, 319 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/drill/blob/84ce21c9/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java index 79ca65f..1634187 100644 --- a/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java +++ b/contrib/storage-hive/core/src/main/java/org/apache/drill/exec/store/hive/HiveRecordReader.java @@ -18,9 +18,13 @@ package org.apache.drill.exec.store.hive; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashSet; import java.util.List; -import java.util.Map; import java.util.Properties; +import java.util.Queue; +import java.util.Set; import java.util.concurrent.Callable; import java.util.concurrent.ExecutionException; @@ -46,6 +50,8 @@ import org.apache.hadoop.hive.metastore.MetaStoreUtils; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDe; import org.apache.hadoop.hive.serde2.SerDeException; @@ -57,9 +63,9 @@ import org.apache.hadoop.hive.serde2.typeinfo.StructTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapred.InputFormat; import org.apache.hadoop.mapred.InputSplit; import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; import org.apache.hadoop.mapred.Reporter; import com.google.common.collect.Lists; @@ -95,8 +101,8 @@ public class HiveRecordReader extends AbstractRecordReader { // Converter which converts data from partition schema to table schema. private Converter partTblObjectInspectorConverter; - protected Object key, value; - protected org.apache.hadoop.mapred.RecordReader reader; + protected Object key; + protected RecordReader reader; protected List<ValueVector> vectors = Lists.newArrayList(); protected List<ValueVector> pVectors = Lists.newArrayList(); protected boolean empty; @@ -104,6 +110,7 @@ public class HiveRecordReader extends AbstractRecordReader { private FragmentContext fragmentContext; private String defaultPartitionValue; private final UserGroupInformation proxyUgi; + private SkipRecordsInspector skipRecordsInspector; protected static final int TARGET_RECORD_COUNT = 4000; @@ -127,8 +134,9 @@ public class HiveRecordReader extends AbstractRecordReader { // Get the configured default val defaultPartitionValue = hiveConf.get(ConfVars.DEFAULTPARTITIONNAME.varname); + Properties tableProperties; try { - final Properties tableProperties = MetaStoreUtils.getTableMetadata(table); + tableProperties = MetaStoreUtils.getTableMetadata(table); final Properties partitionProperties = (partition == null) ? tableProperties : HiveUtilities.getPartitionMetadata(partition, table); @@ -220,7 +228,7 @@ public class HiveRecordReader extends AbstractRecordReader { throw new ExecutionSetupException("Failed to get o.a.hadoop.mapred.RecordReader from Hive InputFormat", e); } key = reader.createKey(); - value = reader.createValue(); + skipRecordsInspector = new SkipRecordsInspector(tableProperties, reader); } } @@ -286,6 +294,16 @@ public class HiveRecordReader extends AbstractRecordReader { } } + /** + * To take into account Hive "skip.header.lines.count" property first N values from file are skipped. + * Since file can be read in batches (depends on TARGET_RECORD_COUNT), additional checks are made + * to determine if it's new file or continuance. + * + * To take into account Hive "skip.footer.lines.count" property values are buffered in queue + * until queue size exceeds number of footer lines to skip, then first value in queue is retrieved. + * Buffer of value objects is used to re-use value objects in order to reduce number of created value objects. + * For each new file queue is cleared to drop footer lines from previous file. + */ @Override public int next() { for (ValueVector vv : vectors) { @@ -297,18 +315,28 @@ public class HiveRecordReader extends AbstractRecordReader { } try { + skipRecordsInspector.reset(); int recordCount = 0; - while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value)) { - Object deSerializedValue = partitionSerDe.deserialize((Writable) value); - if (partTblObjectInspectorConverter != null) { - deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue); + Object value; + while (recordCount < TARGET_RECORD_COUNT && reader.next(key, value = skipRecordsInspector.getNextValue())) { + if (skipRecordsInspector.doSkipHeader(recordCount++)) { + continue; } - readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, recordCount); - recordCount++; + Object bufferedValue = skipRecordsInspector.bufferAdd(value); + if (bufferedValue != null) { + Object deSerializedValue = partitionSerDe.deserialize((Writable) bufferedValue); + if (partTblObjectInspectorConverter != null) { + deSerializedValue = partTblObjectInspectorConverter.convert(deSerializedValue); + } + readHiveRecordAndInsertIntoRecordBatch(deSerializedValue, skipRecordsInspector.getActualCount()); + skipRecordsInspector.incrementActualCount(); + } + skipRecordsInspector.incrementTempCount(); } - setValueCountAndPopulatePartitionVectors(recordCount); - return recordCount; + setValueCountAndPopulatePartitionVectors(skipRecordsInspector.getActualCount()); + skipRecordsInspector.updateContinuance(); + return skipRecordsInspector.getActualCount(); } catch (IOException | SerDeException e) { throw new DrillRuntimeException(e); } @@ -362,4 +390,126 @@ public class HiveRecordReader extends AbstractRecordReader { vector.getMutator().setValueCount(recordCount); } } + + /** + * SkipRecordsInspector encapsulates logic to skip header and footer from file. + * Logic is applicable only for predefined in constructor file formats. + */ + private class SkipRecordsInspector { + + private final Set<Object> fileFormats; + private int headerCount; + private int footerCount; + private Queue<Object> footerBuffer; + // indicates if we continue reading the same file + private boolean continuance; + private int holderIndex; + private List<Object> valueHolder; + private int actualCount; + // actualCount without headerCount, used to determine holderIndex + private int tempCount; + + private SkipRecordsInspector(Properties tableProperties, RecordReader reader) { + this.fileFormats = new HashSet<Object>(Arrays.asList(org.apache.hadoop.mapred.TextInputFormat.class.getName())); + this.headerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.HEADER_COUNT, 0); + this.footerCount = retrievePositiveIntProperty(tableProperties, serdeConstants.FOOTER_COUNT, 0); + this.footerBuffer = Lists.newLinkedList(); + this.continuance = false; + this.holderIndex = -1; + this.valueHolder = initializeValueHolder(reader, footerCount); + this.actualCount = 0; + this.tempCount = 0; + } + + private boolean doSkipHeader(int recordCount) { + return !continuance && recordCount < headerCount; + } + + private void reset() { + tempCount = holderIndex + 1; + actualCount = 0; + if (!continuance) { + footerBuffer.clear(); + } + } + + private Object bufferAdd(Object value) throws SerDeException { + footerBuffer.add(value); + if (footerBuffer.size() <= footerCount) { + return null; + } + return footerBuffer.poll(); + } + + private Object getNextValue() { + holderIndex = tempCount % getHolderSize(); + return valueHolder.get(holderIndex); + } + + private int getHolderSize() { + return valueHolder.size(); + } + + private void updateContinuance() { + this.continuance = actualCount != 0; + } + + private int incrementTempCount() { + return ++tempCount; + } + + private int getActualCount() { + return actualCount; + } + + private int incrementActualCount() { + return ++actualCount; + } + + /** + * Retrieves positive numeric property from Properties object by name. + * Return default value if + * 1. file format is absent in predefined file formats list + * 2. property doesn't exist in table properties + * 3. property value is negative + * otherwise casts value to int. + * + * @param tableProperties property holder + * @param propertyName name of the property + * @param defaultValue default value + * @return property numeric value + * @throws NumberFormatException if property value is non-numeric + */ + private int retrievePositiveIntProperty(Properties tableProperties, String propertyName, int defaultValue) { + int propertyIntValue = defaultValue; + if (!fileFormats.contains(tableProperties.get(hive_metastoreConstants.FILE_INPUT_FORMAT))) { + return propertyIntValue; + } + Object propertyObject = tableProperties.get(propertyName); + if (propertyObject != null) { + try { + propertyIntValue = Integer.valueOf((String) propertyObject); + } catch (NumberFormatException e) { + throw new NumberFormatException(String.format("Hive table property %s value '%s' is non-numeric", propertyName, propertyObject.toString())); + } + } + return propertyIntValue < 0 ? defaultValue : propertyIntValue; + } + + /** + * Creates buffer of objects to be used as values, so these values can be re-used. + * Objects number depends on number of lines to skip in the end of the file plus one object. + * + * @param reader RecordReader to return value object + * @param skipFooterLines number of lines to skip at the end of the file + * @return list of objects to be used as values + */ + private List<Object> initializeValueHolder(RecordReader reader, int skipFooterLines) { + List<Object> valueHolder = new ArrayList<>(skipFooterLines + 1); + for (int i = 0; i <= skipFooterLines; i++) { + valueHolder.add(reader.createValue()); + } + return valueHolder; + } + } } http://git-wip-us.apache.org/repos/asf/drill/blob/84ce21c9/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java index 55de2d7..078c375 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestHiveStorage.java @@ -18,6 +18,7 @@ package org.apache.drill.exec.hive; import com.google.common.collect.ImmutableMap; +import org.apache.drill.common.exceptions.UserRemoteException; import org.apache.drill.exec.ExecConstants; import org.apache.drill.exec.planner.physical.PlannerSettings; import org.apache.hadoop.fs.FileSystem; @@ -29,8 +30,11 @@ import org.junit.Test; import java.math.BigDecimal; import java.sql.Date; import java.sql.Timestamp; +import java.util.Map; +import static org.hamcrest.CoreMatchers.containsString; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; public class TestHiveStorage extends HiveTestBase { @BeforeClass @@ -409,6 +413,93 @@ public class TestHiveStorage extends HiveTestBase { } } + @Test // DRILL-3688 + public void readingFromSmallTableWithSkipHeaderAndFooter() throws Exception { + testBuilder() + .sqlQuery("select key, `value` from hive.skipper.kv_text_small order by key asc") + .ordered() + .baselineColumns("key", "value") + .baselineValues(1, "key_1") + .baselineValues(2, "key_2") + .baselineValues(3, "key_3") + .baselineValues(4, "key_4") + .baselineValues(5, "key_5") + .go(); + + testBuilder() + .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_small") + .unOrdered() + .baselineColumns("cnt") + .baselineValues(5L) + .go(); + } + + @Test // DRILL-3688 + public void readingFromLargeTableWithSkipHeaderAndFooter() throws Exception { + testBuilder() + .sqlQuery("select sum(key) as sum_keys from hive.skipper.kv_text_large") + .unOrdered() + .baselineColumns("sum_keys") + .baselineValues((long)(5000*(5000 + 1)/2)) + .go(); + + testBuilder() + .sqlQuery("select count(1) as cnt from hive.skipper.kv_text_large") + .unOrdered() + .baselineColumns("cnt") + .baselineValues(5000L) + .go(); + } + + @Test // DRILL-3688 + public void testIncorrectHeaderFooterProperty() throws Exception { + Map<String, String> testData = ImmutableMap.<String, String>builder() + .put("hive.skipper.kv_incorrect_skip_header","skip.header.line.count") + .put("hive.skipper.kv_incorrect_skip_footer", "skip.footer.line.count") + .build(); + + String query = "select * from %s"; + String exceptionMessage = "Hive table property %s value 'A' is non-numeric"; + + for (Map.Entry<String, String> entry : testData.entrySet()) { + try { + test(String.format(query, entry.getKey())); + } catch (UserRemoteException e) { + assertThat(e.getMessage(), containsString(String.format(exceptionMessage, entry.getValue()))); + } + } + } + + @Test // DRILL-3688 + public void testIgnoreSkipHeaderFooterForRcfile() throws Exception { + testBuilder() + .sqlQuery("select count(1) as cnt from hive.skipper.kv_rcfile_large") + .unOrdered() + .baselineColumns("cnt") + .baselineValues(5000L) + .go(); + } + + @Test // DRILL-3688 + public void testIgnoreSkipHeaderFooterForParquet() throws Exception { + testBuilder() + .sqlQuery("select count(1) as cnt from hive.skipper.kv_parquet_large") + .unOrdered() + .baselineColumns("cnt") + .baselineValues(5000L) + .go(); + } + + @Test // DRILL-3688 + public void testIgnoreSkipHeaderFooterForSequencefile() throws Exception { + testBuilder() + .sqlQuery("select count(1) as cnt from hive.skipper.kv_sequencefile_large") + .unOrdered() + .baselineColumns("cnt") + .baselineValues(5000L) + .go(); + } + @AfterClass public static void shutdownOptions() throws Exception { test(String.format("alter session set `%s` = false", PlannerSettings.ENABLE_DECIMAL_DATA_TYPE_KEY)); http://git-wip-us.apache.org/repos/asf/drill/blob/84ce21c9/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java index f9fc0ac..8144dc1 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/hive/TestInfoSchemaOnHiveStorage.java @@ -51,6 +51,19 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase { .baselineValues("hive.db1", "kv_db1") .baselineValues("hive.db1", "avro") .go(); + + testBuilder() + .sqlQuery("SHOW TABLES IN hive.skipper") + .unOrdered() + .baselineColumns("TABLE_SCHEMA", "TABLE_NAME") + .baselineValues("hive.skipper", "kv_text_small") + .baselineValues("hive.skipper", "kv_text_large") + .baselineValues("hive.skipper", "kv_incorrect_skip_header") + .baselineValues("hive.skipper", "kv_incorrect_skip_footer") + .baselineValues("hive.skipper", "kv_rcfile_large") + .baselineValues("hive.skipper", "kv_parquet_large") + .baselineValues("hive.skipper", "kv_sequencefile_large") + .go(); } @Test @@ -61,6 +74,7 @@ public class TestInfoSchemaOnHiveStorage extends HiveTestBase { .baselineColumns("SCHEMA_NAME") .baselineValues("hive.default") .baselineValues("hive.db1") + .baselineValues("hive.skipper") .baselineValues("dfs.default") .baselineValues("dfs.root") .baselineValues("dfs.tmp") http://git-wip-us.apache.org/repos/asf/drill/blob/84ce21c9/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java ---------------------------------------------------------------------- diff --git a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java index b38290b..56c768f 100644 --- a/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java +++ b/contrib/storage-hive/core/src/test/java/org/apache/drill/exec/store/hive/HiveTestDataGenerator.java @@ -25,6 +25,7 @@ import java.sql.Timestamp; import java.util.Map; import org.apache.commons.io.FileUtils; +import org.apache.commons.lang3.StringUtils; import org.apache.drill.BaseTestQuery; import org.apache.drill.common.exceptions.DrillException; import org.apache.drill.exec.store.StoragePluginRegistry; @@ -35,6 +36,7 @@ import org.apache.hadoop.hive.ql.Driver; import org.apache.hadoop.hive.ql.session.SessionState; import com.google.common.collect.Maps; +import org.apache.hadoop.hive.serde.serdeConstants; import static org.apache.drill.BaseTestQuery.getTempDir; import static org.apache.drill.exec.hive.HiveTestUtilities.executeQuery; @@ -464,6 +466,32 @@ public class HiveTestDataGenerator { FileUtils.deleteQuietly(new File(whDir, "kv_sh")); //executeQuery(hiveDriver, "INSERT OVERWRITE TABLE kv_sh SELECT * FROM kv"); + // Create text tables with skip header and footer table property + executeQuery(hiveDriver, "create database if not exists skipper"); + executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_small", "textfile", "1", "1")); + executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_small", 5, 1, 1)); + + executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_text_large", "textfile", "2", "2")); + executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_text_large", 5000, 2, 2)); + + executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_header", "textfile", "A", "1")); + executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_incorrect_skip_header", 5, 1, 1)); + + executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_incorrect_skip_footer", "textfile", "1", "A")); + executeQuery(hiveDriver, generateTestDataWithHeadersAndFooters("skipper.kv_incorrect_skip_footer", 5, 1, 1)); + + // Create rcfile table with skip header and footer table property + executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_rcfile_large", "rcfile", "1", "1")); + executeQuery(hiveDriver, "insert into table skipper.kv_rcfile_large select * from skipper.kv_text_large"); + + // Create parquet table with skip header and footer table property + executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_parquet_large", "parquet", "1", "1")); + executeQuery(hiveDriver, "insert into table skipper.kv_parquet_large select * from skipper.kv_text_large"); + + // Create sequencefile table with skip header and footer table property + executeQuery(hiveDriver, createTableWithHeaderFooterProperties("skipper.kv_sequencefile_large", "sequencefile", "1", "1")); + executeQuery(hiveDriver, "insert into table skipper.kv_sequencefile_large select * from skipper.kv_text_large"); + ss.close(); } @@ -520,4 +548,25 @@ public class HiveTestDataGenerator { return file.getPath(); } -} + + private String createTableWithHeaderFooterProperties(String tableName, String format, String headerValue, String footerValue) { + return String.format("create table %s (key int, value string) stored as %s tblproperties('%s'='%s', '%s'='%s')", + tableName, format, serdeConstants.HEADER_COUNT, headerValue, serdeConstants.FOOTER_COUNT, footerValue); + } + + private String generateTestDataWithHeadersAndFooters(String tableName, int rowCount, int headerLines, int footerLines) { + StringBuilder sb = new StringBuilder(); + sb.append("insert into table ").append(tableName).append(" (key, value) values "); + int length = sb.length(); + sb.append(StringUtils.repeat("('key_header', 'value_header')", ",", headerLines)); + for (int i = 1; i <= rowCount; i++) { + sb.append(",(").append(i).append(",").append("'key_").append(i).append("')"); + } + if (headerLines <= 0) { + sb.deleteCharAt(length); + } + sb.append(StringUtils.repeat(",('key_footer', 'value_footer')", footerLines)); + + return sb.toString(); + } +} \ No newline at end of file
