Repository: orc Updated Branches: refs/heads/master 1712e8f63 -> 541890a1b
ORC-296 : work around HADOOP-15171; also fix stream contract (Sergey Shelukhin, reviewed by Gopal Vijayaraghavan) Project: http://git-wip-us.apache.org/repos/asf/orc/repo Commit: http://git-wip-us.apache.org/repos/asf/orc/commit/541890a1 Tree: http://git-wip-us.apache.org/repos/asf/orc/tree/541890a1 Diff: http://git-wip-us.apache.org/repos/asf/orc/diff/541890a1 Branch: refs/heads/master Commit: 541890a1bb2369dbefd132053c9aa43da37315e0 Parents: 1712e8f Author: sergey <ser...@apache.org> Authored: Fri Feb 2 16:23:28 2018 -0800 Committer: sergey <ser...@apache.org> Committed: Fri Feb 2 16:23:28 2018 -0800 ---------------------------------------------------------------------- .../src/java/org/apache/orc/impl/InStream.java | 31 ++++++++++---------- .../org/apache/orc/impl/HadoopShimsPre2_6.java | 12 ++++++++ 2 files changed, 28 insertions(+), 15 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/orc/blob/541890a1/java/core/src/java/org/apache/orc/impl/InStream.java ---------------------------------------------------------------------- diff --git a/java/core/src/java/org/apache/orc/impl/InStream.java b/java/core/src/java/org/apache/orc/impl/InStream.java index c85aee5..94e9232 100644 --- a/java/core/src/java/org/apache/orc/impl/InStream.java +++ b/java/core/src/java/org/apache/orc/impl/InStream.java @@ -239,36 +239,37 @@ public abstract class InStream extends InputStream { @Override public int read() throws IOException { - if (uncompressed == null || uncompressed.remaining() == 0) { - if (currentOffset == length) { - return -1; - } - readHeader(); + if (!ensureUncompressed()) { + return -1; } return 0xff & uncompressed.get(); } @Override public int read(byte[] data, int offset, int length) throws IOException { - if (uncompressed == null || uncompressed.remaining() == 0) { - if (currentOffset == this.length) { - return -1; - } - readHeader(); + if (!ensureUncompressed()) { + return -1; } int actualLength = Math.min(length, uncompressed.remaining()); uncompressed.get(data, offset, actualLength); return actualLength; } - @Override - public int available() throws IOException { - if (uncompressed == null || uncompressed.remaining() == 0) { - if (currentOffset == length) { - return 0; + private boolean ensureUncompressed() throws IOException { + while (uncompressed == null || uncompressed.remaining() == 0) { + if (currentOffset == this.length) { + return false; } readHeader(); } + return true; + } + + @Override + public int available() throws IOException { + if (!ensureUncompressed()) { + return 0; + } return uncompressed.remaining(); } http://git-wip-us.apache.org/repos/asf/orc/blob/541890a1/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java ---------------------------------------------------------------------- diff --git a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java index aa312e6..b702179 100644 --- a/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java +++ b/java/shims/src/java/org/apache/orc/impl/HadoopShimsPre2_6.java @@ -40,12 +40,18 @@ public class HadoopShimsPre2_6 implements HadoopShims { static class SnappyDirectDecompressWrapper implements DirectDecompressor { private final SnappyDirectDecompressor root; + private boolean isFirstCall = true; SnappyDirectDecompressWrapper(SnappyDirectDecompressor root) { this.root = root; } public void decompress(ByteBuffer input, ByteBuffer output) throws IOException { + if (!isFirstCall) { + root.reset(); + } else { + isFirstCall = false; + } root.decompress(input, output); } @@ -62,12 +68,18 @@ public class HadoopShimsPre2_6 implements HadoopShims { static class ZlibDirectDecompressWrapper implements DirectDecompressor { private final ZlibDecompressor.ZlibDirectDecompressor root; + private boolean isFirstCall = true; ZlibDirectDecompressWrapper(ZlibDecompressor.ZlibDirectDecompressor root) { this.root = root; } public void decompress(ByteBuffer input, ByteBuffer output) throws IOException { + if (!isFirstCall) { + root.reset(); + } else { + isFirstCall = false; + } root.decompress(input, output); }