http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java b/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java deleted file mode 100644 index 09108b2..0000000 --- a/orc/src/java/org/apache/orc/impl/RunLengthByteWriter.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.IOException; - -/** - * A streamFactory that writes a sequence of bytes. A control byte is written before - * each run with positive values 0 to 127 meaning 2 to 129 repetitions. If the - * bytes is -1 to -128, 1 to 128 literal byte values follow. - */ -public class RunLengthByteWriter { - static final int MIN_REPEAT_SIZE = 3; - static final int MAX_LITERAL_SIZE = 128; - static final int MAX_REPEAT_SIZE= 127 + MIN_REPEAT_SIZE; - private final PositionedOutputStream output; - private final byte[] literals = new byte[MAX_LITERAL_SIZE]; - private int numLiterals = 0; - private boolean repeat = false; - private int tailRunLength = 0; - - public RunLengthByteWriter(PositionedOutputStream output) { - this.output = output; - } - - private void writeValues() throws IOException { - if (numLiterals != 0) { - if (repeat) { - output.write(numLiterals - MIN_REPEAT_SIZE); - output.write(literals, 0, 1); - } else { - output.write(-numLiterals); - output.write(literals, 0, numLiterals); - } - repeat = false; - tailRunLength = 0; - numLiterals = 0; - } - } - - public void flush() throws IOException { - writeValues(); - output.flush(); - } - - public void write(byte value) throws IOException { - if (numLiterals == 0) { - literals[numLiterals++] = value; - tailRunLength = 1; - } else if (repeat) { - if (value == literals[0]) { - numLiterals += 1; - if (numLiterals == MAX_REPEAT_SIZE) { - writeValues(); - } - } else { - writeValues(); - literals[numLiterals++] = value; - tailRunLength = 1; - } - } else { - if (value == literals[numLiterals - 1]) { - tailRunLength += 1; - } else { - tailRunLength = 1; - } - if (tailRunLength == MIN_REPEAT_SIZE) { - if (numLiterals + 1 == MIN_REPEAT_SIZE) { - repeat = true; - numLiterals += 1; - } else { - numLiterals -= MIN_REPEAT_SIZE - 1; - writeValues(); - literals[0] = value; - repeat = true; - numLiterals = MIN_REPEAT_SIZE; - } - } else { - literals[numLiterals++] = value; - if (numLiterals == MAX_LITERAL_SIZE) { - writeValues(); - } - } - } - } - - public void getPosition(PositionRecorder recorder) throws IOException { - output.getPosition(recorder); - recorder.addPosition(numLiterals); - } -}
http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java deleted file mode 100644 index b91a263..0000000 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReader.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.EOFException; -import java.io.IOException; - -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; - -/** - * A reader that reads a sequence of integers. - * */ -public class RunLengthIntegerReader implements IntegerReader { - private InStream input; - private final boolean signed; - private final long[] literals = - new long[RunLengthIntegerWriter.MAX_LITERAL_SIZE]; - private int numLiterals = 0; - private int delta = 0; - private int used = 0; - private boolean repeat = false; - private SerializationUtils utils; - - public RunLengthIntegerReader(InStream input, boolean signed) throws IOException { - this.input = input; - this.signed = signed; - this.utils = new SerializationUtils(); - } - - private void readValues(boolean ignoreEof) throws IOException { - int control = input.read(); - if (control == -1) { - if (!ignoreEof) { - throw new EOFException("Read past end of RLE integer from " + input); - } - used = numLiterals = 0; - return; - } else if (control < 0x80) { - numLiterals = control + RunLengthIntegerWriter.MIN_REPEAT_SIZE; - used = 0; - repeat = true; - delta = input.read(); - if (delta == -1) { - throw new EOFException("End of stream in RLE Integer from " + input); - } - // convert from 0 to 255 to -128 to 127 by converting to a signed byte - delta = (byte) (0 + delta); - if (signed) { - literals[0] = utils.readVslong(input); - } else { - literals[0] = utils.readVulong(input); - } - } else { - repeat = false; - numLiterals = 0x100 - control; - used = 0; - for(int i=0; i < numLiterals; ++i) { - if (signed) { - literals[i] = utils.readVslong(input); - } else { - literals[i] = utils.readVulong(input); - } - } - } - } - - @Override - public boolean hasNext() throws IOException { - return used != numLiterals || input.available() > 0; - } - - @Override - public long next() throws IOException { - long result; - if (used == numLiterals) { - readValues(false); - } - if (repeat) { - result = literals[0] + (used++) * delta; - } else { - result = literals[used++]; - } - return result; - } - - @Override - public void nextVector(ColumnVector previous, - long[] data, - int previousLen) throws IOException { - previous.isRepeating = true; - for (int i = 0; i < previousLen; i++) { - if (!previous.isNull[i]) { - data[i] = next(); - } else { - // The default value of null for int type in vectorized - // processing is 1, so set that if the value is null - data[i] = 1; - } - - // The default value for nulls in Vectorization for int types is 1 - // and given that non null value can also be 1, we need to check for isNull also - // when determining the isRepeating flag. - if (previous.isRepeating - && i > 0 - && (data[0] != data[i] || previous.isNull[0] != previous.isNull[i])) { - previous.isRepeating = false; - } - } - } - - @Override - public void nextVector(ColumnVector vector, - int[] data, - int size) throws IOException { - if (vector.noNulls) { - for(int r=0; r < data.length && r < size; ++r) { - data[r] = (int) next(); - } - } else if (!(vector.isRepeating && vector.isNull[0])) { - for(int r=0; r < data.length && r < size; ++r) { - if (!vector.isNull[r]) { - data[r] = (int) next(); - } else { - data[r] = 1; - } - } - } - } - - @Override - public void seek(PositionProvider index) throws IOException { - input.seek(index); - int consumed = (int) index.getNext(); - if (consumed != 0) { - // a loop is required for cases where we break the run into two parts - while (consumed > 0) { - readValues(false); - used = consumed; - consumed -= numLiterals; - } - } else { - used = 0; - numLiterals = 0; - } - } - - @Override - public void skip(long numValues) throws IOException { - while (numValues > 0) { - if (used == numLiterals) { - readValues(false); - } - long consume = Math.min(numValues, numLiterals - used); - used += consume; - numValues -= consume; - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java deleted file mode 100644 index 610d9b5..0000000 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerReaderV2.java +++ /dev/null @@ -1,406 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.EOFException; -import java.io.IOException; -import java.util.Arrays; - -import org.apache.hadoop.hive.ql.exec.vector.ColumnVector; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * A reader that reads a sequence of light weight compressed integers. Refer - * {@link RunLengthIntegerWriterV2} for description of various lightweight - * compression techniques. - */ -public class RunLengthIntegerReaderV2 implements IntegerReader { - public static final Logger LOG = LoggerFactory.getLogger(RunLengthIntegerReaderV2.class); - - private InStream input; - private final boolean signed; - private final long[] literals = new long[RunLengthIntegerWriterV2.MAX_SCOPE]; - private boolean isRepeating = false; - private int numLiterals = 0; - private int used = 0; - private final boolean skipCorrupt; - private final SerializationUtils utils; - private RunLengthIntegerWriterV2.EncodingType currentEncoding; - - public RunLengthIntegerReaderV2(InStream input, boolean signed, - boolean skipCorrupt) throws IOException { - this.input = input; - this.signed = signed; - this.skipCorrupt = skipCorrupt; - this.utils = new SerializationUtils(); - } - - private final static RunLengthIntegerWriterV2.EncodingType[] encodings = RunLengthIntegerWriterV2.EncodingType.values(); - private void readValues(boolean ignoreEof) throws IOException { - // read the first 2 bits and determine the encoding type - isRepeating = false; - int firstByte = input.read(); - if (firstByte < 0) { - if (!ignoreEof) { - throw new EOFException("Read past end of RLE integer from " + input); - } - used = numLiterals = 0; - return; - } - currentEncoding = encodings[(firstByte >>> 6) & 0x03]; - switch (currentEncoding) { - case SHORT_REPEAT: readShortRepeatValues(firstByte); break; - case DIRECT: readDirectValues(firstByte); break; - case PATCHED_BASE: readPatchedBaseValues(firstByte); break; - case DELTA: readDeltaValues(firstByte); break; - default: throw new IOException("Unknown encoding " + currentEncoding); - } - } - - private void readDeltaValues(int firstByte) throws IOException { - - // extract the number of fixed bits - int fb = (firstByte >>> 1) & 0x1f; - if (fb != 0) { - fb = utils.decodeBitWidth(fb); - } - - // extract the blob run length - int len = (firstByte & 0x01) << 8; - len |= input.read(); - - // read the first value stored as vint - long firstVal = 0; - if (signed) { - firstVal = utils.readVslong(input); - } else { - firstVal = utils.readVulong(input); - } - - // store first value to result buffer - long prevVal = firstVal; - literals[numLiterals++] = firstVal; - - // if fixed bits is 0 then all values have fixed delta - if (fb == 0) { - // read the fixed delta value stored as vint (deltas can be negative even - // if all number are positive) - long fd = utils.readVslong(input); - if (fd == 0) { - isRepeating = true; - assert numLiterals == 1; - Arrays.fill(literals, numLiterals, numLiterals + len, literals[0]); - numLiterals += len; - } else { - // add fixed deltas to adjacent values - for(int i = 0; i < len; i++) { - literals[numLiterals++] = literals[numLiterals - 2] + fd; - } - } - } else { - long deltaBase = utils.readVslong(input); - // add delta base and first value - literals[numLiterals++] = firstVal + deltaBase; - prevVal = literals[numLiterals - 1]; - len -= 1; - - // write the unpacked values, add it to previous value and store final - // value to result buffer. if the delta base value is negative then it - // is a decreasing sequence else an increasing sequence - utils.readInts(literals, numLiterals, len, fb, input); - while (len > 0) { - if (deltaBase < 0) { - literals[numLiterals] = prevVal - literals[numLiterals]; - } else { - literals[numLiterals] = prevVal + literals[numLiterals]; - } - prevVal = literals[numLiterals]; - len--; - numLiterals++; - } - } - } - - private void readPatchedBaseValues(int firstByte) throws IOException { - - // extract the number of fixed bits - int fbo = (firstByte >>> 1) & 0x1f; - int fb = utils.decodeBitWidth(fbo); - - // extract the run length of data blob - int len = (firstByte & 0x01) << 8; - len |= input.read(); - // runs are always one off - len += 1; - - // extract the number of bytes occupied by base - int thirdByte = input.read(); - int bw = (thirdByte >>> 5) & 0x07; - // base width is one off - bw += 1; - - // extract patch width - int pwo = thirdByte & 0x1f; - int pw = utils.decodeBitWidth(pwo); - - // read fourth byte and extract patch gap width - int fourthByte = input.read(); - int pgw = (fourthByte >>> 5) & 0x07; - // patch gap width is one off - pgw += 1; - - // extract the length of the patch list - int pl = fourthByte & 0x1f; - - // read the next base width number of bytes to extract base value - long base = utils.bytesToLongBE(input, bw); - long mask = (1L << ((bw * 8) - 1)); - // if MSB of base value is 1 then base is negative value else positive - if ((base & mask) != 0) { - base = base & ~mask; - base = -base; - } - - // unpack the data blob - long[] unpacked = new long[len]; - utils.readInts(unpacked, 0, len, fb, input); - - // unpack the patch blob - long[] unpackedPatch = new long[pl]; - - if ((pw + pgw) > 64 && !skipCorrupt) { - throw new IOException("Corruption in ORC data encountered. To skip" + - " reading corrupted data, set hive.exec.orc.skip.corrupt.data to" + - " true"); - } - int bitSize = utils.getClosestFixedBits(pw + pgw); - utils.readInts(unpackedPatch, 0, pl, bitSize, input); - - // apply the patch directly when decoding the packed data - int patchIdx = 0; - long currGap = 0; - long currPatch = 0; - long patchMask = ((1L << pw) - 1); - currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & patchMask; - long actualGap = 0; - - // special case: gap is >255 then patch value will be 0. - // if gap is <=255 then patch value cannot be 0 - while (currGap == 255 && currPatch == 0) { - actualGap += 255; - patchIdx++; - currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & patchMask; - } - // add the left over gap - actualGap += currGap; - - // unpack data blob, patch it (if required), add base to get final result - for(int i = 0; i < unpacked.length; i++) { - if (i == actualGap) { - // extract the patch value - long patchedVal = unpacked[i] | (currPatch << fb); - - // add base to patched value - literals[numLiterals++] = base + patchedVal; - - // increment the patch to point to next entry in patch list - patchIdx++; - - if (patchIdx < pl) { - // read the next gap and patch - currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & patchMask; - actualGap = 0; - - // special case: gap is >255 then patch will be 0. if gap is - // <=255 then patch cannot be 0 - while (currGap == 255 && currPatch == 0) { - actualGap += 255; - patchIdx++; - currGap = unpackedPatch[patchIdx] >>> pw; - currPatch = unpackedPatch[patchIdx] & patchMask; - } - // add the left over gap - actualGap += currGap; - - // next gap is relative to the current gap - actualGap += i; - } - } else { - // no patching required. add base to unpacked value to get final value - literals[numLiterals++] = base + unpacked[i]; - } - } - - } - - private void readDirectValues(int firstByte) throws IOException { - - // extract the number of fixed bits - int fbo = (firstByte >>> 1) & 0x1f; - int fb = utils.decodeBitWidth(fbo); - - // extract the run length - int len = (firstByte & 0x01) << 8; - len |= input.read(); - // runs are one off - len += 1; - - // write the unpacked values and zigzag decode to result buffer - utils.readInts(literals, numLiterals, len, fb, input); - if (signed) { - for(int i = 0; i < len; i++) { - literals[numLiterals] = utils.zigzagDecode(literals[numLiterals]); - numLiterals++; - } - } else { - numLiterals += len; - } - } - - private void readShortRepeatValues(int firstByte) throws IOException { - - // read the number of bytes occupied by the value - int size = (firstByte >>> 3) & 0x07; - // #bytes are one off - size += 1; - - // read the run length - int len = firstByte & 0x07; - // run lengths values are stored only after MIN_REPEAT value is met - len += RunLengthIntegerWriterV2.MIN_REPEAT; - - // read the repeated value which is store using fixed bytes - long val = utils.bytesToLongBE(input, size); - - if (signed) { - val = utils.zigzagDecode(val); - } - - if (numLiterals != 0) { - // Currently this always holds, which makes peekNextAvailLength simpler. - // If this changes, peekNextAvailLength should be adjusted accordingly. - throw new AssertionError("readValues called with existing values present"); - } - // repeat the value for length times - isRepeating = true; - // TODO: this is not so useful and V1 reader doesn't do that. Fix? Same if delta == 0 - for(int i = 0; i < len; i++) { - literals[i] = val; - } - numLiterals = len; - } - - @Override - public boolean hasNext() throws IOException { - return used != numLiterals || input.available() > 0; - } - - @Override - public long next() throws IOException { - long result; - if (used == numLiterals) { - numLiterals = 0; - used = 0; - readValues(false); - } - result = literals[used++]; - return result; - } - - @Override - public void seek(PositionProvider index) throws IOException { - input.seek(index); - int consumed = (int) index.getNext(); - if (consumed != 0) { - // a loop is required for cases where we break the run into two - // parts - while (consumed > 0) { - numLiterals = 0; - readValues(false); - used = consumed; - consumed -= numLiterals; - } - } else { - used = 0; - numLiterals = 0; - } - } - - @Override - public void skip(long numValues) throws IOException { - while (numValues > 0) { - if (used == numLiterals) { - numLiterals = 0; - used = 0; - readValues(false); - } - long consume = Math.min(numValues, numLiterals - used); - used += consume; - numValues -= consume; - } - } - - @Override - public void nextVector(ColumnVector previous, - long[] data, - int previousLen) throws IOException { - previous.isRepeating = true; - for (int i = 0; i < previousLen; i++) { - if (!previous.isNull[i]) { - data[i] = next(); - } else { - // The default value of null for int type in vectorized - // processing is 1, so set that if the value is null - data[i] = 1; - } - - // The default value for nulls in Vectorization for int types is 1 - // and given that non null value can also be 1, we need to check for isNull also - // when determining the isRepeating flag. - if (previous.isRepeating - && i > 0 - && (data[0] != data[i] || - previous.isNull[0] != previous.isNull[i])) { - previous.isRepeating = false; - } - } - } - - @Override - public void nextVector(ColumnVector vector, - int[] data, - int size) throws IOException { - if (vector.noNulls) { - for(int r=0; r < data.length && r < size; ++r) { - data[r] = (int) next(); - } - } else if (!(vector.isRepeating && vector.isNull[0])) { - for(int r=0; r < data.length && r < size; ++r) { - if (!vector.isNull[r]) { - data[r] = (int) next(); - } else { - data[r] = 1; - } - } - } - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java deleted file mode 100644 index 3e5f2e2..0000000 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriter.java +++ /dev/null @@ -1,143 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.IOException; - -/** - * A streamFactory that writes a sequence of integers. A control byte is written before - * each run with positive values 0 to 127 meaning 3 to 130 repetitions, each - * repetition is offset by a delta. If the control byte is -1 to -128, 1 to 128 - * literal vint values follow. - */ -public class RunLengthIntegerWriter implements IntegerWriter { - static final int MIN_REPEAT_SIZE = 3; - static final int MAX_DELTA = 127; - static final int MIN_DELTA = -128; - static final int MAX_LITERAL_SIZE = 128; - private static final int MAX_REPEAT_SIZE = 127 + MIN_REPEAT_SIZE; - private final PositionedOutputStream output; - private final boolean signed; - private final long[] literals = new long[MAX_LITERAL_SIZE]; - private int numLiterals = 0; - private long delta = 0; - private boolean repeat = false; - private int tailRunLength = 0; - private SerializationUtils utils; - - public RunLengthIntegerWriter(PositionedOutputStream output, - boolean signed) { - this.output = output; - this.signed = signed; - this.utils = new SerializationUtils(); - } - - private void writeValues() throws IOException { - if (numLiterals != 0) { - if (repeat) { - output.write(numLiterals - MIN_REPEAT_SIZE); - output.write((byte) delta); - if (signed) { - utils.writeVslong(output, literals[0]); - } else { - utils.writeVulong(output, literals[0]); - } - } else { - output.write(-numLiterals); - for(int i=0; i < numLiterals; ++i) { - if (signed) { - utils.writeVslong(output, literals[i]); - } else { - utils.writeVulong(output, literals[i]); - } - } - } - repeat = false; - numLiterals = 0; - tailRunLength = 0; - } - } - - @Override - public void flush() throws IOException { - writeValues(); - output.flush(); - } - - @Override - public void write(long value) throws IOException { - if (numLiterals == 0) { - literals[numLiterals++] = value; - tailRunLength = 1; - } else if (repeat) { - if (value == literals[0] + delta * numLiterals) { - numLiterals += 1; - if (numLiterals == MAX_REPEAT_SIZE) { - writeValues(); - } - } else { - writeValues(); - literals[numLiterals++] = value; - tailRunLength = 1; - } - } else { - if (tailRunLength == 1) { - delta = value - literals[numLiterals - 1]; - if (delta < MIN_DELTA || delta > MAX_DELTA) { - tailRunLength = 1; - } else { - tailRunLength = 2; - } - } else if (value == literals[numLiterals - 1] + delta) { - tailRunLength += 1; - } else { - delta = value - literals[numLiterals - 1]; - if (delta < MIN_DELTA || delta > MAX_DELTA) { - tailRunLength = 1; - } else { - tailRunLength = 2; - } - } - if (tailRunLength == MIN_REPEAT_SIZE) { - if (numLiterals + 1 == MIN_REPEAT_SIZE) { - repeat = true; - numLiterals += 1; - } else { - numLiterals -= MIN_REPEAT_SIZE - 1; - long base = literals[numLiterals]; - writeValues(); - literals[0] = base; - repeat = true; - numLiterals = MIN_REPEAT_SIZE; - } - } else { - literals[numLiterals++] = value; - if (numLiterals == MAX_LITERAL_SIZE) { - writeValues(); - } - } - } - } - - @Override - public void getPosition(PositionRecorder recorder) throws IOException { - output.getPosition(recorder); - recorder.addPosition(numLiterals); - } - -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java b/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java deleted file mode 100644 index fab2801..0000000 --- a/orc/src/java/org/apache/orc/impl/RunLengthIntegerWriterV2.java +++ /dev/null @@ -1,831 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.orc.impl; - -import java.io.IOException; - -/** - * A writer that performs light weight compression over sequence of integers. - * <p> - * There are four types of lightweight integer compression - * <ul> - * <li>SHORT_REPEAT</li> - * <li>DIRECT</li> - * <li>PATCHED_BASE</li> - * <li>DELTA</li> - * </ul> - * </p> - * The description and format for these types are as below: - * <p> - * <b>SHORT_REPEAT:</b> Used for short repeated integer sequences. - * <ul> - * <li>1 byte header - * <ul> - * <li>2 bits for encoding type</li> - * <li>3 bits for bytes required for repeating value</li> - * <li>3 bits for repeat count (MIN_REPEAT + run length)</li> - * </ul> - * </li> - * <li>Blob - repeat value (fixed bytes)</li> - * </ul> - * </p> - * <p> - * <b>DIRECT:</b> Used for random integer sequences whose number of bit - * requirement doesn't vary a lot. - * <ul> - * <li>2 bytes header - * <ul> - * 1st byte - * <li>2 bits for encoding type</li> - * <li>5 bits for fixed bit width of values in blob</li> - * <li>1 bit for storing MSB of run length</li> - * </ul> - * <ul> - * 2nd byte - * <li>8 bits for lower run length bits</li> - * </ul> - * </li> - * <li>Blob - stores the direct values using fixed bit width. The length of the - * data blob is (fixed width * run length) bits long</li> - * </ul> - * </p> - * <p> - * <b>PATCHED_BASE:</b> Used for random integer sequences whose number of bit - * requirement varies beyond a threshold. - * <ul> - * <li>4 bytes header - * <ul> - * 1st byte - * <li>2 bits for encoding type</li> - * <li>5 bits for fixed bit width of values in blob</li> - * <li>1 bit for storing MSB of run length</li> - * </ul> - * <ul> - * 2nd byte - * <li>8 bits for lower run length bits</li> - * </ul> - * <ul> - * 3rd byte - * <li>3 bits for bytes required to encode base value</li> - * <li>5 bits for patch width</li> - * </ul> - * <ul> - * 4th byte - * <li>3 bits for patch gap width</li> - * <li>5 bits for patch length</li> - * </ul> - * </li> - * <li>Base value - Stored using fixed number of bytes. If MSB is set, base - * value is negative else positive. Length of base value is (base width * 8) - * bits.</li> - * <li>Data blob - Base reduced values as stored using fixed bit width. Length - * of data blob is (fixed width * run length) bits.</li> - * <li>Patch blob - Patch blob is a list of gap and patch value. Each entry in - * the patch list is (patch width + patch gap width) bits long. Gap between the - * subsequent elements to be patched are stored in upper part of entry whereas - * patch values are stored in lower part of entry. Length of patch blob is - * ((patch width + patch gap width) * patch length) bits.</li> - * </ul> - * </p> - * <p> - * <b>DELTA</b> Used for monotonically increasing or decreasing sequences, - * sequences with fixed delta values or long repeated sequences. - * <ul> - * <li>2 bytes header - * <ul> - * 1st byte - * <li>2 bits for encoding type</li> - * <li>5 bits for fixed bit width of values in blob</li> - * <li>1 bit for storing MSB of run length</li> - * </ul> - * <ul> - * 2nd byte - * <li>8 bits for lower run length bits</li> - * </ul> - * </li> - * <li>Base value - zigzag encoded value written as varint</li> - * <li>Delta base - zigzag encoded value written as varint</li> - * <li>Delta blob - only positive values. monotonicity and orderness are decided - * based on the sign of the base value and delta base</li> - * </ul> - * </p> - */ -public class RunLengthIntegerWriterV2 implements IntegerWriter { - - public enum EncodingType { - SHORT_REPEAT, DIRECT, PATCHED_BASE, DELTA - } - - static final int MAX_SCOPE = 512; - static final int MIN_REPEAT = 3; - private static final int MAX_SHORT_REPEAT_LENGTH = 10; - private long prevDelta = 0; - private int fixedRunLength = 0; - private int variableRunLength = 0; - private final long[] literals = new long[MAX_SCOPE]; - private final PositionedOutputStream output; - private final boolean signed; - private EncodingType encoding; - private int numLiterals; - private final long[] zigzagLiterals = new long[MAX_SCOPE]; - private final long[] baseRedLiterals = new long[MAX_SCOPE]; - private final long[] adjDeltas = new long[MAX_SCOPE]; - private long fixedDelta; - private int zzBits90p; - private int zzBits100p; - private int brBits95p; - private int brBits100p; - private int bitsDeltaMax; - private int patchWidth; - private int patchGapWidth; - private int patchLength; - private long[] gapVsPatchList; - private long min; - private boolean isFixedDelta; - private SerializationUtils utils; - private boolean alignedBitpacking; - - RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed) { - this(output, signed, true); - } - - public RunLengthIntegerWriterV2(PositionedOutputStream output, boolean signed, - boolean alignedBitpacking) { - this.output = output; - this.signed = signed; - this.alignedBitpacking = alignedBitpacking; - this.utils = new SerializationUtils(); - clear(); - } - - private void writeValues() throws IOException { - if (numLiterals != 0) { - - if (encoding.equals(EncodingType.SHORT_REPEAT)) { - writeShortRepeatValues(); - } else if (encoding.equals(EncodingType.DIRECT)) { - writeDirectValues(); - } else if (encoding.equals(EncodingType.PATCHED_BASE)) { - writePatchedBaseValues(); - } else { - writeDeltaValues(); - } - - // clear all the variables - clear(); - } - } - - private void writeDeltaValues() throws IOException { - int len = 0; - int fb = bitsDeltaMax; - int efb = 0; - - if (alignedBitpacking) { - fb = utils.getClosestAlignedFixedBits(fb); - } - - if (isFixedDelta) { - // if fixed run length is greater than threshold then it will be fixed - // delta sequence with delta value 0 else fixed delta sequence with - // non-zero delta value - if (fixedRunLength > MIN_REPEAT) { - // ex. sequence: 2 2 2 2 2 2 2 2 - len = fixedRunLength - 1; - fixedRunLength = 0; - } else { - // ex. sequence: 4 6 8 10 12 14 16 - len = variableRunLength - 1; - variableRunLength = 0; - } - } else { - // fixed width 0 is used for long repeating values. - // sequences that require only 1 bit to encode will have an additional bit - if (fb == 1) { - fb = 2; - } - efb = utils.encodeBitWidth(fb); - efb = efb << 1; - len = variableRunLength - 1; - variableRunLength = 0; - } - - // extract the 9th bit of run length - final int tailBits = (len & 0x100) >>> 8; - - // create first byte of the header - final int headerFirstByte = getOpcode() | efb | tailBits; - - // second byte of the header stores the remaining 8 bits of runlength - final int headerSecondByte = len & 0xff; - - // write header - output.write(headerFirstByte); - output.write(headerSecondByte); - - // store the first value from zigzag literal array - if (signed) { - utils.writeVslong(output, literals[0]); - } else { - utils.writeVulong(output, literals[0]); - } - - if (isFixedDelta) { - // if delta is fixed then we don't need to store delta blob - utils.writeVslong(output, fixedDelta); - } else { - // store the first value as delta value using zigzag encoding - utils.writeVslong(output, adjDeltas[0]); - - // adjacent delta values are bit packed. The length of adjDeltas array is - // always one less than the number of literals (delta difference for n - // elements is n-1). We have already written one element, write the - // remaining numLiterals - 2 elements here - utils.writeInts(adjDeltas, 1, numLiterals - 2, fb, output); - } - } - - private void writePatchedBaseValues() throws IOException { - - // NOTE: Aligned bit packing cannot be applied for PATCHED_BASE encoding - // because patch is applied to MSB bits. For example: If fixed bit width of - // base value is 7 bits and if patch is 3 bits, the actual value is - // constructed by shifting the patch to left by 7 positions. - // actual_value = patch << 7 | base_value - // So, if we align base_value then actual_value can not be reconstructed. - - // write the number of fixed bits required in next 5 bits - final int fb = brBits95p; - final int efb = utils.encodeBitWidth(fb) << 1; - - // adjust variable run length, they are one off - variableRunLength -= 1; - - // extract the 9th bit of run length - final int tailBits = (variableRunLength & 0x100) >>> 8; - - // create first byte of the header - final int headerFirstByte = getOpcode() | efb | tailBits; - - // second byte of the header stores the remaining 8 bits of runlength - final int headerSecondByte = variableRunLength & 0xff; - - // if the min value is negative toggle the sign - final boolean isNegative = min < 0 ? true : false; - if (isNegative) { - min = -min; - } - - // find the number of bytes required for base and shift it by 5 bits - // to accommodate patch width. The additional bit is used to store the sign - // of the base value. - final int baseWidth = utils.findClosestNumBits(min) + 1; - final int baseBytes = baseWidth % 8 == 0 ? baseWidth / 8 : (baseWidth / 8) + 1; - final int bb = (baseBytes - 1) << 5; - - // if the base value is negative then set MSB to 1 - if (isNegative) { - min |= (1L << ((baseBytes * 8) - 1)); - } - - // third byte contains 3 bits for number of bytes occupied by base - // and 5 bits for patchWidth - final int headerThirdByte = bb | utils.encodeBitWidth(patchWidth); - - // fourth byte contains 3 bits for page gap width and 5 bits for - // patch length - final int headerFourthByte = (patchGapWidth - 1) << 5 | patchLength; - - // write header - output.write(headerFirstByte); - output.write(headerSecondByte); - output.write(headerThirdByte); - output.write(headerFourthByte); - - // write the base value using fixed bytes in big endian order - for(int i = baseBytes - 1; i >= 0; i--) { - byte b = (byte) ((min >>> (i * 8)) & 0xff); - output.write(b); - } - - // base reduced literals are bit packed - int closestFixedBits = utils.getClosestFixedBits(fb); - - utils.writeInts(baseRedLiterals, 0, numLiterals, closestFixedBits, - output); - - // write patch list - closestFixedBits = utils.getClosestFixedBits(patchGapWidth + patchWidth); - - utils.writeInts(gapVsPatchList, 0, gapVsPatchList.length, closestFixedBits, - output); - - // reset run length - variableRunLength = 0; - } - - /** - * Store the opcode in 2 MSB bits - * @return opcode - */ - private int getOpcode() { - return encoding.ordinal() << 6; - } - - private void writeDirectValues() throws IOException { - - // write the number of fixed bits required in next 5 bits - int fb = zzBits100p; - - if (alignedBitpacking) { - fb = utils.getClosestAlignedFixedBits(fb); - } - - final int efb = utils.encodeBitWidth(fb) << 1; - - // adjust variable run length - variableRunLength -= 1; - - // extract the 9th bit of run length - final int tailBits = (variableRunLength & 0x100) >>> 8; - - // create first byte of the header - final int headerFirstByte = getOpcode() | efb | tailBits; - - // second byte of the header stores the remaining 8 bits of runlength - final int headerSecondByte = variableRunLength & 0xff; - - // write header - output.write(headerFirstByte); - output.write(headerSecondByte); - - // bit packing the zigzag encoded literals - utils.writeInts(zigzagLiterals, 0, numLiterals, fb, output); - - // reset run length - variableRunLength = 0; - } - - private void writeShortRepeatValues() throws IOException { - // get the value that is repeating, compute the bits and bytes required - long repeatVal = 0; - if (signed) { - repeatVal = utils.zigzagEncode(literals[0]); - } else { - repeatVal = literals[0]; - } - - final int numBitsRepeatVal = utils.findClosestNumBits(repeatVal); - final int numBytesRepeatVal = numBitsRepeatVal % 8 == 0 ? numBitsRepeatVal >>> 3 - : (numBitsRepeatVal >>> 3) + 1; - - // write encoding type in top 2 bits - int header = getOpcode(); - - // write the number of bytes required for the value - header |= ((numBytesRepeatVal - 1) << 3); - - // write the run length - fixedRunLength -= MIN_REPEAT; - header |= fixedRunLength; - - // write the header - output.write(header); - - // write the repeating value in big endian byte order - for(int i = numBytesRepeatVal - 1; i >= 0; i--) { - int b = (int) ((repeatVal >>> (i * 8)) & 0xff); - output.write(b); - } - - fixedRunLength = 0; - } - - private void determineEncoding() { - - // we need to compute zigzag values for DIRECT encoding if we decide to - // break early for delta overflows or for shorter runs - computeZigZagLiterals(); - - zzBits100p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 1.0); - - // not a big win for shorter runs to determine encoding - if (numLiterals <= MIN_REPEAT) { - encoding = EncodingType.DIRECT; - return; - } - - // DELTA encoding check - - // for identifying monotonic sequences - boolean isIncreasing = true; - boolean isDecreasing = true; - this.isFixedDelta = true; - - this.min = literals[0]; - long max = literals[0]; - final long initialDelta = literals[1] - literals[0]; - long currDelta = initialDelta; - long deltaMax = initialDelta; - this.adjDeltas[0] = initialDelta; - - for (int i = 1; i < numLiterals; i++) { - final long l1 = literals[i]; - final long l0 = literals[i - 1]; - currDelta = l1 - l0; - min = Math.min(min, l1); - max = Math.max(max, l1); - - isIncreasing &= (l0 <= l1); - isDecreasing &= (l0 >= l1); - - isFixedDelta &= (currDelta == initialDelta); - if (i > 1) { - adjDeltas[i - 1] = Math.abs(currDelta); - deltaMax = Math.max(deltaMax, adjDeltas[i - 1]); - } - } - - // its faster to exit under delta overflow condition without checking for - // PATCHED_BASE condition as encoding using DIRECT is faster and has less - // overhead than PATCHED_BASE - if (!utils.isSafeSubtract(max, min)) { - encoding = EncodingType.DIRECT; - return; - } - - // invariant - subtracting any number from any other in the literals after - // this point won't overflow - - // if min is equal to max then the delta is 0, this condition happens for - // fixed values run >10 which cannot be encoded with SHORT_REPEAT - if (min == max) { - assert isFixedDelta : min + "==" + max + - ", isFixedDelta cannot be false"; - assert currDelta == 0 : min + "==" + max + ", currDelta should be zero"; - fixedDelta = 0; - encoding = EncodingType.DELTA; - return; - } - - if (isFixedDelta) { - assert currDelta == initialDelta - : "currDelta should be equal to initialDelta for fixed delta encoding"; - encoding = EncodingType.DELTA; - fixedDelta = currDelta; - return; - } - - // if initialDelta is 0 then we cannot delta encode as we cannot identify - // the sign of deltas (increasing or decreasing) - if (initialDelta != 0) { - // stores the number of bits required for packing delta blob in - // delta encoding - bitsDeltaMax = utils.findClosestNumBits(deltaMax); - - // monotonic condition - if (isIncreasing || isDecreasing) { - encoding = EncodingType.DELTA; - return; - } - } - - // PATCHED_BASE encoding check - - // percentile values are computed for the zigzag encoded values. if the - // number of bit requirement between 90th and 100th percentile varies - // beyond a threshold then we need to patch the values. if the variation - // is not significant then we can use direct encoding - - zzBits90p = utils.percentileBits(zigzagLiterals, 0, numLiterals, 0.9); - int diffBitsLH = zzBits100p - zzBits90p; - - // if the difference between 90th percentile and 100th percentile fixed - // bits is > 1 then we need patch the values - if (diffBitsLH > 1) { - - // patching is done only on base reduced values. - // remove base from literals - for (int i = 0; i < numLiterals; i++) { - baseRedLiterals[i] = literals[i] - min; - } - - // 95th percentile width is used to determine max allowed value - // after which patching will be done - brBits95p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 0.95); - - // 100th percentile is used to compute the max patch width - brBits100p = utils.percentileBits(baseRedLiterals, 0, numLiterals, 1.0); - - // after base reducing the values, if the difference in bits between - // 95th percentile and 100th percentile value is zero then there - // is no point in patching the values, in which case we will - // fallback to DIRECT encoding. - // The decision to use patched base was based on zigzag values, but the - // actual patching is done on base reduced literals. - if ((brBits100p - brBits95p) != 0) { - encoding = EncodingType.PATCHED_BASE; - preparePatchedBlob(); - return; - } else { - encoding = EncodingType.DIRECT; - return; - } - } else { - // if difference in bits between 95th percentile and 100th percentile is - // 0, then patch length will become 0. Hence we will fallback to direct - encoding = EncodingType.DIRECT; - return; - } - } - - private void computeZigZagLiterals() { - // populate zigzag encoded literals - long zzEncVal = 0; - for (int i = 0; i < numLiterals; i++) { - if (signed) { - zzEncVal = utils.zigzagEncode(literals[i]); - } else { - zzEncVal = literals[i]; - } - zigzagLiterals[i] = zzEncVal; - } - } - - private void preparePatchedBlob() { - // mask will be max value beyond which patch will be generated - long mask = (1L << brBits95p) - 1; - - // since we are considering only 95 percentile, the size of gap and - // patch array can contain only be 5% values - patchLength = (int) Math.ceil((numLiterals * 0.05)); - - int[] gapList = new int[patchLength]; - long[] patchList = new long[patchLength]; - - // #bit for patch - patchWidth = brBits100p - brBits95p; - patchWidth = utils.getClosestFixedBits(patchWidth); - - // if patch bit requirement is 64 then it will not possible to pack - // gap and patch together in a long. To make sure gap and patch can be - // packed together adjust the patch width - if (patchWidth == 64) { - patchWidth = 56; - brBits95p = 8; - mask = (1L << brBits95p) - 1; - } - - int gapIdx = 0; - int patchIdx = 0; - int prev = 0; - int gap = 0; - int maxGap = 0; - - for(int i = 0; i < numLiterals; i++) { - // if value is above mask then create the patch and record the gap - if (baseRedLiterals[i] > mask) { - gap = i - prev; - if (gap > maxGap) { - maxGap = gap; - } - - // gaps are relative, so store the previous patched value index - prev = i; - gapList[gapIdx++] = gap; - - // extract the most significant bits that are over mask bits - long patch = baseRedLiterals[i] >>> brBits95p; - patchList[patchIdx++] = patch; - - // strip off the MSB to enable safe bit packing - baseRedLiterals[i] &= mask; - } - } - - // adjust the patch length to number of entries in gap list - patchLength = gapIdx; - - // if the element to be patched is the first and only element then - // max gap will be 0, but to store the gap as 0 we need atleast 1 bit - if (maxGap == 0 && patchLength != 0) { - patchGapWidth = 1; - } else { - patchGapWidth = utils.findClosestNumBits(maxGap); - } - - // special case: if the patch gap width is greater than 256, then - // we need 9 bits to encode the gap width. But we only have 3 bits in - // header to record the gap width. To deal with this case, we will save - // two entries in patch list in the following way - // 256 gap width => 0 for patch value - // actual gap - 256 => actual patch value - // We will do the same for gap width = 511. If the element to be patched is - // the last element in the scope then gap width will be 511. In this case we - // will have 3 entries in the patch list in the following way - // 255 gap width => 0 for patch value - // 255 gap width => 0 for patch value - // 1 gap width => actual patch value - if (patchGapWidth > 8) { - patchGapWidth = 8; - // for gap = 511, we need two additional entries in patch list - if (maxGap == 511) { - patchLength += 2; - } else { - patchLength += 1; - } - } - - // create gap vs patch list - gapIdx = 0; - patchIdx = 0; - gapVsPatchList = new long[patchLength]; - for(int i = 0; i < patchLength; i++) { - long g = gapList[gapIdx++]; - long p = patchList[patchIdx++]; - while (g > 255) { - gapVsPatchList[i++] = (255L << patchWidth); - g -= 255; - } - - // store patch value in LSBs and gap in MSBs - gapVsPatchList[i] = (g << patchWidth) | p; - } - } - - /** - * clears all the variables - */ - private void clear() { - numLiterals = 0; - encoding = null; - prevDelta = 0; - fixedDelta = 0; - zzBits90p = 0; - zzBits100p = 0; - brBits95p = 0; - brBits100p = 0; - bitsDeltaMax = 0; - patchGapWidth = 0; - patchLength = 0; - patchWidth = 0; - gapVsPatchList = null; - min = 0; - isFixedDelta = true; - } - - @Override - public void flush() throws IOException { - if (numLiterals != 0) { - if (variableRunLength != 0) { - determineEncoding(); - writeValues(); - } else if (fixedRunLength != 0) { - if (fixedRunLength < MIN_REPEAT) { - variableRunLength = fixedRunLength; - fixedRunLength = 0; - determineEncoding(); - writeValues(); - } else if (fixedRunLength >= MIN_REPEAT - && fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { - encoding = EncodingType.SHORT_REPEAT; - writeValues(); - } else { - encoding = EncodingType.DELTA; - isFixedDelta = true; - writeValues(); - } - } - } - output.flush(); - } - - @Override - public void write(long val) throws IOException { - if (numLiterals == 0) { - initializeLiterals(val); - } else { - if (numLiterals == 1) { - prevDelta = val - literals[0]; - literals[numLiterals++] = val; - // if both values are same count as fixed run else variable run - if (val == literals[0]) { - fixedRunLength = 2; - variableRunLength = 0; - } else { - fixedRunLength = 0; - variableRunLength = 2; - } - } else { - long currentDelta = val - literals[numLiterals - 1]; - if (prevDelta == 0 && currentDelta == 0) { - // fixed delta run - - literals[numLiterals++] = val; - - // if variable run is non-zero then we are seeing repeating - // values at the end of variable run in which case keep - // updating variable and fixed runs - if (variableRunLength > 0) { - fixedRunLength = 2; - } - fixedRunLength += 1; - - // if fixed run met the minimum condition and if variable - // run is non-zero then flush the variable run and shift the - // tail fixed runs to start of the buffer - if (fixedRunLength >= MIN_REPEAT && variableRunLength > 0) { - numLiterals -= MIN_REPEAT; - variableRunLength -= MIN_REPEAT - 1; - // copy the tail fixed runs - long[] tailVals = new long[MIN_REPEAT]; - System.arraycopy(literals, numLiterals, tailVals, 0, MIN_REPEAT); - - // determine variable encoding and flush values - determineEncoding(); - writeValues(); - - // shift tail fixed runs to beginning of the buffer - for(long l : tailVals) { - literals[numLiterals++] = l; - } - } - - // if fixed runs reached max repeat length then write values - if (fixedRunLength == MAX_SCOPE) { - determineEncoding(); - writeValues(); - } - } else { - // variable delta run - - // if fixed run length is non-zero and if it satisfies the - // short repeat conditions then write the values as short repeats - // else use delta encoding - if (fixedRunLength >= MIN_REPEAT) { - if (fixedRunLength <= MAX_SHORT_REPEAT_LENGTH) { - encoding = EncodingType.SHORT_REPEAT; - writeValues(); - } else { - encoding = EncodingType.DELTA; - isFixedDelta = true; - writeValues(); - } - } - - // if fixed run length is <MIN_REPEAT and current value is - // different from previous then treat it as variable run - if (fixedRunLength > 0 && fixedRunLength < MIN_REPEAT) { - if (val != literals[numLiterals - 1]) { - variableRunLength = fixedRunLength; - fixedRunLength = 0; - } - } - - // after writing values re-initialize the variables - if (numLiterals == 0) { - initializeLiterals(val); - } else { - // keep updating variable run lengths - prevDelta = val - literals[numLiterals - 1]; - literals[numLiterals++] = val; - variableRunLength += 1; - - // if variable run length reach the max scope, write it - if (variableRunLength == MAX_SCOPE) { - determineEncoding(); - writeValues(); - } - } - } - } - } - } - - private void initializeLiterals(long val) { - literals[numLiterals++] = val; - fixedRunLength = 1; - variableRunLength = 1; - } - - @Override - public void getPosition(PositionRecorder recorder) throws IOException { - output.getPosition(recorder); - recorder.addPosition(numLiterals); - } -} http://git-wip-us.apache.org/repos/asf/hive/blob/d7f71fb4/orc/src/java/org/apache/orc/impl/SchemaEvolution.java ---------------------------------------------------------------------- diff --git a/orc/src/java/org/apache/orc/impl/SchemaEvolution.java b/orc/src/java/org/apache/orc/impl/SchemaEvolution.java deleted file mode 100644 index bb5bcf7..0000000 --- a/orc/src/java/org/apache/orc/impl/SchemaEvolution.java +++ /dev/null @@ -1,399 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.orc.impl; - -import java.util.ArrayList; -import java.util.Arrays; -import java.util.List; - -import org.apache.orc.TypeDescription; - -/** - * Take the file types and the (optional) configuration column names/types and see if there - * has been schema evolution. - */ -public class SchemaEvolution { - // indexed by reader column id - private final TypeDescription[] readerFileTypes; - // indexed by reader column id - private final boolean[] readerIncluded; - // the offset to the first column id ignoring any ACID columns - private final int readerColumnOffset; - // indexed by file column id - private final boolean[] fileIncluded; - private final TypeDescription fileSchema; - private final TypeDescription readerSchema; - private boolean hasConversion; - // indexed by reader column id - private final boolean[] ppdSafeConversion; - - public SchemaEvolution(TypeDescription fileSchema, boolean[] includedCols) { - this(fileSchema, null, includedCols); - } - - public SchemaEvolution(TypeDescription fileSchema, - TypeDescription readerSchema, - boolean[] includeCols) { - this.readerIncluded = includeCols == null ? null : Arrays.copyOf(includeCols, includeCols.length); - this.hasConversion = false; - this.fileSchema = fileSchema; - boolean isAcid = checkAcidSchema(fileSchema); - this.readerColumnOffset = isAcid ? acidEventFieldNames.size() : 0; - if (readerSchema != null) { - if (isAcid) { - this.readerSchema = createEventSchema(readerSchema); - } else { - this.readerSchema = readerSchema; - } - if (readerIncluded != null && - readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) { - throw new IllegalArgumentException("Include vector the wrong length: " + - this.readerSchema.toJson() + " with include length " + - readerIncluded.length); - } - this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1]; - this.fileIncluded = new boolean[fileSchema.getMaximumId() + 1]; - buildConversionFileTypesArray(fileSchema, this.readerSchema); - } else { - this.readerSchema = fileSchema; - this.readerFileTypes = new TypeDescription[this.readerSchema.getMaximumId() + 1]; - this.fileIncluded = readerIncluded; - if (readerIncluded != null && - readerIncluded.length + readerColumnOffset != this.readerSchema.getMaximumId() + 1) { - throw new IllegalArgumentException("Include vector the wrong length: " + - this.readerSchema.toJson() + " with include length " + - readerIncluded.length); - } - buildSameSchemaFileTypesArray(); - } - this.ppdSafeConversion = populatePpdSafeConversion(); - } - - public TypeDescription getReaderSchema() { - return readerSchema; - } - - /** - * Returns the non-ACID (aka base) reader type description. - * - * @return the reader type ignoring the ACID rowid columns, if any - */ - public TypeDescription getReaderBaseSchema() { - return readerSchema.findSubtype(readerColumnOffset); - } - - /** - * Is there Schema Evolution data type conversion? - * @return - */ - public boolean hasConversion() { - return hasConversion; - } - - public TypeDescription getFileType(TypeDescription readerType) { - return getFileType(readerType.getId()); - } - - /** - * Get whether each column is included from the reader's point of view. - * @return a boolean array indexed by reader column id - */ - public boolean[] getReaderIncluded() { - return readerIncluded; - } - - /** - * Get whether each column is included from the file's point of view. - * @return a boolean array indexed by file column id - */ - public boolean[] getFileIncluded() { - return fileIncluded; - } - - /** - * Get the file type by reader type id. - * @param id reader column id - * @return - */ - public TypeDescription getFileType(int id) { - return readerFileTypes[id]; - } - - /** - * Check if column is safe for ppd evaluation - * @param colId reader column id - * @return true if the specified column is safe for ppd evaluation else false - */ - public boolean isPPDSafeConversion(final int colId) { - if (hasConversion()) { - if (colId < 0 || colId >= ppdSafeConversion.length) { - return false; - } - return ppdSafeConversion[colId]; - } - - // when there is no schema evolution PPD is safe - return true; - } - - private boolean[] populatePpdSafeConversion() { - if (fileSchema == null || readerSchema == null || readerFileTypes == null) { - return null; - } - - boolean[] result = new boolean[readerSchema.getMaximumId() + 1]; - boolean safePpd = validatePPDConversion(fileSchema, readerSchema); - result[readerSchema.getId()] = safePpd; - List<TypeDescription> children = readerSchema.getChildren(); - if (children != null) { - for (TypeDescription child : children) { - TypeDescription fileType = getFileType(child.getId()); - safePpd = validatePPDConversion(fileType, child); - result[child.getId()] = safePpd; - } - } - return result; - } - - private boolean validatePPDConversion(final TypeDescription fileType, - final TypeDescription readerType) { - if (fileType == null) { - return false; - } - if (fileType.getCategory().isPrimitive()) { - if (fileType.getCategory().equals(readerType.getCategory())) { - // for decimals alone do equality check to not mess up with precision change - if (fileType.getCategory().equals(TypeDescription.Category.DECIMAL) && - !fileType.equals(readerType)) { - return false; - } - return true; - } - - // only integer and string evolutions are safe - // byte -> short -> int -> long - // string <-> char <-> varchar - // NOTE: Float to double evolution is not safe as floats are stored as doubles in ORC's - // internal index, but when doing predicate evaluation for queries like "select * from - // orc_float where f = 74.72" the constant on the filter is converted from string -> double - // so the precisions will be different and the comparison will fail. - // Soon, we should convert all sargs that compare equality between floats or - // doubles to range predicates. - - // Similarly string -> char and varchar -> char and vice versa is not possible, as ORC stores - // char with padded spaces in its internal index. - switch (fileType.getCategory()) { - case BYTE: - if (readerType.getCategory().equals(TypeDescription.Category.SHORT) || - readerType.getCategory().equals(TypeDescription.Category.INT) || - readerType.getCategory().equals(TypeDescription.Category.LONG)) { - return true; - } - break; - case SHORT: - if (readerType.getCategory().equals(TypeDescription.Category.INT) || - readerType.getCategory().equals(TypeDescription.Category.LONG)) { - return true; - } - break; - case INT: - if (readerType.getCategory().equals(TypeDescription.Category.LONG)) { - return true; - } - break; - case STRING: - if (readerType.getCategory().equals(TypeDescription.Category.VARCHAR)) { - return true; - } - break; - case VARCHAR: - if (readerType.getCategory().equals(TypeDescription.Category.STRING)) { - return true; - } - break; - default: - break; - } - } - return false; - } - - /** - * Should we read the given reader column? - * @param readerId the id of column in the extended reader schema - * @return true if the column should be read - */ - public boolean includeReaderColumn(int readerId) { - return readerIncluded == null || - readerId <= readerColumnOffset || - readerIncluded[readerId - readerColumnOffset]; - } - - void buildConversionFileTypesArray(TypeDescription fileType, - TypeDescription readerType) { - // if the column isn't included, don't map it - int readerId = readerType.getId(); - if (!includeReaderColumn(readerId)) { - return; - } - boolean isOk = true; - // check the easy case first - if (fileType.getCategory() == readerType.getCategory()) { - switch (readerType.getCategory()) { - case BOOLEAN: - case BYTE: - case SHORT: - case INT: - case LONG: - case DOUBLE: - case FLOAT: - case STRING: - case TIMESTAMP: - case BINARY: - case DATE: - // these are always a match - break; - case CHAR: - case VARCHAR: - // We do conversion when same CHAR/VARCHAR type but different maxLength. - if (fileType.getMaxLength() != readerType.getMaxLength()) { - hasConversion = true; - } - break; - case DECIMAL: - // We do conversion when same DECIMAL type but different precision/scale. - if (fileType.getPrecision() != readerType.getPrecision() || - fileType.getScale() != readerType.getScale()) { - hasConversion = true; - } - break; - case UNION: - case MAP: - case LIST: { - // these must be an exact match - List<TypeDescription> fileChildren = fileType.getChildren(); - List<TypeDescription> readerChildren = readerType.getChildren(); - if (fileChildren.size() == readerChildren.size()) { - for(int i=0; i < fileChildren.size(); ++i) { - buildConversionFileTypesArray(fileChildren.get(i), readerChildren.get(i)); - } - } else { - isOk = false; - } - break; - } - case STRUCT: { - // allow either side to have fewer fields than the other - List<TypeDescription> fileChildren = fileType.getChildren(); - List<TypeDescription> readerChildren = readerType.getChildren(); - if (fileChildren.size() != readerChildren.size()) { - hasConversion = true; - } - int jointSize = Math.min(fileChildren.size(), readerChildren.size()); - for(int i=0; i < jointSize; ++i) { - buildConversionFileTypesArray(fileChildren.get(i), readerChildren.get(i)); - } - break; - } - default: - throw new IllegalArgumentException("Unknown type " + readerType); - } - } else { - /* - * Check for the few cases where will not convert.... - */ - - isOk = ConvertTreeReaderFactory.canConvert(fileType, readerType); - hasConversion = true; - } - if (isOk) { - if (readerFileTypes[readerId] != null) { - throw new RuntimeException("reader to file type entry already assigned"); - } - readerFileTypes[readerId] = fileType; - fileIncluded[fileType.getId()] = true; - } else { - throw new IllegalArgumentException( - String.format( - "ORC does not support type conversion from file type %s (%d) to reader type %s (%d)", - fileType.toString(), fileType.getId(), - readerType.toString(), readerId)); - } - } - - /** - * Use to make a reader to file type array when the schema is the same. - * @return - */ - private void buildSameSchemaFileTypesArray() { - buildSameSchemaFileTypesArrayRecurse(readerSchema); - } - - void buildSameSchemaFileTypesArrayRecurse(TypeDescription readerType) { - int id = readerType.getId(); - if (!includeReaderColumn(id)) { - return; - } - if (readerFileTypes[id] != null) { - throw new RuntimeException("reader to file type entry already assigned"); - } - readerFileTypes[id] = readerType; - List<TypeDescription> children = readerType.getChildren(); - if (children != null) { - for (TypeDescription child : children) { - buildSameSchemaFileTypesArrayRecurse(child); - } - } - } - - private static boolean checkAcidSchema(TypeDescription type) { - if (type.getCategory().equals(TypeDescription.Category.STRUCT)) { - List<String> rootFields = type.getFieldNames(); - if (acidEventFieldNames.equals(rootFields)) { - return true; - } - } - return false; - } - - /** - * @param typeDescr - * @return ORC types for the ACID event based on the row's type description - */ - public static TypeDescription createEventSchema(TypeDescription typeDescr) { - TypeDescription result = TypeDescription.createStruct() - .addField("operation", TypeDescription.createInt()) - .addField("originalTransaction", TypeDescription.createLong()) - .addField("bucket", TypeDescription.createInt()) - .addField("rowId", TypeDescription.createLong()) - .addField("currentTransaction", TypeDescription.createLong()) - .addField("row", typeDescr.clone()); - return result; - } - - public static final List<String> acidEventFieldNames= new ArrayList<String>(); - static { - acidEventFieldNames.add("operation"); - acidEventFieldNames.add("originalTransaction"); - acidEventFieldNames.add("bucket"); - acidEventFieldNames.add("rowId"); - acidEventFieldNames.add("currentTransaction"); - acidEventFieldNames.add("row"); - } -}