[
https://issues.apache.org/jira/browse/BROOKLYN-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16047813#comment-16047813
]
ASF GitHub Bot commented on BROOKLYN-440:
-----------------------------------------
Github user geomacy commented on a diff in the pull request:
https://github.com/apache/brooklyn-server/pull/731#discussion_r121618034
--- Diff:
utils/common/src/main/java/org/apache/brooklyn/util/stream/LoggingOutputStream.java
---
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.brooklyn.util.stream;
+
+import java.io.FilterOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.slf4j.Logger;
+
+/**
+ * Wraps another output stream, intercepting the writes to log it.
+ */
+public class LoggingOutputStream extends FilterOutputStream {
+
+ private static final OutputStream NOOP_OUTPUT_STREAM = new
FilterOutputStream(null) {
+ @Override public void write(int b) throws IOException {
+ }
+ @Override public void flush() throws IOException {
+ }
+ @Override public void close() throws IOException {
+ }
+ };
+
+ public static Builder builder() {
+ return new Builder();
+ }
+
+ public static class Builder {
+ OutputStream out;
+ Logger log;
+ String logPrefix;
+
+ public Builder outputStream(OutputStream val) {
+ this.out = val;
+ return this;
+ }
+ public Builder logger(Logger val) {
+ this.log = val;
+ return this;
+ }
+ public Builder logPrefix(String val) {
+ this.logPrefix = val;
+ return this;
+ }
+ public LoggingOutputStream build() {
+ return new LoggingOutputStream(this);
+ }
+ }
+
+ protected final Logger log;
+ protected final String logPrefix;
+ private final AtomicBoolean running = new AtomicBoolean(true);
+ private final StringBuilder lineSoFar = new StringBuilder(16);
+
+ private LoggingOutputStream(Builder builder) {
+ super(builder.out != null ? builder.out : NOOP_OUTPUT_STREAM);
+ log = builder.log;
+ logPrefix = (builder.logPrefix != null) ? builder.logPrefix : "";
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ if (running.get() && b >= 0) onChar(b);
+ out.write(b);
+ }
+
+ @Override
+ public void flush() throws IOException {
+ try {
+ if (lineSoFar.length() > 0) {
+ onLine(lineSoFar.toString());
+ lineSoFar.setLength(0);
+ }
+ } finally {
+ super.flush();
+ }
+ }
+
+ // Overriding close() because FilterOutputStream's close() method
pre-JDK8 has bad behavior:
+ // it silently ignores any exception thrown by flush(). Instead, just
close the delegate stream.
+ // It should flush itself if necessary.
+ @Override
+ public void close() throws IOException {
+ try {
+ onLine(lineSoFar.toString());
+ lineSoFar.setLength(0);
+ } finally {
+ out.close();
+ running.set(false);
+ }
+ }
+
+ public void onChar(int c) {
+ if (c=='\n' || c=='\r') {
+ if (lineSoFar.length()>0)
+ //suppress blank lines, so that we can treat either
newline char as a line separator
+ //(eg to show curl updates frequently)
+ onLine(lineSoFar.toString());
+ lineSoFar.setLength(0);
+ } else {
+ lineSoFar.append((char)c);
+ }
+ }
+
+ public void onLine(String line) {
+ //right trim, in case there is \r or other funnies
+ while (line.length()>0 &&
Character.isWhitespace(line.charAt(line.length()-1)))
+ line = line.substring(0, line.length()-1);
+ //right trim, in case there is \r or other funnies
--- End diff --
`left trim` here; not sure I see the need for this check, since `onChar`
never appends `\n` or `\r` to `lineSoFar`?
> More efficient thread usage for ssh execution
> ---------------------------------------------
>
> Key: BROOKLYN-440
> URL: https://issues.apache.org/jira/browse/BROOKLYN-440
> Project: Brooklyn
> Issue Type: Improvement
> Affects Versions: 0.10.0
> Reporter: Aled Sage
> Priority: Minor
>
> For consuming the stdout/stderr from ssh execution, our use of
> {{PipedInputStream}}/{{PipedOutputStream}} and {{StreamGobbler}} looks very
> inefficient (my fault - I wrote it originally!)
> We normally consume 6 threads per ssh execution:
> 1. The calling thread of {{SshMachineLocation.execScript}} is blocker,
> waiting for it to complete.
> 2. Within sshj, there is a "sftp reader" thread that reads the packets
> 3. Within {{SshjTool.ExecAction}}, we create a {{StreamGobbler}} thread to
> read the ssh stdout, as it is made available.
> 4. Same for stderr.
> 5. Within {{ExecWithLoggingHelpers.execWithLogging}}, we create a
> {{StreamGobber}} thread to read + log the ssh stdout, as it is made available.
> 6. Same for stderr.
> I'm pretty sure we can get rid of threads 5 and 6; not sure about 3 and 4
> though.
> Here is the chain of actions:
> * We call something like {{sshMachineLocation.execScript}}. This can include
> "out" and "err" config, to obtain the exec stdout/stderr.
> * {{SshMachineLocation}} wraps the command execution in
> {{ExecWithLoggingHelpers.execWithLogging}}.
> * In order to log the stdout (and same for stderr):
> * It creates a {{PipedOutputStream}} and {{PipedInputStream}}. It sets the
> {{PipedOutputStream}} as the stdout to use (i.e. config passed to
> {{SshjTool}})
> * It creates a {{StreamGobbler}}, which is a thread that consumes the
> {{PipedInputStream}} - this logs each line, and also writes each line to the
> original "out".
> * Within {{SshjTool.ExecAction}}, it has an sshj
> {{Session.Command.getInputStream()}} and {{.getErrorStream()}} for reading
> the stdout and stderr.
> It creates a {{StreamGobbler}}, which is a thread to consume these input
> streams; it writes the bytes received from that input stream to the {{out}}
> and {{err}} streams passed in.
> For the logging, a simpler and more efficient approach would be to wrap the
> OutputStream. See {{com.google.common.io.CountingOutputStream}} for
> inspiration. It should be as simple as extending
> {{java.util.FilterOutputStream}}, and overriding the {{write}} and {{close}}
> methods. The implementation of these methods would call the wrapped
> outputStream as well as doing some thing very similar to
> {{StreamGobbler.onChar()}}.
> I don't see a way to do the same trick inside {{SshjTool.ExecAction}},
> unfortunately, without changes to sshj to wrap {{ChannelInputStream}} which
> is created inside {{ net.schmizz.sshj.connection.channel.AbstractChannel}} 's
> constructor (or to override {{AbstractChannel.receiveInto}} perhaps). None of
> those seem worth it.
> Below is a trimmed down jstack from executing {{sleep 100}} over ssh:
> {noformat}
> 2017-02-13 11:09:35
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.80-b11 mixed mode):
> "main" prio=5 tid=0x00007f8ba180a000 nid=0x1c03 waiting on condition
> [0x0000700006b21000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000007f4069868> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
> at net.schmizz.concurrent.Promise.tryRetrieve(Promise.java:170)
> at net.schmizz.concurrent.Promise.retrieve(Promise.java:137)
> at net.schmizz.concurrent.Event.await(Event.java:103)
> at
> net.schmizz.sshj.connection.channel.AbstractChannel.join(AbstractChannel.java:259)
> at
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:1009)
> at
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:926)
> at
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:627)
> at
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:613)
> at
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$1.run(SshjTool.java:327)
> at
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.execScript(SshjTool.java:329)
> at
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$1.exec(ExecWithLoggingHelpers.java:83)
> at
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:168)
> at
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:165)
> at org.apache.brooklyn.util.pool.BasicPool.exec(BasicPool.java:146)
> at
> org.apache.brooklyn.location.ssh.SshMachineLocation.execSsh(SshMachineLocation.java:601)
> at
> org.apache.brooklyn.location.ssh.SshMachineLocation$13.execWithTool(SshMachineLocation.java:780)
> at
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execWithLogging(ExecWithLoggingHelpers.java:165)
> at
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execScript(ExecWithLoggingHelpers.java:81)
> at
> org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:764)
> at
> org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:758)
> at
> org.apache.brooklyn.location.ssh.SshMachineLocationIntegrationTest.testSlowForVisualInspection(SshMachineLocationIntegrationTest.java:98)
> "sftp reader" prio=5 tid=0x00007f8ba43a6800 nid=0x6503 in Object.wait()
> [0x0000700008571000]
> java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> - waiting on <0x00000007f3f15690> (a
> net.schmizz.sshj.common.Buffer$PlainBuffer)
> at java.lang.Object.wait(Object.java:503)
> at
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
> - locked <0x00000007f3f15690> (a
> net.schmizz.sshj.common.Buffer$PlainBuffer)
> at net.schmizz.sshj.sftp.PacketReader.readIntoBuffer(PacketReader.java:51)
> at
> net.schmizz.sshj.sftp.PacketReader.getPacketLength(PacketReader.java:59)
> at net.schmizz.sshj.sftp.PacketReader.readPacket(PacketReader.java:75)
> at net.schmizz.sshj.sftp.PacketReader.run(PacketReader.java:87)
> "Thread-7" prio=5 tid=0x00007f8ba1a9f800 nid=0x6903 in Object.wait()
> [0x0000700008777000]
> java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> - waiting on <0x00000007f4069948> (a
> net.schmizz.sshj.common.Buffer$PlainBuffer)
> at java.lang.Object.wait(Object.java:503)
> at
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
> - locked <0x00000007f4069948> (a
> net.schmizz.sshj.common.Buffer$PlainBuffer)
> at
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
> - locked <0x00000007f4069930> (a [B)
> at
> org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "Thread-6" prio=5 tid=0x00007f8ba1b2b000 nid=0x6703 in Object.wait()
> [0x0000700008674000]
> java.lang.Thread.State: WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> - waiting on <0x00000007f4061698> (a
> net.schmizz.sshj.common.Buffer$PlainBuffer)
> at java.lang.Object.wait(Object.java:503)
> at
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
> - locked <0x00000007f4061698> (a
> net.schmizz.sshj.common.Buffer$PlainBuffer)
> at
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
> - locked <0x00000007f4061680> (a [B)
> at
> org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "reader" prio=5 tid=0x00007f8ba385c000 nid=0x6303 runnable
> [0x000070000846e000]
> java.lang.Thread.State: RUNNABLE
> at java.net.SocketInputStream.socketRead0(Native Method)
> at java.net.SocketInputStream.read(SocketInputStream.java:152)
> at java.net.SocketInputStream.read(SocketInputStream.java:122)
> at net.schmizz.sshj.transport.Reader.run(Reader.java:50)
> "brooklyn-execmanager-SrVbReTz-3" daemon prio=5 tid=0x00007f8ba21eb000
> nid=0x6103 waiting on condition [0x000070000836b000]
> java.lang.Thread.State: TIMED_WAITING (parking)
> at sun.misc.Unsafe.park(Native Method)
> - parking to wait for <0x00000007fc0088e0> (a
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
> at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
> at
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
> at
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
> at
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
> at
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
> at
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> at java.lang.Thread.run(Thread.java:745)
> "Thread-2" prio=5 tid=0x00007f8ba21ea000 nid=0x5f03 in Object.wait()
> [0x0000700008268000]
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> - waiting on <0x00000007fc018088> (a java.io.PipedInputStream)
> at java.io.PipedInputStream.read(PipedInputStream.java:327)
> - locked <0x00000007fc018088> (a java.io.PipedInputStream)
> at
> org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "Thread-1" prio=5 tid=0x00007f8ba20e7800 nid=0x5d03 in Object.wait()
> [0x0000700008165000]
> java.lang.Thread.State: TIMED_WAITING (on object monitor)
> at java.lang.Object.wait(Native Method)
> - waiting on <0x00000007fc020088> (a java.io.PipedInputStream)
> at java.io.PipedInputStream.read(PipedInputStream.java:327)
> - locked <0x00000007fc020088> (a java.io.PipedInputStream)
> at
> org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> {noformat}
> ([~svet] I spoke to you about this previously, and believe you have some
> comments?)
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)