HIVE-16671 : LLAP IO: BufferUnderflowException may happen in very rare(?) cases due to ORC end-of-CB estimation (Sergey Shelukhin, reviewed by Prasanth Jayachandran)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/5d62dc8a Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/5d62dc8a Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/5d62dc8a Branch: refs/heads/hive-14535 Commit: 5d62dc8ae853517e4c477516283af67439a26f0d Parents: 85415f7 Author: sergey <[email protected]> Authored: Fri May 19 17:22:35 2017 -0700 Committer: sergey <[email protected]> Committed: Fri May 19 17:22:35 2017 -0700 ---------------------------------------------------------------------- .../ql/io/orc/encoded/EncodedReaderImpl.java | 81 +++++++++++++++++--- .../io/orc/encoded/TestEncodedReaderImpl.java | 77 +++++++++++++++++++ 2 files changed, 149 insertions(+), 9 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/5d62dc8a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java index 5b2e9b5..6cd85d2 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/orc/encoded/EncodedReaderImpl.java @@ -50,6 +50,8 @@ import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.OrcEncodedColumnBatch; import org.apache.hadoop.hive.ql.io.orc.encoded.Reader.PoolFactory; import org.apache.orc.OrcProto; +import com.google.common.annotations.VisibleForTesting; + import sun.misc.Cleaner; @@ -1229,10 +1231,26 @@ class EncodedReaderImpl implements EncodedReader { ByteBuffer slice = null; ByteBuffer compressed = current.getChunk(); long cbStartOffset = current.getOffset(); - int b0 = compressed.get() & 0xff; - int b1 = compressed.get() & 0xff; - int b2 = compressed.get() & 0xff; + int b0 = -1, b1 = -1, b2 = -1; + // First, read the CB header. Due to ORC estimates, ZCR, etc. this can be complex. + if (compressed.remaining() >= 3) { + // The overwhelming majority of cases will go here. Read 3 bytes. Tada! + b0 = compressed.get() & 0xff; + b1 = compressed.get() & 0xff; + b2 = compressed.get() & 0xff; + } else { + // Bad luck! Handle the corner cases where 3 bytes are in multiple blocks. + int[] bytes = new int[3]; + current = readLengthBytesFromSmallBuffers( + current, cbStartOffset, bytes, badEstimates, isTracingEnabled); + if (current == null) return null; + compressed = current.getChunk(); + b0 = bytes[0]; + b1 = bytes[1]; + b2 = bytes[2]; + } int chunkLength = (b2 << 15) | (b1 << 7) | (b0 >> 1); + if (chunkLength > bufferSize) { throw new IllegalArgumentException("Buffer size too small. size = " + bufferSize + " needed = " + chunkLength); @@ -1252,7 +1270,8 @@ class EncodedReaderImpl implements EncodedReader { cbStartOffset, cbEndOffset, chunkLength, current, toDecompress, cacheBuffers); } if (current.getEnd() < cbEndOffset && !current.hasContiguousNext()) { - badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, current, 0)); + badEstimates.add(addIncompleteCompressionBuffer( + cbStartOffset, current, 0, isTracingEnabled)); return null; // This is impossible to read from this chunk. } @@ -1304,12 +1323,56 @@ class EncodedReaderImpl implements EncodedReader { } tmp.removeSelf(); } else { - badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, extraChunkCount)); + badEstimates.add(addIncompleteCompressionBuffer( + cbStartOffset, tmp, extraChunkCount, isTracingEnabled)); + return null; // This is impossible to read from this chunk. + } + } + } + + + @VisibleForTesting + static BufferChunk readLengthBytesFromSmallBuffers(BufferChunk first, long cbStartOffset, + int[] result, List<IncompleteCb> badEstimates, boolean isTracingEnabled) throws IOException { + if (!first.hasContiguousNext()) { + badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, first, 0, isTracingEnabled)); + return null; // This is impossible to read from this chunk. + } + int ix = readLengthBytes(first.getChunk(), result, 0); + assert ix < 3; // Otherwise we wouldn't be here. + DiskRangeList current = first.next; + first.removeSelf(); + while (true) { + if (!(current instanceof BufferChunk)) { + throw new IOException( + "Trying to extend compressed block into uncompressed block " + current); + } + BufferChunk currentBc = (BufferChunk) current; + ix = readLengthBytes(currentBc.getChunk(), result, ix); + if (ix == 3) return currentBc; // Done, we have 3 bytes. Continue reading this buffer. + DiskRangeList tmp = current; + current = current.hasContiguousNext() ? current.next : null; + if (current != null) { + if (isTracingEnabled) { + LOG.trace("Removing partial CB " + tmp + " from ranges after copying its contents"); + } + tmp.removeSelf(); + } else { + badEstimates.add(addIncompleteCompressionBuffer(cbStartOffset, tmp, -1, isTracingEnabled)); return null; // This is impossible to read from this chunk. } } } + private static int readLengthBytes(ByteBuffer compressed, int[] bytes, int ix) { + int byteCount = compressed.remaining(); + while (byteCount > 0 && ix < 3) { + bytes[ix++] = compressed.get() & 0xff; + --byteCount; + } + return ix; + } + private void releaseBuffers(Collection<ByteBuffer> toRelease, boolean isFromDataReader) { if (toRelease == null) return; for (ByteBuffer buf : toRelease) { @@ -1342,12 +1405,12 @@ class EncodedReaderImpl implements EncodedReader { } - private IncompleteCb addIncompleteCompressionBuffer( - long cbStartOffset, DiskRangeList target, int extraChunkCount) { + private static IncompleteCb addIncompleteCompressionBuffer(long cbStartOffset, + DiskRangeList target, int extraChunkCountToLog, boolean isTracingEnabled) { IncompleteCb icb = new IncompleteCb(cbStartOffset, target.getEnd()); if (isTracingEnabled) { - LOG.trace("Replacing " + target + " (and " + extraChunkCount + " previous chunks) with " - + icb + " in the buffers"); + LOG.trace("Replacing " + target + " (and " + extraChunkCountToLog + + " previous chunks) with " + icb + " in the buffers"); } target.replaceSelfWith(icb); return icb; http://git-wip-us.apache.org/repos/asf/hive/blob/5d62dc8a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java new file mode 100644 index 0000000..28ca441 --- /dev/null +++ b/ql/src/test/org/apache/hadoop/hive/ql/io/orc/encoded/TestEncodedReaderImpl.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hive.ql.io.orc.encoded; + +import static org.junit.Assert.*; + +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import org.apache.orc.impl.BufferChunk; +import org.junit.Test; + +public class TestEncodedReaderImpl { + @Test + public void testReadLength() throws IOException { + ByteBuffer one = ByteBuffer.wrap(new byte[] { 1 }), two = ByteBuffer.wrap(new byte[] { 2 }), + three = ByteBuffer.wrap(new byte[] { 3 }), twoThree = ByteBuffer.wrap(new byte[] { 2, 3 }), + oneTwo = ByteBuffer.wrap(new byte[] { 1, 2 }); + BufferChunk bc = new BufferChunk(one, 0); + int[] result = new int[3]; + List<IncompleteCb> l = new ArrayList<>(); + BufferChunk rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true); + assertNull(rv); + one.position(0); + bc.insertAfter(new BufferChunk(two, 1)); + Arrays.fill(result, -1); + rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true); + assertNull(rv); + one.position(0); + two.position(0); + bc.insertAfter(new BufferChunk(two, 1)).insertAfter(new BufferChunk(three, 2)); + Arrays.fill(result, -1); + rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true); + assertNotNull(rv); + for (int i = 0; i < result.length; ++i) { + assertEquals(i + 1, result[i]); + } + one.position(0); + bc.insertAfter(new BufferChunk(twoThree, 1)); + Arrays.fill(result, -1); + rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true); + assertNotNull(rv); + for (int i = 0; i < result.length; ++i) { + assertEquals(i + 1, result[i]); + } + bc = new BufferChunk(oneTwo, 0); + Arrays.fill(result, -1); + rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true); + assertNull(rv); + three.position(0); + bc.insertAfter(new BufferChunk(three, 2)); + Arrays.fill(result, -1); + rv = EncodedReaderImpl.readLengthBytesFromSmallBuffers(bc, 0l, result, l, true); + assertNotNull(rv); + for (int i = 0; i < result.length; ++i) { + assertEquals(i + 1, result[i]); + } + } +}
