Repository: knox
Updated Branches:
  refs/heads/master a3614db44 -> bcb2e1f01


KNOX-1518 - Large HDFS file downloads are incomplete when content is gzipped


Project: http://git-wip-us.apache.org/repos/asf/knox/repo
Commit: http://git-wip-us.apache.org/repos/asf/knox/commit/bcb2e1f0
Tree: http://git-wip-us.apache.org/repos/asf/knox/tree/bcb2e1f0
Diff: http://git-wip-us.apache.org/repos/asf/knox/diff/bcb2e1f0

Branch: refs/heads/master
Commit: bcb2e1f0196548d9cc54ca94e8fe79198ef61906
Parents: a3614db
Author: Phil Zampino <pzamp...@apache.org>
Authored: Mon Oct 8 17:35:36 2018 -0400
Committer: Phil Zampino <pzamp...@apache.org>
Committed: Mon Oct 8 17:35:36 2018 -0400

----------------------------------------------------------------------
 .../filter/rewrite/impl/UrlRewriteResponse.java | 107 ++++++++++++++++---
 1 file changed, 93 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/knox/blob/bcb2e1f0/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java
----------------------------------------------------------------------
diff --git 
a/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java
 
b/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java
index b59aed3..ce16a9a 100644
--- 
a/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java
+++ 
b/gateway-provider-rewrite/src/main/java/org/apache/knox/gateway/filter/rewrite/impl/UrlRewriteResponse.java
@@ -53,7 +53,6 @@ import java.util.List;
 import java.util.Set;
 import java.util.zip.GZIPInputStream;
 import java.util.zip.GZIPOutputStream;
-import java.util.zip.ZipException;
 
 import static 
org.apache.knox.gateway.filter.rewrite.impl.UrlRewriteUtil.getRewriteFilterConfig;
 import static 
org.apache.knox.gateway.filter.rewrite.impl.UrlRewriteUtil.pickFirstRuleWithEqualsIgnoreCasePathMatch;
@@ -151,41 +150,48 @@ public class UrlRewriteResponse extends 
GatewayResponseWrapper implements Params
 
   @Override
   public void streamResponse( InputStream input, OutputStream output ) throws 
IOException {
-    InputStream inStream;
+    InputStream  inStream;
     OutputStream outStream;
+
     boolean isGzip = false;
-    BufferedInputStream inBuffer = new BufferedInputStream(input);
+
+    BufferedInputStream inBuffer = new BufferedInputStream(input, 
STREAM_BUFFER_SIZE);
+
     try {
       // Use this way to check whether the input stream is gzip compressed, in 
case
       // the content encoding header is unknown, as it could be unset in 
inbound response
       inBuffer.mark(STREAM_BUFFER_SIZE);
-      inStream = new GZIPInputStream(inBuffer);
+      inStream = new GZIPInputStream(new 
GZIPInputStreamHelperInputStream(inBuffer), STREAM_BUFFER_SIZE);
       isGzip = true;
-    } catch (ZipException e) {
-      inBuffer.reset();
-      inStream = inBuffer;
     } catch (IOException e) {
+      // Not proper gzip content
       inBuffer.reset();
       inStream = inBuffer;
     }
 
     MimeType mimeType = getMimeType();
     UrlRewriteFilterContentDescriptor filterContentConfig =
-        getRewriteFilterConfig( rewriter.getConfig(), bodyFilterName, mimeType 
);
+                                                
getRewriteFilterConfig(rewriter.getConfig(), bodyFilterName, mimeType);
     if (filterContentConfig != null) {
       String asType = filterContentConfig.asType();
       if ( asType != null && asType.trim().length() > 0 ) {
         mimeType = MimeTypes.create(asType, getCharacterEncoding());
       }
     }
-    InputStream filteredInput = UrlRewriteStreamFilterFactory.create(
-        mimeType, null, inStream, rewriter, this, UrlRewriter.Direction.OUT, 
filterContentConfig );
-    outStream = (isGzip) ? new GZIPOutputStream(output) : output;
+
+    InputStream filteredInput = UrlRewriteStreamFilterFactory.create(mimeType,
+                                                                     null,
+                                                                     inStream,
+                                                                     rewriter,
+                                                                     this,
+                                                                     
UrlRewriter.Direction.OUT,
+                                                                     
filterContentConfig);
+    outStream = (isGzip) ? new GZIPOutputStream(output, STREAM_BUFFER_SIZE) : 
output;
     try {
-        IOUtils.copyLarge( filteredInput, outStream, new 
byte[STREAM_BUFFER_SIZE] );
+      IOUtils.copyLarge(filteredInput, outStream, new 
byte[STREAM_BUFFER_SIZE]);
     } finally {
-        //KNOX-685: outStream.flush();
-        outStream.close();
+      //KNOX-685: outStream.flush();
+      outStream.close();
     }
   }
 
@@ -331,4 +337,77 @@ public class UrlRewriteResponse extends 
GatewayResponseWrapper implements Params
     }
   }
 
+  /**
+   * This InputStream wraps another InputStream to override the available() 
behavior. This is to fulfill
+   * GZIPInputStream's expectation of the available() method, for which the 
behavior actually varies across InputStream
+   * implementations; in some cases, it causes GZIPInputStream to terminate 
prematurely, resulting in partial reads.
+   *
+   * Guaranteeing that the available() method always returns a value greater 
than zero forces the GZIPInputStream to
+   * continue reading from the underlying InputStream until it actually 
reaches the end of the stream.
+   */
+  private static class GZIPInputStreamHelperInputStream extends InputStream {
+
+    private InputStream delegate;
+
+    GZIPInputStreamHelperInputStream(InputStream delegate) {
+      this.delegate = delegate;
+    }
+
+    /**
+     * @return The delegate's available() result if it's > 0, otherwise 1.
+     */
+    @Override
+    public int available() throws IOException {
+      int available = delegate.available();
+      if (available <= 1) {
+        available = 1;
+      }
+      return available;
+    }
+
+    @Override
+    public int read() throws IOException {
+      return delegate.read();
+    }
+
+    @Override
+    public int read(byte[] b) throws IOException {
+      return delegate.read(b);
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+      return delegate.read(b, off, len);
+    }
+
+    @Override
+    public long skip(long n) throws IOException {
+      return delegate.skip(n);
+    }
+
+    @Override
+    public void close() throws IOException {
+      delegate.close();
+    }
+
+    @Override
+    public synchronized void mark(int readlimit) {
+      if (markSupported()) {
+        delegate.mark(readlimit);
+      }
+    }
+
+    @Override
+    public synchronized void reset() throws IOException {
+      if (markSupported()) {
+        delegate.reset();
+      }
+    }
+
+    @Override
+    public boolean markSupported() {
+      return delegate.markSupported();
+    }
+  }
+
 }

Reply via email to