http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java new file mode 100644 index 0000000..fab2801 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java @@ -0,0 +1,831 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.io.IOException; + +/** + * A writer that performs light weight compression over sequence of integers. + * <p> + * There are four types of lightweight integer compression + * <ul> + * <li>SHORT_REPEAT</li> + * <li>DIRECT</li> + * <li>PATCHED_BASE</li> + * <li>DELTA</li> + * </ul> + * </p> + * The description and format for these types are as below: + * <p> + * <b>SHORT_REPEAT:</b> Used for short repeated integer sequences. + * <ul> + * <li>1 byte header + * <ul> + * <li>2 bits for encoding type</li> + * <li>3 bits for bytes required for repeating value</li> + * <li>3 bits for repeat count (MIN_REPEAT + run length)</li> + * </ul> + * </li> + * <li>Blob - repeat value (fixed bytes)</li> + * </ul> + * </p> + * <p> + * <b>DIRECT:</b> Used for random integer sequences whose number of bit + * requirement doesn't vary a lot. + * <ul> + * <li>2 bytes header + * <ul> + * 1st byte + * <li>2 bits for encoding type</li> + * <li>5 bits for fixed bit width of values in blob</li> + * <li>1 bit for storing MSB of run length</li> + * </ul> + * <ul> + * 2nd byte + * <li>8 bits for lower run length bits</li> + * </ul> + * </li> + * <li>Blob - stores the direct values using fixed bit width. The length of the + * data blob is (fixed width * run length) bits long</li> + * </ul> + * </p> + * <p> + * <b>PATCHED_BASE:</b> Used for random integer sequences whose number of bit + * requirement varies beyond a threshold. + * <ul> + * <li>4 bytes header + * <ul> + * 1st byte + * <li>2 bits for encoding type</li> + * <li>5 bits for fixed bit width of values in blob</li> + * <li>1 bit for storing MSB of run length</li> + * </ul> + * <ul> + * 2nd byte + * <li>8 bits for lower run length bits</li> + * </ul> + * <ul> + * 3rd byte + * <li>3 bits for bytes required to encode base value</li> + * <li>5 bits for patch width</li> + * </ul> + * <ul> + * 4th byte + * <li>3 bits for patch gap width</li> + * <li>5 bits for patch length</li> + * </ul> + * </li> + * <li>Base value - Stored using fixed number of bytes. If MSB is set, base + * value is negative else positive. Length of base value is (base width * 8) + * bits.</li> + * <li>Data blob - Base reduced values as stored using fixed bit width. Length + * of data blob is (fixed width * run length) bits.</li> + * <li>Patch blob - Patch blob is a list of gap and patch value. Each entry in + * the patch list is (patch width + patch gap width) bits long. Gap between the + * subsequent elements to be patched are stored in upper part of entry whereas + * patch values are stored in lower part of entry. Length of patch blob is + * ((patch width + patch gap width) * patch length) bits.</li> + * </ul> + * </p> + * <p> + * <b>DELTA</b> Used for monotonically increasing or decreasing sequences, + * sequences with fixed delta values or long repeated sequences. + * <ul> + * <li>2 bytes header + * <ul> + * 1st byte + * <li>2 bits for encoding type</li> + * <li>5 bits for fixed bit width of values in blob</li> + * <li>1 bit for storing MSB of run length</li> + * </ul> + * <ul> + * 2nd byte + * <li>8 bits for lower run length bits</li> + * </ul> + * </li> + * <li>Base value - zigzag encoded value written as varint</li> + * <li>Delta base - zigzag encoded value written as varint</li> + * <li>Delta blob - only positive values. monotonicity and orderness are decided + * based on the sign of the base value and delta base</li> + * </ul> + * </p> + */ +public class RunLengthIntegerWriterV2 implements IntegerWriter { + + public enum EncodingType { + SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA + } + + static final int MAX_SCOPE = 512; + static final int MIN_REPEAT = 3; + private static final int MAX_SHORT_REPEAT_LENGTH = 10; + private long prevDelta = 0; + private int fixedRunLength = 0; + private int variableRunLength = 0; + private final long[] literals = new long[MAX_SCOPE]; + private final PositionedOutputStream output; + private final boolean signed; + private EncodingType encoding; + private int numLiterals; + private final long[] zigzagLiterals = new long[MAX_SCOPE]; + private final long[] baseRedLiterals = new long[MAX_SCOPE]; + private final long[] adjDeltas = new long[MAX_SCOPE]; + private long fixedDelta; + private int zzBits90p; + private int zzBits100p; + private int brBits95p; + private int brBits100p; + private int bitsDeltaMax; + private int patchWidth; + private int patchGapWidth; + private int patchLength; + private long[] gapVsPatchList; + private long min; + private boolean isFixedDelta; + private SerializationUtils utils; + private boolean alignedBitpacking; + + RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { + this(output, signed, true); + } + + public RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed, + boolean alignedBitpacking) { + this.output = output; + this.signed = signed; + this.alignedBitpacking = alignedBitpacking; + this.utils = new SerializationUtils(); + clear(); + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + + if (encoding.equals(EncodingType.SHORT_REPEAT)) { + writeShortRepeatValues(); + } else if (encoding.equals(EncodingType.DIRECT)) { + writeDirectValues(); + } else if (encoding.equals(EncodingType.PATCHED_BASE)) { + writePatchedBaseValues(); + } else { + writeDeltaValues(); + } + + // clear all the variables + clear(); + } + } + + private void writeDeltaValues() throws IOException { + int len = 0; + int fb = bitsDeltaMax; + int efb = 0; + + if (alignedBitpacking) { + fb = utils.getClosestAlignedFixedBits(fb); + } + + if (isFixedDelta) { + // if fixed run length is greater than threshold then it will be fixed + // delta sequence with delta value 0 else fixed delta sequence with + // non-zero delta value + if (fixedRunLength > MIN_REPEAT) { + // ex. sequence: 2 2 2 2 2 2 2 2 + len = fixedRunLength - 1; + fixedRunLength = 0; + } else { + // ex. sequence: 4 6 8 10 12 14 16 + len = variableRunLength - 1; + variableRunLength = 0; + } + } else { + // fixed width 0 is used for long repeating values. + // sequences that require only 1 bit to encode will have an additional bit + if (fb == 1) { + fb = 2; + } + efb = utils.encodeBitWidth(fb); + efb = efb << 1; + len = variableRunLength - 1; + variableRunLength = 0; + } + + // extract the 9th bit of run length + final int tailBits = (len & 0x100) >>> 8; + + // create first byte of the header + final int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + final int headerSecondByte = len & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // store the first value from zigzag literal array + if (signed) { + utils.writeVslong(output, literals[0]); + } else { + utils.writeVulong(output, literals[0]); + } + + if (isFixedDelta) { + // if delta is fixed then we don't need to store delta blob + utils.writeVslong(output, fixedDelta); + } else { + // store the first value as delta value using zigzag encoding + utils.writeVslong(output, adjDeltas[0]); + + // adjacent delta values are bit packed. The length of adjDeltas array is + // always one less than the number of literals (delta difference for n + // elements is n-1). We have already written one element, write the + // remaining numLiterals - 2 elements here + utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output); + } + } + + private void writePatchedBaseValues() throws IOException { + + // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding + // because patch is applied to MSB bits. For example: If fixed bit width of + // base value is 7 bits and if patch is 3 bits, the actual value is + // constructed by shifting the patch to left by 7 positions. + // actual_value = patch << 7 | base_value + // So, if we align base_value then actual_value can not be reconstructed. + + // write the number of fixed bits required in next 5 bits + final int fb = brBits95p; + final int efb = utils.encodeBitWidth(fb) << 1; + + // adjust variable run length, they are one off + variableRunLength -= 1; + + // extract the 9th bit of run length + final int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + final int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + final int headerSecondByte = variableRunLength & 0xff; + + // if the min value is negative toggle the sign + final boolean isNegative = min < 0 ? true : false; + if (isNegative) { + min = -min; + } + + // find the number of bytes required for base and shift it by 5 bits + // to accommodate patch width. The additional bit is used to store the sign + // of the base value. + final int baseWidth = utils.findClosestNumBits(min) + 1; + final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; + final int bb = (baseBytes - 1) << 5; + + // if the base value is negative then set MSB to 1 + if (isNegative) { + min |= (1L << ((baseBytes * 8) - 1)); + } + + // third byte contains 3 bits for number of bytes occupied by base + // and 5 bits for patchWidth + final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth); + + // fourth byte contains 3 bits for page gap width and 5 bits for + // patch length + final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + output.write(headerThirdByte); + output.write(headerFourthByte); + + // write the base value using fixed bytes in big endian order + for(int i = baseBytes - 1; i >= 0; i--) { + byte b = (byte) ((min >>> (i * 8)) & 0xff); + output.write(b); + } + + // base reduced literals are bit packed + int closestFixedBits = utils.getClosestFixedBits(fb); + + utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits, + output); + + // write patch list + closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth); + + utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits, + output); + + // reset run length + variableRunLength = 0; + } + + /** + * Store the opcode in 2 MSB bits + * @return opcode + */ + private int getOpcode() { + return encoding.ordinal() << 6; + } + + private void writeDirectValues() throws IOException { + + // write the number of fixed bits required in next 5 bits + int fb = zzBits100p; + + if (alignedBitpacking) { + fb = utils.getClosestAlignedFixedBits(fb); + } + + final int efb = utils.encodeBitWidth(fb) << 1; + + // adjust variable run length + variableRunLength -= 1; + + // extract the 9th bit of run length + final int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + final int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + final int headerSecondByte = variableRunLength & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // bit packing the zigzag encoded literals + utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output); + + // reset run length + variableRunLength = 0; + } + + private void writeShortRepeatValues() throws IOException { + // get the value that is repeating, compute the bits and bytes required + long repeatVal = 0; + if (signed) { + repeatVal = utils.zigzagEncode(literals[0]); + } else { + repeatVal = literals[0]; + } + + final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal); + final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 + : (numBitsRepeatVal >>> 3) + 1; + + // write encoding type in top 2 bits + int header = getOpcode(); + + // write the number of bytes required for the value + header |= ((numBytesRepeatVal - 1) << 3); + + // write the run length + fixedRunLength -= MIN_REPEAT; + header |= fixedRunLength; + + // write the header + output.write(header); + + // write the repeating value in big endian byte order + for(int i = numBytesRepeatVal - 1; i >= 0; i--) { + int b = (int) ((repeatVal >>> (i * 8)) & 0xff); + output.write(b); + } + + fixedRunLength = 0; + } + + private void determineEncoding() { + + // we need to compute zigzag values for DIRECT encoding if we decide to + // break early for delta overflows or for shorter runs + computeZigZagLiterals(); + + zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0); + + // not a big win for shorter runs to determine encoding + if (numLiterals <= MIN_REPEAT) { + encoding = EncodingType.DIRECT; + return; + } + + // DELTA encoding check + + // for identifying monotonic sequences + boolean isIncreasing = true; + boolean isDecreasing = true; + this.isFixedDelta = true; + + this.min = literals[0]; + long max = literals[0]; + final long initialDelta = literals[1] - literals[0]; + long currDelta = initialDelta; + long deltaMax = initialDelta; + this.adjDeltas[0] = initialDelta; + + for (int i = 1; i < numLiterals; i++) { + final long l1 = literals[i]; + final long l0 = literals[i - 1]; + currDelta = l1 - l0; + min = Math.min(min, l1); + max = Math.max(max, l1); + + isIncreasing &= (l0 <= l1); + isDecreasing &= (l0 >= l1); + + isFixedDelta &= (currDelta == initialDelta); + if (i > 1) { + adjDeltas[i - 1] = Math.abs(currDelta); + deltaMax = Math.max(deltaMax, adjDeltas[i - 1]); + } + } + + // its faster to exit under delta overflow condition without checking for + // PATCHED_BASE condition as encoding using DIRECT is faster and has less + // overhead than PATCHED_BASE + if (!utils.isSafeSubtract(max, min)) { + encoding = EncodingType.DIRECT; + return; + } + + // invariant - subtracting any number from any other in the literals after + // this point won't overflow + + // if min is equal to max then the delta is 0, this condition happens for + // fixed values run >10 which cannot be encoded with SHORT_REPEAT + if (min == max) { + assert isFixedDelta : min + "==" + max + + ", isFixedDelta cannot be false"; + assert currDelta == 0 : min + "==" + max + ", currDelta should be zero"; + fixedDelta = 0; + encoding = EncodingType.DELTA; + return; + } + + if (isFixedDelta) { + assert currDelta == initialDelta + : "currDelta should be equal to initialDelta for fixed delta encoding"; + encoding = EncodingType.DELTA; + fixedDelta = currDelta; + return; + } + + // if initialDelta is 0 then we cannot delta encode as we cannot identify + // the sign of deltas (increasing or decreasing) + if (initialDelta != 0) { + // stores the number of bits required for packing delta blob in + // delta encoding + bitsDeltaMax = utils.findClosestNumBits(deltaMax); + + // monotonic condition + if (isIncreasing || isDecreasing) { + encoding = EncodingType.DELTA; + return; + } + } + + // PATCHED_BASE encoding check + + // percentile values are computed for the zigzag encoded values. if the + // number of bit requirement between 90th and 100th percentile varies + // beyond a threshold then we need to patch the values. if the variation + // is not significant then we can use direct encoding + + zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9); + int diffBitsLH = zzBits100p - zzBits90p; + + // if the difference between 90th percentile and 100th percentile fixed + // bits is > 1 then we need patch the values + if (diffBitsLH > 1) { + + // patching is done only on base reduced values. + // remove base from literals + for (int i = 0; i < numLiterals; i++) { + baseRedLiterals[i] = literals[i] - min; + } + + // 95th percentile width is used to determine max allowed value + // after which patching will be done + brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95); + + // 100th percentile is used to compute the max patch width + brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0); + + // after base reducing the values, if the difference in bits between + // 95th percentile and 100th percentile value is zero then there + // is no point in patching the values, in which case we will + // fallback to DIRECT encoding. + // The decision to use patched base was based on zigzag values, but the + // actual patching is done on base reduced literals. + if ((brBits100p - brBits95p) != 0) { + encoding = EncodingType.PATCHED_BASE; + preparePatchedBlob(); + return; + } else { + encoding = EncodingType.DIRECT; + return; + } + } else { + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct + encoding = EncodingType.DIRECT; + return; + } + } + + private void computeZigZagLiterals() { + // populate zigzag encoded literals + long zzEncVal = 0; + for (int i = 0; i < numLiterals; i++) { + if (signed) { + zzEncVal = utils.zigzagEncode(literals[i]); + } else { + zzEncVal = literals[i]; + } + zigzagLiterals[i] = zzEncVal; + } + } + + private void preparePatchedBlob() { + // mask will be max value beyond which patch will be generated + long mask = (1L << brBits95p) - 1; + + // since we are considering only 95 percentile, the size of gap and + // patch array can contain only be 5% values + patchLength = (int) Math.ceil((numLiterals * 0.05)); + + int[] gapList = new int[patchLength]; + long[] patchList = new long[patchLength]; + + // #bit for patch + patchWidth = brBits100p - brBits95p; + patchWidth = utils.getClosestFixedBits(patchWidth); + + // if patch bit requirement is 64 then it will not possible to pack + // gap and patch together in a long. To make sure gap and patch can be + // packed together adjust the patch width + if (patchWidth == 64) { + patchWidth = 56; + brBits95p = 8; + mask = (1L << brBits95p) - 1; + } + + int gapIdx = 0; + int patchIdx = 0; + int prev = 0; + int gap = 0; + int maxGap = 0; + + for(int i = 0; i < numLiterals; i++) { + // if value is above mask then create the patch and record the gap + if (baseRedLiterals[i] > mask) { + gap = i - prev; + if (gap > maxGap) { + maxGap = gap; + } + + // gaps are relative, so store the previous patched value index + prev = i; + gapList[gapIdx++] = gap; + + // extract the most significant bits that are over mask bits + long patch = baseRedLiterals[i] >>> brBits95p; + patchList[patchIdx++] = patch; + + // strip off the MSB to enable safe bit packing + baseRedLiterals[i] &= mask; + } + } + + // adjust the patch length to number of entries in gap list + patchLength = gapIdx; + + // if the element to be patched is the first and only element then + // max gap will be 0, but to store the gap as 0 we need atleast 1 bit + if (maxGap == 0 && patchLength != 0) { + patchGapWidth = 1; + } else { + patchGapWidth = utils.findClosestNumBits(maxGap); + } + + // special case: if the patch gap width is greater than 256, then + // we need 9 bits to encode the gap width. But we only have 3 bits in + // header to record the gap width. To deal with this case, we will save + // two entries in patch list in the following way + // 256 gap width => 0 for patch value + // actual gap - 256 => actual patch value + // We will do the same for gap width = 511. If the element to be patched is + // the last element in the scope then gap width will be 511. In this case we + // will have 3 entries in the patch list in the following way + // 255 gap width => 0 for patch value + // 255 gap width => 0 for patch value + // 1 gap width => actual patch value + if (patchGapWidth > 8) { + patchGapWidth = 8; + // for gap = 511, we need two additional entries in patch list + if (maxGap == 511) { + patchLength += 2; + } else { + patchLength += 1; + } + } + + // create gap vs patch list + gapIdx = 0; + patchIdx = 0; + gapVsPatchList = new long[patchLength]; + for(int i = 0; i < patchLength; i++) { + long g = gapList[gapIdx++]; + long p = patchList[patchIdx++]; + while (g > 255) { + gapVsPatchList[i++] = (255L << patchWidth); + g -= 255; + } + + // store patch value in LSBs and gap in MSBs + gapVsPatchList[i] = (g << patchWidth) | p; + } + } + + /** + * clears all the variables + */ + private void clear() { + numLiterals = 0; + encoding = null; + prevDelta = 0; + fixedDelta = 0; + zzBits90p = 0; + zzBits100p = 0; + brBits95p = 0; + brBits100p = 0; + bitsDeltaMax = 0; + patchGapWidth = 0; + patchLength = 0; + patchWidth = 0; + gapVsPatchList = null; + min = 0; + isFixedDelta = true; + } + + @Override + public void flush() throws IOException { + if (numLiterals != 0) { + if (variableRunLength != 0) { + determineEncoding(); + writeValues(); + } else if (fixedRunLength != 0) { + if (fixedRunLength < MIN_REPEAT) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + determineEncoding(); + writeValues(); + } else if (fixedRunLength >= MIN_REPEAT + && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + encoding = EncodingType.DELTA; + isFixedDelta = true; + writeValues(); + } + } + } + output.flush(); + } + + @Override + public void write(long val) throws IOException { + if (numLiterals == 0) { + initializeLiterals(val); + } else { + if (numLiterals == 1) { + prevDelta = val - literals[0]; + literals[numLiterals++] = val; + // if both values are same count as fixed run else variable run + if (val == literals[0]) { + fixedRunLength = 2; + variableRunLength = 0; + } else { + fixedRunLength = 0; + variableRunLength = 2; + } + } else { + long currentDelta = val - literals[numLiterals - 1]; + if (prevDelta == 0 && currentDelta == 0) { + // fixed delta run + + literals[numLiterals++] = val; + + // if variable run is non-zero then we are seeing repeating + // values at the end of variable run in which case keep + // updating variable and fixed runs + if (variableRunLength > 0) { + fixedRunLength = 2; + } + fixedRunLength += 1; + + // if fixed run met the minimum condition and if variable + // run is non-zero then flush the variable run and shift the + // tail fixed runs to start of the buffer + if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { + numLiterals -= MIN_REPEAT; + variableRunLength -= MIN_REPEAT - 1; + // copy the tail fixed runs + long[] tailVals = new long[MIN_REPEAT]; + System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT); + + // determine variable encoding and flush values + determineEncoding(); + writeValues(); + + // shift tail fixed runs to beginning of the buffer + for(long l : tailVals) { + literals[numLiterals++] = l; + } + } + + // if fixed runs reached max repeat length then write values + if (fixedRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } else { + // variable delta run + + // if fixed run length is non-zero and if it satisfies the + // short repeat conditions then write the values as short repeats + // else use delta encoding + if (fixedRunLength >= MIN_REPEAT) { + if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + encoding = EncodingType.DELTA; + isFixedDelta = true; + writeValues(); + } + } + + // if fixed run length is <MIN_REPEAT and current value is + // different from previous then treat it as variable run + if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT) { + if (val != literals[numLiterals - 1]) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + } + } + + // after writing values re-initialize the variables + if (numLiterals == 0) { + initializeLiterals(val); + } else { + // keep updating variable run lengths + prevDelta = val - literals[numLiterals - 1]; + literals[numLiterals++] = val; + variableRunLength += 1; + + // if variable run length reach the max scope, write it + if (variableRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } + } + } + } + } + + private void initializeLiterals(long val) { + literals[numLiterals++] = val; + fixedRunLength = 1; + variableRunLength = 1; + } + + @Override + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +}
http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java new file mode 100644 index 0000000..2c80aaa --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/SchemaEvolution.java @@ -0,0 +1,190 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.orc.TypeDescription; + +/** + * Take the file types and the (optional) configuration column names/types and see if there + * has been schema evolution. + */ +public class SchemaEvolution { + private final Map<TypeDescription, TypeDescription> readerToFile; + private final boolean[] included; + private final TypeDescription readerSchema; + private static final Log LOG = LogFactory.getLog(SchemaEvolution.class); + + public SchemaEvolution(TypeDescription readerSchema, boolean[] included) { + this.included = included; + readerToFile = null; + this.readerSchema = readerSchema; + } + + public SchemaEvolution(TypeDescription fileSchema, + TypeDescription readerSchema, + boolean[] included) throws IOException { + readerToFile = new HashMap<>(readerSchema.getMaximumId() + 1); + this.included = included; + if (checkAcidSchema(fileSchema)) { + this.readerSchema = createEventSchema(readerSchema); + } else { + this.readerSchema = readerSchema; + } + buildMapping(fileSchema, this.readerSchema); + } + + public TypeDescription getReaderSchema() { + return readerSchema; + } + + public TypeDescription getFileType(TypeDescription readerType) { + TypeDescription result; + if (readerToFile == null) { + if (included == null || included[readerType.getId()]) { + result = readerType; + } else { + result = null; + } + } else { + result = readerToFile.get(readerType); + } + return result; + } + + void buildMapping(TypeDescription fileType, + TypeDescription readerType) throws IOException { + // if the column isn't included, don't map it + if (included != null && !included[readerType.getId()]) { + return; + } + boolean isOk = true; + // check the easy case first + if (fileType.getCategory() == readerType.getCategory()) { + switch (readerType.getCategory()) { + case BOOLEAN: + case BYTE: + case SHORT: + case INT: + case LONG: + case DOUBLE: + case FLOAT: + case STRING: + case TIMESTAMP: + case BINARY: + case DATE: + // these are always a match + break; + case CHAR: + case VARCHAR: + // HIVE-13648: Look at ORC data type conversion edge cases (CHAR, VARCHAR, DECIMAL) + isOk = fileType.getMaxLength() == readerType.getMaxLength(); + break; + case DECIMAL: + // HIVE-13648: Look at ORC data type conversion edge cases (CHAR, VARCHAR, DECIMAL) + // TODO we don't enforce scale and precision checks, but probably should + break; + case UNION: + case MAP: + case LIST: { + // these must be an exact match + List<TypeDescription> fileChildren = fileType.getChildren(); + List<TypeDescription> readerChildren = readerType.getChildren(); + if (fileChildren.size() == readerChildren.size()) { + for(int i=0; i < fileChildren.size(); ++i) { + buildMapping(fileChildren.get(i), readerChildren.get(i)); + } + } else { + isOk = false; + } + break; + } + case STRUCT: { + // allow either side to have fewer fields than the other + List<TypeDescription> fileChildren = fileType.getChildren(); + List<TypeDescription> readerChildren = readerType.getChildren(); + int jointSize = Math.min(fileChildren.size(), readerChildren.size()); + for(int i=0; i < jointSize; ++i) { + buildMapping(fileChildren.get(i), readerChildren.get(i)); + } + break; + } + default: + throw new IllegalArgumentException("Unknown type " + readerType); + } + } else { + /* + * Check for the few cases where will not convert.... + */ + + isOk = ConvertTreeReaderFactory.canConvert(fileType, readerType); + } + if (isOk) { + readerToFile.put(readerType, fileType); + } else { + throw new IOException( + String.format( + "ORC does not support type conversion from file type %s (%d) to reader type %s (%d)", + fileType.toString(), fileType.getId(), + readerType.toString(), readerType.getId())); + } + } + + private static boolean checkAcidSchema(TypeDescription type) { + if (type.getCategory().equals(TypeDescription.Category.STRUCT)) { + List<String> rootFields = type.getFieldNames(); + if (acidEventFieldNames.equals(rootFields)) { + return true; + } + } + return false; + } + + /** + * @param typeDescr + * @return ORC types for the ACID event based on the row's type description + */ + public static TypeDescription createEventSchema(TypeDescription typeDescr) { + TypeDescription result = TypeDescription.createStruct() + .addField("operation", TypeDescription.createInt()) + .addField("originalTransaction", TypeDescription.createLong()) + .addField("bucket", TypeDescription.createInt()) + .addField("rowId", TypeDescription.createLong()) + .addField("currentTransaction", TypeDescription.createLong()) + .addField("row", typeDescr.clone()); + return result; + } + + public static final List<String> acidEventFieldNames= new ArrayList<String>(); + static { + acidEventFieldNames.add("operation"); + acidEventFieldNames.add("originalTransaction"); + acidEventFieldNames.add("bucket"); + acidEventFieldNames.add("rowId"); + acidEventFieldNames.add("currentTransaction"); + acidEventFieldNames.add("row"); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/SerializationUtils.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/SerializationUtils.java b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java new file mode 100644 index 0000000..2e5a59b --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/SerializationUtils.java @@ -0,0 +1,1311 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.math.BigInteger; + +public final class SerializationUtils { + + private final static int BUFFER_SIZE = 64; + private final byte[] readBuffer; + private final byte[] writeBuffer; + + public SerializationUtils() { + this.readBuffer = new byte[BUFFER_SIZE]; + this.writeBuffer = new byte[BUFFER_SIZE]; + } + + public void writeVulong(OutputStream output, + long value) throws IOException { + while (true) { + if ((value & ~0x7f) == 0) { + output.write((byte) value); + return; + } else { + output.write((byte) (0x80 | (value & 0x7f))); + value >>>= 7; + } + } + } + + public void writeVslong(OutputStream output, + long value) throws IOException { + writeVulong(output, (value << 1) ^ (value >> 63)); + } + + + public long readVulong(InputStream in) throws IOException { + long result = 0; + long b; + int offset = 0; + do { + b = in.read(); + if (b == -1) { + throw new EOFException("Reading Vulong past EOF"); + } + result |= (0x7f & b) << offset; + offset += 7; + } while (b >= 0x80); + return result; + } + + public long readVslong(InputStream in) throws IOException { + long result = readVulong(in); + return (result >>> 1) ^ -(result & 1); + } + + public float readFloat(InputStream in) throws IOException { + readFully(in, readBuffer, 0, 4); + int val = (((readBuffer[0] & 0xff) << 0) + + ((readBuffer[1] & 0xff) << 8) + + ((readBuffer[2] & 0xff) << 16) + + ((readBuffer[3] & 0xff) << 24)); + return Float.intBitsToFloat(val); + } + + public void writeFloat(OutputStream output, + float value) throws IOException { + int ser = Float.floatToIntBits(value); + writeBuffer[0] = (byte) ((ser >> 0) & 0xff); + writeBuffer[1] = (byte) ((ser >> 8) & 0xff); + writeBuffer[2] = (byte) ((ser >> 16) & 0xff); + writeBuffer[3] = (byte) ((ser >> 24) & 0xff); + output.write(writeBuffer, 0, 4); + } + + public double readDouble(InputStream in) throws IOException { + return Double.longBitsToDouble(readLongLE(in)); + } + + public long readLongLE(InputStream in) throws IOException { + readFully(in, readBuffer, 0, 8); + return (((readBuffer[0] & 0xff) << 0) + + ((readBuffer[1] & 0xff) << 8) + + ((readBuffer[2] & 0xff) << 16) + + ((long) (readBuffer[3] & 0xff) << 24) + + ((long) (readBuffer[4] & 0xff) << 32) + + ((long) (readBuffer[5] & 0xff) << 40) + + ((long) (readBuffer[6] & 0xff) << 48) + + ((long) (readBuffer[7] & 0xff) << 56)); + } + + private void readFully(final InputStream in, final byte[] buffer, final int off, final int len) + throws IOException { + int n = 0; + while (n < len) { + int count = in.read(buffer, off + n, len - n); + if (count < 0) { + throw new EOFException("Read past EOF for " + in); + } + n += count; + } + } + + public void writeDouble(OutputStream output, + double value) throws IOException { + writeLongLE(output, Double.doubleToLongBits(value)); + } + + private void writeLongLE(OutputStream output, long value) throws IOException { + writeBuffer[0] = (byte) ((value >> 0) & 0xff); + writeBuffer[1] = (byte) ((value >> 8) & 0xff); + writeBuffer[2] = (byte) ((value >> 16) & 0xff); + writeBuffer[3] = (byte) ((value >> 24) & 0xff); + writeBuffer[4] = (byte) ((value >> 32) & 0xff); + writeBuffer[5] = (byte) ((value >> 40) & 0xff); + writeBuffer[6] = (byte) ((value >> 48) & 0xff); + writeBuffer[7] = (byte) ((value >> 56) & 0xff); + output.write(writeBuffer, 0, 8); + } + + /** + * Write the arbitrarily sized signed BigInteger in vint format. + * + * Signed integers are encoded using the low bit as the sign bit using zigzag + * encoding. + * + * Each byte uses the low 7 bits for data and the high bit for stop/continue. + * + * Bytes are stored LSB first. + * @param output the stream to write to + * @param value the value to output + * @throws IOException + */ + public static void writeBigInteger(OutputStream output, + BigInteger value) throws IOException { + // encode the signed number as a positive integer + value = value.shiftLeft(1); + int sign = value.signum(); + if (sign < 0) { + value = value.negate(); + value = value.subtract(BigInteger.ONE); + } + int length = value.bitLength(); + while (true) { + long lowBits = value.longValue() & 0x7fffffffffffffffL; + length -= 63; + // write out the next 63 bits worth of data + for(int i=0; i < 9; ++i) { + // if this is the last byte, leave the high bit off + if (length <= 0 && (lowBits & ~0x7f) == 0) { + output.write((byte) lowBits); + return; + } else { + output.write((byte) (0x80 | (lowBits & 0x7f))); + lowBits >>>= 7; + } + } + value = value.shiftRight(63); + } + } + + /** + * Read the signed arbitrary sized BigInteger BigInteger in vint format + * @param input the stream to read from + * @return the read BigInteger + * @throws IOException + */ + public static BigInteger readBigInteger(InputStream input) throws IOException { + BigInteger result = BigInteger.ZERO; + long work = 0; + int offset = 0; + long b; + do { + b = input.read(); + if (b == -1) { + throw new EOFException("Reading BigInteger past EOF from " + input); + } + work |= (0x7f & b) << (offset % 63); + offset += 7; + // if we've read 63 bits, roll them into the result + if (offset == 63) { + result = BigInteger.valueOf(work); + work = 0; + } else if (offset % 63 == 0) { + result = result.or(BigInteger.valueOf(work).shiftLeft(offset-63)); + work = 0; + } + } while (b >= 0x80); + if (work != 0) { + result = result.or(BigInteger.valueOf(work).shiftLeft((offset/63)*63)); + } + // convert back to a signed number + boolean isNegative = result.testBit(0); + if (isNegative) { + result = result.add(BigInteger.ONE); + result = result.negate(); + } + result = result.shiftRight(1); + return result; + } + + public enum FixedBitSizes { + ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, + THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, + TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, + TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR; + } + + /** + * Count the number of bits required to encode the given value + * @param value + * @return bits required to store value + */ + public int findClosestNumBits(long value) { + int count = 0; + while (value != 0) { + count++; + value = value >>> 1; + } + return getClosestFixedBits(count); + } + + /** + * zigzag encode the given value + * @param val + * @return zigzag encoded value + */ + public long zigzagEncode(long val) { + return (val << 1) ^ (val >> 63); + } + + /** + * zigzag decode the given value + * @param val + * @return zizag decoded value + */ + public long zigzagDecode(long val) { + return (val >>> 1) ^ -(val & 1); + } + + /** + * Compute the bits required to represent pth percentile value + * @param data - array + * @param p - percentile value (>=0.0 to <=1.0) + * @return pth percentile bits + */ + public int percentileBits(long[] data, int offset, int length, + double p) { + if ((p > 1.0) || (p <= 0.0)) { + return -1; + } + + // histogram that store the encoded bit requirement for each values. + // maximum number of bits that can encoded is 32 (refer FixedBitSizes) + int[] hist = new int[32]; + + // compute the histogram + for(int i = offset; i < (offset + length); i++) { + int idx = encodeBitWidth(findClosestNumBits(data[i])); + hist[idx] += 1; + } + + int perLen = (int) (length * (1.0 - p)); + + // return the bits required by pth percentile length + for(int i = hist.length - 1; i >= 0; i--) { + perLen -= hist[i]; + if (perLen < 0) { + return decodeBitWidth(i); + } + } + + return 0; + } + + /** + * Read n bytes in big endian order and convert to long + * @return long value + */ + public long bytesToLongBE(InStream input, int n) throws IOException { + long out = 0; + long val = 0; + while (n > 0) { + n--; + // store it in a long and then shift else integer overflow will occur + val = input.read(); + out |= (val << (n * 8)); + } + return out; + } + + /** + * Calculate the number of bytes required + * @param n - number of values + * @param numBits - bit width + * @return number of bytes required + */ + int getTotalBytesRequired(int n, int numBits) { + return (n * numBits + 7) / 8; + } + + /** + * For a given fixed bit this function will return the closest available fixed + * bit + * @param n + * @return closest valid fixed bit + */ + public int getClosestFixedBits(int n) { + if (n == 0) { + return 1; + } + + if (n >= 1 && n <= 24) { + return n; + } else if (n > 24 && n <= 26) { + return 26; + } else if (n > 26 && n <= 28) { + return 28; + } else if (n > 28 && n <= 30) { + return 30; + } else if (n > 30 && n <= 32) { + return 32; + } else if (n > 32 && n <= 40) { + return 40; + } else if (n > 40 && n <= 48) { + return 48; + } else if (n > 48 && n <= 56) { + return 56; + } else { + return 64; + } + } + + public int getClosestAlignedFixedBits(int n) { + if (n == 0 || n == 1) { + return 1; + } else if (n > 1 && n <= 2) { + return 2; + } else if (n > 2 && n <= 4) { + return 4; + } else if (n > 4 && n <= 8) { + return 8; + } else if (n > 8 && n <= 16) { + return 16; + } else if (n > 16 && n <= 24) { + return 24; + } else if (n > 24 && n <= 32) { + return 32; + } else if (n > 32 && n <= 40) { + return 40; + } else if (n > 40 && n <= 48) { + return 48; + } else if (n > 48 && n <= 56) { + return 56; + } else { + return 64; + } + } + + /** + * Finds the closest available fixed bit width match and returns its encoded + * value (ordinal) + * @param n - fixed bit width to encode + * @return encoded fixed bit width + */ + public int encodeBitWidth(int n) { + n = getClosestFixedBits(n); + + if (n >= 1 && n <= 24) { + return n - 1; + } else if (n > 24 && n <= 26) { + return FixedBitSizes.TWENTYSIX.ordinal(); + } else if (n > 26 && n <= 28) { + return FixedBitSizes.TWENTYEIGHT.ordinal(); + } else if (n > 28 && n <= 30) { + return FixedBitSizes.THIRTY.ordinal(); + } else if (n > 30 && n <= 32) { + return FixedBitSizes.THIRTYTWO.ordinal(); + } else if (n > 32 && n <= 40) { + return FixedBitSizes.FORTY.ordinal(); + } else if (n > 40 && n <= 48) { + return FixedBitSizes.FORTYEIGHT.ordinal(); + } else if (n > 48 && n <= 56) { + return FixedBitSizes.FIFTYSIX.ordinal(); + } else { + return FixedBitSizes.SIXTYFOUR.ordinal(); + } + } + + /** + * Decodes the ordinal fixed bit value to actual fixed bit width value + * @param n - encoded fixed bit width + * @return decoded fixed bit width + */ + public int decodeBitWidth(int n) { + if (n >= FixedBitSizes.ONE.ordinal() + && n <= FixedBitSizes.TWENTYFOUR.ordinal()) { + return n + 1; + } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) { + return 26; + } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) { + return 28; + } else if (n == FixedBitSizes.THIRTY.ordinal()) { + return 30; + } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) { + return 32; + } else if (n == FixedBitSizes.FORTY.ordinal()) { + return 40; + } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) { + return 48; + } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) { + return 56; + } else { + return 64; + } + } + + /** + * Bitpack and write the input values to underlying output stream + * @param input - values to write + * @param offset - offset + * @param len - length + * @param bitSize - bit width + * @param output - output stream + * @throws IOException + */ + public void writeInts(long[] input, int offset, int len, int bitSize, + OutputStream output) throws IOException { + if (input == null || input.length < 1 || offset < 0 || len < 1 + || bitSize < 1) { + return; + } + + switch (bitSize) { + case 1: + unrolledBitPack1(input, offset, len, output); + return; + case 2: + unrolledBitPack2(input, offset, len, output); + return; + case 4: + unrolledBitPack4(input, offset, len, output); + return; + case 8: + unrolledBitPack8(input, offset, len, output); + return; + case 16: + unrolledBitPack16(input, offset, len, output); + return; + case 24: + unrolledBitPack24(input, offset, len, output); + return; + case 32: + unrolledBitPack32(input, offset, len, output); + return; + case 40: + unrolledBitPack40(input, offset, len, output); + return; + case 48: + unrolledBitPack48(input, offset, len, output); + return; + case 56: + unrolledBitPack56(input, offset, len, output); + return; + case 64: + unrolledBitPack64(input, offset, len, output); + return; + default: + break; + } + + int bitsLeft = 8; + byte current = 0; + for(int i = offset; i < (offset + len); i++) { + long value = input[i]; + int bitsToWrite = bitSize; + while (bitsToWrite > bitsLeft) { + // add the bits to the bottom of the current word + current |= value >>> (bitsToWrite - bitsLeft); + // subtract out the bits we just added + bitsToWrite -= bitsLeft; + // zero out the bits above bitsToWrite + value &= (1L << bitsToWrite) - 1; + output.write(current); + current = 0; + bitsLeft = 8; + } + bitsLeft -= bitsToWrite; + current |= value << bitsLeft; + if (bitsLeft == 0) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + // flush + if (bitsLeft != 8) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + private void unrolledBitPack1(long[] input, int offset, int len, + OutputStream output) throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = (int) (val | ((input[i] & 1) << 7) + | ((input[i + 1] & 1) << 6) + | ((input[i + 2] & 1) << 5) + | ((input[i + 3] & 1) << 4) + | ((input[i + 4] & 1) << 3) + | ((input[i + 5] & 1) << 2) + | ((input[i + 6] & 1) << 1) + | (input[i + 7]) & 1); + output.write(val); + val = 0; + } + + if (remainder > 0) { + int startShift = 7; + for (int i = endUnroll; i < endOffset; i++) { + val = (int) (val | (input[i] & 1) << startShift); + startShift -= 1; + } + output.write(val); + } + } + + private void unrolledBitPack2(long[] input, int offset, int len, + OutputStream output) throws IOException { + final int numHops = 4; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = (int) (val | ((input[i] & 3) << 6) + | ((input[i + 1] & 3) << 4) + | ((input[i + 2] & 3) << 2) + | (input[i + 3]) & 3); + output.write(val); + val = 0; + } + + if (remainder > 0) { + int startShift = 6; + for (int i = endUnroll; i < endOffset; i++) { + val = (int) (val | (input[i] & 3) << startShift); + startShift -= 2; + } + output.write(val); + } + } + + private void unrolledBitPack4(long[] input, int offset, int len, + OutputStream output) throws IOException { + final int numHops = 2; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = (int) (val | ((input[i] & 15) << 4) | (input[i + 1]) & 15); + output.write(val); + val = 0; + } + + if (remainder > 0) { + int startShift = 4; + for (int i = endUnroll; i < endOffset; i++) { + val = (int) (val | (input[i] & 15) << startShift); + startShift -= 4; + } + output.write(val); + } + } + + private void unrolledBitPack8(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 1); + } + + private void unrolledBitPack16(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 2); + } + + private void unrolledBitPack24(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 3); + } + + private void unrolledBitPack32(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 4); + } + + private void unrolledBitPack40(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 5); + } + + private void unrolledBitPack48(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 6); + } + + private void unrolledBitPack56(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 7); + } + + private void unrolledBitPack64(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 8); + } + + private void unrolledBitPackBytes(long[] input, int offset, int len, OutputStream output, int numBytes) throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int i = offset; + for (; i < endUnroll; i = i + numHops) { + writeLongBE(output, input, i, numHops, numBytes); + } + + if (remainder > 0) { + writeRemainingLongs(output, i, input, remainder, numBytes); + } + } + + private void writeRemainingLongs(OutputStream output, int offset, long[] input, int remainder, + int numBytes) throws IOException { + final int numHops = remainder; + + int idx = 0; + switch (numBytes) { + case 1: + while (remainder > 0) { + writeBuffer[idx] = (byte) (input[offset + idx] & 255); + remainder--; + idx++; + } + break; + case 2: + while (remainder > 0) { + writeLongBE2(output, input[offset + idx], idx * 2); + remainder--; + idx++; + } + break; + case 3: + while (remainder > 0) { + writeLongBE3(output, input[offset + idx], idx * 3); + remainder--; + idx++; + } + break; + case 4: + while (remainder > 0) { + writeLongBE4(output, input[offset + idx], idx * 4); + remainder--; + idx++; + } + break; + case 5: + while (remainder > 0) { + writeLongBE5(output, input[offset + idx], idx * 5); + remainder--; + idx++; + } + break; + case 6: + while (remainder > 0) { + writeLongBE6(output, input[offset + idx], idx * 6); + remainder--; + idx++; + } + break; + case 7: + while (remainder > 0) { + writeLongBE7(output, input[offset + idx], idx * 7); + remainder--; + idx++; + } + break; + case 8: + while (remainder > 0) { + writeLongBE8(output, input[offset + idx], idx * 8); + remainder--; + idx++; + } + break; + default: + break; + } + + final int toWrite = numHops * numBytes; + output.write(writeBuffer, 0, toWrite); + } + + private void writeLongBE(OutputStream output, long[] input, int offset, int numHops, int numBytes) throws IOException { + + switch (numBytes) { + case 1: + writeBuffer[0] = (byte) (input[offset + 0] & 255); + writeBuffer[1] = (byte) (input[offset + 1] & 255); + writeBuffer[2] = (byte) (input[offset + 2] & 255); + writeBuffer[3] = (byte) (input[offset + 3] & 255); + writeBuffer[4] = (byte) (input[offset + 4] & 255); + writeBuffer[5] = (byte) (input[offset + 5] & 255); + writeBuffer[6] = (byte) (input[offset + 6] & 255); + writeBuffer[7] = (byte) (input[offset + 7] & 255); + break; + case 2: + writeLongBE2(output, input[offset + 0], 0); + writeLongBE2(output, input[offset + 1], 2); + writeLongBE2(output, input[offset + 2], 4); + writeLongBE2(output, input[offset + 3], 6); + writeLongBE2(output, input[offset + 4], 8); + writeLongBE2(output, input[offset + 5], 10); + writeLongBE2(output, input[offset + 6], 12); + writeLongBE2(output, input[offset + 7], 14); + break; + case 3: + writeLongBE3(output, input[offset + 0], 0); + writeLongBE3(output, input[offset + 1], 3); + writeLongBE3(output, input[offset + 2], 6); + writeLongBE3(output, input[offset + 3], 9); + writeLongBE3(output, input[offset + 4], 12); + writeLongBE3(output, input[offset + 5], 15); + writeLongBE3(output, input[offset + 6], 18); + writeLongBE3(output, input[offset + 7], 21); + break; + case 4: + writeLongBE4(output, input[offset + 0], 0); + writeLongBE4(output, input[offset + 1], 4); + writeLongBE4(output, input[offset + 2], 8); + writeLongBE4(output, input[offset + 3], 12); + writeLongBE4(output, input[offset + 4], 16); + writeLongBE4(output, input[offset + 5], 20); + writeLongBE4(output, input[offset + 6], 24); + writeLongBE4(output, input[offset + 7], 28); + break; + case 5: + writeLongBE5(output, input[offset + 0], 0); + writeLongBE5(output, input[offset + 1], 5); + writeLongBE5(output, input[offset + 2], 10); + writeLongBE5(output, input[offset + 3], 15); + writeLongBE5(output, input[offset + 4], 20); + writeLongBE5(output, input[offset + 5], 25); + writeLongBE5(output, input[offset + 6], 30); + writeLongBE5(output, input[offset + 7], 35); + break; + case 6: + writeLongBE6(output, input[offset + 0], 0); + writeLongBE6(output, input[offset + 1], 6); + writeLongBE6(output, input[offset + 2], 12); + writeLongBE6(output, input[offset + 3], 18); + writeLongBE6(output, input[offset + 4], 24); + writeLongBE6(output, input[offset + 5], 30); + writeLongBE6(output, input[offset + 6], 36); + writeLongBE6(output, input[offset + 7], 42); + break; + case 7: + writeLongBE7(output, input[offset + 0], 0); + writeLongBE7(output, input[offset + 1], 7); + writeLongBE7(output, input[offset + 2], 14); + writeLongBE7(output, input[offset + 3], 21); + writeLongBE7(output, input[offset + 4], 28); + writeLongBE7(output, input[offset + 5], 35); + writeLongBE7(output, input[offset + 6], 42); + writeLongBE7(output, input[offset + 7], 49); + break; + case 8: + writeLongBE8(output, input[offset + 0], 0); + writeLongBE8(output, input[offset + 1], 8); + writeLongBE8(output, input[offset + 2], 16); + writeLongBE8(output, input[offset + 3], 24); + writeLongBE8(output, input[offset + 4], 32); + writeLongBE8(output, input[offset + 5], 40); + writeLongBE8(output, input[offset + 6], 48); + writeLongBE8(output, input[offset + 7], 56); + break; + default: + break; + } + + final int toWrite = numHops * numBytes; + output.write(writeBuffer, 0, toWrite); + } + + private void writeLongBE2(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 8); + writeBuffer[wbOffset + 1] = (byte) (val >>> 0); + } + + private void writeLongBE3(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 16); + writeBuffer[wbOffset + 1] = (byte) (val >>> 8); + writeBuffer[wbOffset + 2] = (byte) (val >>> 0); + } + + private void writeLongBE4(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 24); + writeBuffer[wbOffset + 1] = (byte) (val >>> 16); + writeBuffer[wbOffset + 2] = (byte) (val >>> 8); + writeBuffer[wbOffset + 3] = (byte) (val >>> 0); + } + + private void writeLongBE5(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 32); + writeBuffer[wbOffset + 1] = (byte) (val >>> 24); + writeBuffer[wbOffset + 2] = (byte) (val >>> 16); + writeBuffer[wbOffset + 3] = (byte) (val >>> 8); + writeBuffer[wbOffset + 4] = (byte) (val >>> 0); + } + + private void writeLongBE6(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 40); + writeBuffer[wbOffset + 1] = (byte) (val >>> 32); + writeBuffer[wbOffset + 2] = (byte) (val >>> 24); + writeBuffer[wbOffset + 3] = (byte) (val >>> 16); + writeBuffer[wbOffset + 4] = (byte) (val >>> 8); + writeBuffer[wbOffset + 5] = (byte) (val >>> 0); + } + + private void writeLongBE7(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 48); + writeBuffer[wbOffset + 1] = (byte) (val >>> 40); + writeBuffer[wbOffset + 2] = (byte) (val >>> 32); + writeBuffer[wbOffset + 3] = (byte) (val >>> 24); + writeBuffer[wbOffset + 4] = (byte) (val >>> 16); + writeBuffer[wbOffset + 5] = (byte) (val >>> 8); + writeBuffer[wbOffset + 6] = (byte) (val >>> 0); + } + + private void writeLongBE8(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 56); + writeBuffer[wbOffset + 1] = (byte) (val >>> 48); + writeBuffer[wbOffset + 2] = (byte) (val >>> 40); + writeBuffer[wbOffset + 3] = (byte) (val >>> 32); + writeBuffer[wbOffset + 4] = (byte) (val >>> 24); + writeBuffer[wbOffset + 5] = (byte) (val >>> 16); + writeBuffer[wbOffset + 6] = (byte) (val >>> 8); + writeBuffer[wbOffset + 7] = (byte) (val >>> 0); + } + + /** + * Read bitpacked integers from input stream + * @param buffer - input buffer + * @param offset - offset + * @param len - length + * @param bitSize - bit width + * @param input - input stream + * @throws IOException + */ + public void readInts(long[] buffer, int offset, int len, int bitSize, + InStream input) throws IOException { + int bitsLeft = 0; + int current = 0; + + switch (bitSize) { + case 1: + unrolledUnPack1(buffer, offset, len, input); + return; + case 2: + unrolledUnPack2(buffer, offset, len, input); + return; + case 4: + unrolledUnPack4(buffer, offset, len, input); + return; + case 8: + unrolledUnPack8(buffer, offset, len, input); + return; + case 16: + unrolledUnPack16(buffer, offset, len, input); + return; + case 24: + unrolledUnPack24(buffer, offset, len, input); + return; + case 32: + unrolledUnPack32(buffer, offset, len, input); + return; + case 40: + unrolledUnPack40(buffer, offset, len, input); + return; + case 48: + unrolledUnPack48(buffer, offset, len, input); + return; + case 56: + unrolledUnPack56(buffer, offset, len, input); + return; + case 64: + unrolledUnPack64(buffer, offset, len, input); + return; + default: + break; + } + + for(int i = offset; i < (offset + len); i++) { + long result = 0; + int bitsLeftToRead = bitSize; + while (bitsLeftToRead > bitsLeft) { + result <<= bitsLeft; + result |= current & ((1 << bitsLeft) - 1); + bitsLeftToRead -= bitsLeft; + current = input.read(); + bitsLeft = 8; + } + + // handle the left over bits + if (bitsLeftToRead > 0) { + result <<= bitsLeftToRead; + bitsLeft -= bitsLeftToRead; + result |= (current >> bitsLeft) & ((1 << bitsLeftToRead) - 1); + } + buffer[i] = result; + } + } + + + private void unrolledUnPack1(long[] buffer, int offset, int len, + InStream input) throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = input.read(); + buffer[i] = (val >>> 7) & 1; + buffer[i + 1] = (val >>> 6) & 1; + buffer[i + 2] = (val >>> 5) & 1; + buffer[i + 3] = (val >>> 4) & 1; + buffer[i + 4] = (val >>> 3) & 1; + buffer[i + 5] = (val >>> 2) & 1; + buffer[i + 6] = (val >>> 1) & 1; + buffer[i + 7] = val & 1; + } + + if (remainder > 0) { + int startShift = 7; + val = input.read(); + for (int i = endUnroll; i < endOffset; i++) { + buffer[i] = (val >>> startShift) & 1; + startShift -= 1; + } + } + } + + private void unrolledUnPack2(long[] buffer, int offset, int len, + InStream input) throws IOException { + final int numHops = 4; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = input.read(); + buffer[i] = (val >>> 6) & 3; + buffer[i + 1] = (val >>> 4) & 3; + buffer[i + 2] = (val >>> 2) & 3; + buffer[i + 3] = val & 3; + } + + if (remainder > 0) { + int startShift = 6; + val = input.read(); + for (int i = endUnroll; i < endOffset; i++) { + buffer[i] = (val >>> startShift) & 3; + startShift -= 2; + } + } + } + + private void unrolledUnPack4(long[] buffer, int offset, int len, + InStream input) throws IOException { + final int numHops = 2; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = input.read(); + buffer[i] = (val >>> 4) & 15; + buffer[i + 1] = val & 15; + } + + if (remainder > 0) { + int startShift = 4; + val = input.read(); + for (int i = endUnroll; i < endOffset; i++) { + buffer[i] = (val >>> startShift) & 15; + startShift -= 4; + } + } + } + + private void unrolledUnPack8(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 1); + } + + private void unrolledUnPack16(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 2); + } + + private void unrolledUnPack24(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 3); + } + + private void unrolledUnPack32(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 4); + } + + private void unrolledUnPack40(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 5); + } + + private void unrolledUnPack48(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 6); + } + + private void unrolledUnPack56(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 7); + } + + private void unrolledUnPack64(long[] buffer, int offset, int len, + InStream input) throws IOException { + unrolledUnPackBytes(buffer, offset, len, input, 8); + } + + private void unrolledUnPackBytes(long[] buffer, int offset, int len, InStream input, int numBytes) + throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int i = offset; + for (; i < endUnroll; i = i + numHops) { + readLongBE(input, buffer, i, numHops, numBytes); + } + + if (remainder > 0) { + readRemainingLongs(buffer, i, input, remainder, numBytes); + } + } + + private void readRemainingLongs(long[] buffer, int offset, InStream input, int remainder, + int numBytes) throws IOException { + final int toRead = remainder * numBytes; + // bulk read to buffer + int bytesRead = input.read(readBuffer, 0, toRead); + while (bytesRead != toRead) { + bytesRead += input.read(readBuffer, bytesRead, toRead - bytesRead); + } + + int idx = 0; + switch (numBytes) { + case 1: + while (remainder > 0) { + buffer[offset++] = readBuffer[idx] & 255; + remainder--; + idx++; + } + break; + case 2: + while (remainder > 0) { + buffer[offset++] = readLongBE2(input, idx * 2); + remainder--; + idx++; + } + break; + case 3: + while (remainder > 0) { + buffer[offset++] = readLongBE3(input, idx * 3); + remainder--; + idx++; + } + break; + case 4: + while (remainder > 0) { + buffer[offset++] = readLongBE4(input, idx * 4); + remainder--; + idx++; + } + break; + case 5: + while (remainder > 0) { + buffer[offset++] = readLongBE5(input, idx * 5); + remainder--; + idx++; + } + break; + case 6: + while (remainder > 0) { + buffer[offset++] = readLongBE6(input, idx * 6); + remainder--; + idx++; + } + break; + case 7: + while (remainder > 0) { + buffer[offset++] = readLongBE7(input, idx * 7); + remainder--; + idx++; + } + break; + case 8: + while (remainder > 0) { + buffer[offset++] = readLongBE8(input, idx * 8); + remainder--; + idx++; + } + break; + default: + break; + } + } + + private void readLongBE(InStream in, long[] buffer, int start, int numHops, int numBytes) + throws IOException { + final int toRead = numHops * numBytes; + // bulk read to buffer + int bytesRead = in.read(readBuffer, 0, toRead); + while (bytesRead != toRead) { + bytesRead += in.read(readBuffer, bytesRead, toRead - bytesRead); + } + + switch (numBytes) { + case 1: + buffer[start + 0] = readBuffer[0] & 255; + buffer[start + 1] = readBuffer[1] & 255; + buffer[start + 2] = readBuffer[2] & 255; + buffer[start + 3] = readBuffer[3] & 255; + buffer[start + 4] = readBuffer[4] & 255; + buffer[start + 5] = readBuffer[5] & 255; + buffer[start + 6] = readBuffer[6] & 255; + buffer[start + 7] = readBuffer[7] & 255; + break; + case 2: + buffer[start + 0] = readLongBE2(in, 0); + buffer[start + 1] = readLongBE2(in, 2); + buffer[start + 2] = readLongBE2(in, 4); + buffer[start + 3] = readLongBE2(in, 6); + buffer[start + 4] = readLongBE2(in, 8); + buffer[start + 5] = readLongBE2(in, 10); + buffer[start + 6] = readLongBE2(in, 12); + buffer[start + 7] = readLongBE2(in, 14); + break; + case 3: + buffer[start + 0] = readLongBE3(in, 0); + buffer[start + 1] = readLongBE3(in, 3); + buffer[start + 2] = readLongBE3(in, 6); + buffer[start + 3] = readLongBE3(in, 9); + buffer[start + 4] = readLongBE3(in, 12); + buffer[start + 5] = readLongBE3(in, 15); + buffer[start + 6] = readLongBE3(in, 18); + buffer[start + 7] = readLongBE3(in, 21); + break; + case 4: + buffer[start + 0] = readLongBE4(in, 0); + buffer[start + 1] = readLongBE4(in, 4); + buffer[start + 2] = readLongBE4(in, 8); + buffer[start + 3] = readLongBE4(in, 12); + buffer[start + 4] = readLongBE4(in, 16); + buffer[start + 5] = readLongBE4(in, 20); + buffer[start + 6] = readLongBE4(in, 24); + buffer[start + 7] = readLongBE4(in, 28); + break; + case 5: + buffer[start + 0] = readLongBE5(in, 0); + buffer[start + 1] = readLongBE5(in, 5); + buffer[start + 2] = readLongBE5(in, 10); + buffer[start + 3] = readLongBE5(in, 15); + buffer[start + 4] = readLongBE5(in, 20); + buffer[start + 5] = readLongBE5(in, 25); + buffer[start + 6] = readLongBE5(in, 30); + buffer[start + 7] = readLongBE5(in, 35); + break; + case 6: + buffer[start + 0] = readLongBE6(in, 0); + buffer[start + 1] = readLongBE6(in, 6); + buffer[start + 2] = readLongBE6(in, 12); + buffer[start + 3] = readLongBE6(in, 18); + buffer[start + 4] = readLongBE6(in, 24); + buffer[start + 5] = readLongBE6(in, 30); + buffer[start + 6] = readLongBE6(in, 36); + buffer[start + 7] = readLongBE6(in, 42); + break; + case 7: + buffer[start + 0] = readLongBE7(in, 0); + buffer[start + 1] = readLongBE7(in, 7); + buffer[start + 2] = readLongBE7(in, 14); + buffer[start + 3] = readLongBE7(in, 21); + buffer[start + 4] = readLongBE7(in, 28); + buffer[start + 5] = readLongBE7(in, 35); + buffer[start + 6] = readLongBE7(in, 42); + buffer[start + 7] = readLongBE7(in, 49); + break; + case 8: + buffer[start + 0] = readLongBE8(in, 0); + buffer[start + 1] = readLongBE8(in, 8); + buffer[start + 2] = readLongBE8(in, 16); + buffer[start + 3] = readLongBE8(in, 24); + buffer[start + 4] = readLongBE8(in, 32); + buffer[start + 5] = readLongBE8(in, 40); + buffer[start + 6] = readLongBE8(in, 48); + buffer[start + 7] = readLongBE8(in, 56); + break; + default: + break; + } + } + + private long readLongBE2(InStream in, int rbOffset) { + return (((readBuffer[rbOffset] & 255) << 8) + + ((readBuffer[rbOffset + 1] & 255) << 0)); + } + + private long readLongBE3(InStream in, int rbOffset) { + return (((readBuffer[rbOffset] & 255) << 16) + + ((readBuffer[rbOffset + 1] & 255) << 8) + + ((readBuffer[rbOffset + 2] & 255) << 0)); + } + + private long readLongBE4(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 24) + + ((readBuffer[rbOffset + 1] & 255) << 16) + + ((readBuffer[rbOffset + 2] & 255) << 8) + + ((readBuffer[rbOffset + 3] & 255) << 0)); + } + + private long readLongBE5(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 32) + + ((long) (readBuffer[rbOffset + 1] & 255) << 24) + + ((readBuffer[rbOffset + 2] & 255) << 16) + + ((readBuffer[rbOffset + 3] & 255) << 8) + + ((readBuffer[rbOffset + 4] & 255) << 0)); + } + + private long readLongBE6(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 40) + + ((long) (readBuffer[rbOffset + 1] & 255) << 32) + + ((long) (readBuffer[rbOffset + 2] & 255) << 24) + + ((readBuffer[rbOffset + 3] & 255) << 16) + + ((readBuffer[rbOffset + 4] & 255) << 8) + + ((readBuffer[rbOffset + 5] & 255) << 0)); + } + + private long readLongBE7(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 48) + + ((long) (readBuffer[rbOffset + 1] & 255) << 40) + + ((long) (readBuffer[rbOffset + 2] & 255) << 32) + + ((long) (readBuffer[rbOffset + 3] & 255) << 24) + + ((readBuffer[rbOffset + 4] & 255) << 16) + + ((readBuffer[rbOffset + 5] & 255) << 8) + + ((readBuffer[rbOffset + 6] & 255) << 0)); + } + + private long readLongBE8(InStream in, int rbOffset) { + return (((long) (readBuffer[rbOffset] & 255) << 56) + + ((long) (readBuffer[rbOffset + 1] & 255) << 48) + + ((long) (readBuffer[rbOffset + 2] & 255) << 40) + + ((long) (readBuffer[rbOffset + 3] & 255) << 32) + + ((long) (readBuffer[rbOffset + 4] & 255) << 24) + + ((readBuffer[rbOffset + 5] & 255) << 16) + + ((readBuffer[rbOffset + 6] & 255) << 8) + + ((readBuffer[rbOffset + 7] & 255) << 0)); + } + + // Do not want to use Guava LongMath.checkedSubtract() here as it will throw + // ArithmeticException in case of overflow + public boolean isSafeSubtract(long left, long right) { + return (left ^ right) >= 0 | (left ^ (left - right)) >= 0; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java b/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java new file mode 100644 index 0000000..f9e29eb --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/SettableUncompressedStream.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.orc.impl; + +import java.util.List; + +import org.apache.hadoop.hive.common.DiskRangeInfo; +import org.apache.hadoop.hive.common.io.DiskRange; +import org.apache.orc.impl.InStream; + +/** + * An uncompressed stream whose underlying byte buffer can be set. + */ +public class SettableUncompressedStream extends InStream.UncompressedStream { + + public SettableUncompressedStream(String name, List<DiskRange> input, long length) { + super(name, input, length); + setOffset(input); + } + + public void setBuffers(DiskRangeInfo diskRangeInfo) { + reset(diskRangeInfo.getDiskRanges(), diskRangeInfo.getTotalLength()); + setOffset(diskRangeInfo.getDiskRanges()); + } + + private void setOffset(List<DiskRange> list) { + currentOffset = list.isEmpty() ? 0 : list.get(0).getOffset(); + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/SnappyCodec.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/SnappyCodec.java b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java new file mode 100644 index 0000000..dd4f30c --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/SnappyCodec.java @@ -0,0 +1,108 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.orc.CompressionCodec; +import org.iq80.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +public class SnappyCodec implements CompressionCodec, DirectDecompressionCodec { + private static final HadoopShims SHIMS = HadoopShims.Factory.get(); + + Boolean direct = null; + + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, + ByteBuffer overflow) throws IOException { + int inBytes = in.remaining(); + // I should work on a patch for Snappy to support an overflow buffer + // to prevent the extra buffer copy. + byte[] compressed = new byte[Snappy.maxCompressedLength(inBytes)]; + int outBytes = + Snappy.compress(in.array(), in.arrayOffset() + in.position(), inBytes, + compressed, 0); + if (outBytes < inBytes) { + int remaining = out.remaining(); + if (remaining >= outBytes) { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), outBytes); + out.position(out.position() + outBytes); + } else { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), remaining); + out.position(out.limit()); + System.arraycopy(compressed, remaining, overflow.array(), + overflow.arrayOffset(), outBytes - remaining); + overflow.position(outBytes - remaining); + } + return true; + } else { + return false; + } + } + + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + if(in.isDirect() && out.isDirect()) { + directDecompress(in, out); + return; + } + int inOffset = in.position(); + int uncompressLen = + Snappy.uncompress(in.array(), in.arrayOffset() + inOffset, + in.limit() - inOffset, out.array(), out.arrayOffset() + out.position()); + out.position(uncompressLen + out.position()); + out.flip(); + } + + @Override + public boolean isAvailable() { + if (direct == null) { + try { + if (SHIMS.getDirectDecompressor( + HadoopShims.DirectCompressionType.SNAPPY) != null) { + direct = Boolean.valueOf(true); + } else { + direct = Boolean.valueOf(false); + } + } catch (UnsatisfiedLinkError ule) { + direct = Boolean.valueOf(false); + } + } + return direct.booleanValue(); + } + + @Override + public void directDecompress(ByteBuffer in, ByteBuffer out) + throws IOException { + HadoopShims.DirectDecompressor decompressShim = + SHIMS.getDirectDecompressor(HadoopShims.DirectCompressionType.SNAPPY); + decompressShim.decompress(in, out); + out.flip(); // flip for read + } + + @Override + public CompressionCodec modify(EnumSet<Modifier> modifiers) { + // snappy allows no modifications + return this; + } +} http://git-wip-us.apache.org/repos/asf/orc/blob/3283d238/java/core/src/java/org/apache/orc/impl/StreamName.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/StreamName.java b/java/core/src/java/org/apache/orc/impl/StreamName.java new file mode 100644 index 0000000..b3fd145 --- /dev/null +++ b/java/core/src/java/org/apache/orc/impl/StreamName.java @@ -0,0 +1,97 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.orc.impl; + +import org.apache.orc.OrcProto; + +/** + * The name of a stream within a stripe. + */ +public class StreamName implements Comparable<StreamName> { + private final int column; + private final OrcProto.Stream.Kind kind; + + public static enum Area { + DATA, INDEX + } + + public StreamName(int column, OrcProto.Stream.Kind kind) { + this.column = column; + this.kind = kind; + } + + public boolean equals(Object obj) { + if (obj != null && obj instanceof StreamName) { + StreamName other = (StreamName) obj; + return other.column == column && other.kind == kind; + } else { + return false; + } + } + + @Override + public int compareTo(StreamName streamName) { + if (streamName == null) { + return -1; + } + Area area = getArea(kind); + Area otherArea = streamName.getArea(streamName.kind); + if (area != otherArea) { + return -area.compareTo(otherArea); + } + if (column != streamName.column) { + return column < streamName.column ? -1 : 1; + } + return kind.compareTo(streamName.kind); + } + + public int getColumn() { + return column; + } + + public OrcProto.Stream.Kind getKind() { + return kind; + } + + public Area getArea() { + return getArea(kind); + } + + public static Area getArea(OrcProto.Stream.Kind kind) { + switch (kind) { + case ROW_INDEX: + case DICTIONARY_COUNT: + case BLOOM_FILTER: + return Area.INDEX; + default: + return Area.DATA; + } + } + + @Override + public String toString() { + return "Stream for column " + column + " kind " + kind; + } + + @Override + public int hashCode() { + return column * 101 + kind.getNumber(); + } +} +
