http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java index 23e44f0..d3dc30d 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorSerDeRow.java @@ -71,6 +71,7 @@ import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.CharTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.hive.serde2.typeinfo.VarcharTypeInfo; import org.apache.hadoop.hive.serde2.fast.SerializeWrite; import org.apache.hadoop.io.BooleanWritable; @@ -331,13 +332,13 @@ public class TestVectorSerDeRow extends TestCase { void testVectorSerializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException { - Map<Integer, String> emptyScratchMap = new HashMap<Integer, String>(); + String[] emptyScratchTypeNames = new String[0]; RandomRowObjectSource source = new RandomRowObjectSource(); source.init(r); VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); - batchContext.init(emptyScratchMap, source.rowStructObjectInspector()); + batchContext.init(source.rowStructObjectInspector(), emptyScratchTypeNames); VectorizedRowBatch batch = batchContext.createVectorizedRowBatch(); VectorAssignRowSameBatch vectorAssignRow = new VectorAssignRowSameBatch(); @@ -563,13 +564,13 @@ public class TestVectorSerDeRow extends TestCase { void testVectorDeserializeRow(int caseNum, Random r, SerializationType serializationType) throws HiveException, IOException, SerDeException { - Map<Integer, String> emptyScratchMap = new HashMap<Integer, String>(); + String[] emptyScratchTypeNames = new String[0]; RandomRowObjectSource source = new RandomRowObjectSource(); source.init(r); VectorizedRowBatchCtx batchContext = new VectorizedRowBatchCtx(); - batchContext.init(emptyScratchMap, source.rowStructObjectInspector()); + batchContext.init(source.rowStructObjectInspector(), emptyScratchTypeNames); VectorizedRowBatch batch = batchContext.createVectorizedRowBatch(); int fieldCount = source.typeNames().size();
http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java deleted file mode 100644 index 473ebac..0000000 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/vector/TestVectorizedRowBatchCtx.java +++ /dev/null @@ -1,355 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hive.ql.exec.vector; - -import java.io.File; -import java.io.IOException; -import java.sql.Timestamp; -import java.util.Arrays; -import java.util.Calendar; -import java.util.List; -import java.util.Properties; - -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.FileSystem; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.hive.ql.io.RCFile; -import org.apache.hadoop.hive.ql.io.RCFileOutputFormat; -import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.serde.serdeConstants; -import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.SerDeUtils; -import org.apache.hadoop.hive.serde2.columnar.BytesRefArrayWritable; -import org.apache.hadoop.hive.serde2.columnar.BytesRefWritable; -import org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe; -import org.apache.hadoop.hive.serde2.io.ByteWritable; -import org.apache.hadoop.hive.serde2.io.DoubleWritable; -import org.apache.hadoop.hive.serde2.io.ShortWritable; -import org.apache.hadoop.hive.serde2.io.TimestampWritable; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.StructField; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; -import org.apache.hadoop.io.BooleanWritable; -import org.apache.hadoop.io.BytesWritable; -import org.apache.hadoop.io.DataOutputBuffer; -import org.apache.hadoop.io.FloatWritable; -import org.apache.hadoop.io.IntWritable; -import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.ObjectWritable; -import org.apache.hadoop.io.Text; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.compress.DefaultCodec; -import org.junit.Assert; -import org.junit.Before; -import org.junit.Test; - -/** - * Class that tests the functionality of VectorizedRowBatchCtx. - */ -public class TestVectorizedRowBatchCtx { - - private Configuration conf; - private FileSystem fs; - private Path testFilePath; - private int colCount; - private ColumnarSerDe serDe; - private Properties tbl; - - @Before - public void openFileSystem() throws Exception { - conf = new Configuration(); - fs = FileSystem.getLocal(conf); - Path workDir = new Path(System.getProperty("test.tmp.dir", - "target" + File.separator + "test" + File.separator + "tmp")); - fs.setWorkingDirectory(workDir); - testFilePath = new Path("TestVectorizedRowBatchCtx.testDump.rc"); - fs.delete(testFilePath, false); - } - - private void initSerde() { - tbl = new Properties(); - - // Set the configuration parameters - tbl.setProperty(serdeConstants.SERIALIZATION_FORMAT, "6"); - tbl.setProperty("columns", - "ashort,aint,along,adouble,afloat,astring,abyte,aboolean,atimestamp"); - tbl.setProperty("columns.types", - "smallint:int:bigint:double:float:string:tinyint:boolean:timestamp"); - colCount = 9; - tbl.setProperty(serdeConstants.SERIALIZATION_NULL_FORMAT, "NULL"); - - try { - serDe = new ColumnarSerDe(); - SerDeUtils.initializeSerDe(serDe, conf, tbl, null); - } catch (SerDeException e) { - throw new RuntimeException(e); - } - } - - private void WriteRCFile(FileSystem fs, Path file, Configuration conf) - throws IOException, SerDeException { - fs.delete(file, true); - - RCFileOutputFormat.setColumnNumber(conf, colCount); - RCFile.Writer writer = - new RCFile.Writer(fs, conf, file, null, null, - new DefaultCodec()); - - for (int i = 0; i < 10; ++i) { - BytesRefArrayWritable bytes = new BytesRefArrayWritable(colCount); - BytesRefWritable cu; - - if (i % 3 != 0) { - //if (i < 100) { - cu = new BytesRefWritable((i + "").getBytes("UTF-8"), 0, (i + "").getBytes("UTF-8").length); - bytes.set(0, cu); - - cu = new BytesRefWritable((i + 100 + "").getBytes("UTF-8"), 0, - (i + 100 + "").getBytes("UTF-8").length); - bytes.set(1, cu); - - cu = new BytesRefWritable((i + 200 + "").getBytes("UTF-8"), 0, - (i + 200 + "").getBytes("UTF-8").length); - bytes.set(2, cu); - - cu = new BytesRefWritable((i + 1.23 + "").getBytes("UTF-8"), 0, - (i + 1.23 + "").getBytes("UTF-8").length); - bytes.set(3, cu); - - cu = new BytesRefWritable((i + 2.23 + "").getBytes("UTF-8"), 0, - (i + 2.23 + "").getBytes("UTF-8").length); - bytes.set(4, cu); - - cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0, - ("Test string").getBytes("UTF-8").length); - bytes.set(5, cu); - - cu = new BytesRefWritable((1 + "").getBytes("UTF-8"), 0, - (1 + "").getBytes("UTF-8").length); - bytes.set(6, cu); - - cu = new BytesRefWritable(("true").getBytes("UTF-8"), 0, - ("true").getBytes("UTF-8").length); - bytes.set(7, cu); - - Timestamp t = new Timestamp(Calendar.getInstance().getTime().getTime()); - cu = new BytesRefWritable(t.toString().getBytes("UTF-8"), 0, - t.toString().getBytes("UTF-8").length); - bytes.set(8, cu); - - } else { - cu = new BytesRefWritable((i + "").getBytes("UTF-8"), 0, (i + "").getBytes("UTF-8").length); - bytes.set(0, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(1, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(2, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(3, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(4, cu); - - cu = new BytesRefWritable(("Test string").getBytes("UTF-8"), 0, - ("Test string").getBytes("UTF-8").length); - bytes.set(5, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(6, cu); - - cu = new BytesRefWritable(new byte[0], 0, 0); - bytes.set(7, cu); - -// cu = new BytesRefWritable(new byte[0], 0, 0); -// bytes.set(8, cu); - Timestamp t = new Timestamp(Calendar.getInstance().getTime().getTime()); - cu = new BytesRefWritable(t.toString().getBytes("UTF-8"), 0, - t.toString().getBytes("UTF-8").length); - bytes.set(8, cu); - } - writer.append(bytes); - } - writer.close(); - } - - private VectorizedRowBatch GetRowBatch() throws SerDeException, HiveException, IOException { - - RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf); - DataOutputBuffer buffer = new DataOutputBuffer(); - - // Get object inspector - StructObjectInspector oi = (StructObjectInspector) serDe - .getObjectInspector(); - List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs(); - - Assert.assertEquals("Field size should be 9", colCount, fieldRefs.size()); - - // Create the context - VectorizedRowBatchCtx ctx = new VectorizedRowBatchCtx(oi, oi, serDe, null, null); - VectorizedRowBatch batch = ctx.createVectorizedRowBatch(); - VectorizedBatchUtil.setNoNullFields(batch); - - // Iterate thru the rows and populate the batch - LongWritable rowID = new LongWritable(); - for (int i = 0; i < 10; i++) { - reader.next(rowID); - BytesRefArrayWritable cols = new BytesRefArrayWritable(); - reader.getCurrentRow(cols); - cols.resetValid(colCount); - ctx.addRowToBatch(i, cols, batch, buffer); - } - reader.close(); - batch.size = 10; - return batch; - } - - void ValidateRowBatch(VectorizedRowBatch batch) throws IOException, SerDeException { - - LongWritable rowID = new LongWritable(); - RCFile.Reader reader = new RCFile.Reader(fs, this.testFilePath, conf); - for (int i = 0; i < batch.size; i++) { - reader.next(rowID); - BytesRefArrayWritable cols = new BytesRefArrayWritable(); - reader.getCurrentRow(cols); - cols.resetValid(colCount); - Object row = serDe.deserialize(cols); - - StructObjectInspector oi = (StructObjectInspector) serDe - .getObjectInspector(); - List<? extends StructField> fieldRefs = oi.getAllStructFieldRefs(); - - for (int j = 0; j < fieldRefs.size(); j++) { - Object fieldData = oi.getStructFieldData(row, fieldRefs.get(j)); - ObjectInspector foi = fieldRefs.get(j).getFieldObjectInspector(); - - // Vectorization only supports PRIMITIVE data types. Assert the same - Assert.assertEquals(true, foi.getCategory() == Category.PRIMITIVE); - - PrimitiveObjectInspector poi = (PrimitiveObjectInspector) foi; - Object writableCol = poi.getPrimitiveWritableObject(fieldData); - if (writableCol != null) { - switch (poi.getPrimitiveCategory()) { - case BOOLEAN: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == (((BooleanWritable) writableCol).get() ? 1 - : 0)); - } - break; - case BYTE: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == (long) ((ByteWritable) writableCol).get()); - } - break; - case SHORT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == ((ShortWritable) writableCol).get()); - } - break; - case INT: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == ((IntWritable) writableCol).get()); - } - break; - case LONG: { - LongColumnVector lcv = (LongColumnVector) batch.cols[j]; - Assert.assertEquals(true, lcv.vector[i] == ((LongWritable) writableCol).get()); - } - break; - case FLOAT: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[j]; - Assert.assertEquals(true, dcv.vector[i] == ((FloatWritable) writableCol).get()); - } - break; - case DOUBLE: { - DoubleColumnVector dcv = (DoubleColumnVector) batch.cols[j]; - Assert.assertEquals(true, dcv.vector[i] == ((DoubleWritable) writableCol).get()); - } - break; - case BINARY: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[j]; - BytesWritable colBinary = (BytesWritable) writableCol; - BytesWritable batchBinary = (BytesWritable) bcv.getWritableObject(i); - byte[] a = colBinary.getBytes(); - byte[] b = batchBinary.getBytes(); - Assert.assertEquals(true, Arrays.equals(a, b)); - } - break; - case STRING: { - BytesColumnVector bcv = (BytesColumnVector) batch.cols[j]; - Text colText = (Text) writableCol; - Text batchText = (Text) bcv.getWritableObject(i); - String a = colText.toString(); - String b = batchText.toString(); - Assert.assertEquals(true, a.equals(b)); - } - break; - case TIMESTAMP: { - LongColumnVector tcv = (LongColumnVector) batch.cols[j]; - Timestamp t = ((TimestampWritable) writableCol).getTimestamp(); - long timeInNanoSec = (t.getTime() * 1000000) + (t.getNanos() % 1000000); - Assert.assertEquals(true, tcv.vector[i] == timeInNanoSec); - } - break; - default: - Assert.assertTrue("Unknown type", false); - } - } else { - Assert.assertEquals(true, batch.cols[j].isNull[i]); - } - } - - // Check repeating - Assert.assertEquals(false, batch.cols[0].isRepeating); - Assert.assertEquals(false, batch.cols[1].isRepeating); - Assert.assertEquals(false, batch.cols[2].isRepeating); - Assert.assertEquals(false, batch.cols[3].isRepeating); - Assert.assertEquals(false, batch.cols[4].isRepeating); - - // Check non null - Assert.assertEquals(true, batch.cols[0].noNulls); - Assert.assertEquals(false, batch.cols[1].noNulls); - Assert.assertEquals(false, batch.cols[2].noNulls); - Assert.assertEquals(false, batch.cols[3].noNulls); - Assert.assertEquals(false, batch.cols[4].noNulls); - } - reader.close(); - } - - @Test - public void TestCtx() throws Exception { - initSerde(); - WriteRCFile(this.fs, this.testFilePath, this.conf); - VectorizedRowBatch batch = GetRowBatch(); - ValidateRowBatch(batch); - - // Test VectorizedColumnarSerDe - VectorizedColumnarSerDe vcs = new VectorizedColumnarSerDe(); - SerDeUtils.initializeSerDe(vcs, this.conf, tbl, null); - Writable w = vcs.serializeVector(batch, (StructObjectInspector) serDe - .getObjectInspector()); - BytesRefArrayWritable[] refArray = (BytesRefArrayWritable[]) ((ObjectWritable) w).get(); - vcs.deserializeVector(refArray, 10, batch); - ValidateRowBatch(batch); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java index b9d6c27..b9eec92 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestInputOutputFormat.java @@ -59,6 +59,7 @@ import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatchCtx; import org.apache.hadoop.hive.ql.io.AcidInputFormat; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.CombineHiveInputFormat; @@ -66,9 +67,11 @@ import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.HiveOutputFormat; import org.apache.hadoop.hive.ql.io.InputFormatChecker; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat.SplitStrategy; +import org.apache.hadoop.hive.ql.io.orc.TestOrcRawRecordMerger.MyRow; import org.apache.hadoop.hive.ql.io.sarg.PredicateLeaf; import org.apache.hadoop.hive.ql.io.sarg.SearchArgument; import org.apache.hadoop.hive.ql.io.sarg.SearchArgumentFactory; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.plan.MapWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.ql.plan.TableDesc; @@ -189,6 +192,7 @@ public class TestInputOutputFormat { builder.append("}"); return builder.toString(); } + } public static class BigRowField implements StructField { @@ -331,6 +335,7 @@ public class TestInputOutputFormat { public Category getCategory() { return Category.STRUCT; } + } public static class MyRow implements Writable { @@ -350,6 +355,15 @@ public class TestInputOutputFormat { public void readFields(DataInput dataInput) throws IOException { throw new UnsupportedOperationException("no read"); } + + + static String getColumnNamesProperty() { + return "x,y"; + } + static String getColumnTypesProperty() { + return "int:int"; + } + } @Rule @@ -1130,6 +1144,8 @@ public class TestInputOutputFormat { // read the whole file + conf.set("columns", MyRow.getColumnNamesProperty()); + conf.set("columns.types", MyRow.getColumnTypesProperty()); org.apache.hadoop.mapred.RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); Object key = reader.createKey(); @@ -1250,6 +1266,8 @@ public class TestInputOutputFormat { InputSplit[] splits = in.getSplits(conf, 1); assertEquals(1, splits.length); ColumnProjectionUtils.appendReadColumns(conf, Collections.singletonList(1)); + conf.set("columns", "z,r"); + conf.set("columns.types", "int:struct<x:int,y:int>"); org.apache.hadoop.mapred.RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); Object key = reader.createKey(); @@ -1330,6 +1348,14 @@ public class TestInputOutputFormat { public void readFields(DataInput dataInput) throws IOException { throw new UnsupportedOperationException("no read"); } + + static String getColumnNamesProperty() { + return "str,str2"; + } + static String getColumnTypesProperty() { + return "string:string"; + } + } @Test @@ -1365,6 +1391,8 @@ public class TestInputOutputFormat { assertEquals(1, splits.length); // read the whole file + conf.set("columns", StringRow.getColumnNamesProperty()); + conf.set("columns.types", StringRow.getColumnTypesProperty()); org.apache.hadoop.mapred.RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); Object key = reader.createKey(); @@ -1405,6 +1433,7 @@ public class TestInputOutputFormat { * @param isVectorized should run vectorized * @return a JobConf that contains the necessary information * @throws IOException + * @throws HiveException */ JobConf createMockExecutionEnvironment(Path workDir, Path warehouseDir, @@ -1412,9 +1441,9 @@ public class TestInputOutputFormat { ObjectInspector objectInspector, boolean isVectorized, int partitions - ) throws IOException { - Utilities.clearWorkMap(); + ) throws IOException, HiveException { JobConf conf = new JobConf(); + Utilities.clearWorkMap(); conf.set("hive.exec.plan", workDir.toString()); conf.set("mapred.job.tracker", "local"); conf.set("hive.vectorized.execution.enabled", Boolean.toString(isVectorized)); @@ -1467,6 +1496,11 @@ public class TestInputOutputFormat { MapWork mapWork = new MapWork(); mapWork.setVectorMode(isVectorized); + if (isVectorized) { + VectorizedRowBatchCtx vectorizedRowBatchCtx = new VectorizedRowBatchCtx(); + vectorizedRowBatchCtx.init(structOI, new String[0]); + mapWork.setVectorizedRowBatchCtx(vectorizedRowBatchCtx); + } mapWork.setUseBucketizedHiveInputFormat(false); LinkedHashMap<String, ArrayList<String>> aliasMap = new LinkedHashMap<String, ArrayList<String>>(); @@ -1529,6 +1563,8 @@ public class TestInputOutputFormat { InputSplit[] splits = inputFormat.getSplits(conf, 10); assertEquals(1, splits.length); + conf.set("columns", MyRow.getColumnNamesProperty()); + conf.set("columns.types", MyRow.getColumnTypesProperty()); org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> reader = inputFormat.getRecordReader(splits[0], conf, Reporter.NULL); NullWritable key = reader.createKey(); @@ -1578,6 +1614,8 @@ public class TestInputOutputFormat { InputSplit[] splits = inputFormat.getSplits(conf, 10); assertEquals(1, splits.length); + conf.set("columns", MyRow.getColumnNamesProperty()); + conf.set("columns.types", MyRow.getColumnTypesProperty()); org.apache.hadoop.mapred.RecordReader<NullWritable, VectorizedRowBatch> reader = inputFormat.getRecordReader(splits[0], conf, Reporter.NULL); NullWritable key = reader.createKey(); @@ -1646,8 +1684,11 @@ public class TestInputOutputFormat { assertEquals("checking long " + i, i, longColumn.vector[i]); assertEquals("checking float " + i, i, floatColumn.vector[i], 0.0001); assertEquals("checking double " + i, i, doubleCoulmn.vector[i], 0.0001); + Text strValue = new Text(); + strValue.set(stringColumn.vector[i], stringColumn.start[i], + stringColumn.length[i]); assertEquals("checking string " + i, new Text(Long.toHexString(i)), - stringColumn.getWritableObject(i)); + strValue); assertEquals("checking decimal " + i, HiveDecimal.create(i), decimalColumn.vector[i].getHiveDecimal()); assertEquals("checking date " + i, i, dateColumn.vector[i]); @@ -1718,6 +1759,8 @@ public class TestInputOutputFormat { assertTrue(3 >= split.getLocations().length); // read split + conf.set("columns", MyRow.getColumnNamesProperty()); + conf.set("columns.types", MyRow.getColumnTypesProperty()); org.apache.hadoop.mapred.RecordReader<CombineHiveKey, OrcStruct> reader = inputFormat.getRecordReader(split, conf, Reporter.NULL); CombineHiveKey key = reader.createKey(); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java index 39f71f1..bfdc83f 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java @@ -27,13 +27,16 @@ import org.apache.hadoop.hive.common.ValidTxnList; import org.apache.hadoop.hive.common.ValidReadTxnList; import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; import org.apache.hadoop.hive.ql.io.RecordUpdater; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey; import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair; +import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.io.IntWritable; @@ -48,6 +51,8 @@ import org.junit.Test; import org.mockito.MockSettings; import org.mockito.Mockito; +import com.google.common.collect.Lists; + import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; @@ -354,6 +359,8 @@ public class TestOrcRawRecordMerger { Configuration conf = new Configuration(); conf.set("columns", "col1"); conf.set("columns.types", "string"); + conf.set(serdeConstants.LIST_COLUMNS, "col1"); + conf.set(serdeConstants.LIST_COLUMN_TYPES, "string"); Reader reader = Mockito.mock(Reader.class, settings); RecordReader recordReader = Mockito.mock(RecordReader.class, settings); @@ -362,6 +369,8 @@ public class TestOrcRawRecordMerger { typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1) .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5) .addSubtypes(6); + typeBuilder.addAllFieldNames(Lists.newArrayList("operation", "originalTransaction", "bucket", + "rowId", "currentTransaction", "row")); types.add(typeBuilder.build()); types.add(null); types.add(null); @@ -370,6 +379,10 @@ public class TestOrcRawRecordMerger { types.add(null); typeBuilder.clearSubtypes(); typeBuilder.addSubtypes(7); + typeBuilder.addAllFieldNames(Lists.newArrayList("col1")); + types.add(typeBuilder.build()); + typeBuilder.clear(); + typeBuilder.setKind(OrcProto.Type.Kind.STRING); types.add(typeBuilder.build()); Mockito.when(reader.getTypes()).thenReturn(types); @@ -466,6 +479,14 @@ public class TestOrcRawRecordMerger { col1 = new Text(val); ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); } + + static String getColumnNamesProperty() { + return "col1,ROW__ID"; + } + static String getColumnTypesProperty() { + return "string:struct<transactionId:bigint,bucketId:int,rowId:bigint>"; + } + } static String getValue(OrcStruct event) { @@ -499,6 +520,8 @@ public class TestOrcRawRecordMerger { BUCKET); Reader baseReader = OrcFile.createReader(basePath, OrcFile.readerOptions(conf)); + conf.set("columns", MyRow.getColumnNamesProperty()); + conf.set("columns.types", MyRow.getColumnTypesProperty()); OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, createMaximalTxnList(), new Reader.Options(), @@ -567,6 +590,10 @@ public class TestOrcRawRecordMerger { Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(), BUCKET); + + conf.set("columns", MyRow.getColumnNamesProperty()); + conf.set("columns.types", MyRow.getColumnTypesProperty()); + Reader baseReader = OrcFile.createReader(basePath, OrcFile.readerOptions(conf)); OrcRawRecordMerger merger = @@ -790,6 +817,13 @@ public class TestOrcRawRecordMerger { BigRow(long rowId, long origTxn, int bucket) { ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); } + + static String getColumnNamesProperty() { + return "myint,mylong,mytext,myfloat,mydouble,ROW__ID"; + } + static String getColumnTypesProperty() { + return "int:bigint:string:float:double:struct<transactionId:bigint,bucketId:int,rowId:bigint>"; + } } /** @@ -863,6 +897,8 @@ public class TestOrcRawRecordMerger { InputFormat inf = new OrcInputFormat(); JobConf job = new JobConf(); + job.set("columns", BigRow.getColumnNamesProperty()); + job.set("columns.types", BigRow.getColumnTypesProperty()); job.set("mapred.min.split.size", "1"); job.set("mapred.max.split.size", "2"); job.set("mapred.input.dir", root.toString()); @@ -967,6 +1003,8 @@ public class TestOrcRawRecordMerger { job.set("mapred.min.split.size", "1"); job.set("mapred.max.split.size", "2"); job.set("mapred.input.dir", root.toString()); + job.set("columns", BigRow.getColumnNamesProperty()); + job.set("columns.types", BigRow.getColumnTypesProperty()); InputSplit[] splits = inf.getSplits(job, 5); assertEquals(5, splits.length); org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr; @@ -1037,6 +1075,8 @@ public class TestOrcRawRecordMerger { job.set("mapred.max.split.size", "2"); job.set("mapred.input.dir", root.toString()); job.set("bucket_count", "1"); + job.set("columns", MyRow.getColumnNamesProperty()); + job.set("columns.types", MyRow.getColumnTypesProperty()); InputSplit[] splits = inf.getSplits(job, 5); assertEquals(1, splits.length); org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr; @@ -1104,6 +1144,8 @@ public class TestOrcRawRecordMerger { JobConf job = new JobConf(); job.set("mapred.input.dir", root.toString()); job.set("bucket_count", "2"); + job.set("columns", MyRow.getColumnNamesProperty()); + job.set("columns.types", MyRow.getColumnTypesProperty()); // read the keys before the delta is flushed InputSplit[] splits = inf.getSplits(job, 1); http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig new file mode 100644 index 0000000..15ee24c --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestOrcRawRecordMerger.java.orig @@ -0,0 +1,1150 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hive.ql.io.orc; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidTxnList; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; +import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.io.RecordUpdater; +import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.OriginalReaderPair; +import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderKey; +import org.apache.hadoop.hive.ql.io.orc.OrcRawRecordMerger.ReaderPair; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; +import org.apache.hadoop.hive.serde2.objectinspector.StructField; +import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.NullWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.InputFormat; +import org.apache.hadoop.mapred.InputSplit; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.Reporter; +import org.junit.Test; +import org.mockito.MockSettings; +import org.mockito.Mockito; + +import java.io.File; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNull; + +public class TestOrcRawRecordMerger { + + private static final Logger LOG = LoggerFactory.getLogger(TestOrcRawRecordMerger.class); +//todo: why is statementId -1? + @Test + public void testOrdering() throws Exception { + ReaderKey left = new ReaderKey(100, 200, 1200, 300); + ReaderKey right = new ReaderKey(); + right.setValues(100, 200, 1000, 200,1); + assertTrue(right.compareTo(left) < 0); + assertTrue(left.compareTo(right) > 0); + assertEquals(false, left.equals(right)); + left.set(right); + assertTrue(right.compareTo(left) == 0); + assertEquals(true, right.equals(left)); + right.setRowId(2000); + assertTrue(right.compareTo(left) > 0); + left.setValues(1, 2, 3, 4,-1); + right.setValues(100, 2, 3, 4,-1); + assertTrue(left.compareTo(right) < 0); + assertTrue(right.compareTo(left) > 0); + left.setValues(1, 2, 3, 4,-1); + right.setValues(1, 100, 3, 4,-1); + assertTrue(left.compareTo(right) < 0); + assertTrue(right.compareTo(left) > 0); + left.setValues(1, 2, 3, 100,-1); + right.setValues(1, 2, 3, 4,-1); + assertTrue(left.compareTo(right) < 0); + assertTrue(right.compareTo(left) > 0); + + // ensure that we are consistent when comparing to the base class + RecordIdentifier ri = new RecordIdentifier(1, 2, 3); + assertEquals(1, ri.compareTo(left)); + assertEquals(-1, left.compareTo(ri)); + assertEquals(false, ri.equals(left)); + assertEquals(false, left.equals(ri)); + } + + private static void setRow(OrcStruct event, + int operation, + long originalTransaction, + int bucket, + long rowId, + long currentTransaction, + String value) { + event.setFieldValue(OrcRecordUpdater.OPERATION, new IntWritable(operation)); + event.setFieldValue(OrcRecordUpdater.ORIGINAL_TRANSACTION, + new LongWritable(originalTransaction)); + event.setFieldValue(OrcRecordUpdater.BUCKET, new IntWritable(bucket)); + event.setFieldValue(OrcRecordUpdater.ROW_ID, new LongWritable(rowId)); + event.setFieldValue(OrcRecordUpdater.CURRENT_TRANSACTION, + new LongWritable(currentTransaction)); + OrcStruct row = new OrcStruct(1); + row.setFieldValue(0, new Text(value)); + event.setFieldValue(OrcRecordUpdater.ROW, row); + } + + private static String value(OrcStruct event) { + return OrcRecordUpdater.getRow(event).getFieldValue(0).toString(); + } + + private List<StripeInformation> createStripes(long... rowCounts) { + long offset = 0; + List<StripeInformation> result = + new ArrayList<StripeInformation>(rowCounts.length); + for(long count: rowCounts) { + OrcProto.StripeInformation.Builder stripe = + OrcProto.StripeInformation.newBuilder(); + stripe.setDataLength(800).setIndexLength(100).setFooterLength(100) + .setNumberOfRows(count).setOffset(offset); + offset += 1000; + result.add(new ReaderImpl.StripeInformationImpl(stripe.build())); + } + return result; + } + + // can add .verboseLogging() to cause Mockito to log invocations + private final MockSettings settings = Mockito.withSettings(); + private final Path tmpDir = new Path(System.getProperty("test.tmp.dir", + "target" + File.separator + "test" + File.separator + "tmp")); + + private Reader createMockReader() throws IOException { + Reader reader = Mockito.mock(Reader.class, settings); + RecordReader recordReader = Mockito.mock(RecordReader.class, settings); + OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first"); + OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second"); + OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third"); + OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth"); + OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth"); + Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class))) + .thenReturn(recordReader); + + Mockito.when(recordReader.hasNext()). + thenReturn(true, true, true, true, true, false); + + Mockito.when(recordReader.getProgress()).thenReturn(1.0f); + + Mockito.when(recordReader.next(null)).thenReturn(row1); + Mockito.when(recordReader.next(row1)).thenReturn(row2); + Mockito.when(recordReader.next(row2)).thenReturn(row3); + Mockito.when(recordReader.next(row3)).thenReturn(row4); + Mockito.when(recordReader.next(row4)).thenReturn(row5); + + return reader; + } + + @Test + public void testReaderPair() throws Exception { + ReaderKey key = new ReaderKey(); + Reader reader = createMockReader(); + RecordIdentifier minKey = new RecordIdentifier(10, 20, 30); + RecordIdentifier maxKey = new RecordIdentifier(40, 50, 60); + ReaderPair pair = new ReaderPair(key, reader, 20, minKey, maxKey, + new Reader.Options(), 0); + RecordReader recordReader = pair.recordReader; + assertEquals(10, key.getTransactionId()); + assertEquals(20, key.getBucketId()); + assertEquals(40, key.getRowId()); + assertEquals(120, key.getCurrentTransactionId()); + assertEquals("third", value(pair.nextRecord)); + + pair.next(pair.nextRecord); + assertEquals(40, key.getTransactionId()); + assertEquals(50, key.getBucketId()); + assertEquals(60, key.getRowId()); + assertEquals(130, key.getCurrentTransactionId()); + assertEquals("fourth", value(pair.nextRecord)); + + pair.next(pair.nextRecord); + assertEquals(null, pair.nextRecord); + Mockito.verify(recordReader).close(); + } + + @Test + public void testReaderPairNoMin() throws Exception { + ReaderKey key = new ReaderKey(); + Reader reader = createMockReader(); + + ReaderPair pair = new ReaderPair(key, reader, 20, null, null, + new Reader.Options(), 0); + RecordReader recordReader = pair.recordReader; + assertEquals(10, key.getTransactionId()); + assertEquals(20, key.getBucketId()); + assertEquals(20, key.getRowId()); + assertEquals(100, key.getCurrentTransactionId()); + assertEquals("first", value(pair.nextRecord)); + + pair.next(pair.nextRecord); + assertEquals(10, key.getTransactionId()); + assertEquals(20, key.getBucketId()); + assertEquals(30, key.getRowId()); + assertEquals(110, key.getCurrentTransactionId()); + assertEquals("second", value(pair.nextRecord)); + + pair.next(pair.nextRecord); + assertEquals(10, key.getTransactionId()); + assertEquals(20, key.getBucketId()); + assertEquals(40, key.getRowId()); + assertEquals(120, key.getCurrentTransactionId()); + assertEquals("third", value(pair.nextRecord)); + + pair.next(pair.nextRecord); + assertEquals(40, key.getTransactionId()); + assertEquals(50, key.getBucketId()); + assertEquals(60, key.getRowId()); + assertEquals(130, key.getCurrentTransactionId()); + assertEquals("fourth", value(pair.nextRecord)); + + pair.next(pair.nextRecord); + assertEquals(40, key.getTransactionId()); + assertEquals(50, key.getBucketId()); + assertEquals(61, key.getRowId()); + assertEquals(140, key.getCurrentTransactionId()); + assertEquals("fifth", value(pair.nextRecord)); + + pair.next(pair.nextRecord); + assertEquals(null, pair.nextRecord); + Mockito.verify(recordReader).close(); + } + + private static OrcStruct createOriginalRow(String value) { + OrcStruct result = new OrcStruct(1); + result.setFieldValue(0, new Text(value)); + return result; + } + + private Reader createMockOriginalReader() throws IOException { + Reader reader = Mockito.mock(Reader.class, settings); + RecordReader recordReader = Mockito.mock(RecordReader.class, settings); + OrcStruct row1 = createOriginalRow("first"); + OrcStruct row2 = createOriginalRow("second"); + OrcStruct row3 = createOriginalRow("third"); + OrcStruct row4 = createOriginalRow("fourth"); + OrcStruct row5 = createOriginalRow("fifth"); + + Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class))) + .thenReturn(recordReader); + Mockito.when(recordReader.hasNext()). + thenReturn(true, true, true, true, true, false); + Mockito.when(recordReader.getRowNumber()).thenReturn(0L, 1L, 2L, 3L, 4L); + Mockito.when(recordReader.next(null)).thenReturn(row1); + Mockito.when(recordReader.next(row1)).thenReturn(row2); + Mockito.when(recordReader.next(row2)).thenReturn(row3); + Mockito.when(recordReader.next(row3)).thenReturn(row4); + Mockito.when(recordReader.next(row4)).thenReturn(row5); + return reader; + } + + @Test + public void testOriginalReaderPair() throws Exception { + ReaderKey key = new ReaderKey(); + Reader reader = createMockOriginalReader(); + RecordIdentifier minKey = new RecordIdentifier(0, 10, 1); + RecordIdentifier maxKey = new RecordIdentifier(0, 10, 3); + boolean[] includes = new boolean[]{true, true}; + ReaderPair pair = new OriginalReaderPair(key, reader, 10, minKey, maxKey, + new Reader.Options().include(includes)); + RecordReader recordReader = pair.recordReader; + assertEquals(0, key.getTransactionId()); + assertEquals(10, key.getBucketId()); + assertEquals(2, key.getRowId()); + assertEquals(0, key.getCurrentTransactionId()); + assertEquals("third", value(pair.nextRecord)); + + pair.next(pair.nextRecord); + assertEquals(0, key.getTransactionId()); + assertEquals(10, key.getBucketId()); + assertEquals(3, key.getRowId()); + assertEquals(0, key.getCurrentTransactionId()); + assertEquals("fourth", value(pair.nextRecord)); + + pair.next(pair.nextRecord); + assertEquals(null, pair.nextRecord); + Mockito.verify(recordReader).close(); + } + + private static ValidTxnList createMaximalTxnList() { + return new ValidReadTxnList(Long.MAX_VALUE + ":"); + } + + @Test + public void testOriginalReaderPairNoMin() throws Exception { + ReaderKey key = new ReaderKey(); + Reader reader = createMockOriginalReader(); + ReaderPair pair = new OriginalReaderPair(key, reader, 10, null, null, + new Reader.Options()); + assertEquals("first", value(pair.nextRecord)); + assertEquals(0, key.getTransactionId()); + assertEquals(10, key.getBucketId()); + assertEquals(0, key.getRowId()); + assertEquals(0, key.getCurrentTransactionId()); + + pair.next(pair.nextRecord); + assertEquals("second", value(pair.nextRecord)); + assertEquals(0, key.getTransactionId()); + assertEquals(10, key.getBucketId()); + assertEquals(1, key.getRowId()); + assertEquals(0, key.getCurrentTransactionId()); + + pair.next(pair.nextRecord); + assertEquals("third", value(pair.nextRecord)); + assertEquals(0, key.getTransactionId()); + assertEquals(10, key.getBucketId()); + assertEquals(2, key.getRowId()); + assertEquals(0, key.getCurrentTransactionId()); + + pair.next(pair.nextRecord); + assertEquals("fourth", value(pair.nextRecord)); + assertEquals(0, key.getTransactionId()); + assertEquals(10, key.getBucketId()); + assertEquals(3, key.getRowId()); + assertEquals(0, key.getCurrentTransactionId()); + + pair.next(pair.nextRecord); + assertEquals("fifth", value(pair.nextRecord)); + assertEquals(0, key.getTransactionId()); + assertEquals(10, key.getBucketId()); + assertEquals(4, key.getRowId()); + assertEquals(0, key.getCurrentTransactionId()); + + pair.next(pair.nextRecord); + assertEquals(null, pair.nextRecord); + Mockito.verify(pair.recordReader).close(); + } + + @Test + public void testNewBase() throws Exception { + Configuration conf = new Configuration(); + conf.set("columns", "col1"); + conf.set("columns.types", "string"); + Reader reader = Mockito.mock(Reader.class, settings); + RecordReader recordReader = Mockito.mock(RecordReader.class, settings); + + List<OrcProto.Type> types = new ArrayList<OrcProto.Type>(); + OrcProto.Type.Builder typeBuilder = OrcProto.Type.newBuilder(); + typeBuilder.setKind(OrcProto.Type.Kind.STRUCT).addSubtypes(1) + .addSubtypes(2).addSubtypes(3).addSubtypes(4).addSubtypes(5) + .addSubtypes(6); + types.add(typeBuilder.build()); + types.add(null); + types.add(null); + types.add(null); + types.add(null); + types.add(null); + typeBuilder.clearSubtypes(); + typeBuilder.addSubtypes(7); + types.add(typeBuilder.build()); + + Mockito.when(reader.getTypes()).thenReturn(types); + Mockito.when(reader.rowsOptions(Mockito.any(Reader.Options.class))) + .thenReturn(recordReader); + + OrcStruct row1 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row1, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 20, 100, "first"); + OrcStruct row2 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row2, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 30, 110, "second"); + OrcStruct row3 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row3, OrcRecordUpdater.INSERT_OPERATION, 10, 20, 40, 120, "third"); + OrcStruct row4 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row4, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 60, 130, "fourth"); + OrcStruct row5 = new OrcStruct(OrcRecordUpdater.FIELDS); + setRow(row5, OrcRecordUpdater.INSERT_OPERATION, 40, 50, 61, 140, "fifth"); + + Mockito.when(recordReader.hasNext()). + thenReturn(true, true, true, true, true, false); + + Mockito.when(recordReader.getProgress()).thenReturn(1.0f); + + Mockito.when(recordReader.next(null)).thenReturn(row1, row4); + Mockito.when(recordReader.next(row1)).thenReturn(row2); + Mockito.when(recordReader.next(row2)).thenReturn(row3); + Mockito.when(recordReader.next(row3)).thenReturn(row5); + + Mockito.when(reader.getMetadataValue(OrcRecordUpdater.ACID_KEY_INDEX_NAME)) + .thenReturn(ByteBuffer.wrap("10,20,30;40,50,60;40,50,61" + .getBytes("UTF-8"))); + Mockito.when(reader.getStripes()) + .thenReturn(createStripes(2, 2, 1)); + + OrcRawRecordMerger merger = new OrcRawRecordMerger(conf, false, reader, + false, 10, createMaximalTxnList(), + new Reader.Options().range(1000, 1000), null); + RecordReader rr = merger.getCurrentReader().recordReader; + assertEquals(0, merger.getOtherReaders().size()); + + assertEquals(new RecordIdentifier(10, 20, 30), merger.getMinKey()); + assertEquals(new RecordIdentifier(40, 50, 60), merger.getMaxKey()); + RecordIdentifier id = merger.createKey(); + OrcStruct event = merger.createValue(); + + assertEquals(true, merger.next(id, event)); + assertEquals(10, id.getTransactionId()); + assertEquals(20, id.getBucketId()); + assertEquals(40, id.getRowId()); + assertEquals("third", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(40, id.getTransactionId()); + assertEquals(50, id.getBucketId()); + assertEquals(60, id.getRowId()); + assertEquals("fourth", getValue(event)); + + assertEquals(false, merger.next(id, event)); + assertEquals(1.0, merger.getProgress(), 0.01); + merger.close(); + Mockito.verify(rr).close(); + Mockito.verify(rr).getProgress(); + + StructObjectInspector eventObjectInspector = + (StructObjectInspector) merger.getObjectInspector(); + List<? extends StructField> fields = + eventObjectInspector.getAllStructFieldRefs(); + assertEquals(OrcRecordUpdater.FIELDS, fields.size()); + assertEquals("operation", + fields.get(OrcRecordUpdater.OPERATION).getFieldName()); + assertEquals("currentTransaction", + fields.get(OrcRecordUpdater.CURRENT_TRANSACTION).getFieldName()); + assertEquals("originalTransaction", + fields.get(OrcRecordUpdater.ORIGINAL_TRANSACTION).getFieldName()); + assertEquals("bucket", + fields.get(OrcRecordUpdater.BUCKET).getFieldName()); + assertEquals("rowId", + fields.get(OrcRecordUpdater.ROW_ID).getFieldName()); + StructObjectInspector rowObjectInspector = + (StructObjectInspector) fields.get(OrcRecordUpdater.ROW) + .getFieldObjectInspector(); + assertEquals("col1", + rowObjectInspector.getAllStructFieldRefs().get(0).getFieldName()); + } + + static class MyRow { + Text col1; + RecordIdentifier ROW__ID; + + MyRow(String val) { + col1 = new Text(val); + } + + MyRow(String val, long rowId, long origTxn, int bucket) { + col1 = new Text(val); + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } + } + + static String getValue(OrcStruct event) { + return OrcRecordUpdater.getRow(event).getFieldValue(0).toString(); + } + + @Test + public void testEmpty() throws Exception { + final int BUCKET = 0; + Configuration conf = new Configuration(); + OrcOutputFormat of = new OrcOutputFormat(); + FileSystem fs = FileSystem.getLocal(conf); + Path root = new Path(tmpDir, "testEmpty").makeQualified(fs); + fs.delete(root, true); + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // write the empty base + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .inspector(inspector).bucket(BUCKET).writingBase(true) + .maximumTransactionId(100).finalDestination(root); + of.getRecordUpdater(root, options).close(false); + + ValidTxnList txnList = new ValidReadTxnList("200:"); + AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList); + + Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(), + BUCKET); + Reader baseReader = OrcFile.createReader(basePath, + OrcFile.readerOptions(conf)); + OrcRawRecordMerger merger = + new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, + createMaximalTxnList(), new Reader.Options(), + AcidUtils.getPaths(directory.getCurrentDirectories())); + RecordIdentifier key = merger.createKey(); + OrcStruct value = merger.createValue(); + assertEquals(false, merger.next(key, value)); + } + + /** + * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is + * a base and a delta. + * @throws Exception + */ + @Test + public void testNewBaseAndDelta() throws Exception { + testNewBaseAndDelta(false); + testNewBaseAndDelta(true); + } + private void testNewBaseAndDelta(boolean use130Format) throws Exception { + final int BUCKET = 10; + String[] values = new String[]{"first", "second", "third", "fourth", + "fifth", "sixth", "seventh", "eighth", + "ninth", "tenth"}; + Configuration conf = new Configuration(); + OrcOutputFormat of = new OrcOutputFormat(); + FileSystem fs = FileSystem.getLocal(conf); + Path root = new Path(tmpDir, "testNewBaseAndDelta").makeQualified(fs); + fs.delete(root, true); + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // write the base + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .inspector(inspector).bucket(BUCKET).finalDestination(root); + if(!use130Format) { + options.statementId(-1); + } + RecordUpdater ru = of.getRecordUpdater(root, + options.writingBase(true).maximumTransactionId(100)); + for(String v: values) { + ru.insert(0, new MyRow(v)); + } + ru.close(false); + + // write a delta + ru = of.getRecordUpdater(root, options.writingBase(false) + .minimumTransactionId(200).maximumTransactionId(200).recordIdColumn(1)); + ru.update(200, new MyRow("update 1", 0, 0, BUCKET)); + ru.update(200, new MyRow("update 2", 2, 0, BUCKET)); + ru.update(200, new MyRow("update 3", 3, 0, BUCKET)); + ru.delete(200, new MyRow("", 7, 0, BUCKET)); + ru.delete(200, new MyRow("", 8, 0, BUCKET)); + ru.close(false); + + ValidTxnList txnList = new ValidReadTxnList("200:"); + AcidUtils.Directory directory = AcidUtils.getAcidState(root, conf, txnList); + + assertEquals(new Path(root, "base_0000100"), directory.getBaseDirectory()); + assertEquals(new Path(root, use130Format ? + AcidUtils.deltaSubdir(200,200,0) : AcidUtils.deltaSubdir(200,200)), + directory.getCurrentDirectories().get(0).getPath()); + + Path basePath = AcidUtils.createBucketFile(directory.getBaseDirectory(), + BUCKET); + Reader baseReader = OrcFile.createReader(basePath, + OrcFile.readerOptions(conf)); + OrcRawRecordMerger merger = + new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, + createMaximalTxnList(), new Reader.Options(), + AcidUtils.getPaths(directory.getCurrentDirectories())); + assertEquals(null, merger.getMinKey()); + assertEquals(null, merger.getMaxKey()); + RecordIdentifier id = merger.createKey(); + OrcStruct event = merger.createValue(); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.UPDATE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 0, 200), id); + assertEquals("update 1", getValue(event)); + assertFalse(merger.isDelete(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 1, 0), id); + assertEquals("second", getValue(event)); + assertFalse(merger.isDelete(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.UPDATE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 2, 200), id); + assertEquals("update 2", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.UPDATE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 3, 200), id); + assertEquals("update 3", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 4, 0), id); + assertEquals("fifth", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 5, 0), id); + assertEquals("sixth", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 6, 0), id); + assertEquals("seventh", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.DELETE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); + assertNull(OrcRecordUpdater.getRow(event)); + assertTrue(merger.isDelete(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.DELETE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); + assertNull(OrcRecordUpdater.getRow(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 9, 0), id); + assertEquals("tenth", getValue(event)); + + assertEquals(false, merger.next(id, event)); + merger.close(); + + // make a merger that doesn't collapse events + merger = new OrcRawRecordMerger(conf, false, baseReader, false, BUCKET, + createMaximalTxnList(), new Reader.Options(), + AcidUtils.getPaths(directory.getCurrentDirectories())); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.UPDATE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 0, 200), id); + assertEquals("update 1", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 0, 0), id); + assertEquals("first", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 1, 0), id); + assertEquals("second", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.UPDATE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 2, 200), id); + assertEquals("update 2", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 2, 0), id); + assertEquals("third", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.UPDATE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 3, 200), id); + assertEquals("update 3", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 3, 0), id); + assertEquals("fourth", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 4, 0), id); + assertEquals("fifth", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 5, 0), id); + assertEquals("sixth", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 6, 0), id); + assertEquals("seventh", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.DELETE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 7, 200), id); + assertNull(OrcRecordUpdater.getRow(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 7, 0), id); + assertEquals("eighth", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.DELETE_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 8, 200), id); + assertNull(OrcRecordUpdater.getRow(event)); + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 8, 0), id); + assertEquals("ninth", getValue(event)); + + assertEquals(true, merger.next(id, event)); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, 9, 0), id); + assertEquals("tenth", getValue(event)); + + assertEquals(false, merger.next(id, event)); + merger.close(); + + // try ignoring the 200 transaction and make sure it works still + ValidTxnList txns = new ValidReadTxnList("2000:200"); + merger = + new OrcRawRecordMerger(conf, true, baseReader, false, BUCKET, + txns, new Reader.Options(), + AcidUtils.getPaths(directory.getCurrentDirectories())); + for(int i=0; i < values.length; ++i) { + assertEquals(true, merger.next(id, event)); + LOG.info("id = " + id + "event = " + event); + assertEquals(OrcRecordUpdater.INSERT_OPERATION, + OrcRecordUpdater.getOperation(event)); + assertEquals(new ReaderKey(0, BUCKET, i, 0), id); + assertEquals(values[i], getValue(event)); + } + + assertEquals(false, merger.next(id, event)); + merger.close(); + } + + static class BigRow { + int myint; + long mylong; + Text mytext; + float myfloat; + double mydouble; + RecordIdentifier ROW__ID; + + BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble) { + this.myint = myint; + this.mylong = mylong; + this.mytext = new Text(mytext); + this.myfloat = myfloat; + this.mydouble = mydouble; + ROW__ID = null; + } + + BigRow(int myint, long mylong, String mytext, float myfloat, double mydouble, + long rowId, long origTxn, int bucket) { + this.myint = myint; + this.mylong = mylong; + this.mytext = new Text(mytext); + this.myfloat = myfloat; + this.mydouble = mydouble; + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } + + BigRow(long rowId, long origTxn, int bucket) { + ROW__ID = new RecordIdentifier(origTxn, bucket, rowId); + } + } + + /** + * Test the OrcRecordUpdater with the OrcRawRecordMerger when there is + * a base and a delta. + * @throws Exception + */ + @Test + public void testRecordReaderOldBaseAndDelta() throws Exception { + final int BUCKET = 10; + Configuration conf = new Configuration(); + OrcOutputFormat of = new OrcOutputFormat(); + FileSystem fs = FileSystem.getLocal(conf); + Path root = new Path(tmpDir, "testOldBaseAndDelta").makeQualified(fs); + fs.delete(root, true); + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // write the base + MemoryManager mgr = new MemoryManager(conf){ + int rowsAddedSinceCheck = 0; + + @Override + synchronized void addedRow(int rows) throws IOException { + rowsAddedSinceCheck += rows; + if (rowsAddedSinceCheck >= 2) { + notifyWriters(); + rowsAddedSinceCheck = 0; + } + } + }; + // make 5 stripes with 2 rows each + Writer writer = OrcFile.createWriter(new Path(root, "0000010_0"), + OrcFile.writerOptions(conf).inspector(inspector).fileSystem(fs) + .blockPadding(false).bufferSize(10000).compress(CompressionKind.NONE) + .stripeSize(1).memory(mgr).version(OrcFile.Version.V_0_11)); + String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3", + "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"}; + for(int i=0; i < values.length; ++i) { + writer.addRow(new BigRow(i, i, values[i], i, i)); + } + writer.close(); + + // write a delta + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf) + .writingBase(false).minimumTransactionId(1).maximumTransactionId(1) + .bucket(BUCKET).inspector(inspector).filesystem(fs).recordIdColumn(5).finalDestination(root); + RecordUpdater ru = of.getRecordUpdater(root, options); + values = new String[]{"0.0", null, null, "1.1", null, null, null, + "ignore.7"}; + for(int i=0; i < values.length; ++i) { + if (values[i] != null) { + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); + } + } + ru.delete(100, new BigRow(9, 0, BUCKET)); + ru.close(false); + + // write a delta + options = options.minimumTransactionId(2).maximumTransactionId(2); + ru = of.getRecordUpdater(root, options); + values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; + for(int i=0; i < values.length; ++i) { + if (values[i] != null) { + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); + } + } + ru.delete(100, new BigRow(8, 0, BUCKET)); + ru.close(false); + + InputFormat inf = new OrcInputFormat(); + JobConf job = new JobConf(); + job.set("mapred.min.split.size", "1"); + job.set("mapred.max.split.size", "2"); + job.set("mapred.input.dir", root.toString()); + InputSplit[] splits = inf.getSplits(job, 5); + assertEquals(5, splits.length); + org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr; + + // loop through the 5 splits and read each + for(int i=0; i < 4; ++i) { + System.out.println("starting split " + i); + rr = inf.getRecordReader(splits[i], job, Reporter.NULL); + NullWritable key = rr.createKey(); + OrcStruct value = rr.createValue(); + + // there should be exactly two rows per a split + for(int j=0; j < 2; ++j) { + System.out.println("i = " + i + ", j = " + j); + assertEquals(true, rr.next(key, value)); + System.out.println("record = " + value); + assertEquals(i + "." + j, value.getFieldValue(2).toString()); + } + assertEquals(false, rr.next(key, value)); + } + rr = inf.getRecordReader(splits[4], job, Reporter.NULL); + assertEquals(false, rr.next(rr.createKey(), rr.createValue())); + } + + /** + * Test the RecordReader when there is a new base and a delta. + * @throws Exception + */ + @Test + public void testRecordReaderNewBaseAndDelta() throws Exception { + final int BUCKET = 11; + Configuration conf = new Configuration(); + OrcOutputFormat of = new OrcOutputFormat(); + FileSystem fs = FileSystem.getLocal(conf); + Path root = new Path(tmpDir, "testRecordReaderNewBaseAndDelta").makeQualified(fs); + fs.delete(root, true); + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (BigRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // write the base + MemoryManager mgr = new MemoryManager(conf){ + int rowsAddedSinceCheck = 0; + + @Override + synchronized void addedRow(int rows) throws IOException { + rowsAddedSinceCheck += rows; + if (rowsAddedSinceCheck >= 2) { + notifyWriters(); + rowsAddedSinceCheck = 0; + } + } + }; + + // make 5 stripes with 2 rows each + OrcRecordUpdater.OrcOptions options = (OrcRecordUpdater.OrcOptions) + new OrcRecordUpdater.OrcOptions(conf) + .writingBase(true).minimumTransactionId(0).maximumTransactionId(0) + .bucket(BUCKET).inspector(inspector).filesystem(fs); + options.orcOptions(OrcFile.writerOptions(conf) + .stripeSize(1).blockPadding(false).compress(CompressionKind.NONE) + .memory(mgr)); + options.finalDestination(root); + RecordUpdater ru = of.getRecordUpdater(root, options); + String[] values= new String[]{"ignore.1", "0.1", "ignore.2", "ignore.3", + "2.0", "2.1", "3.0", "ignore.4", "ignore.5", "ignore.6"}; + for(int i=0; i < values.length; ++i) { + ru.insert(0, new BigRow(i, i, values[i], i, i)); + } + ru.close(false); + + // write a delta + options.writingBase(false).minimumTransactionId(1).maximumTransactionId(1).recordIdColumn(5); + ru = of.getRecordUpdater(root, options); + values = new String[]{"0.0", null, null, "1.1", null, null, null, + "ignore.7"}; + for(int i=0; i < values.length; ++i) { + if (values[i] != null) { + ru.update(1, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); + } + } + ru.delete(100, new BigRow(9, 0, BUCKET)); + ru.close(false); + + // write a delta + options.minimumTransactionId(2).maximumTransactionId(2); + ru = of.getRecordUpdater(root, options); + values = new String[]{null, null, "1.0", null, null, null, null, "3.1"}; + for(int i=0; i < values.length; ++i) { + if (values[i] != null) { + ru.update(2, new BigRow(i, i, values[i], i, i, i, 0, BUCKET)); + } + } + ru.delete(100, new BigRow(8, 0, BUCKET)); + ru.close(false); + + InputFormat inf = new OrcInputFormat(); + JobConf job = new JobConf(); + job.set("mapred.min.split.size", "1"); + job.set("mapred.max.split.size", "2"); + job.set("mapred.input.dir", root.toString()); + InputSplit[] splits = inf.getSplits(job, 5); + assertEquals(5, splits.length); + org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr; + + // loop through the 5 splits and read each + for(int i=0; i < 4; ++i) { + System.out.println("starting split " + i); + rr = inf.getRecordReader(splits[i], job, Reporter.NULL); + NullWritable key = rr.createKey(); + OrcStruct value = rr.createValue(); + + // there should be exactly two rows per a split + for(int j=0; j < 2; ++j) { + System.out.println("i = " + i + ", j = " + j); + assertEquals(true, rr.next(key, value)); + System.out.println("record = " + value); + assertEquals(i + "." + j, value.getFieldValue(2).toString()); + } + assertEquals(false, rr.next(key, value)); + } + rr = inf.getRecordReader(splits[4], job, Reporter.NULL); + assertEquals(false, rr.next(rr.createKey(), rr.createValue())); + } + + /** + * Test the RecordReader when there is a new base and a delta. + * @throws Exception + */ + @Test + public void testRecordReaderDelta() throws Exception { + final int BUCKET = 0; + Configuration conf = new Configuration(); + OrcOutputFormat of = new OrcOutputFormat(); + FileSystem fs = FileSystem.getLocal(conf); + Path root = new Path(tmpDir, "testRecordReaderDelta").makeQualified(fs); + fs.delete(root, true); + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // write a delta + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf) + .bucket(BUCKET).inspector(inspector).filesystem(fs) + .writingBase(false).minimumTransactionId(1).maximumTransactionId(1) + .finalDestination(root); + RecordUpdater ru = of.getRecordUpdater(root, options); + String[] values = new String[]{"a", "b", "c", "d", "e"}; + for(int i=0; i < values.length; ++i) { + ru.insert(1, new MyRow(values[i])); + } + ru.close(false); + + // write a delta + options.minimumTransactionId(2).maximumTransactionId(2); + ru = of.getRecordUpdater(root, options); + values = new String[]{"f", "g", "h", "i", "j"}; + for(int i=0; i < values.length; ++i) { + ru.insert(2, new MyRow(values[i])); + } + ru.close(false); + + InputFormat inf = new OrcInputFormat(); + JobConf job = new JobConf(); + job.set("mapred.min.split.size", "1"); + job.set("mapred.max.split.size", "2"); + job.set("mapred.input.dir", root.toString()); + job.set("bucket_count", "1"); + InputSplit[] splits = inf.getSplits(job, 5); + assertEquals(1, splits.length); + org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr; + rr = inf.getRecordReader(splits[0], job, Reporter.NULL); + values = new String[]{"a", "b", "c", "d", "e", "f", "g", "h", "i", "j"}; + OrcStruct row = rr.createValue(); + for(int i = 0; i < values.length; ++i) { + System.out.println("Checking " + i); + assertEquals(true, rr.next(NullWritable.get(), row)); + assertEquals(values[i], row.getFieldValue(0).toString()); + } + assertEquals(false, rr.next(NullWritable.get(), row)); + } + + /** + * Test the RecordReader when the delta has been flushed, but not closed. + * @throws Exception + */ + @Test + public void testRecordReaderIncompleteDelta() throws Exception { + testRecordReaderIncompleteDelta(false); + testRecordReaderIncompleteDelta(true); + } + /** + * + * @param use130Format true means use delta_0001_0001_0000 format, else delta_0001_00001 + */ + private void testRecordReaderIncompleteDelta(boolean use130Format) throws Exception { + final int BUCKET = 1; + Configuration conf = new Configuration(); + OrcOutputFormat of = new OrcOutputFormat(); + FileSystem fs = FileSystem.getLocal(conf).getRaw(); + Path root = new Path(tmpDir, "testRecordReaderIncompleteDelta").makeQualified(fs); + fs.delete(root, true); + ObjectInspector inspector; + synchronized (TestOrcFile.class) { + inspector = ObjectInspectorFactory.getReflectionObjectInspector + (MyRow.class, ObjectInspectorFactory.ObjectInspectorOptions.JAVA); + } + + // write a base + AcidOutputFormat.Options options = + new AcidOutputFormat.Options(conf) + .writingBase(true).minimumTransactionId(0).maximumTransactionId(0) + .bucket(BUCKET).inspector(inspector).filesystem(fs).finalDestination(root); + if(!use130Format) { + options.statementId(-1); + } + RecordUpdater ru = of.getRecordUpdater(root, options); + String[] values= new String[]{"1", "2", "3", "4", "5"}; + for(int i=0; i < values.length; ++i) { + ru.insert(0, new MyRow(values[i])); + } + ru.close(false); + + // write a delta + options.writingBase(false).minimumTransactionId(10) + .maximumTransactionId(19); + ru = of.getRecordUpdater(root, options); + values = new String[]{"6", "7", "8"}; + for(int i=0; i < values.length; ++i) { + ru.insert(1, new MyRow(values[i])); + } + InputFormat inf = new OrcInputFormat(); + JobConf job = new JobConf(); + job.set("mapred.input.dir", root.toString()); + job.set("bucket_count", "2"); + + // read the keys before the delta is flushed + InputSplit[] splits = inf.getSplits(job, 1); + assertEquals(2, splits.length); + org.apache.hadoop.mapred.RecordReader<NullWritable, OrcStruct> rr = + inf.getRecordReader(splits[0], job, Reporter.NULL); + NullWritable key = rr.createKey(); + OrcStruct value = rr.createValue(); + System.out.println("Looking at split " + splits[0]); + for(int i=1; i < 6; ++i) { + System.out.println("Checking row " + i); + assertEquals(true, rr.next(key, value)); + assertEquals(Integer.toString(i), value.getFieldValue(0).toString()); + } + assertEquals(false, rr.next(key, value)); + + ru.flush(); + ru.flush(); + values = new String[]{"9", "10"}; + for(int i=0; i < values.length; ++i) { + ru.insert(3, new MyRow(values[i])); + } + ru.flush(); + + splits = inf.getSplits(job, 1); + assertEquals(2, splits.length); + rr = inf.getRecordReader(splits[0], job, Reporter.NULL); + Path sideFile = new Path(root + "/" + (use130Format ? AcidUtils.deltaSubdir(10,19,0) : + AcidUtils.deltaSubdir(10,19)) + "/bucket_00001_flush_length"); + assertEquals(true, fs.exists(sideFile)); + assertEquals(24, fs.getFileStatus(sideFile).getLen()); + + for(int i=1; i < 11; ++i) { + assertEquals(true, rr.next(key, value)); + assertEquals(Integer.toString(i), value.getFieldValue(0).toString()); + } + assertEquals(false, rr.next(key, value)); + } + +} http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java index e72e5cf..3b35d07 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/TestVectorizedORCReader.java @@ -18,11 +18,13 @@ package org.apache.hadoop.hive.ql.io.orc; +import static org.junit.Assert.assertEquals; import java.io.File; import java.sql.Date; import java.sql.Timestamp; import java.util.Calendar; import java.util.Random; +import org.apache.hadoop.io.Text; import junit.framework.Assert; @@ -30,15 +32,23 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.type.HiveDecimal; +import org.apache.hadoop.hive.ql.exec.vector.BytesColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DecimalColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.DoubleColumnVector; +import org.apache.hadoop.hive.ql.exec.vector.LongColumnVector; import org.apache.hadoop.hive.ql.exec.vector.VectorizedRowBatch; +import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; +import org.apache.hadoop.hive.serde2.io.DoubleWritable; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; +import org.apache.hadoop.hive.serde2.io.ShortWritable; import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.io.BooleanWritable; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; -import org.apache.hadoop.io.NullWritable; import org.junit.Before; import org.junit.Test; @@ -149,42 +159,61 @@ public class TestVectorizedORCReader { row = (OrcStruct) rr.next(row); for (int j = 0; j < batch.cols.length; j++) { Object a = (row.getFieldValue(j)); - Object b = batch.cols[j].getWritableObject(i); - // Boolean values are stores a 1's and 0's, so convert and compare - if (a instanceof BooleanWritable) { + ColumnVector cv = batch.cols[j]; + // if the value is repeating, use row 0 + int rowId = cv.isRepeating ? 0 : i; + + // make sure the null flag agrees + if (a == null) { + Assert.assertEquals(true, !cv.noNulls && cv.isNull[rowId]); + } else if (a instanceof BooleanWritable) { + + // Boolean values are stores a 1's and 0's, so convert and compare Long temp = (long) (((BooleanWritable) a).get() ? 1 : 0); - Assert.assertEquals(true, temp.toString().equals(b.toString())); - continue; - } - // Timestamps are stored as long, so convert and compare - if (a instanceof TimestampWritable) { + long b = ((LongColumnVector) cv).vector[rowId]; + Assert.assertEquals(temp.toString(), Long.toString(b)); + } else if (a instanceof TimestampWritable) { + // Timestamps are stored as long, so convert and compare TimestampWritable t = ((TimestampWritable) a); // Timestamp.getTime() is overriden and is // long time = super.getTime(); // return (time + (nanos / 1000000)); Long timeInNanoSec = (t.getTimestamp().getTime() * 1000000) + (t.getTimestamp().getNanos() % 1000000); - Assert.assertEquals(true, timeInNanoSec.toString().equals(b.toString())); - continue; - } + long b = ((LongColumnVector) cv).vector[rowId]; + Assert.assertEquals(timeInNanoSec.toString(), Long.toString(b)); + + } else if (a instanceof DateWritable) { + // Dates are stored as long, so convert and compare - // Dates are stored as long, so convert and compare - if (a instanceof DateWritable) { DateWritable adt = (DateWritable) a; - Assert.assertEquals(adt.get().getTime(), DateWritable.daysToMillis((int) ((LongWritable) b).get())); - continue; - } + long b = ((LongColumnVector) cv).vector[rowId]; + Assert.assertEquals(adt.get().getTime(), + DateWritable.daysToMillis((int) b)); - // Decimals are stored as BigInteger, so convert and compare - if (a instanceof HiveDecimalWritable) { + } else if (a instanceof HiveDecimalWritable) { + // Decimals are stored as BigInteger, so convert and compare HiveDecimalWritable dec = (HiveDecimalWritable) a; + HiveDecimalWritable b = ((DecimalColumnVector) cv).vector[i]; Assert.assertEquals(dec, b); - } - if (null == a) { - Assert.assertEquals(true, (b == null || (b instanceof NullWritable))); + } else if (a instanceof DoubleWritable) { + + double b = ((DoubleColumnVector) cv).vector[rowId]; + assertEquals(a.toString(), Double.toString(b)); + } else if (a instanceof Text) { + BytesColumnVector bcv = (BytesColumnVector) cv; + Text b = new Text(); + b.set(bcv.vector[rowId], bcv.start[rowId], bcv.length[rowId]); + assertEquals(a, b); + } else if (a instanceof IntWritable || + a instanceof LongWritable || + a instanceof ByteWritable || + a instanceof ShortWritable) { + assertEquals(a.toString(), + Long.toString(((LongColumnVector) cv).vector[rowId])); } else { - Assert.assertEquals(true, b.toString().equals(a.toString())); + assertEquals("huh", a.getClass().getName()); } } } http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/queries/clientnegative/orc_change_fileformat.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/orc_change_fileformat.q b/ql/src/test/queries/clientnegative/orc_change_fileformat.q new file mode 100644 index 0000000..a0f89d9 --- /dev/null +++ b/ql/src/test/queries/clientnegative/orc_change_fileformat.q @@ -0,0 +1,3 @@ +SET hive.exec.schema.evolution=true; +create table src_orc (key tinyint, val string) stored as orc; +alter table src_orc set fileformat textfile; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/queries/clientnegative/orc_change_fileformat_acid.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/orc_change_fileformat_acid.q b/ql/src/test/queries/clientnegative/orc_change_fileformat_acid.q new file mode 100644 index 0000000..0fd287b --- /dev/null +++ b/ql/src/test/queries/clientnegative/orc_change_fileformat_acid.q @@ -0,0 +1,3 @@ +SET hive.exec.schema.evolution=false; +create table src_orc (key tinyint, val string) stored as orc TBLPROPERTIES ('transactional'='true'); +alter table src_orc set fileformat textfile; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/queries/clientnegative/orc_change_serde.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/orc_change_serde.q b/ql/src/test/queries/clientnegative/orc_change_serde.q new file mode 100644 index 0000000..49d56bd --- /dev/null +++ b/ql/src/test/queries/clientnegative/orc_change_serde.q @@ -0,0 +1,3 @@ +SET hive.exec.schema.evolution=true; +create table src_orc (key tinyint, val string) stored as orc; +alter table src_orc set serde 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/queries/clientnegative/orc_change_serde_acid.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/orc_change_serde_acid.q b/ql/src/test/queries/clientnegative/orc_change_serde_acid.q new file mode 100644 index 0000000..d317a28 --- /dev/null +++ b/ql/src/test/queries/clientnegative/orc_change_serde_acid.q @@ -0,0 +1,3 @@ +SET hive.exec.schema.evolution=false; +create table src_orc (key tinyint, val string) stored as orc TBLPROPERTIES ('transactional'='true'); +alter table src_orc set serde 'org.apache.hadoop.hive.serde2.columnar.ColumnarSerDe'; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/queries/clientnegative/orc_reorder_columns1.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/orc_reorder_columns1.q b/ql/src/test/queries/clientnegative/orc_reorder_columns1.q new file mode 100644 index 0000000..516c170 --- /dev/null +++ b/ql/src/test/queries/clientnegative/orc_reorder_columns1.q @@ -0,0 +1,3 @@ +SET hive.exec.schema.evolution=true; +create table src_orc (key tinyint, val string) stored as orc; +alter table src_orc change key k tinyint first; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/queries/clientnegative/orc_reorder_columns1_acid.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/orc_reorder_columns1_acid.q b/ql/src/test/queries/clientnegative/orc_reorder_columns1_acid.q new file mode 100644 index 0000000..2c6cc9f --- /dev/null +++ b/ql/src/test/queries/clientnegative/orc_reorder_columns1_acid.q @@ -0,0 +1,3 @@ +SET hive.exec.schema.evolution=false; +create table src_orc (key tinyint, val string) stored as orc TBLPROPERTIES ('transactional'='true'); +alter table src_orc change key k tinyint first; http://git-wip-us.apache.org/repos/asf/hive/blob/0fd9069e/ql/src/test/queries/clientnegative/orc_reorder_columns2.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientnegative/orc_reorder_columns2.q b/ql/src/test/queries/clientnegative/orc_reorder_columns2.q new file mode 100644 index 0000000..2acabdf --- /dev/null +++ b/ql/src/test/queries/clientnegative/orc_reorder_columns2.q @@ -0,0 +1,3 @@ +SET hive.exec.schema.evolution=true; +create table src_orc (key tinyint, val string) stored as orc; +alter table src_orc change key k tinyint after val;
