Modified: hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml URL: http://svn.apache.org/viewvc/hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml (original) +++ hive/branches/vectorization/ql/src/test/results/compiler/plan/union.q.xml Mon Sep 23 20:40:54 2013 @@ -163,6 +163,9 @@ </void> </object> </void> + <void property="conf"> + <object class="org.apache.hadoop.hive.ql.plan.TableScanDesc"/> + </void> <void property="counterNames"> <object class="java.util.ArrayList"> <void method="add"> @@ -179,6 +182,26 @@ </void> </object> </void> + <void property="neededColumnIDs"> + <object class="java.util.ArrayList"> + <void method="add"> + <int>0</int> + </void> + <void method="add"> + <int>1</int> + </void> + </object> + </void> + <void property="neededColumns"> + <object class="java.util.ArrayList"> + <void method="add"> + <string>_col0</string> + </void> + <void method="add"> + <string>_col1</string> + </void> + </object> + </void> <void property="operatorId"> <string>TS_11</string> </void>
Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java (original) +++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/ColumnProjectionUtils.java Mon Sep 23 20:40:54 2013 @@ -32,87 +32,57 @@ public final class ColumnProjectionUtils public static final String READ_COLUMN_IDS_CONF_STR = "hive.io.file.readcolumn.ids"; public static final String READ_COLUMN_NAMES_CONF_STR = "hive.io.file.readcolumn.names"; + private static final String READ_COLUMN_IDS_CONF_STR_DEFAULT = ""; + private static final String READ_ALL_COLUMNS = "hive.io.file.read.all.columns"; + private static final boolean READ_ALL_COLUMNS_DEFAULT = true; /** - * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column - * is included in the list, RCFile's reader will not skip its value. - * + * Sets the <em>READ_ALL_COLUMNS</em> flag and removes any previously + * set column ids. */ - public static void setReadColumnIDs(Configuration conf, List<Integer> ids) { - String id = toReadColumnIDString(ids); - setReadColumnIDConf(conf, id); + public static void setReadAllColumns(Configuration conf) { + conf.setBoolean(READ_ALL_COLUMNS, true); + setReadColumnIDConf(conf, READ_COLUMN_IDS_CONF_STR_DEFAULT); } /** - * Sets read columns' ids(start from zero) for RCFile's Reader. Once a column - * is included in the list, RCFile's reader will not skip its value. - * + * Returns the <em>READ_ALL_COLUMNS</em> columns flag. */ - public static void appendReadColumnIDs(Configuration conf, List<Integer> ids) { - String id = toReadColumnIDString(ids); - if (id != null) { - String old = conf.get(READ_COLUMN_IDS_CONF_STR, null); - String newConfStr = id; - if (old != null) { - newConfStr = newConfStr + StringUtils.COMMA_STR + old; - } - - setReadColumnIDConf(conf, newConfStr); - } - } - - public static void appendReadColumnNames(Configuration conf, - List<String> cols) { - if (cols != null) { - String old = conf.get(READ_COLUMN_NAMES_CONF_STR, ""); - StringBuilder result = new StringBuilder(old); - boolean first = old.isEmpty(); - for(String col: cols) { - if (first) { - first = false; - } else { - result.append(','); - } - result.append(col); - } - conf.set(READ_COLUMN_NAMES_CONF_STR, result.toString()); - } + public static boolean isReadAllColumns(Configuration conf) { + return conf.getBoolean(READ_ALL_COLUMNS, READ_ALL_COLUMNS_DEFAULT); } - private static void setReadColumnIDConf(Configuration conf, String id) { - if (id == null || id.length() <= 0) { - conf.set(READ_COLUMN_IDS_CONF_STR, ""); - return; - } - - conf.set(READ_COLUMN_IDS_CONF_STR, id); + /** + * Appends read columns' ids (start from zero). Once a column + * is included in the list, a underlying record reader of a columnar file format + * (e.g. RCFile and ORC) can know what columns are needed. + */ + public static void appendReadColumns(Configuration conf, List<Integer> ids) { + String id = toReadColumnIDString(ids); + String old = conf.get(READ_COLUMN_IDS_CONF_STR, null); + String newConfStr = id; + if (old != null) { + newConfStr = newConfStr + StringUtils.COMMA_STR + old; + } + setReadColumnIDConf(conf, newConfStr); + // Set READ_ALL_COLUMNS to false + conf.setBoolean(READ_ALL_COLUMNS, false); } - private static String toReadColumnIDString(List<Integer> ids) { - String id = null; - if (ids != null) { - for (int i = 0; i < ids.size(); i++) { - if (i == 0) { - id = "" + ids.get(i); - } else { - id = id + StringUtils.COMMA_STR + ids.get(i); - } - } - } - return id; + public static void appendReadColumns( + Configuration conf, List<Integer> ids, List<String> names) { + appendReadColumns(conf, ids); + appendReadColumnNames(conf, names); } /** * Returns an array of column ids(start from zero) which is set in the given * parameter <tt>conf</tt>. */ - public static ArrayList<Integer> getReadColumnIDs(Configuration conf) { - if (conf == null) { - return new ArrayList<Integer>(0); - } - String skips = conf.get(READ_COLUMN_IDS_CONF_STR, ""); + public static List<Integer> getReadColumnIDs(Configuration conf) { + String skips = conf.get(READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT); String[] list = StringUtils.split(skips); - ArrayList<Integer> result = new ArrayList<Integer>(list.length); + List<Integer> result = new ArrayList<Integer>(list.length); for (String element : list) { // it may contain duplicates, remove duplicates Integer toAdd = Integer.parseInt(element); @@ -123,11 +93,39 @@ public final class ColumnProjectionUtils return result; } - /** - * Clears the read column ids set in the conf, and will read all columns. - */ - public static void setFullyReadColumns(Configuration conf) { - conf.set(READ_COLUMN_IDS_CONF_STR, ""); + private static void setReadColumnIDConf(Configuration conf, String id) { + if (id.trim().isEmpty()) { + conf.set(READ_COLUMN_IDS_CONF_STR, READ_COLUMN_IDS_CONF_STR_DEFAULT); + } else { + conf.set(READ_COLUMN_IDS_CONF_STR, id); + } + } + + private static void appendReadColumnNames(Configuration conf, List<String> cols) { + String old = conf.get(READ_COLUMN_NAMES_CONF_STR, ""); + StringBuilder result = new StringBuilder(old); + boolean first = old.isEmpty(); + for(String col: cols) { + if (first) { + first = false; + } else { + result.append(','); + } + result.append(col); + } + conf.set(READ_COLUMN_NAMES_CONF_STR, result.toString()); + } + + private static String toReadColumnIDString(List<Integer> ids) { + String id = ""; + for (int i = 0; i < ids.size(); i++) { + if (i == 0) { + id = id + ids.get(i); + } else { + id = id + StringUtils.COMMA_STR + ids.get(i); + } + } + return id; } private ColumnProjectionUtils() { Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java (original) +++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroDeserializer.java Mon Sep 23 20:40:54 2013 @@ -21,8 +21,10 @@ import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.nio.ByteBuffer; +import java.rmi.server.UID; import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; @@ -52,6 +54,20 @@ import org.apache.hadoop.io.Writable; class AvroDeserializer { private static final Log LOG = LogFactory.getLog(AvroDeserializer.class); /** + * Set of already seen and valid record readers IDs which doesn't need re-encoding + */ + private final HashSet<UID> noEncodingNeeded = new HashSet<UID>(); + /** + * Map of record reader ID and the associated re-encoder. It contains only the record readers + * that record needs to be re-encoded. + */ + private final HashMap<UID, SchemaReEncoder> reEncoderCache = new HashMap<UID, SchemaReEncoder>(); + /** + * Flag to print the re-encoding warning message only once. Avoid excessive logging for each + * record encoding. + */ + private static boolean warnedOnce = false; + /** * When encountering a record with an older schema than the one we're trying * to read, it is necessary to re-encode with a reader against the newer schema. * Because Hive doesn't provide a way to pass extra information to the @@ -64,16 +80,15 @@ class AvroDeserializer { private final ByteArrayOutputStream baos = new ByteArrayOutputStream(); private final GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>(); private BinaryDecoder binaryDecoder = null; - private final InstanceCache<ReaderWriterSchemaPair, GenericDatumReader<GenericRecord>> gdrCache - = new InstanceCache<ReaderWriterSchemaPair, GenericDatumReader<GenericRecord>>() { - @Override - protected GenericDatumReader<GenericRecord> makeInstance(ReaderWriterSchemaPair hv) { - return new GenericDatumReader<GenericRecord>(hv.getWriter(), hv.getReader()); - } - }; - public GenericRecord reencode(GenericRecord r, Schema readerSchema) - throws AvroSerdeException { + GenericDatumReader<GenericRecord> gdr = null; + + public SchemaReEncoder(Schema writer, Schema reader) { + gdr = new GenericDatumReader<GenericRecord>(writer, reader); + } + + public GenericRecord reencode(GenericRecord r) + throws AvroSerdeException { baos.reset(); BinaryEncoder be = EncoderFactory.get().directBinaryEncoder(baos, null); @@ -84,8 +99,6 @@ class AvroDeserializer { binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder(bais, binaryDecoder); - ReaderWriterSchemaPair pair = new ReaderWriterSchemaPair(r.getSchema(), readerSchema); - GenericDatumReader<GenericRecord> gdr = gdrCache.retrieve(pair); return gdr.read(r, binaryDecoder); } catch (IOException e) { @@ -95,7 +108,6 @@ class AvroDeserializer { } private List<Object> row; - private SchemaReEncoder reEncoder; /** * Deserialize an Avro record, recursing into its component fields and @@ -127,14 +139,31 @@ class AvroDeserializer { AvroGenericRecordWritable recordWritable = (AvroGenericRecordWritable) writable; GenericRecord r = recordWritable.getRecord(); - // Check if we're working with an evolved schema - if(!r.getSchema().equals(readerSchema)) { - LOG.warn("Received different schemas. Have to re-encode: " + - r.getSchema().toString(false)); - if(reEncoder == null) { - reEncoder = new SchemaReEncoder(); + UID recordReaderId = recordWritable.getRecordReaderID(); + //If the record reader (from which the record is originated) is already seen and valid, + //no need to re-encode the record. + if(!noEncodingNeeded.contains(recordReaderId)) { + SchemaReEncoder reEncoder = null; + //Check if the record record is already encoded once. If it does + //reuse the encoder. + if(reEncoderCache.containsKey(recordReaderId)) { + reEncoder = reEncoderCache.get(recordReaderId); //Reuse the re-encoder + } else if (!r.getSchema().equals(readerSchema)) { //Evolved schema? + //Create and store new encoder in the map for re-use + reEncoder = new SchemaReEncoder(r.getSchema(), readerSchema); + reEncoderCache.put(recordReaderId, reEncoder); + } else{ + LOG.info("Adding new valid RRID :" + recordReaderId); + noEncodingNeeded.add(recordReaderId); + } + if(reEncoder != null) { + if (!warnedOnce) { + LOG.warn("Received different schemas. Have to re-encode: " + + r.getSchema().toString(false) + "\nSIZE" + reEncoderCache + " ID " + recordReaderId); + warnedOnce = true; + } + r = reEncoder.reencode(r); } - r = reEncoder.reencode(r, readerSchema); } workerBase(row, columnNames, columnTypes, r); @@ -288,4 +317,13 @@ class AvroDeserializer { return map; } + + public HashSet<UID> getNoEncodingNeeded() { + return noEncodingNeeded; + } + + public HashMap<UID, SchemaReEncoder> getReEncoderCache() { + return reEncoderCache; + } + } Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java (original) +++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/avro/AvroGenericRecordWritable.java Mon Sep 23 20:40:54 2013 @@ -17,6 +17,13 @@ */ package org.apache.hadoop.hive.serde2.avro; +import java.io.DataInput; +import java.io.DataOutput; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.rmi.server.UID; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericDatumReader; @@ -28,12 +35,6 @@ import org.apache.avro.io.DecoderFactory import org.apache.avro.io.EncoderFactory; import org.apache.hadoop.io.Writable; -import java.io.DataInput; -import java.io.DataOutput; -import java.io.DataOutputStream; -import java.io.IOException; -import java.io.InputStream; - /** * Wrapper around an Avro GenericRecord. Necessary because Hive's deserializer * will happily deserialize any object - as long as it's a writable. @@ -41,6 +42,10 @@ import java.io.InputStream; public class AvroGenericRecordWritable implements Writable{ GenericRecord record; private BinaryDecoder binaryDecoder; + /** + * Unique Id determine which record reader created this record + */ + private UID recordReaderID; // There are two areas of exploration for optimization here. // 1. We're serializing the schema with every object. If we assume the schema @@ -68,6 +73,7 @@ public class AvroGenericRecordWritable i // Write schema since we need it to pull the data out. (see point #1 above) String schemaString = record.getSchema().toString(false); out.writeUTF(schemaString); + recordReaderID.write(out); // Write record to byte buffer GenericDatumWriter<GenericRecord> gdw = new GenericDatumWriter<GenericRecord>(); @@ -80,9 +86,18 @@ public class AvroGenericRecordWritable i @Override public void readFields(DataInput in) throws IOException { Schema schema = Schema.parse(in.readUTF()); + recordReaderID = UID.read(in); record = new GenericData.Record(schema); binaryDecoder = DecoderFactory.defaultFactory().createBinaryDecoder((InputStream) in, binaryDecoder); GenericDatumReader<GenericRecord> gdr = new GenericDatumReader<GenericRecord>(schema); record = gdr.read(record, binaryDecoder); } + + public UID getRecordReaderID() { + return recordReaderID; + } + + public void setRecordReaderID(UID recordReaderID) { + this.recordReaderID = recordReaderID; + } } Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java (original) +++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarSerDe.java Mon Sep 23 20:40:54 2013 @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.serde2.columnar; import java.io.IOException; +import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.Properties; @@ -78,9 +79,9 @@ public class ColumnarSerDe extends Colum * @see SerDe#initialize(Configuration, Properties) */ @Override - public void initialize(Configuration job, Properties tbl) throws SerDeException { + public void initialize(Configuration conf, Properties tbl) throws SerDeException { - serdeParams = LazySimpleSerDe.initSerdeParams(job, tbl, getClass().getName()); + serdeParams = LazySimpleSerDe.initSerdeParams(conf, tbl, getClass().getName()); // Create the ObjectInspectors for the fields. Note: Currently // ColumnarObject uses same ObjectInpector as LazyStruct @@ -89,14 +90,20 @@ public class ColumnarSerDe extends Colum .getSeparators(), serdeParams.getNullSequence(), serdeParams .isEscaped(), serdeParams.getEscapeChar()); - java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(job); - - cachedLazyStruct = new ColumnarStruct(cachedObjectInspector, notSkipIDs, - serdeParams.getNullSequence()); - int size = serdeParams.getColumnTypes().size(); + List<Integer> notSkipIDs = new ArrayList<Integer>(); + if (conf == null || ColumnProjectionUtils.isReadAllColumns(conf)) { + for (int i = 0; i < size; i++ ) { + notSkipIDs.add(i); + } + } else { + notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf); + } + cachedLazyStruct = new ColumnarStruct( + cachedObjectInspector, notSkipIDs, serdeParams.getNullSequence()); + super.initialize(size); - LOG.debug("ColumnarSerDe initialized with: columnNames=" + LOG.info("ColumnarSerDe initialized with: columnNames=" + serdeParams.getColumnNames() + " columnTypes=" + serdeParams.getColumnTypes() + " separator=" + Arrays.asList(serdeParams.getSeparators()) + " nullstring=" Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java (original) +++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStruct.java Mon Sep 23 20:40:54 2013 @@ -18,7 +18,7 @@ package org.apache.hadoop.hive.serde2.columnar; -import java.util.ArrayList; +import java.util.List; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -49,22 +49,10 @@ public class ColumnarStruct extends Colu * * @param oi * the ObjectInspector representing the type of this LazyStruct. - */ - public ColumnarStruct(ObjectInspector oi) { - this(oi, null, null); - } - - /** - * Construct a ColumnarStruct object with the TypeInfo. It creates the first - * level object at the first place - * - * @param oi - * the ObjectInspector representing the type of this LazyStruct. * @param notSkippedColumnIDs * the column ids that should not be skipped */ - public ColumnarStruct(ObjectInspector oi, - ArrayList<Integer> notSkippedColumnIDs, Text nullSequence) { + public ColumnarStruct(ObjectInspector oi, List<Integer> notSkippedColumnIDs, Text nullSequence) { super(oi, notSkippedColumnIDs); if (nullSequence != null) { this.nullSequence = nullSequence; @@ -84,7 +72,7 @@ public class ColumnarStruct extends Colu } return fieldLen; } - + @Override protected LazyObjectBase createLazyObjectBase(ObjectInspector objectInspector) { return LazyFactory.createLazyObject(objectInspector); Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java (original) +++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/ColumnarStructBase.java Mon Sep 23 20:40:54 2013 @@ -122,22 +122,13 @@ public abstract class ColumnarStructBase private FieldInfo[] fieldInfoList = null; private ArrayList<Object> cachedList; - public ColumnarStructBase(ObjectInspector oi, - ArrayList<Integer> notSkippedColumnIDs) { + public ColumnarStructBase(ObjectInspector oi, List<Integer> notSkippedColumnIDs) { List<? extends StructField> fieldRefs = ((StructObjectInspector) oi) .getAllStructFieldRefs(); int num = fieldRefs.size(); fieldInfoList = new FieldInfo[num]; - // if no columns is set to be skipped, add all columns in - // 'notSkippedColumnIDs' - if (notSkippedColumnIDs == null || notSkippedColumnIDs.size() == 0) { - for (int i = 0; i < num; i++) { - notSkippedColumnIDs.add(i); - } - } - for (int i = 0; i < num; i++) { ObjectInspector foi = fieldRefs.get(i).getFieldObjectInspector(); fieldInfoList[i] = new FieldInfo( Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java (original) +++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarSerDe.java Mon Sep 23 20:40:54 2013 @@ -17,22 +17,23 @@ */ package org.apache.hadoop.hive.serde2.columnar; +import java.util.ArrayList; import java.util.List; import java.util.Properties; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; import org.apache.hadoop.hive.serde2.SerDeException; -import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe.SerDeParameters; +import org.apache.hadoop.hive.serde2.lazy.LazyUtils; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinarySerDe; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; -import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; import org.apache.hadoop.io.Writable; @@ -66,9 +67,17 @@ public class LazyBinaryColumnarSerDe ext cachedObjectInspector = LazyBinaryFactory.createColumnarStructInspector( columnNames, columnTypes); - java.util.ArrayList<Integer> notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf); - cachedLazyStruct = new LazyBinaryColumnarStruct(cachedObjectInspector, notSkipIDs); int size = columnTypes.size(); + List<Integer> notSkipIDs = new ArrayList<Integer>(); + if (conf == null || ColumnProjectionUtils.isReadAllColumns(conf)) { + for (int i = 0; i < size; i++ ) { + notSkipIDs.add(i); + } + } else { + notSkipIDs = ColumnProjectionUtils.getReadColumnIDs(conf); + } + cachedLazyStruct = new LazyBinaryColumnarStruct(cachedObjectInspector, notSkipIDs); + super.initialize(size); } Modified: hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java (original) +++ hive/branches/vectorization/serde/src/java/org/apache/hadoop/hive/serde2/columnar/LazyBinaryColumnarStruct.java Mon Sep 23 20:40:54 2013 @@ -18,26 +18,22 @@ package org.apache.hadoop.hive.serde2.columnar; -import java.util.ArrayList; +import java.util.List; import org.apache.hadoop.hive.serde2.lazy.ByteArrayRef; import org.apache.hadoop.hive.serde2.lazy.LazyObjectBase; import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryFactory; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils; -import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; -import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector.PrimitiveCategory; public class LazyBinaryColumnarStruct extends ColumnarStructBase { - public LazyBinaryColumnarStruct(ObjectInspector oi, ArrayList<Integer> notSkippedColumnIDs) { + public LazyBinaryColumnarStruct(ObjectInspector oi, List<Integer> notSkippedColumnIDs) { super(oi, notSkippedColumnIDs); } - static VInt vInt = new LazyBinaryUtils.VInt(); - @Override protected int getLength(ObjectInspector objectInspector, ByteArrayRef cachedByteArrayRef, int start, int length) { @@ -48,8 +44,8 @@ public class LazyBinaryColumnarStruct ex if (category.equals(Category.PRIMITIVE)) { PrimitiveCategory primitiveCategory = ((PrimitiveObjectInspector) objectInspector) .getPrimitiveCategory(); - if (primitiveCategory.equals(PrimitiveCategory.STRING) && (length == 1) && - (cachedByteArrayRef.getData()[start] + if (primitiveCategory.equals(PrimitiveCategory.STRING) && (length == 1) && + (cachedByteArrayRef.getData()[start] == LazyBinaryColumnarSerDe.INVALID_UTF__SINGLE_BYTE[0])) { return 0; } Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java (original) +++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestAvroDeserializer.java Mon Sep 23 20:40:54 2013 @@ -23,6 +23,7 @@ import static org.junit.Assert.assertTru import java.io.IOException; import java.nio.ByteBuffer; +import java.rmi.server.UID; import java.util.ArrayList; import java.util.HashMap; import java.util.Hashtable; @@ -500,4 +501,64 @@ public class TestAvroDeserializer { assertEquals(expected, soi.getPrimitiveJavaObject(rowElement)); } } + + @Test + public void verifyCaching() throws SerDeException, IOException { + Schema s = Schema.parse(TestAvroObjectInspectorGenerator.RECORD_SCHEMA); + GenericData.Record record = new GenericData.Record(s); + GenericData.Record innerRecord = new GenericData.Record(s.getField("aRecord").schema()); + innerRecord.put("int1", 42); + innerRecord.put("boolean1", true); + innerRecord.put("long1", 42432234234l); + record.put("aRecord", innerRecord); + assertTrue(GENERIC_DATA.validate(s, record)); + + AvroGenericRecordWritable garw = Utils.serializeAndDeserializeRecord(record); + UID recordReaderID = new UID(); + garw.setRecordReaderID(recordReaderID); + AvroObjectInspectorGenerator aoig = new AvroObjectInspectorGenerator(s); + + AvroDeserializer de = new AvroDeserializer(); + ArrayList<Object> row = + (ArrayList<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + + assertEquals(1, de.getNoEncodingNeeded().size()); + assertEquals(0, de.getReEncoderCache().size()); + + // Read the record with the same record reader ID + row = (ArrayList<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + + //Expecting not to change the size of internal structures + assertEquals(1, de.getNoEncodingNeeded().size()); + assertEquals(0, de.getReEncoderCache().size()); + + //Read the record with **different** record reader ID + garw.setRecordReaderID(new UID()); //New record reader ID + row = (ArrayList<Object>) de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, s); + + //Expecting to change the size of internal structures + assertEquals(2, de.getNoEncodingNeeded().size()); + assertEquals(0, de.getReEncoderCache().size()); + + //Read the record with **different** record reader ID and **evolved** schema + Schema evolvedSchema = Schema.parse(s.toString()); + evolvedSchema.getField("aRecord").schema().addProp("Testing", "meaningless"); + garw.setRecordReaderID(recordReaderID = new UID()); //New record reader ID + row = + (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, evolvedSchema); + + //Expecting to change the size of internal structures + assertEquals(2, de.getNoEncodingNeeded().size()); + assertEquals(1, de.getReEncoderCache().size()); + + //Read the record with existing record reader ID and same **evolved** schema + garw.setRecordReaderID(recordReaderID); //Reuse record reader ID + row = + (ArrayList<Object>)de.deserialize(aoig.getColumnNames(), aoig.getColumnTypes(), garw, evolvedSchema); + + //Expecting NOT to change the size of internal structures + assertEquals(2, de.getNoEncodingNeeded().size()); + assertEquals(1, de.getReEncoderCache().size()); + + } } Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java (original) +++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestGenericAvroRecordWritable.java Mon Sep 23 20:40:54 2013 @@ -17,18 +17,19 @@ */ package org.apache.hadoop.hive.serde2.avro; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericRecord; -import org.junit.Test; +import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.rmi.server.UID; -import static org.junit.Assert.assertEquals; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericData; +import org.apache.avro.generic.GenericRecord; +import org.junit.Test; public class TestGenericAvroRecordWritable { private static final String schemaJSON = "{\n" + @@ -59,12 +60,14 @@ public class TestGenericAvroRecordWritab assertEquals("Doctor", gr.get("last")); AvroGenericRecordWritable garw = new AvroGenericRecordWritable(gr); + garw.setRecordReaderID(new UID()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream daos = new DataOutputStream(baos); garw.write(daos); AvroGenericRecordWritable garw2 = new AvroGenericRecordWritable(gr); + garw2.setRecordReaderID(new UID()); ByteArrayInputStream bais = new ByteArrayInputStream(baos.toByteArray()); DataInputStream dais = new DataInputStream(bais); Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java (original) +++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/TestSchemaReEncoder.java Mon Sep 23 20:40:54 2013 @@ -17,15 +17,15 @@ */ package org.apache.hadoop.hive.serde2.avro; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + import org.apache.avro.Schema; import org.apache.avro.generic.GenericData; import org.apache.avro.generic.GenericRecord; import org.apache.hadoop.hive.serde2.SerDeException; import org.junit.Test; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - public class TestSchemaReEncoder { @Test public void schemasCanAddFields() throws SerDeException { @@ -62,8 +62,8 @@ public class TestSchemaReEncoder { GenericRecord record = new GenericData.Record(originalSchema); record.put("text", "it is a far better thing I do, yadda, yadda"); assertTrue(GenericData.get().validate(originalSchema, record)); - AvroDeserializer.SchemaReEncoder schemaReEncoder = new AvroDeserializer.SchemaReEncoder(); - GenericRecord r2 = schemaReEncoder.reencode(record, evolvedSchema); + AvroDeserializer.SchemaReEncoder schemaReEncoder = new AvroDeserializer.SchemaReEncoder(record.getSchema(), evolvedSchema); + GenericRecord r2 = schemaReEncoder.reencode(record); assertTrue(GenericData.get().validate(evolvedSchema, r2)); assertEquals("Hi!", r2.get("new_kid").toString()); @@ -104,7 +104,8 @@ public class TestSchemaReEncoder { record.put("a", 19); assertTrue(GenericData.get().validate(originalSchema2, record)); - r2 = schemaReEncoder.reencode(record, evolvedSchema2); + schemaReEncoder = new AvroDeserializer.SchemaReEncoder(record.getSchema(), evolvedSchema2); + r2 = schemaReEncoder.reencode(record); assertTrue(GenericData.get().validate(evolvedSchema2, r2)); assertEquals(42l, r2.get("b")); } Modified: hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java URL: http://svn.apache.org/viewvc/hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java?rev=1525692&r1=1525691&r2=1525692&view=diff ============================================================================== --- hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java (original) +++ hive/branches/vectorization/serde/src/test/org/apache/hadoop/hive/serde2/avro/Utils.java Mon Sep 23 20:40:54 2013 @@ -17,13 +17,14 @@ */ package org.apache.hadoop.hive.serde2.avro; -import org.apache.avro.generic.GenericData; - import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.DataOutputStream; import java.io.IOException; +import java.rmi.server.UID; + +import org.apache.avro.generic.GenericData; class Utils { // Force Avro to serialize and de-serialize the record to make sure it has a @@ -31,6 +32,7 @@ class Utils { public static AvroGenericRecordWritable serializeAndDeserializeRecord(GenericData.Record record) throws IOException { AvroGenericRecordWritable garw = new AvroGenericRecordWritable(record); + garw.setRecordReaderID(new UID()); ByteArrayOutputStream baos = new ByteArrayOutputStream(); DataOutputStream daos = new DataOutputStream(baos); garw.write(daos); @@ -39,6 +41,7 @@ class Utils { DataInputStream dais = new DataInputStream(bais); AvroGenericRecordWritable garw2 = new AvroGenericRecordWritable(); + garw2.setRecordReaderID(new UID()); garw2.readFields(dais); return garw2; }
