http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java new file mode 100644 index 0000000..1067957 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/RecordReaderUtils.java @@ -0,0 +1,578 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.TreeMap; + +import com.google.common.collect.Lists; +import org.apache.commons.lang.builder.HashCodeBuilder; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.hadoop.hive.common.io.DiskRangeList; +import org.apache.hadoop.hive.common.io.DiskRangeList.CreateHelper; +import org.apache.hadoop.hive.common.io.DiskRangeList.MutateHelper; +import org.apache.orc.CompressionCodec; +import org.apache.orc.DataReader; +import org.apache.orc.OrcProto; + +import com.google.common.collect.ComparisonChain; +import org.apache.orc.StripeInformation; + +/** + * Stateless methods shared between RecordReaderImpl and EncodedReaderImpl. + */ +public class RecordReaderUtils { + private static final HadoopShims SHIMS = HadoopShims.Factory.get(); + + private static class DefaultDataReader implements DataReader { + private FSDataInputStream file = null; + private final ByteBufferAllocatorPool pool; + private HadoopShims.ZeroCopyReaderShim zcr = null; + private final FileSystem fs; + private final Path path; + private final boolean useZeroCopy; + private final CompressionCodec codec; + private final int bufferSize; + private final int typeCount; + + private DefaultDataReader(DefaultDataReader other) { + this.pool = other.pool; + this.bufferSize = other.bufferSize; + this.typeCount = other.typeCount; + this.fs = other.fs; + this.path = other.path; + this.useZeroCopy = other.useZeroCopy; + this.codec = other.codec; + } + + private DefaultDataReader(DataReaderProperties properties) { + this.fs = properties.getFileSystem(); + this.path = properties.getPath(); + this.useZeroCopy = properties.getZeroCopy(); + this.codec = WriterImpl.createCodec(properties.getCompression()); + this.bufferSize = properties.getBufferSize(); + this.typeCount = properties.getTypeCount(); + if (useZeroCopy) { + this.pool = new ByteBufferAllocatorPool(); + } else { + this.pool = null; + } + } + + @Override + public void open() throws IOException { + this.file = fs.open(path); + if (useZeroCopy) { + zcr = RecordReaderUtils.createZeroCopyShim(file, codec, pool); + } else { + zcr = null; + } + } + + @Override + public OrcIndex readRowIndex(StripeInformation stripe, + OrcProto.StripeFooter footer, + boolean[] included, + OrcProto.RowIndex[] indexes, + boolean[] sargColumns, + OrcProto.BloomFilterIndex[] bloomFilterIndices + ) throws IOException { + if (file == null) { + open(); + } + if (footer == null) { + footer = readStripeFooter(stripe); + } + if (indexes == null) { + indexes = new OrcProto.RowIndex[typeCount]; + } + if (bloomFilterIndices == null) { + bloomFilterIndices = new OrcProto.BloomFilterIndex[typeCount]; + } + long offset = stripe.getOffset(); + List<OrcProto.Stream> streams = footer.getStreamsList(); + for (int i = 0; i < streams.size(); i++) { + OrcProto.Stream stream = streams.get(i); + OrcProto.Stream nextStream = null; + if (i < streams.size() - 1) { + nextStream = streams.get(i+1); + } + int col = stream.getColumn(); + int len = (int) stream.getLength(); + // row index stream and bloom filter are interlaced, check if the sarg column contains bloom + // filter and combine the io to read row index and bloom filters for that column together + if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.ROW_INDEX)) { + boolean readBloomFilter = false; + if (sargColumns != null && sargColumns[col] && + nextStream.getKind() == OrcProto.Stream.Kind.BLOOM_FILTER) { + len += nextStream.getLength(); + i += 1; + readBloomFilter = true; + } + if ((included == null || included[col]) && indexes[col] == null) { + byte[] buffer = new byte[len]; + file.readFully(offset, buffer, 0, buffer.length); + ByteBuffer bb = ByteBuffer.wrap(buffer); + indexes[col] = OrcProto.RowIndex.parseFrom(InStream.create("index", + Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), stream.getLength(), + codec, bufferSize)); + if (readBloomFilter) { + bb.position((int) stream.getLength()); + bloomFilterIndices[col] = OrcProto.BloomFilterIndex.parseFrom(InStream.create( + "bloom_filter", Lists.<DiskRange>newArrayList(new BufferChunk(bb, 0)), + nextStream.getLength(), codec, bufferSize)); + } + } + } + offset += len; + } + + OrcIndex index = new OrcIndex(indexes, bloomFilterIndices); + return index; + } + + @Override + public OrcProto.StripeFooter readStripeFooter(StripeInformation stripe) throws IOException { + if (file == null) { + open(); + } + long offset = stripe.getOffset() + stripe.getIndexLength() + stripe.getDataLength(); + int tailLength = (int) stripe.getFooterLength(); + + // read the footer + ByteBuffer tailBuf = ByteBuffer.allocate(tailLength); + file.readFully(offset, tailBuf.array(), tailBuf.arrayOffset(), tailLength); + return OrcProto.StripeFooter.parseFrom(InStream.createCodedInputStream("footer", + Lists.<DiskRange>newArrayList(new BufferChunk(tailBuf, 0)), + tailLength, codec, bufferSize)); + } + + @Override + public DiskRangeList readFileData( + DiskRangeList range, long baseOffset, boolean doForceDirect) throws IOException { + return RecordReaderUtils.readDiskRanges(file, zcr, baseOffset, range, doForceDirect); + } + + @Override + public void close() throws IOException { + if (pool != null) { + pool.clear(); + } + // close both zcr and file + try (HadoopShims.ZeroCopyReaderShim myZcr = zcr) { + if (file != null) { + file.close(); + } + } + } + + @Override + public boolean isTrackingDiskRanges() { + return zcr != null; + } + + @Override + public void releaseBuffer(ByteBuffer buffer) { + zcr.releaseBuffer(buffer); + } + + @Override + public DataReader clone() { + return new DefaultDataReader(this); + } + + } + + public static DataReader createDefaultDataReader(DataReaderProperties properties) { + return new DefaultDataReader(properties); + } + + public static boolean[] findPresentStreamsByColumn( + List<OrcProto.Stream> streamList, List<OrcProto.Type> types) { + boolean[] hasNull = new boolean[types.size()]; + for(OrcProto.Stream stream: streamList) { + if (stream.hasKind() && (stream.getKind() == OrcProto.Stream.Kind.PRESENT)) { + hasNull[stream.getColumn()] = true; + } + } + return hasNull; + } + + /** + * Does region A overlap region B? The end points are inclusive on both sides. + * @param leftA A's left point + * @param rightA A's right point + * @param leftB B's left point + * @param rightB B's right point + * @return Does region A overlap region B? + */ + static boolean overlap(long leftA, long rightA, long leftB, long rightB) { + if (leftA <= leftB) { + return rightA >= leftB; + } + return rightB >= leftA; + } + + public static void addEntireStreamToRanges( + long offset, long length, CreateHelper list, boolean doMergeBuffers) { + list.addOrMerge(offset, offset + length, doMergeBuffers, false); + } + + public static void addRgFilteredStreamToRanges(OrcProto.Stream stream, + boolean[] includedRowGroups, boolean isCompressed, OrcProto.RowIndex index, + OrcProto.ColumnEncoding encoding, OrcProto.Type type, int compressionSize, boolean hasNull, + long offset, long length, CreateHelper list, boolean doMergeBuffers) { + for (int group = 0; group < includedRowGroups.length; ++group) { + if (!includedRowGroups[group]) continue; + int posn = getIndexPosition( + encoding.getKind(), type.getKind(), stream.getKind(), isCompressed, hasNull); + long start = index.getEntry(group).getPositions(posn); + final long nextGroupOffset; + boolean isLast = group == (includedRowGroups.length - 1); + nextGroupOffset = isLast ? length : index.getEntry(group + 1).getPositions(posn); + + start += offset; + long end = offset + estimateRgEndOffset( + isCompressed, isLast, nextGroupOffset, length, compressionSize); + list.addOrMerge(start, end, doMergeBuffers, true); + } + } + + public static long estimateRgEndOffset(boolean isCompressed, boolean isLast, + long nextGroupOffset, long streamLength, int bufferSize) { + // figure out the worst case last location + // if adjacent groups have the same compressed block offset then stretch the slop + // by factor of 2 to safely accommodate the next compression block. + // One for the current compression block and another for the next compression block. + long slop = isCompressed ? 2 * (OutStream.HEADER_SIZE + bufferSize) : WORST_UNCOMPRESSED_SLOP; + return isLast ? streamLength : Math.min(streamLength, nextGroupOffset + slop); + } + + private static final int BYTE_STREAM_POSITIONS = 1; + private static final int RUN_LENGTH_BYTE_POSITIONS = BYTE_STREAM_POSITIONS + 1; + private static final int BITFIELD_POSITIONS = RUN_LENGTH_BYTE_POSITIONS + 1; + private static final int RUN_LENGTH_INT_POSITIONS = BYTE_STREAM_POSITIONS + 1; + + /** + * Get the offset in the index positions for the column that the given + * stream starts. + * @param columnEncoding the encoding of the column + * @param columnType the type of the column + * @param streamType the kind of the stream + * @param isCompressed is the file compressed + * @param hasNulls does the column have a PRESENT stream? + * @return the number of positions that will be used for that stream + */ + public static int getIndexPosition(OrcProto.ColumnEncoding.Kind columnEncoding, + OrcProto.Type.Kind columnType, + OrcProto.Stream.Kind streamType, + boolean isCompressed, + boolean hasNulls) { + if (streamType == OrcProto.Stream.Kind.PRESENT) { + return 0; + } + int compressionValue = isCompressed ? 1 : 0; + int base = hasNulls ? (BITFIELD_POSITIONS + compressionValue) : 0; + switch (columnType) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case FLOAT: + case DOUBLE: + case DATE: + case STRUCT: + case MAP: + case LIST: + case UNION: + return base; + case CHAR: + case VARCHAR: + case STRING: + if (columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY || + columnEncoding == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2) { + return base; + } else { + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } else { + return base + BYTE_STREAM_POSITIONS + compressionValue; + } + } + case BINARY: + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + BYTE_STREAM_POSITIONS + compressionValue; + case DECIMAL: + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + BYTE_STREAM_POSITIONS + compressionValue; + case TIMESTAMP: + if (streamType == OrcProto.Stream.Kind.DATA) { + return base; + } + return base + RUN_LENGTH_INT_POSITIONS + compressionValue; + default: + throw new IllegalArgumentException("Unknown type " + columnType); + } + } + + // for uncompressed streams, what is the most overlap with the following set + // of rows (long vint literal group). + static final int WORST_UNCOMPRESSED_SLOP = 2 + 8 * 512; + + /** + * Is this stream part of a dictionary? + * @return is this part of a dictionary? + */ + public static boolean isDictionary(OrcProto.Stream.Kind kind, + OrcProto.ColumnEncoding encoding) { + assert kind != OrcProto.Stream.Kind.DICTIONARY_COUNT; + OrcProto.ColumnEncoding.Kind encodingKind = encoding.getKind(); + return kind == OrcProto.Stream.Kind.DICTIONARY_DATA || + (kind == OrcProto.Stream.Kind.LENGTH && + (encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY || + encodingKind == OrcProto.ColumnEncoding.Kind.DICTIONARY_V2)); + } + + /** + * Build a string representation of a list of disk ranges. + * @param range ranges to stringify + * @return the resulting string + */ + public static String stringifyDiskRanges(DiskRangeList range) { + StringBuilder buffer = new StringBuilder(); + buffer.append("["); + boolean isFirst = true; + while (range != null) { + if (!isFirst) { + buffer.append(", {"); + } else { + buffer.append("{"); + } + isFirst = false; + buffer.append(range.toString()); + buffer.append("}"); + range = range.next; + } + buffer.append("]"); + return buffer.toString(); + } + + /** + * Read the list of ranges from the file. + * @param file the file to read + * @param base the base of the stripe + * @param range the disk ranges within the stripe to read + * @return the bytes read for each disk range, which is the same length as + * ranges + * @throws IOException + */ + static DiskRangeList readDiskRanges(FSDataInputStream file, + HadoopShims.ZeroCopyReaderShim zcr, + long base, + DiskRangeList range, + boolean doForceDirect) throws IOException { + if (range == null) return null; + DiskRangeList prev = range.prev; + if (prev == null) { + prev = new MutateHelper(range); + } + while (range != null) { + if (range.hasData()) { + range = range.next; + continue; + } + int len = (int) (range.getEnd() - range.getOffset()); + long off = range.getOffset(); + if (zcr != null) { + file.seek(base + off); + boolean hasReplaced = false; + while (len > 0) { + ByteBuffer partial = zcr.readBuffer(len, false); + BufferChunk bc = new BufferChunk(partial, off); + if (!hasReplaced) { + range.replaceSelfWith(bc); + hasReplaced = true; + } else { + range.insertAfter(bc); + } + range = bc; + int read = partial.remaining(); + len -= read; + off += read; + } + } else { + // Don't use HDFS ByteBuffer API because it has no readFully, and is buggy and pointless. + byte[] buffer = new byte[len]; + file.readFully((base + off), buffer, 0, buffer.length); + ByteBuffer bb = null; + if (doForceDirect) { + bb = ByteBuffer.allocateDirect(len); + bb.put(buffer); + bb.position(0); + bb.limit(len); + } else { + bb = ByteBuffer.wrap(buffer); + } + range = range.replaceSelfWith(new BufferChunk(bb, range.getOffset())); + } + range = range.next; + } + return prev.next; + } + + + static List<DiskRange> getStreamBuffers(DiskRangeList range, long offset, long length) { + // This assumes sorted ranges (as do many other parts of ORC code. + ArrayList<DiskRange> buffers = new ArrayList<DiskRange>(); + if (length == 0) return buffers; + long streamEnd = offset + length; + boolean inRange = false; + while (range != null) { + if (!inRange) { + if (range.getEnd() <= offset) { + range = range.next; + continue; // Skip until we are in range. + } + inRange = true; + if (range.getOffset() < offset) { + // Partial first buffer, add a slice of it. + buffers.add(range.sliceAndShift(offset, Math.min(streamEnd, range.getEnd()), -offset)); + if (range.getEnd() >= streamEnd) break; // Partial first buffer is also partial last buffer. + range = range.next; + continue; + } + } else if (range.getOffset() >= streamEnd) { + break; + } + if (range.getEnd() > streamEnd) { + // Partial last buffer (may also be the first buffer), add a slice of it. + buffers.add(range.sliceAndShift(range.getOffset(), streamEnd, -offset)); + break; + } + // Buffer that belongs entirely to one stream. + // TODO: ideally we would want to reuse the object and remove it from the list, but we cannot + // because bufferChunks is also used by clearStreams for zcr. Create a useless dup. + buffers.add(range.sliceAndShift(range.getOffset(), range.getEnd(), -offset)); + if (range.getEnd() == streamEnd) break; + range = range.next; + } + return buffers; + } + + static HadoopShims.ZeroCopyReaderShim createZeroCopyShim(FSDataInputStream file, + CompressionCodec codec, ByteBufferAllocatorPool pool) throws IOException { + if ((codec == null || ((codec instanceof DirectDecompressionCodec) + && ((DirectDecompressionCodec) codec).isAvailable()))) { + /* codec is null or is available */ + return SHIMS.getZeroCopyReader(file, pool); + } + return null; + } + + // this is an implementation copied from ElasticByteBufferPool in hadoop-2, + // which lacks a clear()/clean() operation + public final static class ByteBufferAllocatorPool implements HadoopShims.ByteBufferPoolShim { + private static final class Key implements Comparable<Key> { + private final int capacity; + private final long insertionGeneration; + + Key(int capacity, long insertionGeneration) { + this.capacity = capacity; + this.insertionGeneration = insertionGeneration; + } + + @Override + public int compareTo(Key other) { + return ComparisonChain.start().compare(capacity, other.capacity) + .compare(insertionGeneration, other.insertionGeneration).result(); + } + + @Override + public boolean equals(Object rhs) { + if (rhs == null) { + return false; + } + try { + Key o = (Key) rhs; + return (compareTo(o) == 0); + } catch (ClassCastException e) { + return false; + } + } + + @Override + public int hashCode() { + return new HashCodeBuilder().append(capacity).append(insertionGeneration) + .toHashCode(); + } + } + + private final TreeMap<Key, ByteBuffer> buffers = new TreeMap<Key, ByteBuffer>(); + + private final TreeMap<Key, ByteBuffer> directBuffers = new TreeMap<Key, ByteBuffer>(); + + private long currentGeneration = 0; + + private final TreeMap<Key, ByteBuffer> getBufferTree(boolean direct) { + return direct ? directBuffers : buffers; + } + + public void clear() { + buffers.clear(); + directBuffers.clear(); + } + + @Override + public ByteBuffer getBuffer(boolean direct, int length) { + TreeMap<Key, ByteBuffer> tree = getBufferTree(direct); + Map.Entry<Key, ByteBuffer> entry = tree.ceilingEntry(new Key(length, 0)); + if (entry == null) { + return direct ? ByteBuffer.allocateDirect(length) : ByteBuffer + .allocate(length); + } + tree.remove(entry.getKey()); + return entry.getValue(); + } + + @Override + public void putBuffer(ByteBuffer buffer) { + TreeMap<Key, ByteBuffer> tree = getBufferTree(buffer.isDirect()); + while (true) { + Key key = new Key(buffer.capacity(), currentGeneration++); + if (!tree.containsKey(key)) { + tree.put(key, buffer); + return; + } + // Buffers are indexed by (capacity, generation). + // If our key is not unique on the first try, we try again + } + } + } +}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RedBlackTree.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RedBlackTree.java b/java/core/src/java/org/apache/orc/impl/RedBlackTree.java new file mode 100644 index 0000000..41aa4b9 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/RedBlackTree.java @@ -0,0 +1,311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.orc.impl.DynamicIntArray; + +/** + * A memory efficient red-black tree that does not allocate any objects per + * an element. This class is abstract and assumes that the child class + * handles the key and comparisons with the key. + */ +abstract class RedBlackTree { + public static final int NULL = -1; + + // Various values controlling the offset of the data within the array. + private static final int LEFT_OFFSET = 0; + private static final int RIGHT_OFFSET = 1; + private static final int ELEMENT_SIZE = 2; + + protected int size = 0; + private final DynamicIntArray data; + protected int root = NULL; + protected int lastAdd = 0; + private boolean wasAdd = false; + + /** + * Create a set with the given initial capacity. + */ + public RedBlackTree(int initialCapacity) { + data = new DynamicIntArray(initialCapacity * ELEMENT_SIZE); + } + + /** + * Insert a new node into the data array, growing the array as necessary. + * + * @return Returns the position of the new node. + */ + private int insert(int left, int right, boolean isRed) { + int position = size; + size += 1; + setLeft(position, left, isRed); + setRight(position, right); + return position; + } + + /** + * Compare the value at the given position to the new value. + * @return 0 if the values are the same, -1 if the new value is smaller and + * 1 if the new value is larger. + */ + protected abstract int compareValue(int position); + + /** + * Is the given node red as opposed to black? To prevent having an extra word + * in the data array, we just the low bit on the left child index. + */ + protected boolean isRed(int position) { + return position != NULL && + (data.get(position * ELEMENT_SIZE + LEFT_OFFSET) & 1) == 1; + } + + /** + * Set the red bit true or false. + */ + private void setRed(int position, boolean isRed) { + int offset = position * ELEMENT_SIZE + LEFT_OFFSET; + if (isRed) { + data.set(offset, data.get(offset) | 1); + } else { + data.set(offset, data.get(offset) & ~1); + } + } + + /** + * Get the left field of the given position. + */ + protected int getLeft(int position) { + return data.get(position * ELEMENT_SIZE + LEFT_OFFSET) >> 1; + } + + /** + * Get the right field of the given position. + */ + protected int getRight(int position) { + return data.get(position * ELEMENT_SIZE + RIGHT_OFFSET); + } + + /** + * Set the left field of the given position. + * Note that we are storing the node color in the low bit of the left pointer. + */ + private void setLeft(int position, int left) { + int offset = position * ELEMENT_SIZE + LEFT_OFFSET; + data.set(offset, (left << 1) | (data.get(offset) & 1)); + } + + /** + * Set the left field of the given position. + * Note that we are storing the node color in the low bit of the left pointer. + */ + private void setLeft(int position, int left, boolean isRed) { + int offset = position * ELEMENT_SIZE + LEFT_OFFSET; + data.set(offset, (left << 1) | (isRed ? 1 : 0)); + } + + /** + * Set the right field of the given position. + */ + private void setRight(int position, int right) { + data.set(position * ELEMENT_SIZE + RIGHT_OFFSET, right); + } + + /** + * Insert or find a given key in the tree and rebalance the tree correctly. + * Rebalancing restores the red-black aspect of the tree to maintain the + * invariants: + * 1. If a node is red, both of its children are black. + * 2. Each child of a node has the same black height (the number of black + * nodes between it and the leaves of the tree). + * + * Inserted nodes are at the leaves and are red, therefore there is at most a + * violation of rule 1 at the node we just put in. Instead of always keeping + * the parents, this routine passing down the context. + * + * The fix is broken down into 6 cases (1.{1,2,3} and 2.{1,2,3} that are + * left-right mirror images of each other). See Algorighms by Cormen, + * Leiserson, and Rivest for the explaination of the subcases. + * + * @param node The node that we are fixing right now. + * @param fromLeft Did we come down from the left? + * @param parent Nodes' parent + * @param grandparent Parent's parent + * @param greatGrandparent Grandparent's parent + * @return Does parent also need to be checked and/or fixed? + */ + private boolean add(int node, boolean fromLeft, int parent, + int grandparent, int greatGrandparent) { + if (node == NULL) { + if (root == NULL) { + lastAdd = insert(NULL, NULL, false); + root = lastAdd; + wasAdd = true; + return false; + } else { + lastAdd = insert(NULL, NULL, true); + node = lastAdd; + wasAdd = true; + // connect the new node into the tree + if (fromLeft) { + setLeft(parent, node); + } else { + setRight(parent, node); + } + } + } else { + int compare = compareValue(node); + boolean keepGoing; + + // Recurse down to find where the node needs to be added + if (compare < 0) { + keepGoing = add(getLeft(node), true, node, parent, grandparent); + } else if (compare > 0) { + keepGoing = add(getRight(node), false, node, parent, grandparent); + } else { + lastAdd = node; + wasAdd = false; + return false; + } + + // we don't need to fix the root (because it is always set to black) + if (node == root || !keepGoing) { + return false; + } + } + + + // Do we need to fix this node? Only if there are two reds right under each + // other. + if (isRed(node) && isRed(parent)) { + if (parent == getLeft(grandparent)) { + int uncle = getRight(grandparent); + if (isRed(uncle)) { + // case 1.1 + setRed(parent, false); + setRed(uncle, false); + setRed(grandparent, true); + return true; + } else { + if (node == getRight(parent)) { + // case 1.2 + // swap node and parent + int tmp = node; + node = parent; + parent = tmp; + // left-rotate on node + setLeft(grandparent, parent); + setRight(node, getLeft(parent)); + setLeft(parent, node); + } + + // case 1.2 and 1.3 + setRed(parent, false); + setRed(grandparent, true); + + // right-rotate on grandparent + if (greatGrandparent == NULL) { + root = parent; + } else if (getLeft(greatGrandparent) == grandparent) { + setLeft(greatGrandparent, parent); + } else { + setRight(greatGrandparent, parent); + } + setLeft(grandparent, getRight(parent)); + setRight(parent, grandparent); + return false; + } + } else { + int uncle = getLeft(grandparent); + if (isRed(uncle)) { + // case 2.1 + setRed(parent, false); + setRed(uncle, false); + setRed(grandparent, true); + return true; + } else { + if (node == getLeft(parent)) { + // case 2.2 + // swap node and parent + int tmp = node; + node = parent; + parent = tmp; + // right-rotate on node + setRight(grandparent, parent); + setLeft(node, getRight(parent)); + setRight(parent, node); + } + // case 2.2 and 2.3 + setRed(parent, false); + setRed(grandparent, true); + // left-rotate on grandparent + if (greatGrandparent == NULL) { + root = parent; + } else if (getRight(greatGrandparent) == grandparent) { + setRight(greatGrandparent, parent); + } else { + setLeft(greatGrandparent, parent); + } + setRight(grandparent, getLeft(parent)); + setLeft(parent, grandparent); + return false; + } + } + } else { + return true; + } + } + + /** + * Add the new key to the tree. + * @return true if the element is a new one. + */ + protected boolean add() { + add(root, false, NULL, NULL, NULL); + if (wasAdd) { + setRed(root, false); + return true; + } else { + return false; + } + } + + /** + * Get the number of elements in the set. + */ + public int size() { + return size; + } + + /** + * Reset the table to empty. + */ + public void clear() { + root = NULL; + size = 0; + data.clear(); + } + + /** + * Get the buffer size in bytes. + */ + public long getSizeInBytes() { + return data.getSizeInBytes(); + } +} + http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java new file mode 100644 index 0000000..24bd051 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteReader.java @@ -0,0 +1,174 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; + +/** + * A reader that reads a sequence of bytes. A control byte is read before + * each run with positive values 0 to 127 meaning 3 to 130 repetitions. If the + * byte is -1 to -128, 1 to 128 literal byte values follow. + */ +public class RunLengthByteReader { + private InStream input; + private final byte[] literals = + new byte[RunLengthByteWriter.MAX_LITERAL_SIZE]; + private int numLiterals = 0; + private int used = 0; + private boolean repeat = false; + + public RunLengthByteReader(InStream input) throws IOException { + this.input = input; + } + + public void setInStream(InStream input) { + this.input = input; + } + + private void readValues(boolean ignoreEof) throws IOException { + int control = input.read(); + used = 0; + if (control == -1) { + if (!ignoreEof) { + throw new EOFException("Read past end of buffer RLE byte from " + input); + } + used = numLiterals = 0; + return; + } else if (control < 0x80) { + repeat = true; + numLiterals = control + RunLengthByteWriter.MIN_REPEAT_SIZE; + int val = input.read(); + if (val == -1) { + throw new EOFException("Reading RLE byte got EOF"); + } + literals[0] = (byte) val; + } else { + repeat = false; + numLiterals = 0x100 - control; + int bytes = 0; + while (bytes < numLiterals) { + int result = input.read(literals, bytes, numLiterals - bytes); + if (result == -1) { + throw new EOFException("Reading RLE byte literal got EOF in " + this); + } + bytes += result; + } + } + } + + public boolean hasNext() throws IOException { + return used != numLiterals || input.available() > 0; + } + + public byte next() throws IOException { + byte result; + if (used == numLiterals) { + readValues(false); + } + if (repeat) { + result = literals[0]; + } else { + result = literals[used]; + } + ++used; + return result; + } + + public void nextVector(ColumnVector previous, long[] data, long size) + throws IOException { + previous.isRepeating = true; + for (int i = 0; i < size; i++) { + if (!previous.isNull[i]) { + data[i] = next(); + } else { + // The default value of null for int types in vectorized + // processing is 1, so set that if the value is null + data[i] = 1; + } + + // The default value for nulls in Vectorization for int types is 1 + // and given that non null value can also be 1, we need to check for isNull also + // when determining the isRepeating flag. + if (previous.isRepeating + && i > 0 + && ((data[0] != data[i]) || + (previous.isNull[0] != previous.isNull[i]))) { + previous.isRepeating = false; + } + } + } + + /** + * Read the next size bytes into the data array, skipping over any slots + * where isNull is true. + * @param isNull if non-null, skip any rows where isNull[r] is true + * @param data the array to read into + * @param size the number of elements to read + * @throws IOException + */ + public void nextVector(boolean[] isNull, int[] data, + long size) throws IOException { + if (isNull == null) { + for(int i=0; i < size; ++i) { + data[i] = next(); + } + } else { + for(int i=0; i < size; ++i) { + if (!isNull[i]) { + data[i] = next(); + } + } + } + } + + public void seek(PositionProvider index) throws IOException { + input.seek(index); + int consumed = (int) index.getNext(); + if (consumed != 0) { + // a loop is required for cases where we break the run into two parts + while (consumed > 0) { + readValues(false); + used = consumed; + consumed -= numLiterals; + } + } else { + used = 0; + numLiterals = 0; + } + } + + public void skip(long items) throws IOException { + while (items > 0) { + if (used == numLiterals) { + readValues(false); + } + long consume = Math.min(items, numLiterals - used); + used += consume; + items -= consume; + } + } + + @Override + public String toString() { + return "byte rle " + (repeat ? "repeat" : "literal") + " used: " + + used + "/" + numLiterals + " from " + input; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java new file mode 100644 index 0000000..09108b2 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/RunLengthByteWriter.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; + +/** + * A streamFactory that writes a sequence of bytes. A control byte is written before + * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the + * bytes is -1 to -128, 1 to 128 literal byte values follow. + */ +public class RunLengthByteWriter { + static final int MIN_REPEAT_SIZE = 3; + static final int MAX_LITERAL_SIZE = 128; + static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE; + private final PositionedOutputStream output; + private final byte[] literals = new byte[MAX_LITERAL_SIZE]; + private int numLiterals = 0; + private boolean repeat = false; + private int tailRunLength = 0; + + public RunLengthByteWriter(PositionedOutputStream output) { + this.output = output; + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + if (repeat) { + output.write(numLiterals - MIN_REPEAT_SIZE); + output.write(literals, 0, 1); + } else { + output.write(-numLiterals); + output.write(literals, 0, numLiterals); + } + repeat = false; + tailRunLength = 0; + numLiterals = 0; + } + } + + public void flush() throws IOException { + writeValues(); + output.flush(); + } + + public void write(byte value) throws IOException { + if (numLiterals == 0) { + literals[numLiterals++] = value; + tailRunLength = 1; + } else if (repeat) { + if (value == literals[0]) { + numLiterals += 1; + if (numLiterals == MAX_REPEAT_SIZE) { + writeValues(); + } + } else { + writeValues(); + literals[numLiterals++] = value; + tailRunLength = 1; + } + } else { + if (value == literals[numLiterals - 1]) { + tailRunLength += 1; + } else { + tailRunLength = 1; + } + if (tailRunLength == MIN_REPEAT_SIZE) { + if (numLiterals + 1 == MIN_REPEAT_SIZE) { + repeat = true; + numLiterals += 1; + } else { + numLiterals -= MIN_REPEAT_SIZE - 1; + writeValues(); + literals[0] = value; + repeat = true; + numLiterals = MIN_REPEAT_SIZE; + } + } else { + literals[numLiterals++] = value; + if (numLiterals == MAX_LITERAL_SIZE) { + writeValues(); + } + } + } + } + + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReader.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReader.java new file mode 100644 index 0000000..b91a263 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReader.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.EOFException; +import java.io.IOException; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; + +/** + * A reader that reads a sequence of integers. + * */ +public class RunLengthIntegerReader implements IntegerReader { + private InStream input; + private final boolean signed; + private final long[] literals = + new long[RunLengthIntegerWriter.MAX_LITERAL_SIZE]; + private int numLiterals = 0; + private int delta = 0; + private int used = 0; + private boolean repeat = false; + private SerializationUtils utils; + + public RunLengthIntegerReader(InStream input, boolean signed) throws IOException { + this.input = input; + this.signed = signed; + this.utils = new SerializationUtils(); + } + + private void readValues(boolean ignoreEof) throws IOException { + int control = input.read(); + if (control == -1) { + if (!ignoreEof) { + throw new EOFException("Read past end of RLE integer from " + input); + } + used = numLiterals = 0; + return; + } else if (control < 0x80) { + numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE; + used = 0; + repeat = true; + delta = input.read(); + if (delta == -1) { + throw new EOFException("End of stream in RLE Integer from " + input); + } + // convert from 0 to 255 to -128 to 127 by converting to a signed byte + delta = (byte) (0 + delta); + if (signed) { + literals[0] = utils.readVslong(input); + } else { + literals[0] = utils.readVulong(input); + } + } else { + repeat = false; + numLiterals = 0x100 - control; + used = 0; + for(int i=0; i < numLiterals; ++i) { + if (signed) { + literals[i] = utils.readVslong(input); + } else { + literals[i] = utils.readVulong(input); + } + } + } + } + + @Override + public boolean hasNext() throws IOException { + return used != numLiterals || input.available() > 0; + } + + @Override + public long next() throws IOException { + long result; + if (used == numLiterals) { + readValues(false); + } + if (repeat) { + result = literals[0] + (used++) * delta; + } else { + result = literals[used++]; + } + return result; + } + + @Override + public void nextVector(ColumnVector previous, + long[] data, + int previousLen) throws IOException { + previous.isRepeating = true; + for (int i = 0; i < previousLen; i++) { + if (!previous.isNull[i]) { + data[i] = next(); + } else { + // The default value of null for int type in vectorized + // processing is 1, so set that if the value is null + data[i] = 1; + } + + // The default value for nulls in Vectorization for int types is 1 + // and given that non null value can also be 1, we need to check for isNull also + // when determining the isRepeating flag. + if (previous.isRepeating + && i > 0 + && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) { + previous.isRepeating = false; + } + } + } + + @Override + public void nextVector(ColumnVector vector, + int[] data, + int size) throws IOException { + if (vector.noNulls) { + for(int r=0; r < data.length && r < size; ++r) { + data[r] = (int) next(); + } + } else if (!(vector.isRepeating && vector.isNull[0])) { + for(int r=0; r < data.length && r < size; ++r) { + if (!vector.isNull[r]) { + data[r] = (int) next(); + } else { + data[r] = 1; + } + } + } + } + + @Override + public void seek(PositionProvider index) throws IOException { + input.seek(index); + int consumed = (int) index.getNext(); + if (consumed != 0) { + // a loop is required for cases where we break the run into two parts + while (consumed > 0) { + readValues(false); + used = consumed; + consumed -= numLiterals; + } + } else { + used = 0; + numLiterals = 0; + } + } + + @Override + public void skip(long numValues) throws IOException { + while (numValues > 0) { + if (used == numLiterals) { + readValues(false); + } + long consume = Math.min(numValues, numLiterals - used); + used += consume; + numValues -= consume; + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java new file mode 100644 index 0000000..610d9b5 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java @@ -0,0 +1,406 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.EOFException; +import java.io.IOException; +import java.util.Arrays; + +import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A reader that reads a sequence of light weight compressed integers. Refer + * {@link RunLengthIntegerWriterV2} for description of various lightweight + * compression techniques. + */ +public class RunLengthIntegerReaderV2 implements IntegerReader { + public static final Logger LOG = LoggerFactory.getLogger(RunLengthIntegerReaderV2.class); + + private InStream input; + private final boolean signed; + private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE]; + private boolean isRepeating = false; + private int numLiterals = 0; + private int used = 0; + private final boolean skipCorrupt; + private final SerializationUtils utils; + private RunLengthIntegerWriterV2.EncodingType currentEncoding; + + public RunLengthIntegerReaderV2(InStream input, boolean signed, + boolean skipCorrupt) throws IOException { + this.input = input; + this.signed = signed; + this.skipCorrupt = skipCorrupt; + this.utils = new SerializationUtils(); + } + + private final static RunLengthIntegerWriterV2.EncodingType[] encodings = RunLengthIntegerWriterV2.EncodingType.values(); + private void readValues(boolean ignoreEof) throws IOException { + // read the first 2 bits and determine the encoding type + isRepeating = false; + int firstByte = input.read(); + if (firstByte < 0) { + if (!ignoreEof) { + throw new EOFException("Read past end of RLE integer from " + input); + } + used = numLiterals = 0; + return; + } + currentEncoding = encodings[(firstByte >>> 6) & 0x03]; + switch (currentEncoding) { + case SHORT_REPEAT: readShortRepeatValues(firstByte); break; + case DIRECT: readDirectValues(firstByte); break; + case PATCHED_BASE: readPatchedBaseValues(firstByte); break; + case DELTA: readDeltaValues(firstByte); break; + default: throw new IOException("Unknown encoding " + currentEncoding); + } + } + + private void readDeltaValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fb = (firstByte >>> 1) & 0x1f; + if (fb != 0) { + fb = utils.decodeBitWidth(fb); + } + + // extract the blob run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + + // read the first value stored as vint + long firstVal = 0; + if (signed) { + firstVal = utils.readVslong(input); + } else { + firstVal = utils.readVulong(input); + } + + // store first value to result buffer + long prevVal = firstVal; + literals[numLiterals++] = firstVal; + + // if fixed bits is 0 then all values have fixed delta + if (fb == 0) { + // read the fixed delta value stored as vint (deltas can be negative even + // if all number are positive) + long fd = utils.readVslong(input); + if (fd == 0) { + isRepeating = true; + assert numLiterals == 1; + Arrays.fill(literals, numLiterals, numLiterals + len, literals[0]); + numLiterals += len; + } else { + // add fixed deltas to adjacent values + for(int i = 0; i < len; i++) { + literals[numLiterals++] = literals[numLiterals - 2] + fd; + } + } + } else { + long deltaBase = utils.readVslong(input); + // add delta base and first value + literals[numLiterals++] = firstVal + deltaBase; + prevVal = literals[numLiterals - 1]; + len -= 1; + + // write the unpacked values, add it to previous value and store final + // value to result buffer. if the delta base value is negative then it + // is a decreasing sequence else an increasing sequence + utils.readInts(literals, numLiterals, len, fb, input); + while (len > 0) { + if (deltaBase < 0) { + literals[numLiterals] = prevVal - literals[numLiterals]; + } else { + literals[numLiterals] = prevVal + literals[numLiterals]; + } + prevVal = literals[numLiterals]; + len--; + numLiterals++; + } + } + } + + private void readPatchedBaseValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = utils.decodeBitWidth(fbo); + + // extract the run length of data blob + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are always one off + len += 1; + + // extract the number of bytes occupied by base + int thirdByte = input.read(); + int bw = (thirdByte >>> 5) & 0x07; + // base width is one off + bw += 1; + + // extract patch width + int pwo = thirdByte & 0x1f; + int pw = utils.decodeBitWidth(pwo); + + // read fourth byte and extract patch gap width + int fourthByte = input.read(); + int pgw = (fourthByte >>> 5) & 0x07; + // patch gap width is one off + pgw += 1; + + // extract the length of the patch list + int pl = fourthByte & 0x1f; + + // read the next base width number of bytes to extract base value + long base = utils.bytesToLongBE(input, bw); + long mask = (1L << ((bw * 8) - 1)); + // if MSB of base value is 1 then base is negative value else positive + if ((base & mask) != 0) { + base = base & ~mask; + base = -base; + } + + // unpack the data blob + long[] unpacked = new long[len]; + utils.readInts(unpacked, 0, len, fb, input); + + // unpack the patch blob + long[] unpackedPatch = new long[pl]; + + if ((pw + pgw) > 64 && !skipCorrupt) { + throw new IOException("Corruption in ORC data encountered. To skip" + + " reading corrupted data, set hive.exec.orc.skip.corrupt.data to" + + " true"); + } + int bitSize = utils.getClosestFixedBits(pw + pgw); + utils.readInts(unpackedPatch, 0, pl, bitSize, input); + + // apply the patch directly when decoding the packed data + int patchIdx = 0; + long currGap = 0; + long currPatch = 0; + long patchMask = ((1L << pw) - 1); + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & patchMask; + long actualGap = 0; + + // special case: gap is >255 then patch value will be 0. + // if gap is <=255 then patch value cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & patchMask; + } + // add the left over gap + actualGap += currGap; + + // unpack data blob, patch it (if required), add base to get final result + for(int i = 0; i < unpacked.length; i++) { + if (i == actualGap) { + // extract the patch value + long patchedVal = unpacked[i] | (currPatch << fb); + + // add base to patched value + literals[numLiterals++] = base + patchedVal; + + // increment the patch to point to next entry in patch list + patchIdx++; + + if (patchIdx < pl) { + // read the next gap and patch + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & patchMask; + actualGap = 0; + + // special case: gap is >255 then patch will be 0. if gap is + // <=255 then patch cannot be 0 + while (currGap == 255 && currPatch == 0) { + actualGap += 255; + patchIdx++; + currGap = unpackedPatch[patchIdx] >>> pw; + currPatch = unpackedPatch[patchIdx] & patchMask; + } + // add the left over gap + actualGap += currGap; + + // next gap is relative to the current gap + actualGap += i; + } + } else { + // no patching required. add base to unpacked value to get final value + literals[numLiterals++] = base + unpacked[i]; + } + } + + } + + private void readDirectValues(int firstByte) throws IOException { + + // extract the number of fixed bits + int fbo = (firstByte >>> 1) & 0x1f; + int fb = utils.decodeBitWidth(fbo); + + // extract the run length + int len = (firstByte & 0x01) << 8; + len |= input.read(); + // runs are one off + len += 1; + + // write the unpacked values and zigzag decode to result buffer + utils.readInts(literals, numLiterals, len, fb, input); + if (signed) { + for(int i = 0; i < len; i++) { + literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]); + numLiterals++; + } + } else { + numLiterals += len; + } + } + + private void readShortRepeatValues(int firstByte) throws IOException { + + // read the number of bytes occupied by the value + int size = (firstByte >>> 3) & 0x07; + // #bytes are one off + size += 1; + + // read the run length + int len = firstByte & 0x07; + // run lengths values are stored only after MIN_REPEAT value is met + len += RunLengthIntegerWriterV2.MIN_REPEAT; + + // read the repeated value which is store using fixed bytes + long val = utils.bytesToLongBE(input, size); + + if (signed) { + val = utils.zigzagDecode(val); + } + + if (numLiterals != 0) { + // Currently this always holds, which makes peekNextAvailLength simpler. + // If this changes, peekNextAvailLength should be adjusted accordingly. + throw new AssertionError("readValues called with existing values present"); + } + // repeat the value for length times + isRepeating = true; + // TODO: this is not so useful and V1 reader doesn't do that. Fix? Same if delta == 0 + for(int i = 0; i < len; i++) { + literals[i] = val; + } + numLiterals = len; + } + + @Override + public boolean hasNext() throws IOException { + return used != numLiterals || input.available() > 0; + } + + @Override + public long next() throws IOException { + long result; + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(false); + } + result = literals[used++]; + return result; + } + + @Override + public void seek(PositionProvider index) throws IOException { + input.seek(index); + int consumed = (int) index.getNext(); + if (consumed != 0) { + // a loop is required for cases where we break the run into two + // parts + while (consumed > 0) { + numLiterals = 0; + readValues(false); + used = consumed; + consumed -= numLiterals; + } + } else { + used = 0; + numLiterals = 0; + } + } + + @Override + public void skip(long numValues) throws IOException { + while (numValues > 0) { + if (used == numLiterals) { + numLiterals = 0; + used = 0; + readValues(false); + } + long consume = Math.min(numValues, numLiterals - used); + used += consume; + numValues -= consume; + } + } + + @Override + public void nextVector(ColumnVector previous, + long[] data, + int previousLen) throws IOException { + previous.isRepeating = true; + for (int i = 0; i < previousLen; i++) { + if (!previous.isNull[i]) { + data[i] = next(); + } else { + // The default value of null for int type in vectorized + // processing is 1, so set that if the value is null + data[i] = 1; + } + + // The default value for nulls in Vectorization for int types is 1 + // and given that non null value can also be 1, we need to check for isNull also + // when determining the isRepeating flag. + if (previous.isRepeating + && i > 0 + && (data[0] != data[i] || + previous.isNull[0] != previous.isNull[i])) { + previous.isRepeating = false; + } + } + } + + @Override + public void nextVector(ColumnVector vector, + int[] data, + int size) throws IOException { + if (vector.noNulls) { + for(int r=0; r < data.length && r < size; ++r) { + data[r] = (int) next(); + } + } else if (!(vector.isRepeating && vector.isNull[0])) { + for(int r=0; r < data.length && r < size; ++r) { + if (!vector.isNull[r]) { + data[r] = (int) next(); + } else { + data[r] = 1; + } + } + } + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java new file mode 100644 index 0000000..3e5f2e2 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java @@ -0,0 +1,143 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; + +/** + * A streamFactory that writes a sequence of integers. A control byte is written before + * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each + * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128 + * literal vint values follow. + */ +public class RunLengthIntegerWriter implements IntegerWriter { + static final int MIN_REPEAT_SIZE = 3; + static final int MAX_DELTA = 127; + static final int MIN_DELTA = -128; + static final int MAX_LITERAL_SIZE = 128; + private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE; + private final PositionedOutputStream output; + private final boolean signed; + private final long[] literals = new long[MAX_LITERAL_SIZE]; + private int numLiterals = 0; + private long delta = 0; + private boolean repeat = false; + private int tailRunLength = 0; + private SerializationUtils utils; + + public RunLengthIntegerWriter(PositionedOutputStream output, + boolean signed) { + this.output = output; + this.signed = signed; + this.utils = new SerializationUtils(); + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + if (repeat) { + output.write(numLiterals - MIN_REPEAT_SIZE); + output.write((byte) delta); + if (signed) { + utils.writeVslong(output, literals[0]); + } else { + utils.writeVulong(output, literals[0]); + } + } else { + output.write(-numLiterals); + for(int i=0; i < numLiterals; ++i) { + if (signed) { + utils.writeVslong(output, literals[i]); + } else { + utils.writeVulong(output, literals[i]); + } + } + } + repeat = false; + numLiterals = 0; + tailRunLength = 0; + } + } + + @Override + public void flush() throws IOException { + writeValues(); + output.flush(); + } + + @Override + public void write(long value) throws IOException { + if (numLiterals == 0) { + literals[numLiterals++] = value; + tailRunLength = 1; + } else if (repeat) { + if (value == literals[0] + delta * numLiterals) { + numLiterals += 1; + if (numLiterals == MAX_REPEAT_SIZE) { + writeValues(); + } + } else { + writeValues(); + literals[numLiterals++] = value; + tailRunLength = 1; + } + } else { + if (tailRunLength == 1) { + delta = value - literals[numLiterals - 1]; + if (delta < MIN_DELTA || delta > MAX_DELTA) { + tailRunLength = 1; + } else { + tailRunLength = 2; + } + } else if (value == literals[numLiterals - 1] + delta) { + tailRunLength += 1; + } else { + delta = value - literals[numLiterals - 1]; + if (delta < MIN_DELTA || delta > MAX_DELTA) { + tailRunLength = 1; + } else { + tailRunLength = 2; + } + } + if (tailRunLength == MIN_REPEAT_SIZE) { + if (numLiterals + 1 == MIN_REPEAT_SIZE) { + repeat = true; + numLiterals += 1; + } else { + numLiterals -= MIN_REPEAT_SIZE - 1; + long base = literals[numLiterals]; + writeValues(); + literals[0] = base; + repeat = true; + numLiterals = MIN_REPEAT_SIZE; + } + } else { + literals[numLiterals++] = value; + if (numLiterals == MAX_LITERAL_SIZE) { + writeValues(); + } + } + } + } + + @Override + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } + +}
