[ 
https://issues.apache.org/jira/browse/BROOKLYN-440?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16050097#comment-16050097
 ] 

ASF GitHub Bot commented on BROOKLYN-440:
-----------------------------------------

Github user neykov commented on a diff in the pull request:

    https://github.com/apache/brooklyn-server/pull/731#discussion_r122135363
  
    --- Diff: 
utils/common/src/main/java/org/apache/brooklyn/util/stream/LoggingOutputStream.java
 ---
    @@ -0,0 +1,134 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing,
    + * software distributed under the License is distributed on an
    + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
    + * KIND, either express or implied.  See the License for the
    + * specific language governing permissions and limitations
    + * under the License.
    + */
    +package org.apache.brooklyn.util.stream;
    +
    +import java.io.FilterOutputStream;
    +import java.io.IOException;
    +import java.io.OutputStream;
    +import java.util.concurrent.atomic.AtomicBoolean;
    +
    +import org.slf4j.Logger;
    +
    +/**
    + * Wraps another output stream, intercepting the writes to log it.
    + */
    +public class LoggingOutputStream extends FilterOutputStream {
    +
    +    private static final OutputStream NOOP_OUTPUT_STREAM = new 
FilterOutputStream(null) {
    +        @Override public void write(int b) throws IOException {
    +        }
    +        @Override public void flush() throws IOException {
    +        }
    +        @Override public void close() throws IOException {
    +        }        
    +    };
    +    
    +    public static Builder builder() {
    +        return new Builder();
    +    }
    +    
    +    public static class Builder {
    +        OutputStream out;
    +        Logger log;
    +        String logPrefix;
    +        
    +        public Builder outputStream(OutputStream val) {
    +            this.out = val;
    +            return this;
    +        }
    +        public Builder logger(Logger val) {
    +            this.log = val;
    +            return this;
    +        }
    +        public Builder logPrefix(String val) {
    +            this.logPrefix = val;
    +            return this;
    +        }
    +        public LoggingOutputStream build() {
    +            return new LoggingOutputStream(this);
    +        }
    +    }
    +    
    +    protected final Logger log;
    +    protected final String logPrefix;
    +    private final AtomicBoolean running = new AtomicBoolean(true);
    +    private final StringBuilder lineSoFar = new StringBuilder(16);
    +
    +    private LoggingOutputStream(Builder builder) {
    +        super(builder.out != null ? builder.out : NOOP_OUTPUT_STREAM);
    +        log = builder.log;
    +        logPrefix = (builder.logPrefix != null) ? builder.logPrefix : "";
    +      }
    +
    +    @Override
    +    public void write(int b) throws IOException {
    +        if (running.get() && b >= 0) onChar(b);
    --- End diff --
    
    Nice catch


> More efficient thread usage for ssh execution
> ---------------------------------------------
>
>                 Key: BROOKLYN-440
>                 URL: https://issues.apache.org/jira/browse/BROOKLYN-440
>             Project: Brooklyn
>          Issue Type: Improvement
>    Affects Versions: 0.10.0
>            Reporter: Aled Sage
>            Priority: Minor
>
> For consuming the stdout/stderr from ssh execution, our use of 
> {{PipedInputStream}}/{{PipedOutputStream}} and {{StreamGobbler}} looks very 
> inefficient (my fault - I wrote it originally!)
> We normally consume 6 threads per ssh execution:
> 1. The calling thread of {{SshMachineLocation.execScript}} is blocker, 
> waiting for it to complete.
> 2. Within sshj, there is a "sftp reader" thread that reads the packets
> 3. Within {{SshjTool.ExecAction}}, we create a {{StreamGobbler}} thread to 
> read the ssh stdout, as it is made available.
> 4. Same for stderr.
> 5. Within {{ExecWithLoggingHelpers.execWithLogging}}, we create a 
> {{StreamGobber}} thread to read + log the ssh stdout, as it is made available.
> 6. Same for stderr.
> I'm pretty sure we can get rid of threads 5 and 6; not sure about 3 and 4 
> though.
> Here is the chain of actions:
> * We call something like {{sshMachineLocation.execScript}}. This can include 
> "out" and "err" config, to obtain the exec stdout/stderr.
> * {{SshMachineLocation}} wraps the command execution in 
> {{ExecWithLoggingHelpers.execWithLogging}}.
> * In order to log the stdout (and same for stderr):
>   * It creates a {{PipedOutputStream}} and {{PipedInputStream}}. It sets the 
> {{PipedOutputStream}} as the stdout to use (i.e. config passed to 
> {{SshjTool}})
>   * It creates a {{StreamGobbler}}, which is a thread that consumes the 
> {{PipedInputStream}} - this logs each line, and also writes each line to the 
> original "out".
> * Within {{SshjTool.ExecAction}}, it has an sshj 
> {{Session.Command.getInputStream()}} and {{.getErrorStream()}} for reading 
> the stdout and stderr.
>   It creates a {{StreamGobbler}}, which is a thread to consume these input 
> streams; it writes the bytes received from that input stream to the {{out}} 
> and {{err}} streams passed in.
> For the logging, a simpler and more efficient approach would be to wrap the 
> OutputStream. See {{com.google.common.io.CountingOutputStream}} for 
> inspiration. It should be as simple as extending 
> {{java.util.FilterOutputStream}}, and overriding the {{write}} and {{close}} 
> methods. The implementation of these methods would call the wrapped 
> outputStream as well as doing some thing very similar to 
> {{StreamGobbler.onChar()}}.
> I don't see a way to do the same trick inside {{SshjTool.ExecAction}}, 
> unfortunately, without changes to sshj to wrap {{ChannelInputStream}} which 
> is created inside {{ net.schmizz.sshj.connection.channel.AbstractChannel}} 's 
> constructor (or to override {{AbstractChannel.receiveInto}} perhaps). None of 
> those seem worth it.
> Below is a trimmed down jstack from executing {{sleep 100}} over ssh:
> {noformat}
> 2017-02-13 11:09:35
> Full thread dump Java HotSpot(TM) 64-Bit Server VM (24.80-b11 mixed mode):
> "main" prio=5 tid=0x00007f8ba180a000 nid=0x1c03 waiting on condition 
> [0x0000700006b21000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x00000007f4069868> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:2176)
>     at net.schmizz.concurrent.Promise.tryRetrieve(Promise.java:170)
>     at net.schmizz.concurrent.Promise.retrieve(Promise.java:137)
>     at net.schmizz.concurrent.Event.await(Event.java:103)
>     at 
> net.schmizz.sshj.connection.channel.AbstractChannel.join(AbstractChannel.java:259)
>     at 
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:1009)
>     at 
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$ShellAction.create(SshjTool.java:926)
>     at 
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:627)
>     at 
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.acquire(SshjTool.java:613)
>     at 
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool$1.run(SshjTool.java:327)
>     at 
> org.apache.brooklyn.util.core.internal.ssh.sshj.SshjTool.execScript(SshjTool.java:329)
>     at 
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$1.exec(ExecWithLoggingHelpers.java:83)
>     at 
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:168)
>     at 
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers$3.apply(ExecWithLoggingHelpers.java:165)
>     at org.apache.brooklyn.util.pool.BasicPool.exec(BasicPool.java:146)
>     at 
> org.apache.brooklyn.location.ssh.SshMachineLocation.execSsh(SshMachineLocation.java:601)
>     at 
> org.apache.brooklyn.location.ssh.SshMachineLocation$13.execWithTool(SshMachineLocation.java:780)
>     at 
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execWithLogging(ExecWithLoggingHelpers.java:165)
>     at 
> org.apache.brooklyn.util.core.task.system.internal.ExecWithLoggingHelpers.execScript(ExecWithLoggingHelpers.java:81)
>     at 
> org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:764)
>     at 
> org.apache.brooklyn.location.ssh.SshMachineLocation.execScript(SshMachineLocation.java:758)
>     at 
> org.apache.brooklyn.location.ssh.SshMachineLocationIntegrationTest.testSlowForVisualInspection(SshMachineLocationIntegrationTest.java:98)
> "sftp reader" prio=5 tid=0x00007f8ba43a6800 nid=0x6503 in Object.wait() 
> [0x0000700008571000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007f3f15690> (a 
> net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at java.lang.Object.wait(Object.java:503)
>     at 
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
>     - locked <0x00000007f3f15690> (a 
> net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at net.schmizz.sshj.sftp.PacketReader.readIntoBuffer(PacketReader.java:51)
>     at 
> net.schmizz.sshj.sftp.PacketReader.getPacketLength(PacketReader.java:59)
>     at net.schmizz.sshj.sftp.PacketReader.readPacket(PacketReader.java:75)
>     at net.schmizz.sshj.sftp.PacketReader.run(PacketReader.java:87)
> "Thread-7" prio=5 tid=0x00007f8ba1a9f800 nid=0x6903 in Object.wait() 
> [0x0000700008777000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007f4069948> (a 
> net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at java.lang.Object.wait(Object.java:503)
>     at 
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
>     - locked <0x00000007f4069948> (a 
> net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at 
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
>     - locked <0x00000007f4069930> (a [B)
>     at 
> org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "Thread-6" prio=5 tid=0x00007f8ba1b2b000 nid=0x6703 in Object.wait() 
> [0x0000700008674000]
>    java.lang.Thread.State: WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007f4061698> (a 
> net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at java.lang.Object.wait(Object.java:503)
>     at 
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:107)
>     - locked <0x00000007f4061698> (a 
> net.schmizz.sshj.common.Buffer$PlainBuffer)
>     at 
> net.schmizz.sshj.connection.channel.ChannelInputStream.read(ChannelInputStream.java:90)
>     - locked <0x00000007f4061680> (a [B)
>     at 
> org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "reader" prio=5 tid=0x00007f8ba385c000 nid=0x6303 runnable 
> [0x000070000846e000]
>    java.lang.Thread.State: RUNNABLE
>     at java.net.SocketInputStream.socketRead0(Native Method)
>     at java.net.SocketInputStream.read(SocketInputStream.java:152)
>     at java.net.SocketInputStream.read(SocketInputStream.java:122)
>     at net.schmizz.sshj.transport.Reader.run(Reader.java:50)
> "brooklyn-execmanager-SrVbReTz-3" daemon prio=5 tid=0x00007f8ba21eb000 
> nid=0x6103 waiting on condition [0x000070000836b000]
>    java.lang.Thread.State: TIMED_WAITING (parking)
>     at sun.misc.Unsafe.park(Native Method)
>     - parking to wait for  <0x00000007fc0088e0> (a 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject)
>     at java.util.concurrent.locks.LockSupport.parkNanos(LockSupport.java:226)
>     at 
> java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.awaitNanos(AbstractQueuedSynchronizer.java:2082)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:1090)
>     at 
> java.util.concurrent.ScheduledThreadPoolExecutor$DelayedWorkQueue.take(ScheduledThreadPoolExecutor.java:807)
>     at 
> java.util.concurrent.ThreadPoolExecutor.getTask(ThreadPoolExecutor.java:1068)
>     at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1130)
>     at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>     at java.lang.Thread.run(Thread.java:745)
> "Thread-2" prio=5 tid=0x00007f8ba21ea000 nid=0x5f03 in Object.wait() 
> [0x0000700008268000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007fc018088> (a java.io.PipedInputStream)
>     at java.io.PipedInputStream.read(PipedInputStream.java:327)
>     - locked <0x00000007fc018088> (a java.io.PipedInputStream)
>     at 
> org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> "Thread-1" prio=5 tid=0x00007f8ba20e7800 nid=0x5d03 in Object.wait() 
> [0x0000700008165000]
>    java.lang.Thread.State: TIMED_WAITING (on object monitor)
>     at java.lang.Object.wait(Native Method)
>     - waiting on <0x00000007fc020088> (a java.io.PipedInputStream)
>     at java.io.PipedInputStream.read(PipedInputStream.java:327)
>     - locked <0x00000007fc020088> (a java.io.PipedInputStream)
>     at 
> org.apache.brooklyn.util.stream.StreamGobbler.run(StreamGobbler.java:81)
> {noformat}
> ([~svet] I spoke to you about this previously, and believe you have some 
> comments?)



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to