http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java new file mode 100644 index 0000000..7237b2e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/RunLengthIntegerWriterV2.java @@ -0,0 +1,832 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import java.io.IOException; + +/** + * A writer that performs light weight compression over sequence of integers. + * <p> + * There are four types of lightweight integer compression + * <ul> + * <li>SHORT_REPEAT</li> + * <li>DIRECT</li> + * <li>PATCHED_BASE</li> + * <li>DELTA</li> + * </ul> + * </p> + * The description and format for these types are as below: + * <p> + * <b>SHORT_REPEAT:</b> Used for short repeated integer sequences. + * <ul> + * <li>1 byte header + * <ul> + * <li>2 bits for encoding type</li> + * <li>3 bits for bytes required for repeating value</li> + * <li>3 bits for repeat count (MIN_REPEAT + run length)</li> + * </ul> + * </li> + * <li>Blob - repeat value (fixed bytes)</li> + * </ul> + * </p> + * <p> + * <b>DIRECT:</b> Used for random integer sequences whose number of bit + * requirement doesn't vary a lot. + * <ul> + * <li>2 bytes header + * <ul> + * 1st byte + * <li>2 bits for encoding type</li> + * <li>5 bits for fixed bit width of values in blob</li> + * <li>1 bit for storing MSB of run length</li> + * </ul> + * <ul> + * 2nd byte + * <li>8 bits for lower run length bits</li> + * </ul> + * </li> + * <li>Blob - stores the direct values using fixed bit width. The length of the + * data blob is (fixed width * run length) bits long</li> + * </ul> + * </p> + * <p> + * <b>PATCHED_BASE:</b> Used for random integer sequences whose number of bit + * requirement varies beyond a threshold. + * <ul> + * <li>4 bytes header + * <ul> + * 1st byte + * <li>2 bits for encoding type</li> + * <li>5 bits for fixed bit width of values in blob</li> + * <li>1 bit for storing MSB of run length</li> + * </ul> + * <ul> + * 2nd byte + * <li>8 bits for lower run length bits</li> + * </ul> + * <ul> + * 3rd byte + * <li>3 bits for bytes required to encode base value</li> + * <li>5 bits for patch width</li> + * </ul> + * <ul> + * 4th byte + * <li>3 bits for patch gap width</li> + * <li>5 bits for patch length</li> + * </ul> + * </li> + * <li>Base value - Stored using fixed number of bytes. If MSB is set, base + * value is negative else positive. Length of base value is (base width * 8) + * bits.</li> + * <li>Data blob - Base reduced values as stored using fixed bit width. Length + * of data blob is (fixed width * run length) bits.</li> + * <li>Patch blob - Patch blob is a list of gap and patch value. Each entry in + * the patch list is (patch width + patch gap width) bits long. Gap between the + * subsequent elements to be patched are stored in upper part of entry whereas + * patch values are stored in lower part of entry. Length of patch blob is + * ((patch width + patch gap width) * patch length) bits.</li> + * </ul> + * </p> + * <p> + * <b>DELTA</b> Used for monotonically increasing or decreasing sequences, + * sequences with fixed delta values or long repeated sequences. + * <ul> + * <li>2 bytes header + * <ul> + * 1st byte + * <li>2 bits for encoding type</li> + * <li>5 bits for fixed bit width of values in blob</li> + * <li>1 bit for storing MSB of run length</li> + * </ul> + * <ul> + * 2nd byte + * <li>8 bits for lower run length bits</li> + * </ul> + * </li> + * <li>Base value - encoded as varint</li> + * <li>Delta base - encoded as varint</li> + * <li>Delta blob - only positive values. monotonicity and orderness are decided + * based on the sign of the base value and delta base</li> + * </ul> + * </p> + */ +class RunLengthIntegerWriterV2 implements IntegerWriter { + + public enum EncodingType { + SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA + } + + static final int MAX_SCOPE = 512; + static final int MIN_REPEAT = 3; + private static final int MAX_SHORT_REPEAT_LENGTH = 10; + private long prevDelta = 0; + private int fixedRunLength = 0; + private int variableRunLength = 0; + private final long[] literals = new long[MAX_SCOPE]; + private final PositionedOutputStream output; + private final boolean signed; + private EncodingType encoding; + private int numLiterals; + private final long[] zigzagLiterals = new long[MAX_SCOPE]; + private final long[] baseRedLiterals = new long[MAX_SCOPE]; + private final long[] adjDeltas = new long[MAX_SCOPE]; + private long fixedDelta; + private int zzBits90p; + private int zzBits100p; + private int brBits95p; + private int brBits100p; + private int bitsDeltaMax; + private int patchWidth; + private int patchGapWidth; + private int patchLength; + private long[] gapVsPatchList; + private long min; + private boolean isFixedDelta; + private SerializationUtils utils; + private boolean alignedBitpacking; + + RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { + this(output, signed, true); + } + + RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed, + boolean alignedBitpacking) { + this.output = output; + this.signed = signed; + this.alignedBitpacking = alignedBitpacking; + this.utils = new SerializationUtils(); + clear(); + } + + private void writeValues() throws IOException { + if (numLiterals != 0) { + + if (encoding.equals(EncodingType.SHORT_REPEAT)) { + writeShortRepeatValues(); + } else if (encoding.equals(EncodingType.DIRECT)) { + writeDirectValues(); + } else if (encoding.equals(EncodingType.PATCHED_BASE)) { + writePatchedBaseValues(); + } else { + writeDeltaValues(); + } + + // clear all the variables + clear(); + } + } + + private void writeDeltaValues() throws IOException { + int len = 0; + int fb = bitsDeltaMax; + int efb = 0; + + if (alignedBitpacking) { + fb = utils.getClosestAlignedFixedBits(fb); + } + + if (isFixedDelta) { + // if fixed run length is greater than threshold then it will be fixed + // delta sequence with delta value 0 else fixed delta sequence with + // non-zero delta value + if (fixedRunLength > MIN_REPEAT) { + // ex. sequence: 2 2 2 2 2 2 2 2 + len = fixedRunLength - 1; + fixedRunLength = 0; + } else { + // ex. sequence: 4 6 8 10 12 14 16 + len = variableRunLength - 1; + variableRunLength = 0; + } + } else { + // fixed width 0 is used for long repeating values. + // sequences that require only 1 bit to encode will have an additional bit + if (fb == 1) { + fb = 2; + } + efb = utils.encodeBitWidth(fb); + efb = efb << 1; + len = variableRunLength - 1; + variableRunLength = 0; + } + + // extract the 9th bit of run length + final int tailBits = (len & 0x100) >>> 8; + + // create first byte of the header + final int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + final int headerSecondByte = len & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // store the first value from zigzag literal array + if (signed) { + utils.writeVslong(output, literals[0]); + } else { + utils.writeVulong(output, literals[0]); + } + + if (isFixedDelta) { + // if delta is fixed then we don't need to store delta blob + utils.writeVslong(output, fixedDelta); + } else { + // store the first value as delta value using zigzag encoding + utils.writeVslong(output, adjDeltas[0]); + + // adjacent delta values are bit packed. The length of adjDeltas array is + // always one less than the number of literals (delta difference for n + // elements is n-1). We have already written one element, write the + // remaining numLiterals - 2 elements here + utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output); + } + } + + private void writePatchedBaseValues() throws IOException { + + // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding + // because patch is applied to MSB bits. For example: If fixed bit width of + // base value is 7 bits and if patch is 3 bits, the actual value is + // constructed by shifting the patch to left by 7 positions. + // actual_value = patch << 7 | base_value + // So, if we align base_value then actual_value can not be reconstructed. + + // write the number of fixed bits required in next 5 bits + final int fb = brBits95p; + final int efb = utils.encodeBitWidth(fb) << 1; + + // adjust variable run length, they are one off + variableRunLength -= 1; + + // extract the 9th bit of run length + final int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + final int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + final int headerSecondByte = variableRunLength & 0xff; + + // if the min value is negative toggle the sign + final boolean isNegative = min < 0 ? true : false; + if (isNegative) { + min = -min; + } + + // find the number of bytes required for base and shift it by 5 bits + // to accommodate patch width. The additional bit is used to store the sign + // of the base value. + final int baseWidth = utils.findClosestNumBits(min) + 1; + final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; + final int bb = (baseBytes - 1) << 5; + + // if the base value is negative then set MSB to 1 + if (isNegative) { + min |= (1L << ((baseBytes * 8) - 1)); + } + + // third byte contains 3 bits for number of bytes occupied by base + // and 5 bits for patchWidth + final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth); + + // fourth byte contains 3 bits for page gap width and 5 bits for + // patch length + final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + output.write(headerThirdByte); + output.write(headerFourthByte); + + // write the base value using fixed bytes in big endian order + for(int i = baseBytes - 1; i >= 0; i--) { + byte b = (byte) ((min >>> (i * 8)) & 0xff); + output.write(b); + } + + // base reduced literals are bit packed + int closestFixedBits = utils.getClosestFixedBits(fb); + + utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits, + output); + + // write patch list + closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth); + + utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits, + output); + + // reset run length + variableRunLength = 0; + } + + /** + * Store the opcode in 2 MSB bits + * @return opcode + */ + private int getOpcode() { + return encoding.ordinal() << 6; + } + + private void writeDirectValues() throws IOException { + + // write the number of fixed bits required in next 5 bits + int fb = zzBits100p; + + if (alignedBitpacking) { + fb = utils.getClosestAlignedFixedBits(fb); + } + + final int efb = utils.encodeBitWidth(fb) << 1; + + // adjust variable run length + variableRunLength -= 1; + + // extract the 9th bit of run length + final int tailBits = (variableRunLength & 0x100) >>> 8; + + // create first byte of the header + final int headerFirstByte = getOpcode() | efb | tailBits; + + // second byte of the header stores the remaining 8 bits of runlength + final int headerSecondByte = variableRunLength & 0xff; + + // write header + output.write(headerFirstByte); + output.write(headerSecondByte); + + // bit packing the zigzag encoded literals + utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output); + + // reset run length + variableRunLength = 0; + } + + private void writeShortRepeatValues() throws IOException { + // get the value that is repeating, compute the bits and bytes required + long repeatVal = 0; + if (signed) { + repeatVal = utils.zigzagEncode(literals[0]); + } else { + repeatVal = literals[0]; + } + + final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal); + final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 + : (numBitsRepeatVal >>> 3) + 1; + + // write encoding type in top 2 bits + int header = getOpcode(); + + // write the number of bytes required for the value + header |= ((numBytesRepeatVal - 1) << 3); + + // write the run length + fixedRunLength -= MIN_REPEAT; + header |= fixedRunLength; + + // write the header + output.write(header); + + // write the repeating value in big endian byte order + for(int i = numBytesRepeatVal - 1; i >= 0; i--) { + int b = (int) ((repeatVal >>> (i * 8)) & 0xff); + output.write(b); + } + + fixedRunLength = 0; + } + + private void determineEncoding() { + + // we need to compute zigzag values for DIRECT encoding if we decide to + // break early for delta overflows or for shorter runs + computeZigZagLiterals(); + + zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0); + + // not a big win for shorter runs to determine encoding + if (numLiterals <= MIN_REPEAT) { + encoding = EncodingType.DIRECT; + return; + } + + // DELTA encoding check + + // for identifying monotonic sequences + boolean isIncreasing = true; + boolean isDecreasing = true; + this.isFixedDelta = true; + + this.min = literals[0]; + long max = literals[0]; + final long initialDelta = literals[1] - literals[0]; + long currDelta = initialDelta; + long deltaMax = initialDelta; + this.adjDeltas[0] = initialDelta; + + for (int i = 1; i < numLiterals; i++) { + final long l1 = literals[i]; + final long l0 = literals[i - 1]; + currDelta = l1 - l0; + min = Math.min(min, l1); + max = Math.max(max, l1); + + isIncreasing &= (l0 <= l1); + isDecreasing &= (l0 >= l1); + + isFixedDelta &= (currDelta == initialDelta); + if (i > 1) { + adjDeltas[i - 1] = Math.abs(currDelta); + deltaMax = Math.max(deltaMax, adjDeltas[i - 1]); + } + } + + // its faster to exit under delta overflow condition without checking for + // PATCHED_BASE condition as encoding using DIRECT is faster and has less + // overhead than PATCHED_BASE + if (!utils.isSafeSubtract(max, min)) { + encoding = EncodingType.DIRECT; + return; + } + + // invariant - subtracting any number from any other in the literals after + // this point won't overflow + + // if initialDelta is 0 then we cannot delta encode as we cannot identify + // the sign of deltas (increasing or decreasing) + if (initialDelta != 0) { + + // if min is equal to max then the delta is 0, this condition happens for + // fixed values run >10 which cannot be encoded with SHORT_REPEAT + if (min == max) { + assert isFixedDelta : min + "==" + max + + ", isFixedDelta cannot be false"; + assert currDelta == 0 : min + "==" + max + ", currDelta should be zero"; + fixedDelta = 0; + encoding = EncodingType.DELTA; + return; + } + + if (isFixedDelta) { + assert currDelta == initialDelta + : "currDelta should be equal to initialDelta for fixed delta encoding"; + encoding = EncodingType.DELTA; + fixedDelta = currDelta; + return; + } + + // stores the number of bits required for packing delta blob in + // delta encoding + bitsDeltaMax = utils.findClosestNumBits(deltaMax); + + // monotonic condition + if (isIncreasing || isDecreasing) { + encoding = EncodingType.DELTA; + return; + } + } + + // PATCHED_BASE encoding check + + // percentile values are computed for the zigzag encoded values. if the + // number of bit requirement between 90th and 100th percentile varies + // beyond a threshold then we need to patch the values. if the variation + // is not significant then we can use direct encoding + + zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9); + int diffBitsLH = zzBits100p - zzBits90p; + + // if the difference between 90th percentile and 100th percentile fixed + // bits is > 1 then we need patch the values + if (diffBitsLH > 1) { + + // patching is done only on base reduced values. + // remove base from literals + for (int i = 0; i < numLiterals; i++) { + baseRedLiterals[i] = literals[i] - min; + } + + // 95th percentile width is used to determine max allowed value + // after which patching will be done + brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95); + + // 100th percentile is used to compute the max patch width + brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0); + + // after base reducing the values, if the difference in bits between + // 95th percentile and 100th percentile value is zero then there + // is no point in patching the values, in which case we will + // fallback to DIRECT encoding. + // The decision to use patched base was based on zigzag values, but the + // actual patching is done on base reduced literals. + if ((brBits100p - brBits95p) != 0) { + encoding = EncodingType.PATCHED_BASE; + preparePatchedBlob(); + return; + } else { + encoding = EncodingType.DIRECT; + return; + } + } else { + // if difference in bits between 95th percentile and 100th percentile is + // 0, then patch length will become 0. Hence we will fallback to direct + encoding = EncodingType.DIRECT; + return; + } + } + + private void computeZigZagLiterals() { + // populate zigzag encoded literals + long zzEncVal = 0; + for (int i = 0; i < numLiterals; i++) { + if (signed) { + zzEncVal = utils.zigzagEncode(literals[i]); + } else { + zzEncVal = literals[i]; + } + zigzagLiterals[i] = zzEncVal; + } + } + + private void preparePatchedBlob() { + // mask will be max value beyond which patch will be generated + long mask = (1L << brBits95p) - 1; + + // since we are considering only 95 percentile, the size of gap and + // patch array can contain only be 5% values + patchLength = (int) Math.ceil((numLiterals * 0.05)); + + int[] gapList = new int[patchLength]; + long[] patchList = new long[patchLength]; + + // #bit for patch + patchWidth = brBits100p - brBits95p; + patchWidth = utils.getClosestFixedBits(patchWidth); + + // if patch bit requirement is 64 then it will not possible to pack + // gap and patch together in a long. To make sure gap and patch can be + // packed together adjust the patch width + if (patchWidth == 64) { + patchWidth = 56; + brBits95p = 8; + mask = (1L << brBits95p) - 1; + } + + int gapIdx = 0; + int patchIdx = 0; + int prev = 0; + int gap = 0; + int maxGap = 0; + + for(int i = 0; i < numLiterals; i++) { + // if value is above mask then create the patch and record the gap + if (baseRedLiterals[i] > mask) { + gap = i - prev; + if (gap > maxGap) { + maxGap = gap; + } + + // gaps are relative, so store the previous patched value index + prev = i; + gapList[gapIdx++] = gap; + + // extract the most significant bits that are over mask bits + long patch = baseRedLiterals[i] >>> brBits95p; + patchList[patchIdx++] = patch; + + // strip off the MSB to enable safe bit packing + baseRedLiterals[i] &= mask; + } + } + + // adjust the patch length to number of entries in gap list + patchLength = gapIdx; + + // if the element to be patched is the first and only element then + // max gap will be 0, but to store the gap as 0 we need atleast 1 bit + if (maxGap == 0 && patchLength != 0) { + patchGapWidth = 1; + } else { + patchGapWidth = utils.findClosestNumBits(maxGap); + } + + // special case: if the patch gap width is greater than 256, then + // we need 9 bits to encode the gap width. But we only have 3 bits in + // header to record the gap width. To deal with this case, we will save + // two entries in patch list in the following way + // 256 gap width => 0 for patch value + // actual gap - 256 => actual patch value + // We will do the same for gap width = 511. If the element to be patched is + // the last element in the scope then gap width will be 511. In this case we + // will have 3 entries in the patch list in the following way + // 255 gap width => 0 for patch value + // 255 gap width => 0 for patch value + // 1 gap width => actual patch value + if (patchGapWidth > 8) { + patchGapWidth = 8; + // for gap = 511, we need two additional entries in patch list + if (maxGap == 511) { + patchLength += 2; + } else { + patchLength += 1; + } + } + + // create gap vs patch list + gapIdx = 0; + patchIdx = 0; + gapVsPatchList = new long[patchLength]; + for(int i = 0; i < patchLength; i++) { + long g = gapList[gapIdx++]; + long p = patchList[patchIdx++]; + while (g > 255) { + gapVsPatchList[i++] = (255L << patchWidth); + g -= 255; + } + + // store patch value in LSBs and gap in MSBs + gapVsPatchList[i] = (g << patchWidth) | p; + } + } + + /** + * clears all the variables + */ + private void clear() { + numLiterals = 0; + encoding = null; + prevDelta = 0; + fixedDelta = 0; + zzBits90p = 0; + zzBits100p = 0; + brBits95p = 0; + brBits100p = 0; + bitsDeltaMax = 0; + patchGapWidth = 0; + patchLength = 0; + patchWidth = 0; + gapVsPatchList = null; + min = 0; + isFixedDelta = true; + } + + @Override + public void flush() throws IOException { + if (numLiterals != 0) { + if (variableRunLength != 0) { + determineEncoding(); + writeValues(); + } else if (fixedRunLength != 0) { + if (fixedRunLength < MIN_REPEAT) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + determineEncoding(); + writeValues(); + } else if (fixedRunLength >= MIN_REPEAT + && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + encoding = EncodingType.DELTA; + isFixedDelta = true; + writeValues(); + } + } + } + output.flush(); + } + + @Override + public void write(long val) throws IOException { + if (numLiterals == 0) { + initializeLiterals(val); + } else { + if (numLiterals == 1) { + prevDelta = val - literals[0]; + literals[numLiterals++] = val; + // if both values are same count as fixed run else variable run + if (val == literals[0]) { + fixedRunLength = 2; + variableRunLength = 0; + } else { + fixedRunLength = 0; + variableRunLength = 2; + } + } else { + long currentDelta = val - literals[numLiterals - 1]; + if (prevDelta == 0 && currentDelta == 0) { + // fixed delta run + + literals[numLiterals++] = val; + + // if variable run is non-zero then we are seeing repeating + // values at the end of variable run in which case keep + // updating variable and fixed runs + if (variableRunLength > 0) { + fixedRunLength = 2; + } + fixedRunLength += 1; + + // if fixed run met the minimum condition and if variable + // run is non-zero then flush the variable run and shift the + // tail fixed runs to start of the buffer + if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { + numLiterals -= MIN_REPEAT; + variableRunLength -= MIN_REPEAT - 1; + // copy the tail fixed runs + long[] tailVals = new long[MIN_REPEAT]; + System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT); + + // determine variable encoding and flush values + determineEncoding(); + writeValues(); + + // shift tail fixed runs to beginning of the buffer + for(long l : tailVals) { + literals[numLiterals++] = l; + } + } + + // if fixed runs reached max repeat length then write values + if (fixedRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } else { + // variable delta run + + // if fixed run length is non-zero and if it satisfies the + // short repeat conditions then write the values as short repeats + // else use delta encoding + if (fixedRunLength >= MIN_REPEAT) { + if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { + encoding = EncodingType.SHORT_REPEAT; + writeValues(); + } else { + encoding = EncodingType.DELTA; + isFixedDelta = true; + writeValues(); + } + } + + // if fixed run length is <MIN_REPEAT and current value is + // different from previous then treat it as variable run + if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT) { + if (val != literals[numLiterals - 1]) { + variableRunLength = fixedRunLength; + fixedRunLength = 0; + } + } + + // after writing values re-initialize the variables + if (numLiterals == 0) { + initializeLiterals(val); + } else { + // keep updating variable run lengths + prevDelta = val - literals[numLiterals - 1]; + literals[numLiterals++] = val; + variableRunLength += 1; + + // if variable run length reach the max scope, write it + if (variableRunLength == MAX_SCOPE) { + determineEncoding(); + writeValues(); + } + } + } + } + } + } + + private void initializeLiterals(long val) { + literals[numLiterals++] = val; + fixedRunLength = 1; + variableRunLength = 1; + } + + @Override + public void getPosition(PositionRecorder recorder) throws IOException { + output.getPosition(recorder); + recorder.addPosition(numLiterals); + } +}
http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SerializationUtils.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SerializationUtils.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SerializationUtils.java new file mode 100644 index 0000000..53687b7 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SerializationUtils.java @@ -0,0 +1,844 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import java.io.EOFException; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; +import java.math.BigInteger; + +final class SerializationUtils { + + private final static int BUFFER_SIZE = 64; + private final byte[] readBuffer; + private final byte[] writeBuffer; + + public SerializationUtils() { + this.readBuffer = new byte[BUFFER_SIZE]; + this.writeBuffer = new byte[BUFFER_SIZE]; + } + + void writeVulong(OutputStream output, long value) throws IOException { + while (true) { + if ((value & ~0x7f) == 0) { + output.write((byte) value); + return; + } else { + output.write((byte) (0x80 | (value & 0x7f))); + value >>>= 7; + } + } + } + + void writeVslong(OutputStream output, long value) throws IOException { + writeVulong(output, (value << 1) ^ (value >> 63)); + } + + + long readVulong(InputStream in) throws IOException { + long result = 0; + long b; + int offset = 0; + do { + b = in.read(); + if (b == -1) { + throw new EOFException("Reading Vulong past EOF"); + } + result |= (0x7f & b) << offset; + offset += 7; + } while (b >= 0x80); + return result; + } + + long readVslong(InputStream in) throws IOException { + long result = readVulong(in); + return (result >>> 1) ^ -(result & 1); + } + + float readFloat(InputStream in) throws IOException { + int ser = in.read() | (in.read() << 8) | (in.read() << 16) | + (in.read() << 24); + return Float.intBitsToFloat(ser); + } + + void writeFloat(OutputStream output, float value) throws IOException { + int ser = Float.floatToIntBits(value); + output.write(ser & 0xff); + output.write((ser >> 8) & 0xff); + output.write((ser >> 16) & 0xff); + output.write((ser >> 24) & 0xff); + } + + double readDouble(InputStream in) throws IOException { + return Double.longBitsToDouble(readLongLE(in)); + } + + long readLongLE(InputStream in) throws IOException { + in.read(readBuffer, 0, 8); + return (((readBuffer[0] & 0xff) << 0) + + ((readBuffer[1] & 0xff) << 8) + + ((readBuffer[2] & 0xff) << 16) + + ((long) (readBuffer[3] & 0xff) << 24) + + ((long) (readBuffer[4] & 0xff) << 32) + + ((long) (readBuffer[5] & 0xff) << 40) + + ((long) (readBuffer[6] & 0xff) << 48) + + ((long) (readBuffer[7] & 0xff) << 56)); + } + + void writeDouble(OutputStream output, double value) throws IOException { + writeLongLE(output, Double.doubleToLongBits(value)); + } + + private void writeLongLE(OutputStream output, long value) throws IOException { + writeBuffer[0] = (byte) ((value >> 0) & 0xff); + writeBuffer[1] = (byte) ((value >> 8) & 0xff); + writeBuffer[2] = (byte) ((value >> 16) & 0xff); + writeBuffer[3] = (byte) ((value >> 24) & 0xff); + writeBuffer[4] = (byte) ((value >> 32) & 0xff); + writeBuffer[5] = (byte) ((value >> 40) & 0xff); + writeBuffer[6] = (byte) ((value >> 48) & 0xff); + writeBuffer[7] = (byte) ((value >> 56) & 0xff); + output.write(writeBuffer, 0, 8); + } + + /** + * Write the arbitrarily sized signed BigInteger in vint format. + * + * Signed integers are encoded using the low bit as the sign bit using zigzag + * encoding. + * + * Each byte uses the low 7 bits for data and the high bit for stop/continue. + * + * Bytes are stored LSB first. + * @param output the stream to write to + * @param value the value to output + * @throws IOException + */ + static void writeBigInteger(OutputStream output, + BigInteger value) throws IOException { + // encode the signed number as a positive integer + value = value.shiftLeft(1); + int sign = value.signum(); + if (sign < 0) { + value = value.negate(); + value = value.subtract(BigInteger.ONE); + } + int length = value.bitLength(); + while (true) { + long lowBits = value.longValue() & 0x7fffffffffffffffL; + length -= 63; + // write out the next 63 bits worth of data + for(int i=0; i < 9; ++i) { + // if this is the last byte, leave the high bit off + if (length <= 0 && (lowBits & ~0x7f) == 0) { + output.write((byte) lowBits); + return; + } else { + output.write((byte) (0x80 | (lowBits & 0x7f))); + lowBits >>>= 7; + } + } + value = value.shiftRight(63); + } + } + + /** + * Read the signed arbitrary sized BigInteger BigInteger in vint format + * @param input the stream to read from + * @return the read BigInteger + * @throws IOException + */ + static BigInteger readBigInteger(InputStream input) throws IOException { + BigInteger result = BigInteger.ZERO; + long work = 0; + int offset = 0; + long b; + do { + b = input.read(); + if (b == -1) { + throw new EOFException("Reading BigInteger past EOF from " + input); + } + work |= (0x7f & b) << (offset % 63); + offset += 7; + // if we've read 63 bits, roll them into the result + if (offset == 63) { + result = BigInteger.valueOf(work); + work = 0; + } else if (offset % 63 == 0) { + result = result.or(BigInteger.valueOf(work).shiftLeft(offset-63)); + work = 0; + } + } while (b >= 0x80); + if (work != 0) { + result = result.or(BigInteger.valueOf(work).shiftLeft((offset/63)*63)); + } + // convert back to a signed number + boolean isNegative = result.testBit(0); + if (isNegative) { + result = result.add(BigInteger.ONE); + result = result.negate(); + } + result = result.shiftRight(1); + return result; + } + + enum FixedBitSizes { + ONE, TWO, THREE, FOUR, FIVE, SIX, SEVEN, EIGHT, NINE, TEN, ELEVEN, TWELVE, + THIRTEEN, FOURTEEN, FIFTEEN, SIXTEEN, SEVENTEEN, EIGHTEEN, NINETEEN, + TWENTY, TWENTYONE, TWENTYTWO, TWENTYTHREE, TWENTYFOUR, TWENTYSIX, + TWENTYEIGHT, THIRTY, THIRTYTWO, FORTY, FORTYEIGHT, FIFTYSIX, SIXTYFOUR; + } + + /** + * Count the number of bits required to encode the given value + * @param value + * @return bits required to store value + */ + int findClosestNumBits(long value) { + int count = 0; + while (value != 0) { + count++; + value = value >>> 1; + } + return getClosestFixedBits(count); + } + + /** + * zigzag encode the given value + * @param val + * @return zigzag encoded value + */ + long zigzagEncode(long val) { + return (val << 1) ^ (val >> 63); + } + + /** + * zigzag decode the given value + * @param val + * @return zizag decoded value + */ + long zigzagDecode(long val) { + return (val >>> 1) ^ -(val & 1); + } + + /** + * Compute the bits required to represent pth percentile value + * @param data - array + * @param p - percentile value (>=0.0 to <=1.0) + * @return pth percentile bits + */ + int percentileBits(long[] data, int offset, int length, double p) { + if ((p > 1.0) || (p <= 0.0)) { + return -1; + } + + // histogram that store the encoded bit requirement for each values. + // maximum number of bits that can encoded is 32 (refer FixedBitSizes) + int[] hist = new int[32]; + + // compute the histogram + for(int i = offset; i < (offset + length); i++) { + int idx = encodeBitWidth(findClosestNumBits(data[i])); + hist[idx] += 1; + } + + int perLen = (int) (length * (1.0 - p)); + + // return the bits required by pth percentile length + for(int i = hist.length - 1; i >= 0; i--) { + perLen -= hist[i]; + if (perLen < 0) { + return decodeBitWidth(i); + } + } + + return 0; + } + + /** + * Calculate the number of bytes required + * @param n - number of values + * @param numBits - bit width + * @return number of bytes required + */ + int getTotalBytesRequired(int n, int numBits) { + return (n * numBits + 7) / 8; + } + + /** + * For a given fixed bit this function will return the closest available fixed + * bit + * @param n + * @return closest valid fixed bit + */ + int getClosestFixedBits(int n) { + if (n == 0) { + return 1; + } + + if (n >= 1 && n <= 24) { + return n; + } else if (n > 24 && n <= 26) { + return 26; + } else if (n > 26 && n <= 28) { + return 28; + } else if (n > 28 && n <= 30) { + return 30; + } else if (n > 30 && n <= 32) { + return 32; + } else if (n > 32 && n <= 40) { + return 40; + } else if (n > 40 && n <= 48) { + return 48; + } else if (n > 48 && n <= 56) { + return 56; + } else { + return 64; + } + } + + public int getClosestAlignedFixedBits(int n) { + if (n == 0 || n == 1) { + return 1; + } else if (n > 1 && n <= 2) { + return 2; + } else if (n > 2 && n <= 4) { + return 4; + } else if (n > 4 && n <= 8) { + return 8; + } else if (n > 8 && n <= 16) { + return 16; + } else if (n > 16 && n <= 24) { + return 24; + } else if (n > 24 && n <= 32) { + return 32; + } else if (n > 32 && n <= 40) { + return 40; + } else if (n > 40 && n <= 48) { + return 48; + } else if (n > 48 && n <= 56) { + return 56; + } else { + return 64; + } + } + + /** + * Finds the closest available fixed bit width match and returns its encoded + * value (ordinal) + * @param n - fixed bit width to encode + * @return encoded fixed bit width + */ + int encodeBitWidth(int n) { + n = getClosestFixedBits(n); + + if (n >= 1 && n <= 24) { + return n - 1; + } else if (n > 24 && n <= 26) { + return FixedBitSizes.TWENTYSIX.ordinal(); + } else if (n > 26 && n <= 28) { + return FixedBitSizes.TWENTYEIGHT.ordinal(); + } else if (n > 28 && n <= 30) { + return FixedBitSizes.THIRTY.ordinal(); + } else if (n > 30 && n <= 32) { + return FixedBitSizes.THIRTYTWO.ordinal(); + } else if (n > 32 && n <= 40) { + return FixedBitSizes.FORTY.ordinal(); + } else if (n > 40 && n <= 48) { + return FixedBitSizes.FORTYEIGHT.ordinal(); + } else if (n > 48 && n <= 56) { + return FixedBitSizes.FIFTYSIX.ordinal(); + } else { + return FixedBitSizes.SIXTYFOUR.ordinal(); + } + } + + /** + * Decodes the ordinal fixed bit value to actual fixed bit width value + * @param n - encoded fixed bit width + * @return decoded fixed bit width + */ + int decodeBitWidth(int n) { + if (n >= FixedBitSizes.ONE.ordinal() + && n <= FixedBitSizes.TWENTYFOUR.ordinal()) { + return n + 1; + } else if (n == FixedBitSizes.TWENTYSIX.ordinal()) { + return 26; + } else if (n == FixedBitSizes.TWENTYEIGHT.ordinal()) { + return 28; + } else if (n == FixedBitSizes.THIRTY.ordinal()) { + return 30; + } else if (n == FixedBitSizes.THIRTYTWO.ordinal()) { + return 32; + } else if (n == FixedBitSizes.FORTY.ordinal()) { + return 40; + } else if (n == FixedBitSizes.FORTYEIGHT.ordinal()) { + return 48; + } else if (n == FixedBitSizes.FIFTYSIX.ordinal()) { + return 56; + } else { + return 64; + } + } + + /** + * Bitpack and write the input values to underlying output stream + * @param input - values to write + * @param offset - offset + * @param len - length + * @param bitSize - bit width + * @param output - output stream + * @throws IOException + */ + void writeInts(long[] input, int offset, int len, int bitSize, + OutputStream output) throws IOException { + if (input == null || input.length < 1 || offset < 0 || len < 1 + || bitSize < 1) { + return; + } + + switch (bitSize) { + case 1: + unrolledBitPack1(input, offset, len, output); + return; + case 2: + unrolledBitPack2(input, offset, len, output); + return; + case 4: + unrolledBitPack4(input, offset, len, output); + return; + case 8: + unrolledBitPack8(input, offset, len, output); + return; + case 16: + unrolledBitPack16(input, offset, len, output); + return; + case 24: + unrolledBitPack24(input, offset, len, output); + return; + case 32: + unrolledBitPack32(input, offset, len, output); + return; + case 40: + unrolledBitPack40(input, offset, len, output); + return; + case 48: + unrolledBitPack48(input, offset, len, output); + return; + case 56: + unrolledBitPack56(input, offset, len, output); + return; + case 64: + unrolledBitPack64(input, offset, len, output); + return; + default: + break; + } + + int bitsLeft = 8; + byte current = 0; + for(int i = offset; i < (offset + len); i++) { + long value = input[i]; + int bitsToWrite = bitSize; + while (bitsToWrite > bitsLeft) { + // add the bits to the bottom of the current word + current |= value >>> (bitsToWrite - bitsLeft); + // subtract out the bits we just added + bitsToWrite -= bitsLeft; + // zero out the bits above bitsToWrite + value &= (1L << bitsToWrite) - 1; + output.write(current); + current = 0; + bitsLeft = 8; + } + bitsLeft -= bitsToWrite; + current |= value << bitsLeft; + if (bitsLeft == 0) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + // flush + if (bitsLeft != 8) { + output.write(current); + current = 0; + bitsLeft = 8; + } + } + + private void unrolledBitPack1(long[] input, int offset, int len, + OutputStream output) throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = (int) (val | ((input[i] & 1) << 7) + | ((input[i + 1] & 1) << 6) + | ((input[i + 2] & 1) << 5) + | ((input[i + 3] & 1) << 4) + | ((input[i + 4] & 1) << 3) + | ((input[i + 5] & 1) << 2) + | ((input[i + 6] & 1) << 1) + | (input[i + 7]) & 1); + output.write(val); + val = 0; + } + + if (remainder > 0) { + int startShift = 7; + for (int i = endUnroll; i < endOffset; i++) { + val = (int) (val | (input[i] & 1) << startShift); + startShift -= 1; + } + output.write(val); + } + } + + private void unrolledBitPack2(long[] input, int offset, int len, + OutputStream output) throws IOException { + final int numHops = 4; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = (int) (val | ((input[i] & 3) << 6) + | ((input[i + 1] & 3) << 4) + | ((input[i + 2] & 3) << 2) + | (input[i + 3]) & 3); + output.write(val); + val = 0; + } + + if (remainder > 0) { + int startShift = 6; + for (int i = endUnroll; i < endOffset; i++) { + val = (int) (val | (input[i] & 3) << startShift); + startShift -= 2; + } + output.write(val); + } + } + + private void unrolledBitPack4(long[] input, int offset, int len, + OutputStream output) throws IOException { + final int numHops = 2; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int val = 0; + for (int i = offset; i < endUnroll; i = i + numHops) { + val = (int) (val | ((input[i] & 15) << 4) | (input[i + 1]) & 15); + output.write(val); + val = 0; + } + + if (remainder > 0) { + int startShift = 4; + for (int i = endUnroll; i < endOffset; i++) { + val = (int) (val | (input[i] & 15) << startShift); + startShift -= 4; + } + output.write(val); + } + } + + private void unrolledBitPack8(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 1); + } + + private void unrolledBitPack16(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 2); + } + + private void unrolledBitPack24(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 3); + } + + private void unrolledBitPack32(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 4); + } + + private void unrolledBitPack40(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 5); + } + + private void unrolledBitPack48(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 6); + } + + private void unrolledBitPack56(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 7); + } + + private void unrolledBitPack64(long[] input, int offset, int len, + OutputStream output) throws IOException { + unrolledBitPackBytes(input, offset, len, output, 8); + } + + private void unrolledBitPackBytes(long[] input, int offset, int len, OutputStream output, int numBytes) throws IOException { + final int numHops = 8; + final int remainder = len % numHops; + final int endOffset = offset + len; + final int endUnroll = endOffset - remainder; + int i = offset; + for (; i < endUnroll; i = i + numHops) { + writeLongBE(output, input, i, numHops, numBytes); + } + + if (remainder > 0) { + writeRemainingLongs(output, i, input, remainder, numBytes); + } + } + + private void writeRemainingLongs(OutputStream output, int offset, long[] input, int remainder, + int numBytes) throws IOException { + final int numHops = remainder; + + int idx = 0; + switch (numBytes) { + case 1: + while (remainder > 0) { + writeBuffer[idx] = (byte) (input[offset + idx] & 255); + remainder--; + idx++; + } + break; + case 2: + while (remainder > 0) { + writeLongBE2(output, input[offset + idx], idx * 2); + remainder--; + idx++; + } + break; + case 3: + while (remainder > 0) { + writeLongBE3(output, input[offset + idx], idx * 3); + remainder--; + idx++; + } + break; + case 4: + while (remainder > 0) { + writeLongBE4(output, input[offset + idx], idx * 4); + remainder--; + idx++; + } + break; + case 5: + while (remainder > 0) { + writeLongBE5(output, input[offset + idx], idx * 5); + remainder--; + idx++; + } + break; + case 6: + while (remainder > 0) { + writeLongBE6(output, input[offset + idx], idx * 6); + remainder--; + idx++; + } + break; + case 7: + while (remainder > 0) { + writeLongBE7(output, input[offset + idx], idx * 7); + remainder--; + idx++; + } + break; + case 8: + while (remainder > 0) { + writeLongBE8(output, input[offset + idx], idx * 8); + remainder--; + idx++; + } + break; + default: + break; + } + + final int toWrite = numHops * numBytes; + output.write(writeBuffer, 0, toWrite); + } + + private void writeLongBE(OutputStream output, long[] input, int offset, int numHops, int numBytes) throws IOException { + + switch (numBytes) { + case 1: + writeBuffer[0] = (byte) (input[offset + 0] & 255); + writeBuffer[1] = (byte) (input[offset + 1] & 255); + writeBuffer[2] = (byte) (input[offset + 2] & 255); + writeBuffer[3] = (byte) (input[offset + 3] & 255); + writeBuffer[4] = (byte) (input[offset + 4] & 255); + writeBuffer[5] = (byte) (input[offset + 5] & 255); + writeBuffer[6] = (byte) (input[offset + 6] & 255); + writeBuffer[7] = (byte) (input[offset + 7] & 255); + break; + case 2: + writeLongBE2(output, input[offset + 0], 0); + writeLongBE2(output, input[offset + 1], 2); + writeLongBE2(output, input[offset + 2], 4); + writeLongBE2(output, input[offset + 3], 6); + writeLongBE2(output, input[offset + 4], 8); + writeLongBE2(output, input[offset + 5], 10); + writeLongBE2(output, input[offset + 6], 12); + writeLongBE2(output, input[offset + 7], 14); + break; + case 3: + writeLongBE3(output, input[offset + 0], 0); + writeLongBE3(output, input[offset + 1], 3); + writeLongBE3(output, input[offset + 2], 6); + writeLongBE3(output, input[offset + 3], 9); + writeLongBE3(output, input[offset + 4], 12); + writeLongBE3(output, input[offset + 5], 15); + writeLongBE3(output, input[offset + 6], 18); + writeLongBE3(output, input[offset + 7], 21); + break; + case 4: + writeLongBE4(output, input[offset + 0], 0); + writeLongBE4(output, input[offset + 1], 4); + writeLongBE4(output, input[offset + 2], 8); + writeLongBE4(output, input[offset + 3], 12); + writeLongBE4(output, input[offset + 4], 16); + writeLongBE4(output, input[offset + 5], 20); + writeLongBE4(output, input[offset + 6], 24); + writeLongBE4(output, input[offset + 7], 28); + break; + case 5: + writeLongBE5(output, input[offset + 0], 0); + writeLongBE5(output, input[offset + 1], 5); + writeLongBE5(output, input[offset + 2], 10); + writeLongBE5(output, input[offset + 3], 15); + writeLongBE5(output, input[offset + 4], 20); + writeLongBE5(output, input[offset + 5], 25); + writeLongBE5(output, input[offset + 6], 30); + writeLongBE5(output, input[offset + 7], 35); + break; + case 6: + writeLongBE6(output, input[offset + 0], 0); + writeLongBE6(output, input[offset + 1], 6); + writeLongBE6(output, input[offset + 2], 12); + writeLongBE6(output, input[offset + 3], 18); + writeLongBE6(output, input[offset + 4], 24); + writeLongBE6(output, input[offset + 5], 30); + writeLongBE6(output, input[offset + 6], 36); + writeLongBE6(output, input[offset + 7], 42); + break; + case 7: + writeLongBE7(output, input[offset + 0], 0); + writeLongBE7(output, input[offset + 1], 7); + writeLongBE7(output, input[offset + 2], 14); + writeLongBE7(output, input[offset + 3], 21); + writeLongBE7(output, input[offset + 4], 28); + writeLongBE7(output, input[offset + 5], 35); + writeLongBE7(output, input[offset + 6], 42); + writeLongBE7(output, input[offset + 7], 49); + break; + case 8: + writeLongBE8(output, input[offset + 0], 0); + writeLongBE8(output, input[offset + 1], 8); + writeLongBE8(output, input[offset + 2], 16); + writeLongBE8(output, input[offset + 3], 24); + writeLongBE8(output, input[offset + 4], 32); + writeLongBE8(output, input[offset + 5], 40); + writeLongBE8(output, input[offset + 6], 48); + writeLongBE8(output, input[offset + 7], 56); + break; + default: + break; + } + + final int toWrite = numHops * numBytes; + output.write(writeBuffer, 0, toWrite); + } + + private void writeLongBE2(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 8); + writeBuffer[wbOffset + 1] = (byte) (val >>> 0); + } + + private void writeLongBE3(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 16); + writeBuffer[wbOffset + 1] = (byte) (val >>> 8); + writeBuffer[wbOffset + 2] = (byte) (val >>> 0); + } + + private void writeLongBE4(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 24); + writeBuffer[wbOffset + 1] = (byte) (val >>> 16); + writeBuffer[wbOffset + 2] = (byte) (val >>> 8); + writeBuffer[wbOffset + 3] = (byte) (val >>> 0); + } + + private void writeLongBE5(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 32); + writeBuffer[wbOffset + 1] = (byte) (val >>> 24); + writeBuffer[wbOffset + 2] = (byte) (val >>> 16); + writeBuffer[wbOffset + 3] = (byte) (val >>> 8); + writeBuffer[wbOffset + 4] = (byte) (val >>> 0); + } + + private void writeLongBE6(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 40); + writeBuffer[wbOffset + 1] = (byte) (val >>> 32); + writeBuffer[wbOffset + 2] = (byte) (val >>> 24); + writeBuffer[wbOffset + 3] = (byte) (val >>> 16); + writeBuffer[wbOffset + 4] = (byte) (val >>> 8); + writeBuffer[wbOffset + 5] = (byte) (val >>> 0); + } + + private void writeLongBE7(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 48); + writeBuffer[wbOffset + 1] = (byte) (val >>> 40); + writeBuffer[wbOffset + 2] = (byte) (val >>> 32); + writeBuffer[wbOffset + 3] = (byte) (val >>> 24); + writeBuffer[wbOffset + 4] = (byte) (val >>> 16); + writeBuffer[wbOffset + 5] = (byte) (val >>> 8); + writeBuffer[wbOffset + 6] = (byte) (val >>> 0); + } + + private void writeLongBE8(OutputStream output, long val, int wbOffset) { + writeBuffer[wbOffset + 0] = (byte) (val >>> 56); + writeBuffer[wbOffset + 1] = (byte) (val >>> 48); + writeBuffer[wbOffset + 2] = (byte) (val >>> 40); + writeBuffer[wbOffset + 3] = (byte) (val >>> 32); + writeBuffer[wbOffset + 4] = (byte) (val >>> 24); + writeBuffer[wbOffset + 5] = (byte) (val >>> 16); + writeBuffer[wbOffset + 6] = (byte) (val >>> 8); + writeBuffer[wbOffset + 7] = (byte) (val >>> 0); + } + + // Do not want to use Guava LongMath.checkedSubtract() here as it will throw + // ArithmeticException in case of overflow + public boolean isSafeSubtract(long left, long right) { + return (left ^ right) >= 0 | (left ^ (left - right)) >= 0; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SnappyCodec.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SnappyCodec.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SnappyCodec.java new file mode 100644 index 0000000..285a32a --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/SnappyCodec.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import org.apache.hadoop.hive.shims.HadoopShims.DirectCompressionType; +import org.apache.hadoop.hive.shims.HadoopShims.DirectDecompressorShim; +import org.apache.hadoop.hive.shims.ShimLoader; +import org.iq80.snappy.Snappy; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.EnumSet; + +class SnappyCodec implements CompressionCodec, DirectDecompressionCodec { + + Boolean direct = null; + + @Override + public boolean compress(ByteBuffer in, ByteBuffer out, + ByteBuffer overflow) throws IOException { + int inBytes = in.remaining(); + // I should work on a patch for Snappy to support an overflow buffer + // to prevent the extra buffer copy. + byte[] compressed = new byte[Snappy.maxCompressedLength(inBytes)]; + int outBytes = + Snappy.compress(in.array(), in.arrayOffset() + in.position(), inBytes, + compressed, 0); + if (outBytes < inBytes) { + int remaining = out.remaining(); + if (remaining >= outBytes) { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), outBytes); + out.position(out.position() + outBytes); + } else { + System.arraycopy(compressed, 0, out.array(), out.arrayOffset() + + out.position(), remaining); + out.position(out.limit()); + System.arraycopy(compressed, remaining, overflow.array(), + overflow.arrayOffset(), outBytes - remaining); + overflow.position(outBytes - remaining); + } + return true; + } else { + return false; + } + } + + @Override + public void decompress(ByteBuffer in, ByteBuffer out) throws IOException { + if(in.isDirect() && out.isDirect()) { + directDecompress(in, out); + return; + } + int inOffset = in.position(); + int uncompressLen = + Snappy.uncompress(in.array(), in.arrayOffset() + inOffset, + in.limit() - inOffset, out.array(), out.arrayOffset() + out.position()); + out.position(uncompressLen + out.position()); + out.flip(); + } + + @Override + public boolean isAvailable() { + if (direct == null) { + try { + if (ShimLoader.getHadoopShims().getDirectDecompressor( + DirectCompressionType.SNAPPY) != null) { + direct = Boolean.valueOf(true); + } else { + direct = Boolean.valueOf(false); + } + } catch (UnsatisfiedLinkError ule) { + direct = Boolean.valueOf(false); + } + } + return direct.booleanValue(); + } + + @Override + public void directDecompress(ByteBuffer in, ByteBuffer out) + throws IOException { + DirectDecompressorShim decompressShim = ShimLoader.getHadoopShims() + .getDirectDecompressor(DirectCompressionType.SNAPPY); + decompressShim.decompress(in, out); + out.flip(); // flip for read + } + + @Override + public CompressionCodec modify(EnumSet<Modifier> modifiers) { + // snappy allows no modifications + return this; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StreamName.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StreamName.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StreamName.java new file mode 100644 index 0000000..3821645 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StreamName.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +/** + * The name of a stream within a stripe. + */ +class StreamName implements Comparable<StreamName> { + private final int column; + private final OrcProto.Stream.Kind kind; + + public enum Area { + DATA, INDEX + } + + public StreamName(int column, OrcProto.Stream.Kind kind) { + this.column = column; + this.kind = kind; + } + + public boolean equals(Object obj) { + if (obj != null && obj instanceof StreamName) { + StreamName other = (StreamName) obj; + return other.column == column && other.kind == kind; + } else { + return false; + } + } + + @Override + public int compareTo(StreamName streamName) { + if (streamName == null) { + return -1; + } + Area area = getArea(kind); + Area otherArea = StreamName.getArea(streamName.kind); + if (area != otherArea) { + return -area.compareTo(otherArea); + } + if (column != streamName.column) { + return column < streamName.column ? -1 : 1; + } + return kind.compareTo(streamName.kind); + } + + public int getColumn() { + return column; + } + + public OrcProto.Stream.Kind getKind() { + return kind; + } + + public Area getArea() { + return getArea(kind); + } + + public static Area getArea(OrcProto.Stream.Kind kind) { + switch (kind) { + case ROW_INDEX: + case DICTIONARY_COUNT: + case BLOOM_FILTER: + return Area.INDEX; + default: + return Area.DATA; + } + } + + @Override + public String toString() { + return "Stream for column " + column + " kind " + kind; + } + + @Override + public int hashCode() { + return column * 101 + kind.getNumber(); + } +} + http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringColumnStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringColumnStatistics.java new file mode 100644 index 0000000..4248664 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringColumnStatistics.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +/** + * Statistics for string columns. + */ +public interface StringColumnStatistics extends ColumnStatistics { + /** + * Get the minimum string. + * @return the minimum + */ + String getMinimum(); + + /** + * Get the maximum string. + * @return the maximum + */ + String getMaximum(); + + /** + * Get the total length of all strings + * @return the sum (total length) + */ + long getSum(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringRedBlackTree.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringRedBlackTree.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringRedBlackTree.java new file mode 100644 index 0000000..8835cef --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StringRedBlackTree.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +import org.apache.hadoop.io.Text; + +import java.io.IOException; +import java.io.OutputStream; + +/** + * A red-black tree that stores strings. The strings are stored as UTF-8 bytes + * and an offset for each entry. + */ +class StringRedBlackTree extends RedBlackTree { + private final DynamicByteArray byteArray = new DynamicByteArray(); + private final DynamicIntArray keyOffsets; + private String newKey; + + public StringRedBlackTree(int initialCapacity) { + super(initialCapacity); + keyOffsets = new DynamicIntArray(initialCapacity); + } + + public int add(String value) { + newKey = value; + return addNewKey(); + } + + private int addNewKey() { + // if the newKey is actually new, add it to our byteArray and store the offset & length + if (add()) { + int len = newKey.length(); + keyOffsets.add(byteArray.add(newKey.getBytes(), 0, len)); + } + return lastAdd; + } + + public int add(Text value) { + newKey = value.toString(); + return addNewKey(); + } + + @Override + protected int compareValue(int position) { + int start = keyOffsets.get(position); + int end; + if (position + 1 == keyOffsets.size()) { + end = byteArray.size(); + } else { + end = keyOffsets.get(position+1); + } + return byteArray.compare(newKey.getBytes(), 0, newKey.length(), + start, end - start); + } + + /** + * The information about each node. + */ + public interface VisitorContext { + /** + * Get the position where the key was originally added. + * @return the number returned by add. + */ + int getOriginalPosition(); + + /** + * Write the bytes for the string to the given output stream. + * @param out the stream to write to. + * @throws IOException + */ + void writeBytes(OutputStream out) throws IOException; + + /** + * Get the original string. + * @return the string + */ + Text getText(); + + /** + * Get the number of bytes. + * @return the string's length in bytes + */ + int getLength(); + } + + /** + * The interface for visitors. + */ + public interface Visitor { + /** + * Called once for each node of the tree in sort order. + * @param context the information about each node + * @throws IOException + */ + void visit(VisitorContext context) throws IOException; + } + + private class VisitorContextImpl implements VisitorContext { + private int originalPosition; + private int start; + private int end; + private final Text text = new Text(); + + public int getOriginalPosition() { + return originalPosition; + } + + public Text getText() { + byteArray.setText(text, start, end - start); + return text; + } + + public void writeBytes(OutputStream out) throws IOException { + byteArray.write(out, start, end - start); + } + + public int getLength() { + return end - start; + } + + void setPosition(int position) { + originalPosition = position; + start = keyOffsets.get(originalPosition); + if (position + 1 == keyOffsets.size()) { + end = byteArray.size(); + } else { + end = keyOffsets.get(originalPosition + 1); + } + } + } + + private void recurse(int node, Visitor visitor, VisitorContextImpl context + ) throws IOException { + if (node != NULL) { + recurse(getLeft(node), visitor, context); + context.setPosition(node); + visitor.visit(context); + recurse(getRight(node), visitor, context); + } + } + + /** + * Visit all of the nodes in the tree in sorted order. + * @param visitor the action to be applied to each node + * @throws IOException + */ + public void visit(Visitor visitor) throws IOException { + recurse(root, visitor, new VisitorContextImpl()); + } + + /** + * Reset the table to empty. + */ + public void clear() { + super.clear(); + byteArray.clear(); + keyOffsets.clear(); + } + + public void getText(Text result, int originalPosition) { + int offset = keyOffsets.get(originalPosition); + int length; + if (originalPosition + 1 == keyOffsets.size()) { + length = byteArray.size() - offset; + } else { + length = keyOffsets.get(originalPosition + 1) - offset; + } + byteArray.setText(result, offset, length); + } + + /** + * Get the size of the character data in the table. + * @return the bytes used by the table + */ + public int getCharacterSize() { + return byteArray.size(); + } + + /** + * Calculate the approximate size in memory. + * @return the number of bytes used in storing the tree. + */ + public long getSizeInBytes() { + return byteArray.getSizeInBytes() + keyOffsets.getSizeInBytes() + + super.getSizeInBytes(); + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeInformation.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeInformation.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeInformation.java new file mode 100644 index 0000000..62819c1 --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeInformation.java @@ -0,0 +1,59 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.tajo.storage.thirdparty.orc; + +/** + * Information about the stripes in an ORC file that is provided by the Reader. + */ +public interface StripeInformation { + /** + * Get the byte offset of the start of the stripe. + * @return the bytes from the start of the file + */ + long getOffset(); + + /** + * Get the total length of the stripe in bytes. + * @return the number of bytes in the stripe + */ + long getLength(); + + /** + * Get the length of the stripe's indexes. + * @return the number of bytes in the index + */ + long getIndexLength(); + + /** + * Get the length of the stripe's data. + * @return the number of bytes in the stripe + */ + long getDataLength(); + + /** + * Get the length of the stripe's tail section, which contains its index. + * @return the number of bytes in the tail + */ + long getFooterLength(); + + /** + * Get the number of rows in the stripe. + * @return a count of the number of rows + */ + long getNumberOfRows(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeStatistics.java new file mode 100644 index 0000000..013fc8e --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/StripeStatistics.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import java.util.List; + +public class StripeStatistics { + private final List<OrcProto.ColumnStatistics> cs; + + StripeStatistics(List<OrcProto.ColumnStatistics> list) { + this.cs = list; + } + + /** + * Return list of column statistics + * + * @return column stats + */ + public ColumnStatistics[] getColumnStatistics() { + ColumnStatistics[] result = new ColumnStatistics[cs.size()]; + for (int i = 0; i < result.length; ++i) { + result[i] = ColumnStatisticsImpl.deserialize(cs.get(i)); + } + return result; + } +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/TimestampColumnStatistics.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/TimestampColumnStatistics.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/TimestampColumnStatistics.java new file mode 100644 index 0000000..6fad0ac --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/TimestampColumnStatistics.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import java.sql.Timestamp; + +/** + * Statistics for Timestamp columns. + */ +public interface TimestampColumnStatistics extends ColumnStatistics { + /** + * Get the minimum value for the column. + * @return minimum value + */ + Timestamp getMinimum(); + + /** + * Get the maximum value for the column. + * @return maximum value + */ + Timestamp getMaximum(); +} http://git-wip-us.apache.org/repos/asf/tajo/blob/8763d42b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Writer.java ---------------------------------------------------------------------- diff --git a/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Writer.java b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Writer.java new file mode 100644 index 0000000..669b44f --- /dev/null +++ b/tajo-storage/tajo-storage-hdfs/src/main/java/org/apache/tajo/storage/thirdparty/orc/Writer.java @@ -0,0 +1,99 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.tajo.storage.thirdparty.orc; + +import org.apache.tajo.storage.Tuple; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; + +/** + * The interface for writing ORC files. + */ +public interface Writer { + /** + * Add arbitrary meta-data to the ORC file. This may be called at any point + * until the Writer is closed. If the same key is passed a second time, the + * second value will replace the first. + * @param key a key to label the data with. + * @param value the contents of the metadata. + */ + void addUserMetadata(String key, ByteBuffer value); + + void addTuple(Tuple tuple) throws IOException; + + /** + * Flush all of the buffers and close the file. No methods on this writer + * should be called afterwards. + * @throws IOException + */ + void close() throws IOException; + + /** + * Return the deserialized data size. Raw data size will be compute when + * writing the file footer. Hence raw data size value will be available only + * after closing the writer. + * + * @return raw data size + */ + long getRawDataSize(); + + /** + * Return the number of rows in file. Row count gets updated when flushing + * the stripes. To get accurate row count this method should be called after + * closing the writer. + * + * @return row count + */ + long getNumberOfRows(); + + /** + * Write an intermediate footer on the file such that if the file is + * truncated to the returned offset, it would be a valid ORC file. + * @return the offset that would be a valid end location for an ORC file + */ + long writeIntermediateFooter() throws IOException; + + /** + * Fast stripe append to ORC file. This interface is used for fast ORC file + * merge with other ORC files. When merging, the file to be merged should pass + * stripe in binary form along with stripe information and stripe statistics. + * After appending last stripe of a file, use appendUserMetadata() to append + * any user metadata. + * @param stripe - stripe as byte array + * @param offset - offset within byte array + * @param length - length of stripe within byte array + * @param stripeInfo - stripe information + * @param stripeStatistics - stripe statistics (Protobuf objects can be + * merged directly) + * @throws IOException + */ + public void appendStripe(byte[] stripe, int offset, int length, + StripeInformation stripeInfo, + OrcProto.StripeStatistics stripeStatistics) throws IOException; + + /** + * When fast stripe append is used for merging ORC stripes, after appending + * the last stripe from a file, this interface must be used to merge any + * user metadata. + * @param userMetadata - user metadata + */ + public void appendUserMetadata(List<OrcProto.UserMetadataItem> userMetadata); +}
