Repository: nifi Updated Branches: refs/heads/master 674c9e468 -> bcac2766b
NIFI-3648 removed message copying when not in debug mode. This closes #1637. Signed-off-by: Mark Payne <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bcac2766 Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bcac2766 Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bcac2766 Branch: refs/heads/master Commit: bcac2766bce63508b7930344b9dc21ceef33bf98 Parents: 674c9e4 Author: Mike Moser <[email protected]> Authored: Thu Mar 30 14:34:26 2017 +0000 Committer: Mark Payne <[email protected]> Committed: Thu Jan 11 11:23:04 2018 -0500 ---------------------------------------------------------------------- .../protocol/impl/SocketProtocolListener.java | 24 ++++++++++++-------- 1 file changed, 14 insertions(+), 10 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/bcac2766/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java index 8958988..e31a547 100644 --- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java +++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-cluster-protocol/src/main/java/org/apache/nifi/cluster/protocol/impl/SocketProtocolListener.java @@ -24,7 +24,6 @@ import java.util.Collection; import java.util.Collections; import java.util.UUID; import java.util.concurrent.CopyOnWriteArrayList; -import java.util.concurrent.TimeUnit; import org.apache.nifi.cluster.protocol.NodeIdentifier; import org.apache.nifi.cluster.protocol.ProtocolContext; @@ -45,6 +44,7 @@ import org.apache.nifi.io.socket.SocketListener; import org.apache.nifi.reporting.Bulletin; import org.apache.nifi.reporting.BulletinRepository; import org.apache.nifi.security.util.CertificateUtils; +import org.apache.nifi.stream.io.ByteCountingInputStream; import org.apache.nifi.util.StopWatch; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -121,9 +121,7 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi @Override public void dispatchRequest(final Socket socket) { - byte[] receivedMessage = null; String hostname = null; - final int maxMsgBuffer = 1024 * 1024; // don't buffer more than 1 MB of the message try { final StopWatch stopWatch = new StopWatch(true); hostname = socket.getInetAddress().getHostName(); @@ -134,15 +132,21 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi // unmarshall message final ProtocolMessageUnmarshaller<ProtocolMessage> unmarshaller = protocolContext.createUnmarshaller(); - final InputStream inStream = socket.getInputStream(); - final CopyingInputStream copyingInputStream = new CopyingInputStream(inStream, maxMsgBuffer); // don't copy more than 1 MB + final ByteCountingInputStream countingIn = new ByteCountingInputStream(socket.getInputStream()); + InputStream wrappedInStream = countingIn; + if (logger.isDebugEnabled()) { + final int maxMsgBuffer = 1024 * 1024; // don't buffer more than 1 MB of the message + final CopyingInputStream copyingInputStream = new CopyingInputStream(wrappedInStream, maxMsgBuffer); + wrappedInStream = copyingInputStream; + } final ProtocolMessage request; try { - request = unmarshaller.unmarshal(copyingInputStream); + request = unmarshaller.unmarshal(wrappedInStream); } finally { - receivedMessage = copyingInputStream.getBytesRead(); - if (logger.isDebugEnabled()) { + if (logger.isDebugEnabled() && wrappedInStream instanceof CopyingInputStream) { + final CopyingInputStream copyingInputStream = (CopyingInputStream) wrappedInStream; + byte[] receivedMessage = copyingInputStream.getBytesRead(); logger.debug("Received message: " + new String(receivedMessage)); } } @@ -181,8 +185,8 @@ public class SocketProtocolListener extends SocketListener implements ProtocolLi stopWatch.stop(); final NodeIdentifier nodeId = getNodeIdentifier(request); final String from = nodeId == null ? hostname : nodeId.toString(); - logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {} millis", - requestId, request.getType(), receivedMessage.length, from, stopWatch.getDuration(TimeUnit.MILLISECONDS)); + logger.info("Finished processing request {} (type={}, length={} bytes) from {} in {}", + requestId, request.getType(), countingIn.getBytesRead(), from, stopWatch.getDuration()); } catch (final IOException | ProtocolException e) { logger.warn("Failed processing protocol message from " + hostname + " due to " + e, e);
