Repository: nifi Updated Branches: refs/heads/master 4fdea680e -> 77a676bf9
NIFI-3894: Call Inflater/Deflater.end to free up memory This closes #1796. Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/77a676bf Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/77a676bf Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/77a676bf Branch: refs/heads/master Commit: 77a676bf9225d506de77ebec4cd029c917047321 Parents: 4fdea68 Author: Koji Kawamura <[email protected]> Authored: Mon May 15 09:45:48 2017 +0900 Committer: Mark Payne <[email protected]> Committed: Tue May 16 13:49:31 2017 -0400 ---------------------------------------------------------------------- .../main/java/org/apache/nifi/remote/AbstractTransaction.java | 5 +++++ .../java/org/apache/nifi/remote/io/CompressionInputStream.java | 5 +++-- .../org/apache/nifi/remote/io/CompressionOutputStream.java | 5 +++++ .../nifi/remote/protocol/AbstractFlowFileServerProtocol.java | 6 ++++++ 4 files changed, 19 insertions(+), 2 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/77a676bf/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java index 826cf00..2d6b2e1 100644 --- a/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java +++ b/nifi-commons/nifi-site-to-site-client/src/main/java/org/apache/nifi/remote/AbstractTransaction.java @@ -148,6 +148,11 @@ public abstract class AbstractTransaction implements Transaction { final InputStream dataIn = compress ? new CompressionInputStream(is) : is; final DataPacket packet = codec.decode(new CheckedInputStream(dataIn, crc)); + if (compress) { + // Close CompressionInputStream to free acquired memory, without closing underlying stream. + dataIn.close(); + } + if (packet == null) { this.dataAvailable = false; } else { http://git-wip-us.apache.org/repos/asf/nifi/blob/77a676bf/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java index 6434b2d..f4469ed 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionInputStream.java @@ -174,12 +174,13 @@ public class CompressionInputStream extends InputStream { } /** - * Does nothing. Does NOT close underlying InputStream + * Calls {@link Inflater#end()} to free acquired memory to prevent OutOfMemory error. + * However, does NOT close underlying InputStream. * * @throws java.io.IOException for any issues closing underlying stream */ @Override public void close() throws IOException { - + inflater.end(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/77a676bf/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java ---------------------------------------------------------------------- diff --git a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java index 525b5b1..4491234 100644 --- a/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java +++ b/nifi-commons/nifi-utils/src/main/java/org/apache/nifi/remote/io/CompressionOutputStream.java @@ -137,10 +137,15 @@ public class CompressionOutputStream extends OutputStream { super.flush(); } + /** + * Flushes remaining buffer and calls {@link Deflater#end()} to free acquired memory to prevent OutOfMemory error. + * @throws IOException for any issues closing underlying stream + */ @Override public void close() throws IOException { compressAndWrite(); out.write(0); // indicate that the stream is finished. out.flush(); + deflater.end(); } } http://git-wip-us.apache.org/repos/asf/nifi/blob/77a676bf/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java index fe4b1b1..f539808 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-site-to-site/src/main/java/org/apache/nifi/remote/protocol/AbstractFlowFileServerProtocol.java @@ -440,6 +440,12 @@ public abstract class AbstractFlowFileServerProtocol implements ServerProtocol { final CheckedInputStream checkedInputStream = new CheckedInputStream(flowFileInputStream, crc); final DataPacket dataPacket = codec.decode(checkedInputStream); + + if (handshakeProperties.isUseGzip()) { + // Close CompressionInputStream to free acquired memory, without closing underlying stream. + checkedInputStream.close(); + } + if (dataPacket == null) { logger.debug("{} Received null dataPacket indicating the end of transaction from {}", this, peer); break;
