Repository: knox Updated Branches: refs/heads/master a3614db44 -> bcb2e1f01
KNOX-1518 - Large HDFS file downloads are incomplete when content is gzipped Project: http://git-wip-us.apache.org/repos/asf/knox/repo Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/bcb2e1f0 Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/bcb2e1f0 Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/bcb2e1f0 Branch: refs/heads/master Commit: bcb2e1f0196548d9cc54ca94e8fe79198ef61906 Parents: a3614db Author: Phil Zampino <pzamp...@apache.org> Authored: Mon Oct 8 17:35:36 2018 -0400 Committer: Phil Zampino <pzamp...@apache.org> Committed: Mon Oct 8 17:35:36 2018 -0400 ---------------------------------------------------------------------- .../filter/rewrite/impl/UrlRewriteResponse.java | 107 ++++++++++++++++--- 1 file changed, 93 insertions(+), 14 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/knox/blob/bcb2e1f0/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java ---------------------------------------------------------------------- diff --git a/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java b/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java index b59aed3..ce16a9a 100644 --- a/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java +++ b/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java @@ -53,7 +53,6 @@ import java.util.List; import java.util.Set; import java.util.zip.GZIPInputStream; import java.util.zip.GZIPOutputStream; -import java.util.zip.ZipException; import static org.apache.knox.gateway.filter.rewrite.impl.UrlRewriteUtil.getRewriteFilterConfig; import static org.apache.knox.gateway.filter.rewrite.impl.UrlRewriteUtil.pickFirstRuleWithEqualsIgnoreCasePathMatch; @@ -151,41 +150,48 @@ public class UrlRewriteResponse extends GatewayResponseWrapper implements Params @Override public void streamResponse( InputStream input, OutputStream output ) throws IOException { - InputStream inStream; + InputStream inStream; OutputStream outStream; + boolean isGzip = false; - BufferedInputStream inBuffer = new BufferedInputStream(input); + + BufferedInputStream inBuffer = new BufferedInputStream(input, STREAM_BUFFER_SIZE); + try { // Use this way to check whether the input stream is gzip compressed, in case // the content encoding header is unknown, as it could be unset in inbound response inBuffer.mark(STREAM_BUFFER_SIZE); - inStream = new GZIPInputStream(inBuffer); + inStream = new GZIPInputStream(new GZIPInputStreamHelperInputStream(inBuffer), STREAM_BUFFER_SIZE); isGzip = true; - } catch (ZipException e) { - inBuffer.reset(); - inStream = inBuffer; } catch (IOException e) { + // Not proper gzip content inBuffer.reset(); inStream = inBuffer; } MimeType mimeType = getMimeType(); UrlRewriteFilterContentDescriptor filterContentConfig = - getRewriteFilterConfig( rewriter.getConfig(), bodyFilterName, mimeType ); + getRewriteFilterConfig(rewriter.getConfig(), bodyFilterName, mimeType); if (filterContentConfig != null) { String asType = filterContentConfig.asType(); if ( asType != null && asType.trim().length() > 0 ) { mimeType = MimeTypes.create(asType, getCharacterEncoding()); } } - InputStream filteredInput = UrlRewriteStreamFilterFactory.create( - mimeType, null, inStream, rewriter, this, UrlRewriter.Direction.OUT, filterContentConfig ); - outStream = (isGzip) ? new GZIPOutputStream(output) : output; + + InputStream filteredInput = UrlRewriteStreamFilterFactory.create(mimeType, + null, + inStream, + rewriter, + this, + UrlRewriter.Direction.OUT, + filterContentConfig); + outStream = (isGzip) ? new GZIPOutputStream(output, STREAM_BUFFER_SIZE) : output; try { - IOUtils.copyLarge( filteredInput, outStream, new byte[STREAM_BUFFER_SIZE] ); + IOUtils.copyLarge(filteredInput, outStream, new byte[STREAM_BUFFER_SIZE]); } finally { - //KNOX-685: outStream.flush(); - outStream.close(); + //KNOX-685: outStream.flush(); + outStream.close(); } } @@ -331,4 +337,77 @@ public class UrlRewriteResponse extends GatewayResponseWrapper implements Params } } + /** + * This InputStream wraps another InputStream to override the available() behavior. This is to fulfill + * GZIPInputStream's expectation of the available() method, for which the behavior actually varies across InputStream + * implementations; in some cases, it causes GZIPInputStream to terminate prematurely, resulting in partial reads. + * + * Guaranteeing that the available() method always returns a value greater than zero forces the GZIPInputStream to + * continue reading from the underlying InputStream until it actually reaches the end of the stream. + */ + private static class GZIPInputStreamHelperInputStream extends InputStream { + + private InputStream delegate; + + GZIPInputStreamHelperInputStream(InputStream delegate) { + this.delegate = delegate; + } + + /** + * @return The delegate's available() result if it's > 0, otherwise 1. + */ + @Override + public int available() throws IOException { + int available = delegate.available(); + if (available <= 1) { + available = 1; + } + return available; + } + + @Override + public int read() throws IOException { + return delegate.read(); + } + + @Override + public int read(byte[] b) throws IOException { + return delegate.read(b); + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + return delegate.read(b, off, len); + } + + @Override + public long skip(long n) throws IOException { + return delegate.skip(n); + } + + @Override + public void close() throws IOException { + delegate.close(); + } + + @Override + public synchronized void mark(int readlimit) { + if (markSupported()) { + delegate.mark(readlimit); + } + } + + @Override + public synchronized void reset() throws IOException { + if (markSupported()) { + delegate.reset(); + } + } + + @Override + public boolean markSupported() { + return delegate.markSupported(); + } + } + }