Repository: nifi
Updated Branches:
  refs/heads/master 4fdea680e -> 77a676bf9


NIFI-3894: Call Inflater/Deflater.end to free up memory

This closes #1796.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/77a676bf
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/77a676bf
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/77a676bf

Branch: refs/heads/master
Commit: 77a676bf9225d506de77ebec4cd029c917047321
Parents: 4fdea68
Author: Koji Kawamura <[email protected]>
Authored: Mon May 15 09:45:48 2017 +0900
Committer: Mark Payne <[email protected]>
Committed: Tue May 16 13:49:31 2017 -0400

----------------------------------------------------------------------
 .../main/java/org/apache/nifi/remote/AbstractTransaction.java  | 5 +++++
 .../java/org/apache/nifi/remote/io/CompressionInputStream.java | 5 +++--
 .../org/apache/nifi/remote/io/CompressionOutputStream.java     | 5 +++++
 .../nifi/remote/protocol/AbstractFlowFileServerProtocol.java   | 6 ++++++
 4 files changed, 19 insertions(+), 2 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/77a676bf/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
index 826cf00..2d6b2e1 100644
--- 
a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
+++ 
b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java
@@ -148,6 +148,11 @@ public abstract class AbstractTransaction implements 
Transaction {
                 final InputStream dataIn = compress ? new 
CompressionInputStream(is) : is;
                 final DataPacket packet = codec.decode(new 
CheckedInputStream(dataIn, crc));
 
+                if (compress) {
+                    // Close CompressionInputStream to free acquired memory, 
without closing underlying stream.
+                    dataIn.close();
+                }
+
                 if (packet == null) {
                     this.dataAvailable = false;
                 } else {

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a676bf/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
index 6434b2d..f4469ed 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java
@@ -174,12 +174,13 @@ public class CompressionInputStream extends InputStream {
     }
 
     /**
-     * Does nothing. Does NOT close underlying InputStream
+     * Calls {@link Inflater#end()} to free acquired memory to prevent 
OutOfMemory error.
+     * However, does NOT close underlying InputStream.
      *
      * @throws java.io.IOException for any issues closing underlying stream
      */
     @Override
     public void close() throws IOException {
-
+        inflater.end();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a676bf/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
----------------------------------------------------------------------
diff --git 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
index 525b5b1..4491234 100644
--- 
a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
+++ 
b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java
@@ -137,10 +137,15 @@ public class CompressionOutputStream extends OutputStream 
{
         super.flush();
     }
 
+    /**
+     * Flushes remaining buffer and calls {@link Deflater#end()} to free 
acquired memory to prevent OutOfMemory error.
+     * @throws IOException for any issues closing underlying stream
+     */
     @Override
     public void close() throws IOException {
         compressAndWrite();
         out.write(0);   // indicate that the stream is finished.
         out.flush();
+        deflater.end();
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/77a676bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
index fe4b1b1..f539808 100644
--- 
a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
+++ 
b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java
@@ -440,6 +440,12 @@ public abstract class AbstractFlowFileServerProtocol 
implements ServerProtocol {
             final CheckedInputStream checkedInputStream = new 
CheckedInputStream(flowFileInputStream, crc);
 
             final DataPacket dataPacket = codec.decode(checkedInputStream);
+
+            if (handshakeProperties.isUseGzip()) {
+                // Close CompressionInputStream to free acquired memory, 
without closing underlying stream.
+                checkedInputStream.close();
+            }
+
             if (dataPacket == null) {
                 logger.debug("{} Received null dataPacket indicating the end 
of transaction from {}", this, peer);
                 break;

Reply via email to