Repository: hive Updated Branches: refs/heads/master 8b7043626 -> b054174bb
HIVE-20694: Additional unit tests for VectorizedOrcAcidRowBatchReader min max key evaluation (Saurabh Seth via Eugene Koifman) Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/b054174b Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/b054174b Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/b054174b Branch: refs/heads/master Commit: b054174bb0eb8b692cafbb30194236fc75486e60 Parents: 8b70436 Author: Saurabh Seth <[email protected]> Authored: Tue Oct 9 16:02:25 2018 -0700 Committer: Eugene Koifman <[email protected]> Committed: Tue Oct 9 16:02:25 2018 -0700 ---------------------------------------------------------------------- .../TestVectorizedOrcAcidRowBatchReader.java | 380 +++++++++++++++++++ 1 file changed, 380 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/b054174b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java index 0a499b1..0b26879 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedOrcAcidRowBatchReader.java @@ -18,9 +18,11 @@ package org.apache.hadoop.hive.ql.io.orc; import java.io.File; +import java.util.ArrayList; import java.util.List; import java.util.Properties; +import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; @@ -43,9 +45,12 @@ import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.mapred.JobConf; import org.apache.hadoop.mapred.Reporter; +import org.apache.orc.OrcConf; +import org.apache.orc.StripeInformation; import org.apache.orc.TypeDescription; import org.junit.Before; import org.junit.Test; @@ -67,6 +72,8 @@ public class TestVectorizedOrcAcidRowBatchReader { private Path root; private ObjectInspector inspector; private ObjectInspector originalInspector; + private ObjectInspector bigRowInspector; + private ObjectInspector bigOriginalRowInspector; public static class DummyRow { LongWritable field; @@ -110,6 +117,49 @@ public class TestVectorizedOrcAcidRowBatchReader { } } + /** + * A larger Dummy row that can be used to write multiple stripes. + */ + public static class BigRow { + BytesWritable field; + RecordIdentifier rowId; + + BigRow(byte[] val) { + field = new BytesWritable(val); + } + + BigRow(byte[] val, long rowId, long origTxn, int bucket) { + field = new BytesWritable(val); + bucket = BucketCodec.V1.encode(new AcidOutputFormat.Options(null).bucket(bucket)); + this.rowId = new RecordIdentifier(origTxn, bucket, rowId); + } + + static String getColumnNamesProperty() { + return "field"; + } + static String getColumnTypesProperty() { + return "binary"; + } + } + + /** + * A larger Dummy row for original files that can be used to write multiple stripes. + */ + public static class BigOriginalRow { + BytesWritable field; + + BigOriginalRow(byte[] val) { + field = new BytesWritable(val); + } + + static String getColumnNamesProperty() { + return "field"; + } + static String getColumnTypesProperty() { + return "binary"; + } + } + @Before public void setup() throws Exception { conf = new JobConf(); @@ -122,6 +172,7 @@ public class TestVectorizedOrcAcidRowBatchReader { conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, DummyRow.getColumnTypesProperty()); conf.setBoolean(HiveConf.ConfVars.HIVE_VECTORIZATION_ENABLED.varname, true); conf.set(HiveConf.ConfVars.HIVE_ORC_SPLIT_STRATEGY.varname, "BI"); + OrcConf.ROWS_BETWEEN_CHECKS.setLong(conf, 1); Path workDir = new Path(System.getProperty("test.tmp.dir", "target" + File.separator + "test" + File.separator + "tmp")); @@ -135,6 +186,11 @@ public class TestVectorizedOrcAcidRowBatchReader { originalInspector = ObjectInspectorFactory.getReflectionObjectInspector(DummyOriginalRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + + bigRowInspector = ObjectInspectorFactory.getReflectionObjectInspector(BigRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + bigOriginalRowInspector = ObjectInspectorFactory.getReflectionObjectInspector(BigOriginalRow.class, + ObjectInspectorFactory.ObjectInspectorOptions.JAVA); } } @Test @@ -397,6 +453,182 @@ public class TestVectorizedOrcAcidRowBatchReader { } @Test + public void testDeleteEventFilteringOff3() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, false); + testDeleteEventFiltering3(); + } + + @Test + public void testDeleteEventFilteringOn3() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + testDeleteEventFiltering3(); + } + + @Test + public void testWithoutStatsDeleteEventFilteringOn3() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + OrcConf.ROW_INDEX_STRIDE.setLong(conf, 0); + testDeleteEventFiltering3(); + } + + private void testDeleteEventFiltering3() throws Exception { + boolean filterOn = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS); + boolean columnStatsPresent = OrcConf.ROW_INDEX_STRIDE.getLong(conf) != 0; + + // To create small stripes + OrcConf.STRIPE_SIZE.setLong(conf, 1); + // Need to use a bigger row than DummyRow for the writer to flush the stripes + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty()); + + // Use OrcRecordUpdater.OrcOptions to set the batch size. + OrcRecordUpdater.OrcOptions orcOptions = new OrcRecordUpdater.OrcOptions(conf); + orcOptions.orcOptions(OrcFile.writerOptions(conf).batchSize(1)); + + int bucket = 1; + + AcidOutputFormat.Options options = orcOptions.filesystem(fs) + .bucket(bucket) + .writingBase(true) + .minimumWriteId(10000002) + .maximumWriteId(10000002) + .inspector(bigRowInspector) + .reporter(Reporter.NULL) + .recordIdColumn(1) + .finalDestination(root); + + int bucketProperty = BucketCodec.V1.encode(options); + + // Create 3 stripes with 1 row each + byte[] data = new byte[1000]; + RecordUpdater updater = new OrcRecordUpdater(root, options); + updater.insert(10000002, new BigRow(data, 0, 0, bucket)); + updater.insert(10000002, new BigRow(data, 1, 0, bucket)); + updater.insert(10000002, new BigRow(data, 2, 0, bucket)); + updater.close(false); + + String acidFile = "base_10000002/bucket_00001"; + Path acidFilePath = new Path(root, acidFile); + + Reader reader = OrcFile.createReader(acidFilePath, OrcFile.readerOptions(conf)); + + List<StripeInformation> stripes = reader.getStripes(); + + // Make sure 3 stripes are created + assertEquals(3, stripes.size()); + + long fileLength = fs.getFileStatus(acidFilePath).getLen(); + + // 1. Splits within a stripe + // A split that's completely within the 2nd stripe + StripeInformation stripe = stripes.get(1); + OrcSplit split = new OrcSplit(acidFilePath, null, + stripe.getOffset() + 50, + stripe.getLength() - 100, + new String[] {"localhost"}, null, false, true, new ArrayList<>(), + fileLength, fileLength, root, null); + + validateKeyInterval(split, new RecordIdentifier(1, 1, 1), + new RecordIdentifier(0, 0, 0), filterOn); + + // A split that's completely within the last stripe + stripe = stripes.get(2); + split = new OrcSplit(acidFilePath, null, + stripe.getOffset() + 50, + stripe.getLength() - 100, + new String[] {"localhost"}, null, false, true, new ArrayList<>(), + fileLength, fileLength, root, null); + + validateKeyInterval(split, new RecordIdentifier(1, 1, 1), + new RecordIdentifier(0, 0, 0), filterOn); + + // 2. Splits starting at a stripe boundary + // A split that starts where the 1st stripe starts and ends before the 1st stripe ends + stripe = stripes.get(0); + split = new OrcSplit(acidFilePath, null, + stripe.getOffset(), + stripe.getLength() - 50, + new String[] {"localhost"}, null, false, true, new ArrayList<>(), + fileLength, fileLength, root, null); + + // The key interval for the 1st stripe + if (columnStatsPresent) { + validateKeyInterval(split, new RecordIdentifier(10000002, bucketProperty, 0), + new RecordIdentifier(10000002, bucketProperty, 0), filterOn); + } else { + validateKeyInterval(split, null, new RecordIdentifier(10000002, bucketProperty, 0), filterOn); + } + + // A split that starts where the 2nd stripe starts and ends after the 2nd stripe ends + stripe = stripes.get(1); + split = new OrcSplit(acidFilePath, null, + stripe.getOffset(), + stripe.getLength() + 50, + new String[] {"localhost"}, null, false, true, new ArrayList<>(), + fileLength, fileLength, root, null); + + // The key interval for the last 2 stripes + validateKeyInterval(split, new RecordIdentifier(10000002, bucketProperty, 1), + new RecordIdentifier(10000002, bucketProperty, 2), filterOn); + + // 3. Splits ending at a stripe boundary + // A split that starts before the last stripe starts and ends at the last stripe boundary + stripe = stripes.get(2); + split = new OrcSplit(acidFilePath, null, + stripe.getOffset() - 50, + stripe.getLength() + 50, + new String[] {"localhost"}, null, false, true, new ArrayList<>(), + fileLength, fileLength, root, null); + + // The key interval for the last stripe + validateKeyInterval(split, new RecordIdentifier(10000002, bucketProperty, 2), + new RecordIdentifier(10000002, bucketProperty, 2), filterOn); + + // A split that starts after the 1st stripe starts and ends where the last stripe ends + split = new OrcSplit(acidFilePath, null, + stripes.get(0).getOffset() + 50, + reader.getContentLength() - 50, + new String[] {"localhost"}, null, false, true, new ArrayList<>(), + fileLength, fileLength, root, null); + + // The key interval for the last 2 stripes + validateKeyInterval(split, new RecordIdentifier(10000002, bucketProperty, 1), + new RecordIdentifier(10000002, bucketProperty, 2), filterOn); + + // A split that starts where the 1st stripe starts and ends where the last stripe ends + split = new OrcSplit(acidFilePath, null, + stripes.get(0).getOffset(), + reader.getContentLength(), + new String[] {"localhost"}, null, false, true, new ArrayList<>(), + fileLength, fileLength, root, null); + + // The key interval for all 3 stripes + if (columnStatsPresent) { + validateKeyInterval(split, new RecordIdentifier(10000002, bucketProperty, 0), + new RecordIdentifier(10000002, bucketProperty, 2), filterOn); + } else { + validateKeyInterval(split, null, new RecordIdentifier(10000002, bucketProperty, 2), filterOn); + } + } + + private void validateKeyInterval(OrcSplit split, RecordIdentifier lowKey, RecordIdentifier highKey, boolean filterOn) + throws Exception { + VectorizedOrcAcidRowBatchReader vectorizedReader = + new VectorizedOrcAcidRowBatchReader(split, conf, Reporter.NULL, new VectorizedRowBatchCtx()); + + OrcRawRecordMerger.KeyInterval keyInterval = + vectorizedReader.getKeyInterval(); + SearchArgument sarg = vectorizedReader.getDeleteEventSarg(); + if(filterOn) { + assertEquals(new OrcRawRecordMerger.KeyInterval(lowKey, highKey), keyInterval); + } else { + assertEquals(new OrcRawRecordMerger.KeyInterval(null, null), keyInterval); + assertNull(sarg); + } + } + + @Test public void testDeleteEventOriginalFilteringOn() throws Exception { HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); testDeleteEventOriginalFiltering(); @@ -547,6 +779,154 @@ public class TestVectorizedOrcAcidRowBatchReader { } } + @Test + public void testDeleteEventOriginalFilteringOff2() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, false); + testDeleteEventOriginalFiltering2(); + } + + @Test + public void testDeleteEventOriginalFilteringOn2() throws Exception { + HiveConf.setBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS, true); + testDeleteEventOriginalFiltering2(); + } + + private void testDeleteEventOriginalFiltering2() throws Exception { + boolean filterOn = + HiveConf.getBoolVar(conf, HiveConf.ConfVars.FILTER_DELETE_EVENTS); + + conf.setBoolean(hive_metastoreConstants.TABLE_IS_TRANSACTIONAL, false); + + // Need to use a bigger row than DummyRow for the writer to flush the stripes + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS, BigRow.getColumnNamesProperty()); + conf.set(IOConstants.SCHEMA_EVOLUTION_COLUMNS_TYPES, BigRow.getColumnTypesProperty()); + + Properties properties = new Properties(); + + OrcFile.WriterOptions writerOptions = OrcFile.writerOptions(properties, conf); + writerOptions.inspector(bigOriginalRowInspector) + .stripeSize(1) + .batchSize(1); + + String originalFile = "000000_0"; + Path originalFilePath = new Path(root, originalFile); + + byte[] data = new byte[1000]; + Writer writer = OrcFile.createWriter(originalFilePath, writerOptions); + writer.addRow(new BigOriginalRow(data)); + writer.addRow(new BigOriginalRow(data)); + writer.addRow(new BigOriginalRow(data)); + writer.close(); + + Reader reader = OrcFile.createReader(originalFilePath, OrcFile.readerOptions(conf)); + + List<StripeInformation> stripes = reader.getStripes(); + + // Make sure 3 stripes are created + assertEquals(3, stripes.size()); + + FileStatus fileStatus = fs.getFileStatus(originalFilePath); + long fileLength = fileStatus.getLen(); + + // Set vector mode to true in the map work so that we can generate the syntheticProps + MapWork mapWork = new MapWork(); + mapWork.setVectorMode(true); + VectorizedRowBatchCtx vrbContext = new VectorizedRowBatchCtx(); + mapWork.setVectorizedRowBatchCtx(vrbContext); + HiveConf.setVar(conf, HiveConf.ConfVars.PLAN, "//tmp"); + Utilities.setMapWork(conf, mapWork); + + OrcSplit.OffsetAndBucketProperty syntheticProps = VectorizedOrcAcidRowBatchReader.computeOffsetAndBucket( + fileStatus, root, true, true, conf); + + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .bucket(0); + int bucketProperty = BucketCodec.V1.encode(options); + + // 1. Splits within a stripe + // A split that's completely within the 2nd stripe + StripeInformation stripe = stripes.get(1); + OrcSplit split = new OrcSplit(originalFilePath, null, + stripe.getOffset() + 50, + stripe.getLength() - 100, + new String[] {"localhost"}, null, true, true, new ArrayList<>(), + fileLength, fileLength, root, syntheticProps); + + validateKeyInterval(split, new RecordIdentifier(0, bucketProperty, 2), + new RecordIdentifier(0, bucketProperty, 1), filterOn); + + // A split that's completely within the last stripe + stripe = stripes.get(2); + split = new OrcSplit(originalFilePath, null, + stripe.getOffset() + 50, + stripe.getLength() - 100, + new String[] {"localhost"}, null, true, true, new ArrayList<>(), + fileLength, fileLength, root, syntheticProps); + + validateKeyInterval(split, new RecordIdentifier(0, bucketProperty, 3), + new RecordIdentifier(0, bucketProperty, 2), filterOn); + + // 2. Splits starting at a stripe boundary + // A split that starts where the 1st stripe starts and ends before the 1st stripe ends + stripe = stripes.get(0); + split = new OrcSplit(originalFilePath, null, + stripe.getOffset(), + stripe.getLength() - 50, + new String[] {"localhost"}, null, true, true, new ArrayList<>(), + fileLength, fileLength, root, syntheticProps); + + // The key interval for the 1st stripe + validateKeyInterval(split, new RecordIdentifier(0, bucketProperty, 0), + new RecordIdentifier(0, bucketProperty, 0), filterOn); + + // A split that starts where the 2nd stripe starts and ends after the 2nd stripe ends + stripe = stripes.get(1); + split = new OrcSplit(originalFilePath, null, + stripe.getOffset(), + stripe.getLength() + 50, + new String[] {"localhost"}, null, true, true, new ArrayList<>(), + fileLength, fileLength, root, syntheticProps); + + // The key interval for the last 2 stripes + validateKeyInterval(split, new RecordIdentifier(0, bucketProperty, 1), + new RecordIdentifier(0, bucketProperty, 2), filterOn); + + // 3. Splits ending at a stripe boundary + // A split that starts before the last stripe starts and ends at the last stripe boundary + stripe = stripes.get(2); + split = new OrcSplit(originalFilePath, null, + stripe.getOffset() - 50, + stripe.getLength() + 50, + new String[] {"localhost"}, null, true, true, new ArrayList<>(), + fileLength, fileLength, root, syntheticProps); + + // The key interval for the last stripe + validateKeyInterval(split, new RecordIdentifier(0, bucketProperty, 2), + new RecordIdentifier(0, bucketProperty, 2), filterOn); + + // A split that starts after the 1st stripe starts and ends where the last stripe ends + split = new OrcSplit(originalFilePath, null, + stripes.get(0).getOffset() + 50, + reader.getContentLength() - 50, + new String[] {"localhost"}, null, true, true, new ArrayList<>(), + fileLength, fileLength, root, syntheticProps); + + // The key interval for the last 2 stripes + validateKeyInterval(split, new RecordIdentifier(0, bucketProperty, 1), + new RecordIdentifier(0, bucketProperty, 2), filterOn); + + // A split that starts where the 1st stripe starts and ends where the last stripe ends + split = new OrcSplit(originalFilePath, null, + stripes.get(0).getOffset(), + reader.getContentLength(), + new String[] {"localhost"}, null, true, true, new ArrayList<>(), + fileLength, fileLength, root, syntheticProps); + + // The key interval for all 3 stripes + validateKeyInterval(split, new RecordIdentifier(0, bucketProperty, 0), + new RecordIdentifier(0, bucketProperty, 2), filterOn); + } + @Test public void testVectorizedOrcAcidRowBatchReader() throws Exception { conf.set("bucket_count", "1");
