Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/BytesBytesMultiHashMap.java Tue Apr 14 14:47:30 2015 @@ -186,8 +186,9 @@ public final class BytesBytesMultiHashMa public class ThreadSafeGetter { private WriteBuffers.Position position = new WriteBuffers.Position(); - public byte getValueRefs(byte[] key, int length, List<WriteBuffers.ByteSegmentRef> result) { - return BytesBytesMultiHashMap.this.getValueRefs(key, length, result, position); + public byte getValueResult(byte[] key, int offset, int length, + BytesBytesMultiHashMap.Result hashMapResult) { + return BytesBytesMultiHashMap.this.getValueResult(key, offset, length, hashMapResult, position); } public void populateValue(WriteBuffers.ByteSegmentRef valueRef) { @@ -196,6 +197,227 @@ public final class BytesBytesMultiHashMa } } + /** + * The result of looking up a key in the multi-hash map. + * + * This object can read through the 0, 1, or more values found for the key. + */ + public static class Result { + + // Whether there are more than 0 rows. + private boolean hasRows; + + // We need a pointer to the hash map since this class must be static to support having + // multiple hash tables with Hybrid Grace partitioning. + private BytesBytesMultiHashMap hashMap; + + // And, a mutable read position for thread safety when sharing a hash map. + private WriteBuffers.Position readPos; + + // These values come from setValueResult when it finds a key. These values allow this + // class to read (and re-read) the values. + private long firstOffset; + private boolean hasList; + private long offsetAfterListRecordKeyLen; + + // When we have multiple values, we save the next value record's offset here. + private long nextTailOffset; + + // 0-based index of which row we are on. + private long readIndex; + + // A reference to the current row. + private WriteBuffers.ByteSegmentRef byteSegmentRef; + + public Result() { + hasRows = false; + byteSegmentRef = new WriteBuffers.ByteSegmentRef(); + } + + /** + * @return Whether there are 1 or more values. + */ + public boolean hasRows() { + // NOTE: Originally we named this isEmpty, but that name conflicted with another interface. + return hasRows; + } + + /** + * @return Whether there is just 1 value row. + */ + public boolean isSingleRow() { + return !hasList; + } + + /** + * Set internal values for reading the values after finding a key. + * + * @param hashMap + * The hash map we found the key in. + * @param firstOffset + * The absolute offset of the first record in the write buffers. + * @param hasList + * Whether there are multiple values (true) or just a single value (false). + * @param offsetAfterListRecordKeyLen + * The offset of just after the key length in the list record. Or, 0 when single row. + * @param readPos + * Holds mutable read position for thread safety. + */ + public void set(BytesBytesMultiHashMap hashMap, long firstOffset, boolean hasList, + long offsetAfterListRecordKeyLen, WriteBuffers.Position readPos) { + + this.hashMap = hashMap; + this.readPos = readPos; + + this.firstOffset = firstOffset; + this.hasList = hasList; + this.offsetAfterListRecordKeyLen = offsetAfterListRecordKeyLen; + + // Position at first row. + readIndex = 0; + nextTailOffset = -1; + + hasRows = true; + } + + public WriteBuffers.ByteSegmentRef first() { + if (!hasRows) { + return null; + } + + // Position at first row. + readIndex = 0; + nextTailOffset = -1; + + return internalRead(); + } + + public WriteBuffers.ByteSegmentRef next() { + if (!hasRows) { + return null; + } + + return internalRead(); + } + + /** + * Read the current value. + * + * @return + * The ByteSegmentRef to the current value read. + */ + private WriteBuffers.ByteSegmentRef internalRead() { + + if (!hasList) { + + /* + * Single value. + */ + + if (readIndex > 0) { + return null; + } + + // For a non-list (i.e. single value), the offset is for the variable length long (VLong) + // holding the value length (followed by the key length). + hashMap.writeBuffers.setReadPoint(firstOffset, readPos); + int valueLength = (int) hashMap.writeBuffers.readVLong(readPos); + + // The value is before the offset. Make byte segment reference absolute. + byteSegmentRef.reset(firstOffset - valueLength, valueLength); + hashMap.writeBuffers.populateValue(byteSegmentRef); + + readIndex++; + return byteSegmentRef; + } + + /* + * Multiple values. + */ + + if (readIndex == 0) { + // For a list, the value and key lengths of 1st record were overwritten with the + // relative offset to a new list record. + long relativeOffset = hashMap.writeBuffers.readNByteLong(firstOffset, 5, readPos); + + // At the beginning of the list record will be the value length. + hashMap.writeBuffers.setReadPoint(firstOffset + relativeOffset, readPos); + int valueLength = (int) hashMap.writeBuffers.readVLong(readPos); + + // The value is before the list record offset. Make byte segment reference absolute. + byteSegmentRef.reset(firstOffset - valueLength, valueLength); + hashMap.writeBuffers.populateValue(byteSegmentRef); + + readIndex++; + return byteSegmentRef; + } + + if (readIndex == 1) { + // We remembered the offset of just after the key length in the list record. + // Read the absolute offset to the 2nd value. + nextTailOffset = hashMap.writeBuffers.readNByteLong(offsetAfterListRecordKeyLen, 5, readPos); + if (nextTailOffset <= 0) { + throw new Error("Expecting a second value"); + } + } else if (nextTailOffset <= 0) { + return null; + } + + hashMap.writeBuffers.setReadPoint(nextTailOffset, readPos); + + // Get the value length. + int valueLength = (int) hashMap.writeBuffers.readVLong(readPos); + + // Now read the relative offset to next record. Next record is always before the + // previous record in the write buffers (see writeBuffers javadoc). + long delta = hashMap.writeBuffers.readVLong(readPos); + long newTailOffset = delta == 0 ? 0 : (nextTailOffset - delta); + + // The value is before the value record offset. Make byte segment reference absolute. + byteSegmentRef.reset(nextTailOffset - valueLength, valueLength); + hashMap.writeBuffers.populateValue(byteSegmentRef); + + nextTailOffset = newTailOffset; + readIndex++; + return byteSegmentRef; + } + + /** + * @return Whether we have read all the values or not. + */ + public boolean isEof() { + // LOG.info("BytesBytesMultiHashMap isEof hasRows " + hasRows + " hasList " + hasList + " readIndex " + readIndex + " nextTailOffset " + nextTailOffset); + if (!hasRows) { + return true; + } + + if (!hasList) { + return (readIndex > 0); + } else { + // Multiple values. + if (readIndex <= 1) { + // Careful: We have not read the list record and 2nd value yet, so nextTailOffset + // is not valid yet. + return false; + } else { + return (nextTailOffset <= 0); + } + } + } + + /** + * Lets go of any references to a hash map. + */ + public void forget() { + hashMap = null; + readPos = null; + byteSegmentRef.reset(0, 0); + hasRows = false; + readIndex = 0; + nextTailOffset = -1; + } + } + /** The source of keys and values to put into hashtable; avoids byte copying. */ public static interface KvSource { /** Write key into output. */ @@ -264,55 +486,41 @@ public final class BytesBytesMultiHashMa } /** Not thread-safe! Use createGetterForThread. */ - public byte getValueRefs(byte[] key, int length, List<WriteBuffers.ByteSegmentRef> result) { - return getValueRefs(key, length, result, writeBuffers.getReadPosition()); + public byte getValueResult(byte[] key, int offset, int length, Result hashMapResult) { + return getValueResult(key, offset, length, hashMapResult, writeBuffers.getReadPosition()); } /** - * Gets "lazy" values for a key (as a set of byte segments in underlying buffer). + * Finds a key. Values can be read with the supplied result object. + * * @param key Key buffer. - * @param length Length of the key in buffer. - * @param result The list to use to store the results. - * @return the state byte for the key (see class description). + * @param offset the offset to the key in the buffer + * @param hashMapResult The object to fill in that can read the values. + * @param readPos Holds mutable read position for thread safety. + * @return The state byte. */ - private byte getValueRefs(byte[] key, int length, - List<WriteBuffers.ByteSegmentRef> result, WriteBuffers.Position readPos) { + private byte getValueResult(byte[] key, int offset, int length, Result hashMapResult, + WriteBuffers.Position readPos) { + + hashMapResult.forget(); + // First, find first record for the key. - result.clear(); - long ref = findKeyRefToRead(key, length, readPos); + long ref = findKeyRefToRead(key, offset, length, readPos); if (ref == 0) { return 0; } + boolean hasList = Ref.hasList(ref); // This relies on findKeyRefToRead doing key equality check and leaving read ptr where needed. - long lrPtrOffset = hasList ? writeBuffers.getReadPoint(readPos) : 0; + long offsetAfterListRecordKeyLen = hasList ? writeBuffers.getReadPoint(readPos) : 0; - writeBuffers.setReadPoint(getFirstRecordLengthsOffset(ref, readPos), readPos); - int valueLength = (int)writeBuffers.readVLong(readPos); - result.add(new WriteBuffers.ByteSegmentRef(Ref.getOffset(ref) - valueLength, valueLength)); - byte stateByte = Ref.getStateByte(ref); - if (!hasList) { - return stateByte; - } + hashMapResult.set(this, Ref.getOffset(ref), hasList, offsetAfterListRecordKeyLen, + readPos); - // There're multiple records for the key; get the offset of the next one. - long nextTailOffset = writeBuffers.readNByteLong(lrPtrOffset, 5, readPos); - // LOG.info("Next tail offset " + nextTailOffset); - - while (nextTailOffset > 0) { - writeBuffers.setReadPoint(nextTailOffset, readPos); - valueLength = (int)writeBuffers.readVLong(readPos); - result.add(new WriteBuffers.ByteSegmentRef(nextTailOffset - valueLength, valueLength)); - // Now read the relative offset to next record. Next record is always before the - // previous record in the write buffers (see writeBuffers javadoc). - long delta = writeBuffers.readVLong(readPos); - nextTailOffset = delta == 0 ? 0 : (nextTailOffset - delta); - } - return stateByte; + return Ref.getStateByte(ref); } - /** * Take the segment reference from {@link #getValueRefs(byte[], int, List)} * result and makes it self-contained - adds byte array where the value is stored, and @@ -418,9 +626,10 @@ public final class BytesBytesMultiHashMa * @param length Read key length. * @return The ref to use for reading. */ - private long findKeyRefToRead(byte[] key, int length, WriteBuffers.Position readPos) { + private long findKeyRefToRead(byte[] key, int offset, int length, + WriteBuffers.Position readPos) { final int bucketMask = (refs.length - 1); - int hashCode = writeBuffers.hashCode(key, 0, length); + int hashCode = writeBuffers.hashCode(key, offset, length); int slot = hashCode & bucketMask; // LOG.info("Read hash code for " + Utils.toStringBinary(key, 0, length) // + " is " + Integer.toBinaryString(hashCode) + " - " + slot); @@ -432,7 +641,7 @@ public final class BytesBytesMultiHashMa if (ref == 0) { return 0; } - if (isSameKey(key, length, ref, hashCode, readPos)) { + if (isSameKey(key, offset, length, ref, hashCode, readPos)) { return ref; } ++metricGetConflict; @@ -502,7 +711,7 @@ public final class BytesBytesMultiHashMa /** * Same as {@link #isSameKey(long, int, long, int)} but for externally stored key. */ - private boolean isSameKey(byte[] key, int length, long ref, int hashCode, + private boolean isSameKey(byte[] key, int offset, int length, long ref, int hashCode, WriteBuffers.Position readPos) { if (!compareHashBits(ref, hashCode)) { return false; // Hash bits don't match. @@ -512,7 +721,11 @@ public final class BytesBytesMultiHashMa keyLength = (int)writeBuffers.readVLong(readPos); long keyOffset = Ref.getOffset(ref) - (valueLength + keyLength); // See the comment in the other isSameKey - return writeBuffers.isEqual(key, length, keyOffset, keyLength); + if (offset == 0) { + return writeBuffers.isEqual(key, length, keyOffset, keyLength); + } else { + return writeBuffers.isEqual(key, offset, length, keyOffset, keyLength); + } } private boolean compareHashBits(long ref, int hashCode) { @@ -673,7 +886,6 @@ public final class BytesBytesMultiHashMa public void debugDumpTable() { StringBuilder dump = new StringBuilder(keysAssigned + " keys\n"); TreeMap<Long, Integer> byteIntervals = new TreeMap<Long, Integer>(); - List<WriteBuffers.ByteSegmentRef> results = new ArrayList<WriteBuffers.ByteSegmentRef>(); int examined = 0; for (int slot = 0; slot < refs.length; ++slot) { long ref = refs[slot]; @@ -696,9 +908,17 @@ public final class BytesBytesMultiHashMa byteIntervals.put(keyOffset - 4, keyLength + 4); writeBuffers.populateValue(fakeRef); System.arraycopy(fakeRef.getBytes(), (int)fakeRef.getOffset(), key, 0, keyLength); - getValueRefs(key, key.length, results); dump.append(Utils.toStringBinary(key, 0, key.length)).append(" ref [").append(dumpRef(ref)) - .append("]: ").append(results.size()).append(" rows\n"); + .append("]: "); + Result hashMapResult = new Result(); + getValueResult(key, 0, key.length, hashMapResult); + List<WriteBuffers.ByteSegmentRef> results = new ArrayList<WriteBuffers.ByteSegmentRef>(); + WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.first(); + while (byteSegmentRef != null) { + results.add(hashMapResult.byteSegmentRef); + byteSegmentRef = hashMapResult.next(); + } + dump.append(results.size()).append(" rows\n"); for (int i = 0; i < results.size(); ++i) { WriteBuffers.ByteSegmentRef segment = results.get(i); byteIntervals.put(segment.getOffset(),
Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/FlatRowContainer.java Tue Apr 14 14:47:30 2015 @@ -208,6 +208,16 @@ public class FlatRowContainer extends Ab } @Override + public boolean hasRows() throws HiveException { + return rowCount() > 0; + } + + @Override + public boolean isSingleRow() throws HiveException { + return rowCount() == 1; + } + + @Override public int rowCount() throws HiveException { return rowLength > 0 ? (array.length / rowLength) : -rowLength; // see rowLength javadoc } Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/HybridHashTableContainer.java Tue Apr 14 14:47:30 2015 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.p import com.esotericsoftware.kryo.Kryo; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -566,7 +567,7 @@ public class HybridHashTableContainer im @Override public MapJoinRowContainer getCurrentRows() { - return currentValue.isEmpty() ? null : currentValue; + return !currentValue.hasRows() ? null : currentValue; } @Override @@ -579,8 +580,8 @@ public class HybridHashTableContainer im private class ReusableRowContainer implements MapJoinRowContainer, AbstractRowContainer.RowIterator<List<Object>> { private byte aliasFilter; - private List<WriteBuffers.ByteSegmentRef> refs; - private int currentRow; + private BytesBytesMultiHashMap.Result hashMapResult; + /** * Sometimes, when container is empty in multi-table mapjoin, we need to add a dummy row. * This container does not normally support adding rows; this is for the dummy row. @@ -600,6 +601,7 @@ public class HybridHashTableContainer im valueStruct = null; // No rows? } uselessIndirection = new ByteArrayRef(); + hashMapResult = new BytesBytesMultiHashMap.Result(); clearRows(); } @@ -611,57 +613,58 @@ public class HybridHashTableContainer im * the evaluation for this big table row will be postponed. */ public JoinUtil.JoinResult setFromOutput(Output output) throws HiveException { - if (refs == null) { - refs = new ArrayList<WriteBuffers.ByteSegmentRef>(0); - } - int keyHash = WriteBuffers.murmurHash(output.getData(), 0, output.getLength()); partitionId = keyHash & (hashPartitions.length - 1); // If the target hash table is on disk, spill this row to disk as well to be processed later if (isOnDisk(partitionId)) { toSpillPartitionId = partitionId; - refs.clear(); + hashMapResult.forget(); return JoinUtil.JoinResult.SPILL; } else { - byte aliasFilter = hashPartitions[partitionId].hashMap.getValueRefs( - output.getData(), output.getLength(), refs); - this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; - this.dummyRow = null; - if (refs.isEmpty()) { - return JoinUtil.JoinResult.NOMATCH; - } - else { + aliasFilter = hashPartitions[partitionId].hashMap.getValueResult(output.getData(), 0, output.getLength(), hashMapResult); + dummyRow = null; + if (hashMapResult.hasRows()) { return JoinUtil.JoinResult.MATCH; + } else { + aliasFilter = (byte) 0xff; + return JoinUtil.JoinResult.NOMATCH; } } } - public boolean isEmpty() { - return refs.isEmpty() && (dummyRow == null); + @Override + public boolean hasRows() { + return hashMapResult.hasRows() || (dummyRow != null); + } + + @Override + public boolean isSingleRow() { + if (!hashMapResult.hasRows()) { + return (dummyRow != null); + } + return hashMapResult.isSingleRow(); } // Implementation of row container @Override - public RowIterator<List<Object>> rowIter() throws HiveException { - currentRow = -1; + public AbstractRowContainer.RowIterator<List<Object>> rowIter() throws HiveException { return this; } @Override public int rowCount() throws HiveException { - return dummyRow != null ? 1 : refs.size(); + // For performance reasons we do not want to chase the values to the end to determine + // the count. Use hasRows and isSingleRow instead. + throw new UnsupportedOperationException("Getting the row count not supported"); } @Override public void clearRows() { // Doesn't clear underlying hashtable - if (refs != null) { - refs.clear(); - } + hashMapResult.forget(); dummyRow = null; - currentRow = -1; aliasFilter = (byte) 0xff; } @@ -678,36 +681,47 @@ public class HybridHashTableContainer im // Implementation of row iterator @Override public List<Object> first() throws HiveException { - currentRow = 0; - return next(); - } - - @Override - public List<Object> next() throws HiveException { + // A little strange that we forget the dummy row on read. if (dummyRow != null) { List<Object> result = dummyRow; dummyRow = null; return result; } - if (currentRow < 0 || refs.size() < currentRow) throw new HiveException("No rows"); - if (refs.size() == currentRow) return null; - WriteBuffers.ByteSegmentRef ref = refs.get(currentRow++); + + WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.first(); + if (byteSegmentRef == null) { + return null; + } else { + return uppack(byteSegmentRef); + } + + } + + @Override + public List<Object> next() throws HiveException { + + WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.next(); + if (byteSegmentRef == null) { + return null; + } else { + return uppack(byteSegmentRef); + } + + } + + private List<Object> uppack(WriteBuffers.ByteSegmentRef ref) throws HiveException { if (ref.getLength() == 0) { return EMPTY_LIST; // shortcut, 0 length means no fields } - if (ref.getBytes() == null) { - // partitionId is derived from previously calculated value in setFromOutput() - hashPartitions[partitionId].hashMap.populateValue(ref); - } uselessIndirection.setData(ref.getBytes()); valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength()); - return valueStruct.getFieldsAsList(); + return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that? } @Override public void addRow(List<Object> t) { - if (dummyRow != null || !refs.isEmpty()) { + if (dummyRow != null || hashMapResult.hasRows()) { throw new RuntimeException("Cannot add rows when not empty"); } dummyRow = t; Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinBytesTableContainer.java Tue Apr 14 14:47:30 2015 @@ -417,7 +417,7 @@ public class MapJoinBytesTableContainer @Override public MapJoinRowContainer getCurrentRows() { - return currentValue.isEmpty() ? null : currentValue; + return !currentValue.hasRows() ? null : currentValue; } @Override @@ -430,10 +430,11 @@ public class MapJoinBytesTableContainer private class ReusableRowContainer implements MapJoinRowContainer, AbstractRowContainer.RowIterator<List<Object>> { private byte aliasFilter; + /** Hash table wrapper specific to the container. */ private final BytesBytesMultiHashMap.ThreadSafeGetter threadSafeHashMapGetter; - private List<WriteBuffers.ByteSegmentRef> refs; - private int currentRow; + private BytesBytesMultiHashMap.Result hashMapResult; + /** * Sometimes, when container is empty in multi-table mapjoin, we need to add a dummy row. * This container does not normally support adding rows; this is for the dummy row. @@ -452,48 +453,55 @@ public class MapJoinBytesTableContainer } uselessIndirection = new ByteArrayRef(); threadSafeHashMapGetter = hashMap.createGetterForThread(); + hashMapResult = new BytesBytesMultiHashMap.Result(); clearRows(); } public JoinUtil.JoinResult setFromOutput(Output output) { - if (refs == null) { - refs = new ArrayList<WriteBuffers.ByteSegmentRef>(); - } - byte aliasFilter = threadSafeHashMapGetter.getValueRefs( - output.getData(), output.getLength(), refs); - this.aliasFilter = refs.isEmpty() ? (byte) 0xff : aliasFilter; - this.dummyRow = null; - if (refs.isEmpty()) { - return JoinUtil.JoinResult.NOMATCH; - } else { + + aliasFilter = threadSafeHashMapGetter.getValueResult( + output.getData(), 0, output.getLength(), hashMapResult); + dummyRow = null; + if (hashMapResult.hasRows()) { return JoinUtil.JoinResult.MATCH; + } else { + aliasFilter = (byte) 0xff; + return JoinUtil.JoinResult.NOMATCH; } + + } + + @Override + public boolean hasRows() { + return hashMapResult.hasRows() || (dummyRow != null); } - public boolean isEmpty() { - return refs.isEmpty() && (dummyRow == null); + @Override + public boolean isSingleRow() { + if (!hashMapResult.hasRows()) { + return (dummyRow != null); + } + return hashMapResult.isSingleRow(); } // Implementation of row container @Override public AbstractRowContainer.RowIterator<List<Object>> rowIter() throws HiveException { - currentRow = -1; return this; } @Override public int rowCount() throws HiveException { - return dummyRow != null ? 1 : refs.size(); + // For performance reasons we do not want to chase the values to the end to determine + // the count. Use hasRows and isSingleRow instead. + throw new UnsupportedOperationException("Getting the row count not supported"); } @Override public void clearRows() { // Doesn't clear underlying hashtable - if (refs != null) { - refs.clear(); - } + hashMapResult.forget(); dummyRow = null; - currentRow = -1; aliasFilter = (byte) 0xff; } @@ -510,30 +518,39 @@ public class MapJoinBytesTableContainer // Implementation of row iterator @Override public List<Object> first() throws HiveException { - currentRow = 0; - return nextInternal(); - } - @Override - public List<Object> next() throws HiveException { - return nextInternal(); - } - - private List<Object> nextInternal() throws HiveException { + // A little strange that we forget the dummy row on read. if (dummyRow != null) { List<Object> result = dummyRow; dummyRow = null; return result; } - if (currentRow < 0 || refs.size() < currentRow) throw new HiveException("No rows"); - if (refs.size() == currentRow) return null; - WriteBuffers.ByteSegmentRef ref = refs.get(currentRow++); + + WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.first(); + if (byteSegmentRef == null) { + return null; + } else { + return uppack(byteSegmentRef); + } + + } + + @Override + public List<Object> next() throws HiveException { + + WriteBuffers.ByteSegmentRef byteSegmentRef = hashMapResult.next(); + if (byteSegmentRef == null) { + return null; + } else { + return uppack(byteSegmentRef); + } + + } + + private List<Object> uppack(WriteBuffers.ByteSegmentRef ref) throws HiveException { if (ref.getLength() == 0) { return EMPTY_LIST; // shortcut, 0 length means no fields } - if (ref.getBytes() == null) { - threadSafeHashMapGetter.populateValue(ref); - } uselessIndirection.setData(ref.getBytes()); valueStruct.init(uselessIndirection, (int)ref.getOffset(), ref.getLength()); return valueStruct.getFieldsAsList(); // TODO: should we unset bytes after that? @@ -541,7 +558,7 @@ public class MapJoinBytesTableContainer @Override public void addRow(List<Object> t) { - if (dummyRow != null || !refs.isEmpty()) { + if (dummyRow != null || hashMapResult.hasRows()) { throw new RuntimeException("Cannot add rows when not empty"); } dummyRow = t; Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/MapJoinEagerRowContainer.java Tue Apr 14 14:47:30 2015 @@ -81,6 +81,16 @@ public class MapJoinEagerRowContainer return null; } + @Override + public boolean hasRows() { + return list.size() > 0; + } + + @Override + public boolean isSingleRow() { + return list.size() == 1; + } + /** * Get the number of elements in the RowContainer. * Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/RowContainer.java Tue Apr 14 14:47:30 2015 @@ -331,6 +331,17 @@ public class RowContainer<ROW extends Li } } + + @Override + public boolean hasRows() { + return size > 0; + } + + @Override + public boolean isSingleRow() { + return size == 1; + } + /** * Get the number of elements in the RowContainer. * Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/persistence/UnwrapRowContainer.java Tue Apr 14 14:47:30 2015 @@ -95,6 +95,17 @@ public class UnwrapRowContainer internal.addRow(t); } + + @Override + public boolean hasRows() throws HiveException { + return internal.hasRows(); + } + + @Override + public boolean isSingleRow() throws HiveException { + return internal.isSingleRow(); + } + @Override public int rowCount() throws HiveException { return internal.rowCount(); Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordProcessor.java Tue Apr 14 14:47:30 2015 @@ -232,7 +232,7 @@ public class ReduceRecordProcessor exte sources[tag] = new ReduceRecordSource(); sources[tag].init(jconf, redWork.getReducer(), redWork.getVectorMode(), keyTableDesc, valueTableDesc, reader, tag == bigTablePosition, (byte) tag, - redWork.getAllScratchColumnVectorTypeMaps()); + redWork.getVectorScratchColumnTypeMap()); ois[tag] = sources[tag].getObjectInspector(); } Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/ReduceRecordSource.java Tue Apr 14 14:47:30 2015 @@ -114,7 +114,7 @@ public class ReduceRecordSource implemen void init(JobConf jconf, Operator<?> reducer, boolean vectorized, TableDesc keyTableDesc, TableDesc valueTableDesc, KeyValuesReader reader, boolean handleGroupKey, byte tag, - Map<String, Map<Integer, String>> scratchColumnVectorTypes) + Map<Integer, String> vectorScratchColumnTypeMap) throws Exception { ObjectInspector keyObjectInspector; @@ -180,10 +180,8 @@ public class ReduceRecordSource implemen } rowObjectInspector = ObjectInspectorFactory.getStandardStructObjectInspector(colNames, ois); - Map<Integer, String> reduceShuffleScratchColumnTypeMap = - scratchColumnVectorTypes.get("_REDUCE_SHUFFLE_"); batchContext = new VectorizedRowBatchCtx(); - batchContext.init(reduceShuffleScratchColumnTypeMap, (StructObjectInspector) rowObjectInspector); + batchContext.init(vectorScratchColumnTypeMap, (StructObjectInspector) rowObjectInspector); batch = batchContext.createVectorizedRowBatch(); } else { ois.add(keyObjectInspector); Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/TezTask.java Tue Apr 14 14:47:30 2015 @@ -64,6 +64,7 @@ import org.apache.tez.dag.api.Vertex; import org.apache.tez.dag.api.VertexGroup; import org.apache.tez.dag.api.client.DAGClient; import org.apache.tez.dag.api.client.StatusGetOpts; +import org.json.JSONObject; /** * @@ -279,6 +280,16 @@ public class TezTask extends Task<TezWor // the name of the dag is what is displayed in the AM/Job UI DAG dag = DAG.create(work.getName()); + + // set some info for the query + JSONObject json = new JSONObject().put("context", "Hive").put("description", ctx.getCmd()); + String dagInfo = json.toString(); + + if (LOG.isDebugEnabled()) { + LOG.debug("DagInfo: " + dagInfo); + } + dag.setDAGInfo(dagInfo); + dag.setCredentials(conf.getCredentials()); setAccessControlsForCurrentUser(dag); Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorColumnAssignFactory.java Tue Apr 14 14:47:30 2015 @@ -573,6 +573,12 @@ public class VectorColumnAssignFactory { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.INTERVAL_DAY_TIME); } else if (writables[i] instanceof BooleanWritable) { vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.BOOLEAN); + } else if (writables[i] instanceof HiveDecimalWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.DECIMAL); + } else if (writables[i] instanceof HiveCharWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.CHAR); + } else if (writables[i] instanceof HiveVarcharWritable) { + vcas[i] = buildObjectAssign(outputBatch, i, PrimitiveCategory.VARCHAR); } else { throw new HiveException("Unimplemented vector assigner for writable type " + writables[i].getClass()); Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorGroupByOperator.java Tue Apr 14 14:47:30 2015 @@ -766,8 +766,7 @@ public class VectorGroupByOperator exten isVectorOutput = desc.getVectorDesc().isVectorOutput(); - vOutContext = new VectorizationContext(desc.getOutputColumnNames()); - vOutContext.setFileKey(vContext.getFileKey() + "/_GROUPBY_"); + vOutContext = new VectorizationContext(getName(), desc.getOutputColumnNames()); } public VectorGroupByOperator() { Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorMapJoinOperator.java Tue Apr 14 14:47:30 2015 @@ -120,8 +120,7 @@ public class VectorMapJoinOperator exten bigTableValueExpressions = vContext.getVectorExpressions(exprs.get(posBigTable)); // We are making a new output vectorized row batch. - vOutContext = new VectorizationContext(desc.getOutputColumnNames()); - vOutContext.setFileKey(vContext.getFileKey() + "/MAP_JOIN_" + desc.getBigTableAlias()); + vOutContext = new VectorizationContext(getName(), desc.getOutputColumnNames()); } @Override Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSMBMapJoinOperator.java Tue Apr 14 14:47:30 2015 @@ -117,8 +117,7 @@ public class VectorSMBMapJoinOperator ex bigTableValueExpressions = vContext.getVectorExpressions(exprs.get(posBigTable)); // We are making a new output vectorized row batch. - vOutContext = new VectorizationContext(desc.getOutputColumnNames()); - vOutContext.setFileKey(vContext.getFileKey() + "/SMB_JOIN_" + desc.getBigTableAlias()); + vOutContext = new VectorizationContext(getName(), desc.getOutputColumnNames()); } @Override Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorSelectOperator.java Tue Apr 14 14:47:30 2015 @@ -68,10 +68,7 @@ public class VectorSelectOperator extend * Create a new vectorization context to create a new projection, but keep * same output column manager must be inherited to track the scratch the columns. */ - vOutContext = new VectorizationContext(vContext); - - // Set a fileKey, although this operator doesn't use it. - vOutContext.setFileKey(vContext.getFileKey() + "/_SELECT_"); + vOutContext = new VectorizationContext(getName(), vContext); vOutContext.resetProjectionColumns(); for (int i=0; i < colList.size(); ++i) { Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizationContext.java Tue Apr 14 14:47:30 2015 @@ -31,8 +31,10 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.TreeMap; +import java.util.TreeSet; import java.util.regex.Pattern; +import org.apache.commons.lang.ArrayUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.hive.serde2.io.HiveDecimalWritable; @@ -128,6 +130,9 @@ public class VectorizationContext { private static final Log LOG = LogFactory.getLog( VectorizationContext.class.getName()); + private String contextName; + private int level; + VectorExpressionDescriptor vMap; private List<Integer> projectedColumns; @@ -140,7 +145,10 @@ public class VectorizationContext { // Convenient constructor for initial batch creation takes // a list of columns names and maps them to 0..n-1 indices. - public VectorizationContext(List<String> initialColumnNames) { + public VectorizationContext(String contextName, List<String> initialColumnNames) { + this.contextName = contextName; + level = 0; + LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level + " initialColumnNames " + initialColumnNames.toString()); this.projectionColumnNames = initialColumnNames; projectedColumns = new ArrayList<Integer>(); @@ -157,8 +165,11 @@ public class VectorizationContext { // Constructor to with the individual addInitialColumn method // followed by a call to finishedAddingInitialColumns. - public VectorizationContext() { - projectedColumns = new ArrayList<Integer>(); + public VectorizationContext(String contextName) { + this.contextName = contextName; + level = 0; + LOG.info("VectorizationContext consructor contextName " + contextName + " level " + level); + projectedColumns = new ArrayList<Integer>(); projectionColumnNames = new ArrayList<String>(); projectionColumnMap = new HashMap<String, Integer>(); this.ocm = new OutputColumnManager(0); @@ -169,7 +180,10 @@ public class VectorizationContext { // Constructor useful making a projection vectorization context. // Use with resetProjectionColumns and addProjectionColumn. // Keeps existing output column map, etc. - public VectorizationContext(VectorizationContext vContext) { + public VectorizationContext(String contextName, VectorizationContext vContext) { + this.contextName = contextName; + level = vContext.level + 1; + LOG.info("VectorizationContext consructor reference contextName " + contextName + " level " + level); this.projectedColumns = new ArrayList<Integer>(); this.projectionColumnNames = new ArrayList<String>(); this.projectionColumnMap = new HashMap<String, Integer>(); @@ -238,13 +252,6 @@ public class VectorizationContext { //Map column number to type private OutputColumnManager ocm; - // File key is used by operators to retrieve the scratch vectors - // from mapWork at runtime. The operators that modify the structure of - // a vector row batch, need to allocate scratch vectors as well. Every - // operator that creates a new Vectorization context should set a unique - // fileKey. - private String fileKey = null; - // Set of UDF classes for type casting data types in row-mode. private static Set<Class<?>> castExpressionUdfs = new HashSet<Class<?>>(); static { @@ -268,14 +275,6 @@ public class VectorizationContext { castExpressionUdfs.add(UDFToShort.class); } - public String getFileKey() { - return fileKey; - } - - public void setFileKey(String fileKey) { - this.fileKey = fileKey; - } - protected int getInputColumnIndex(String name) throws HiveException { if (name == null) { throw new HiveException("Null column name"); @@ -316,6 +315,7 @@ public class VectorizationContext { // We need to differentiate DECIMAL columns by their precision and scale... String normalizedTypeName = getNormalizedName(hiveTypeName); int relativeCol = allocateOutputColumnInternal(normalizedTypeName); + // LOG.info("allocateOutputColumn for hiveTypeName " + hiveTypeName + " column " + (initialOutputCol + relativeCol)); return initialOutputCol + relativeCol; } @@ -357,6 +357,22 @@ public class VectorizationContext { usedOutputColumns.remove(index-initialOutputCol); } } + + public int[] currentScratchColumns() { + TreeSet<Integer> treeSet = new TreeSet(); + for (Integer col : usedOutputColumns) { + treeSet.add(initialOutputCol + col); + } + return ArrayUtils.toPrimitive(treeSet.toArray(new Integer[0])); + } + } + + public int allocateScratchColumn(String hiveTypeName) { + return ocm.allocateOutputColumn(hiveTypeName); + } + + public int[] currentScratchColumns() { + return ocm.currentScratchColumns(); } private VectorExpression getColumnVectorExpression(ExprNodeColumnDesc @@ -2106,6 +2122,10 @@ public class VectorizationContext { "\" for type: \"" + inputType.name() + " (reduce-side = " + isReduce + ")"); } + public int firstOutputColumnIndex() { + return firstOutputColumnIndex; + } + public Map<Integer, String> getScratchColumnTypeMap() { Map<Integer, String> map = new HashMap<Integer, String>(); for (int i = 0; i < ocm.outputColCount; i++) { @@ -2117,7 +2137,7 @@ public class VectorizationContext { public String toString() { StringBuilder sb = new StringBuilder(32); - sb.append("Context key ").append(getFileKey()).append(", "); + sb.append("Context name ").append(contextName).append(", level " + level + ", "); Comparator<Integer> comparerInteger = new Comparator<Integer>() { @Override @@ -2129,11 +2149,11 @@ public class VectorizationContext { for (Map.Entry<String, Integer> entry : projectionColumnMap.entrySet()) { sortedColumnMap.put(entry.getValue(), entry.getKey()); } - sb.append("sortedProjectionColumnMap ").append(sortedColumnMap).append(", "); + sb.append("sorted projectionColumnMap ").append(sortedColumnMap).append(", "); Map<Integer, String> sortedScratchColumnTypeMap = new TreeMap<Integer, String>(comparerInteger); sortedScratchColumnTypeMap.putAll(getScratchColumnTypeMap()); - sb.append("sortedScratchColumnTypeMap ").append(sortedScratchColumnTypeMap); + sb.append("sorted scratchColumnTypeMap ").append(sortedScratchColumnTypeMap); return sb.toString(); } Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedBatchUtil.java Tue Apr 14 14:47:30 2015 @@ -20,6 +20,7 @@ package org.apache.hadoop.hive.ql.exec.v import java.io.IOException; import java.sql.Timestamp; +import java.util.ArrayList; import java.util.LinkedList; import java.util.List; @@ -29,7 +30,6 @@ import org.apache.hadoop.hive.common.typ import org.apache.hadoop.hive.common.type.HiveIntervalDayTime; import org.apache.hadoop.hive.common.type.HiveIntervalYearMonth; import org.apache.hadoop.hive.common.type.HiveVarchar; -import org.apache.hadoop.hive.ql.exec.vector.expressions.StringExpr; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.io.ByteWritable; import org.apache.hadoop.hive.serde2.io.DateWritable; @@ -43,10 +43,15 @@ import org.apache.hadoop.hive.serde2.io. import org.apache.hadoop.hive.serde2.io.TimestampWritable; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector.Category; +import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory; import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector; +import org.apache.hadoop.hive.serde2.objectinspector.StandardStructObjectInspector; import org.apache.hadoop.hive.serde2.objectinspector.StructField; import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.serde2.typeinfo.DecimalTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.PrimitiveTypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo; +import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.BytesWritable; import org.apache.hadoop.io.DataOutputBuffer; @@ -542,5 +547,98 @@ public class VectorizedBatchUtil { poi.getPrimitiveCategory()); } } -} + public static StandardStructObjectInspector convertToStandardStructObjectInspector( + StructObjectInspector structObjectInspector) throws HiveException { + + List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); + List<ObjectInspector> oids = new ArrayList<ObjectInspector>(); + ArrayList<String> columnNames = new ArrayList<String>(); + + for(StructField field : fields) { + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString( + field.getFieldObjectInspector().getTypeName()); + ObjectInspector standardWritableObjectInspector = + TypeInfoUtils.getStandardWritableObjectInspectorFromTypeInfo(typeInfo); + oids.add(standardWritableObjectInspector); + columnNames.add(field.getFieldName()); + } + return ObjectInspectorFactory.getStandardStructObjectInspector(columnNames,oids); + } + + public static PrimitiveTypeInfo[] primitiveTypeInfosFromStructObjectInspector( + StructObjectInspector structObjectInspector) throws HiveException { + + List<? extends StructField> fields = structObjectInspector.getAllStructFieldRefs(); + PrimitiveTypeInfo[] result = new PrimitiveTypeInfo[fields.size()]; + + int i = 0; + for(StructField field : fields) { + TypeInfo typeInfo = TypeInfoUtils.getTypeInfoFromTypeString( + field.getFieldObjectInspector().getTypeName()); + result[i++] = (PrimitiveTypeInfo) typeInfo; + } + return result; + } + + + public static String displayBytes(byte[] bytes, int start, int length) { + StringBuilder sb = new StringBuilder(); + for (int i = start; i < start + length; i++) { + char ch = (char) bytes[i]; + if (ch < ' ' || ch > '~') { + sb.append(String.format("\\%03d", (int) (bytes[i] & 0xff))); + } else { + sb.append(ch); + } + } + return sb.toString(); + } + + public static void debugDisplayOneRow(VectorizedRowBatch batch, int index, String prefix) { + StringBuffer sb = new StringBuffer(); + sb.append(prefix + " row " + index + " "); + for (int i = 0; i < batch.projectionSize; i++) { + int column = batch.projectedColumns[i]; + ColumnVector colVector = batch.cols[column]; + if (colVector == null) { + sb.append("(null colVector " + column + ")"); + } else { + boolean isRepeating = colVector.isRepeating; + index = (isRepeating ? 0 : index); + if (colVector.noNulls || !colVector.isNull[index]) { + if (colVector instanceof LongColumnVector) { + sb.append(((LongColumnVector) colVector).vector[index]); + } else if (colVector instanceof DoubleColumnVector) { + sb.append(((DoubleColumnVector) colVector).vector[index]); + } else if (colVector instanceof BytesColumnVector) { + BytesColumnVector bytesColumnVector = (BytesColumnVector) colVector; + byte[] bytes = bytesColumnVector.vector[index]; + int start = bytesColumnVector.start[index]; + int length = bytesColumnVector.length[index]; + if (bytes == null) { + sb.append("(Unexpected null bytes with start " + start + " length " + length + ")"); + } else { + sb.append(displayBytes(bytes, start, length)); + } + } else if (colVector instanceof DecimalColumnVector) { + sb.append(((DecimalColumnVector) colVector).vector[index].toString()); + } else { + sb.append("Unknown"); + } + } else { + sb.append("NULL"); + } + } + sb.append(" "); + } + System.out.println(sb.toString()); + } + + public static void debugDisplayBatch(VectorizedRowBatch batch, String prefix) throws HiveException { + for (int i = 0; i < batch.size; i++) { + int index = (batch.selectedInUse ? batch.selected[i] : i); + debugDisplayOneRow(batch, index, prefix); + } + } +} Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatch.java Tue Apr 14 14:47:30 2015 @@ -190,6 +190,7 @@ public class VectorizedRowBatch implemen * - sets size to 0 * - sets endOfFile to false * - resets each column + * - inits each column */ public void reset() { selectedInUse = false; @@ -198,6 +199,7 @@ public class VectorizedRowBatch implemen for (ColumnVector vc : cols) { if (vc != null) { vc.reset(); + vc.init(); } } } Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/exec/vector/VectorizedRowBatchCtx.java Tue Apr 14 14:47:30 2015 @@ -42,7 +42,6 @@ import org.apache.hadoop.hive.ql.exec.Ut import org.apache.hadoop.hive.ql.io.HiveFileFormatUtils; import org.apache.hadoop.hive.ql.io.IOPrepareCache; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.plan.MapredWork; import org.apache.hadoop.hive.ql.plan.PartitionDesc; import org.apache.hadoop.hive.serde.serdeConstants; import org.apache.hadoop.hive.serde2.ColumnProjectionUtils; @@ -174,9 +173,8 @@ public class VectorizedRowBatchCtx { split.getPath(), IOPrepareCache.get().getPartitionDescMap()); String partitionPath = split.getPath().getParent().toString(); - scratchColumnTypeMap = Utilities - .getMapWorkAllScratchColumnVectorTypeMaps(hiveConf) - .get(partitionPath); + scratchColumnTypeMap = Utilities.getMapWorkVectorScratchColumnTypeMap(hiveConf); + // LOG.info("VectorizedRowBatchCtx init scratchColumnTypeMap " + scratchColumnTypeMap.toString()); Properties partProps = (part.getPartSpec() == null || part.getPartSpec().isEmpty()) ? @@ -631,7 +629,7 @@ public class VectorizedRowBatchCtx { for (int i = origNumCols; i < newNumCols; i++) { String typeName = scratchColumnTypeMap.get(i); if (typeName == null) { - throw new HiveException("No type found for column type entry " + i); + throw new HiveException("No type entry found for column " + i + " in map " + scratchColumnTypeMap.toString()); } vrb.cols[i] = allocateColumnVector(typeName, VectorizedRowBatch.DEFAULT_SIZE); @@ -646,7 +644,7 @@ public class VectorizedRowBatchCtx { * @param decimalType The given decimal type string. * @return An integer array of size 2 with first element set to precision and second set to scale. */ - private int[] getScalePrecisionFromDecimalType(String decimalType) { + private static int[] getScalePrecisionFromDecimalType(String decimalType) { Pattern p = Pattern.compile("\\d+"); Matcher m = p.matcher(decimalType); m.find(); @@ -657,7 +655,7 @@ public class VectorizedRowBatchCtx { return precScale; } - private ColumnVector allocateColumnVector(String type, int defaultSize) { + public static ColumnVector allocateColumnVector(String type, int defaultSize) { if (type.equalsIgnoreCase("double")) { return new DoubleColumnVector(defaultSize); } else if (VectorizationContext.isStringFamily(type)) { @@ -675,18 +673,4 @@ public class VectorizedRowBatchCtx { throw new Error("Cannot allocate vector column for " + type); } } - - public VectorColumnAssign[] buildObjectAssigners(VectorizedRowBatch outputBatch) - throws HiveException { - List<? extends StructField> fieldRefs = rowOI.getAllStructFieldRefs(); - assert outputBatch.numCols == fieldRefs.size(); - VectorColumnAssign[] assigners = new VectorColumnAssign[fieldRefs.size()]; - for(int i = 0; i < assigners.length; ++i) { - StructField fieldRef = fieldRefs.get(i); - ObjectInspector fieldOI = fieldRef.getFieldObjectInspector(); - assigners[i] = VectorColumnAssignFactory.buildObjectAssign( - outputBatch, i, fieldOI); - } - return assigners; - } } Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/VectorizedParquetInputFormat.java Tue Apr 14 14:47:30 2015 @@ -109,7 +109,9 @@ public class VectorizedParquetInputForma @Override public boolean next(NullWritable key, VectorizedRowBatch outputBatch) throws IOException { - assert(outputBatch.numCols == assigners.length); + if (assigners != null) { + assert(outputBatch.numCols == assigners.length); + } outputBatch.reset(); int maxSize = outputBatch.getMaxSize(); try { Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/io/parquet/read/ParquetRecordReaderWrapper.java Tue Apr 14 14:47:30 2015 @@ -43,6 +43,8 @@ import org.apache.hadoop.mapred.Reporter import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptID; +import parquet.filter2.compat.FilterCompat; +import parquet.filter2.compat.RowGroupFilter; import parquet.filter2.predicate.FilterPredicate; import parquet.hadoop.ParquetFileReader; import parquet.hadoop.ParquetInputFormat; @@ -72,6 +74,7 @@ public class ParquetRecordReaderWrapper private boolean skipTimestampConversion = false; private JobConf jobConf; private final ProjectionPusher projectionPusher; + private List<BlockMetaData> filtedBlocks; public ParquetRecordReaderWrapper( final ParquetInputFormat<ArrayWritable> newInputFormat, @@ -100,8 +103,6 @@ public class ParquetRecordReaderWrapper taskAttemptID = new TaskAttemptID(); } - setFilter(jobConf); - // create a TaskInputOutputContext Configuration conf = jobConf; if (skipTimestampConversion ^ HiveConf.getBoolVar( @@ -136,13 +137,13 @@ public class ParquetRecordReaderWrapper } } - public void setFilter(final JobConf conf) { + public FilterCompat.Filter setFilter(final JobConf conf) { String serializedPushdown = conf.get(TableScanDesc.FILTER_EXPR_CONF_STR); String columnNamesString = conf.get(ColumnProjectionUtils.READ_COLUMN_NAMES_CONF_STR); if (serializedPushdown == null || columnNamesString == null || serializedPushdown.isEmpty() || columnNamesString.isEmpty()) { - return; + return null; } FilterPredicate p = @@ -151,9 +152,11 @@ public class ParquetRecordReaderWrapper if (p != null) { LOG.debug("Predicate filter for parquet is " + p.toString()); ParquetInputFormat.setFilterPredicate(conf, p); + return FilterCompat.get(p); } else { LOG.debug("No predicate filter can be generated for " + TableScanDesc.FILTER_EXPR_CONF_STR + " with the value of " + serializedPushdown); + return null; } } @@ -244,6 +247,7 @@ public class ParquetRecordReaderWrapper if (oldSplit instanceof FileSplit) { final Path finalPath = ((FileSplit) oldSplit).getPath(); jobConf = projectionPusher.pushProjectionsAndFilters(conf, finalPath.getParent()); + FilterCompat.Filter filter = setFilter(jobConf); final ParquetMetadata parquetMetadata = ParquetFileReader.readFooter(jobConf, finalPath); final List<BlockMetaData> blocks = parquetMetadata.getBlocks(); @@ -264,24 +268,43 @@ public class ParquetRecordReaderWrapper } if (splitGroup.isEmpty()) { LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit); - split = null; - } else { - if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { - skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); + return null; + } + + if (filter != null) { + filtedBlocks = RowGroupFilter.filterRowGroups(filter, splitGroup, fileMetaData.getSchema()); + if (filtedBlocks.isEmpty()) { + LOG.debug("All row groups are dropped due to filter predicates"); + return null; + } + + long droppedBlocks = splitGroup.size() - filtedBlocks.size(); + if (droppedBlocks > 0) { + LOG.debug("Dropping " + droppedBlocks + " row groups that do not pass filter predicate"); } - split = new ParquetInputSplit(finalPath, - splitStart, - splitLength, - ((FileSplit) oldSplit).getLocations(), - splitGroup, - readContext.getRequestedSchema().toString(), - fileMetaData.getSchema().toString(), - fileMetaData.getKeyValueMetaData(), - readContext.getReadSupportMetadata()); + } else { + filtedBlocks = splitGroup; + } + + if (HiveConf.getBoolVar(conf, HiveConf.ConfVars.HIVE_PARQUET_TIMESTAMP_SKIP_CONVERSION)) { + skipTimestampConversion = !Strings.nullToEmpty(fileMetaData.getCreatedBy()).startsWith("parquet-mr"); } + split = new ParquetInputSplit(finalPath, + splitStart, + splitLength, + ((FileSplit) oldSplit).getLocations(), + filtedBlocks, + readContext.getRequestedSchema().toString(), + fileMetaData.getSchema().toString(), + fileMetaData.getKeyValueMetaData(), + readContext.getReadSupportMetadata()); + return split; } else { throw new IllegalArgumentException("Unknown split type: " + oldSplit); } - return split; + } + + public List<BlockMetaData> getFiltedBlocks() { + return filtedBlocks; } } Modified: hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java URL: http://svn.apache.org/viewvc/hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java?rev=1673437&r1=1673436&r2=1673437&view=diff ============================================================================== --- hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java (original) +++ hive/branches/hbase-metastore/ql/src/java/org/apache/hadoop/hive/ql/lib/DefaultGraphWalker.java Tue Apr 14 14:47:30 2015 @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.lib; import java.util.ArrayList; import java.util.Collection; import java.util.HashMap; +import java.util.IdentityHashMap; import java.util.List; import java.util.Set; import java.util.Stack; @@ -37,7 +38,7 @@ public class DefaultGraphWalker implemen protected Stack<Node> opStack; protected final List<Node> toWalk = new ArrayList<Node>(); - protected final HashMap<Node, Object> retMap = new HashMap<Node, Object>(); + protected final IdentityHashMap<Node, Object> retMap = new IdentityHashMap<Node, Object>(); protected final Dispatcher dispatcher; /**