BROOKLYN-106: Fix sshj’s async-exec

- on repeated timeouts of longPoll, then keep
  going.
- On long-poll, reduce the sshTriesTimeout.
- On timeout/error, don’t wait forever for
  stdout/stderr streams to close.
- Potential improvement to disconnect/reconnect.
- Adds Duration.isShorterThan(Stopwatch)


Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo
Commit: 
http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/c06a2297
Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/c06a2297
Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/c06a2297

Branch: refs/heads/master
Commit: c06a2297eadfd5b6322bde303a65cbc0e477c7d1
Parents: 0673146
Author: Aled Sage <[email protected]>
Authored: Thu Jan 8 21:12:58 2015 +0000
Committer: Aled Sage <[email protected]>
Committed: Wed Jan 14 22:38:08 2015 +0000

----------------------------------------------------------------------
 .../util/internal/ssh/ShellAbstractTool.java    |  34 +++--
 .../internal/ssh/sshj/SshjClientConnection.java |   2 +-
 .../util/internal/ssh/sshj/SshjTool.java        | 150 ++++++++++++++++---
 .../sshj/SshjToolAsyncStubIntegrationTest.java  |  14 +-
 .../ssh/sshj/SshjToolIntegrationTest.java       |  80 ++++++----
 .../main/java/brooklyn/util/time/Duration.java  |   9 +-
 6 files changed, 219 insertions(+), 70 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java 
b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
index 82624d7..e677399 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java
@@ -247,7 +247,7 @@ public abstract class ShellAbstractTool implements 
ShellTool {
             }
             this.scriptNameWithoutExtension = "brooklyn-"+
                     Time.makeDateStampString()+"-"+Identifiers.makeRandomId(4)+
-                    (summary==null ? "" : "-"+summary);
+                    (Strings.isBlank(summary) ? "" : "-"+summary);
             this.scriptPath = Os.mergePathsUnix(scriptDir, 
scriptNameWithoutExtension+".sh");
         }
 
@@ -367,14 +367,13 @@ public abstract class ShellAbstractTool implements 
ShellTool {
          * An offset can be given, to only retrieve data starting at a 
particular character (indexed from 0).
          */
         protected List<String> buildLongPollCommand(int stdoutPosition, int 
stderrPosition, Duration timeout) {
-            // Note that `tail -c +1` means start at the *first* character 
(i.e. start counting from 1, not 0)
-            // TODO Relies on commands terminating when ssh connection dropped 
(because not run with `nohup`)
-            String catStdoutCmd = "tail -c +"+(stdoutPosition+1)+" -f 
"+stdoutPath+" &";
-            String catStderrCmd = "tail -c +"+(stderrPosition+1)+" -f 
"+stderrPath+" 1>&2 &";
             long maxTime = Math.max(1, timeout.toSeconds());
             
+            // Note that `tail -c +1` means start at the *first* character 
(i.e. start counting from 1, not 0)
             List<String> waitForExitStatusParts = ImmutableList.of(
                     "# Long poll", // comment is to aid testing - see 
SshjToolAsyncStubIntegrationTest
+                    "tail -c +"+(stdoutPosition+1)+" -f "+stdoutPath+" & 
export TAIL_STDOUT_PID=$!",
+                    "tail -c +"+(stderrPosition+1)+" -f "+stderrPath+" 1>&2 & 
export TAIL_STDERR_PID=$!",
                     "EXIT_STATUS_PATH="+exitStatusPath,
                     "PID_PATH="+pidPath,
                     "MAX_TIME="+maxTime,
@@ -390,9 +389,11 @@ public abstract class ShellAbstractTool implements 
ShellTool {
                     "            sleep 3",
                     "            if test -s $EXIT_STATUS_PATH; then",
                     "                EXIT_STATUS=`cat $EXIT_STATUS_PATH`",
+                    "                kill ${TAIL_STDERR_PID} 
${TAIL_STDOUT_PID}",
                     "                exit $EXIT_STATUS",
                     "            else",
                     "                echo \"No exit status in 
$EXIT_STATUS_PATH, and pid in $PID_PATH ($PID) not executing\"",
+                    "                kill ${TAIL_STDERR_PID} 
${TAIL_STDOUT_PID}",
                     "                exit 126",
                     "            fi",
                     "        fi",
@@ -401,25 +402,32 @@ public abstract class ShellAbstractTool implements 
ShellTool {
                     "    sleep 1",
                     "    COUNTER+=1",
                     "done",
+                    "kill ${TAIL_STDERR_PID} ${TAIL_STDOUT_PID}",
                     "exit 125"+"\n");
             String waitForExitStatus = 
Joiner.on("\n").join(waitForExitStatusParts);
 
-            MutableList.Builder<String> cmds = MutableList.<String>builder()
-                    .add(runAsRoot ? BashCommands.sudo(catStdoutCmd) : 
catStdoutCmd)
-                    .add(runAsRoot ? BashCommands.sudo(catStderrCmd) : 
catStderrCmd)
-                    .add(runAsRoot ? BashCommands.sudo(waitForExitStatus) : 
waitForExitStatus);
-            return cmds.build();
+            return ImmutableList.of(runAsRoot ? 
BashCommands.sudo(waitForExitStatus) : waitForExitStatus);
         }
 
         protected List<String> deleteTemporaryFilesCommand() {
+            ImmutableList.Builder<String> cmdParts = ImmutableList.builder();
+            
             if (!Boolean.TRUE.equals(noDeleteAfterExec)) {
                 // use "-f" because some systems have "rm" aliased to "rm -i"
                 // use "< /dev/null" to guarantee doesn't hang
-                return ImmutableList.of(
+                cmdParts.add(
                         "rm -f "+scriptPath+" "+stdoutPath+" "+stderrPath+" 
"+exitStatusPath+" "+pidPath+" < /dev/null");
-            } else {
-                return ImmutableList.<String>of();
             }
+            
+            // If the buildLongPollCommand didn't complete properly then it 
might have left tail command running;
+            // ensure they are killed.
+            cmdParts.add(
+                    "ps aux | grep \"tail -c\" | grep \""+stdoutPath+"\" | 
grep -v grep | awk '{ printf $2 }' | xargs kill",
+                    "ps aux | grep \"tail -c\" | grep \""+stderrPath+"\" | 
grep -v grep | awk '{ printf $2 }' | xargs kill");
+
+            String cmd = Joiner.on("\n").join(cmdParts.build());
+            
+            return ImmutableList.of(runAsRoot ? BashCommands.sudo(cmd) : cmd);
         }
 
         public abstract int run();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java
----------------------------------------------------------------------
diff --git 
a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java 
b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java
index 52347d2..982022a 100644
--- 
a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java
+++ 
b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java
@@ -169,8 +169,8 @@ public class SshjClientConnection implements 
SshAction<SSHClient> {
             } catch (IOException e) {
                 if (LOG.isDebugEnabled()) LOG.debug("<< exception 
disconnecting from {}: {}", e, e.getMessage());
             }
-            ssh = null;
         }
+        ssh = null;
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
----------------------------------------------------------------------
diff --git a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java 
b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
index e7ecd0c..8fa38ad 100644
--- a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
+++ b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java
@@ -55,7 +55,9 @@ import org.slf4j.LoggerFactory;
 
 import brooklyn.internal.BrooklynFeatureEnablement;
 import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.RuntimeTimeoutException;
 import brooklyn.util.internal.ssh.BackoffLimitedRetryHandler;
+import brooklyn.util.internal.ssh.ShellTool;
 import brooklyn.util.internal.ssh.SshAbstractTool;
 import brooklyn.util.internal.ssh.SshTool;
 import brooklyn.util.io.FileUtil;
@@ -333,6 +335,24 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
         }
     }
 
+    /**
+     * Executes the script in the background (`nohup ... &`), and then 
executes other ssh commands to poll for the
+     * stdout, stderr and exit code of that original process (which will each 
have been written to separate files).
+     * 
+     * The polling is a "long poll". That is, it executes a long-running ssh 
command to retrieve the stdout, etc.
+     * If that long-poll command fails, then we just execute another one to 
pick up from where it left off.
+     * This means we do not need to execute many ssh commands (which are 
expensive), but can still return promptly
+     * when the command completes.
+     * 
+     * Much of this was motivated by 
https://issues.apache.org/jira/browse/BROOKLYN-106, which is no longer
+     * an issue. The retries (e.g. in the upload-script) are arguably overkill 
given that {@link #acquire(SshAction)}
+     * will already retry. However, leaving this in place as it could prove 
useful when working with flakey
+     * networks in the future.
+     * 
+     * TODO There are (probably) issues with this method when using {@link 
ShellTool#PROP_RUN_AS_ROOT}.
+     * I (Aled) saw the .pid file having an owner of root:root, and a failure 
message in stderr of:
+     *   -bash: line 3: 
/tmp/brooklyn-20150113-161203056-XMEo-move_install_dir_from_user_to_.pid: 
Permission denied
+     */
     protected int execScriptAsyncAndPoll(final Map<String,?> props, final 
List<String> commands, final Map<String,?> env) {
         return new ToolAbstractAsyncExecScript(props) {
             private int maxConsecutiveSshFailures = 3;
@@ -346,9 +366,37 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
             
             public int run() {
                 timer = Stopwatch.createStarted();
-                String scriptContents = toScript(props, commands, env);
+                final String scriptContents = toScript(props, commands, env);
                 if (LOG.isTraceEnabled()) LOG.trace("Running shell command at 
{} as async script: {}", host, scriptContents);
-                copyToServer(ImmutableMap.of("permissions", "0700"), 
scriptContents.getBytes(), scriptPath);
+                
+                // Upload script; try repeatedly because have seen timeout 
intermittently on vcloud-director (BROOKLYN-106 related).
+                boolean uploadSuccess = Repeater.create("async script upload 
on "+SshjTool.this.toString()+" (for "+getSummary()+")")
+                        .backoffTo(maxDelayBetweenPolls)
+                        .limitIterationsTo(3)
+                        .rethrowException()
+                        .until(new Callable<Boolean>() {
+                            @Override
+                            public Boolean call() throws Exception {
+                                iteration++;
+                                if (LOG.isDebugEnabled()) {
+                                    String msg = "Uploading 
(iteration="+iteration+") for async script on "+SshjTool.this.toString()+" (for 
"+getSummary()+")";
+                                    if (iteration == 1) {
+                                        LOG.trace(msg);
+                                    } else {
+                                        LOG.debug(msg);
+                                    }
+                                }
+                                copyToServer(ImmutableMap.of("permissions", 
"0700"), scriptContents.getBytes(), scriptPath);
+                                return true;
+                            }})
+                        .run();
+                
+                if (!uploadSuccess) {
+                    // Unexpected! Should have either returned true or have 
rethrown the exception; should never get false.
+                    String msg = "Unexpected state: repeated failure for async 
script upload on "+SshjTool.this.toString()+" ("+getSummary()+")";
+                    LOG.warn(msg+"; rethrowing");
+                    throw new IllegalStateException(msg);
+                }
                 
                 // Execute script asynchronously
                 int execResult = asInt(acquire(new 
ShellAction(buildRunScriptCommand(), out, err, execTimeout)), -1);
@@ -357,7 +405,7 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                 // Long polling to get the status
                 try {
                     final AtomicReference<Integer> result = new 
AtomicReference<Integer>();
-                    boolean success = Repeater.create("async script on 
"+SshjTool.this.toString()+" (for "+getSummary()+")")
+                    boolean success = Repeater.create("async script long-poll 
on "+SshjTool.this.toString()+" (for "+getSummary()+")")
                             .backoffTo(maxDelayBetweenPolls)
                             .limitTimeTo(execTimeout)
                             .until(new Callable<Boolean>() {
@@ -377,13 +425,28 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                         LOG.warn(msg+"; rethrowing");
                         throw new TimeoutException(msg);
                     }
-
+                    
                     return result.get();
                     
                 } catch (Exception e) {
+                    LOG.debug("Problem polling for async script on 
"+SshjTool.this.toString()+" (for "+getSummary()+"); rethrowing after deleting 
temporary files", e);
                     throw Exceptions.propagate(e);
+                } finally {
+                    // Delete the temporary files created (and the `tail -c` 
commands that might have been left behind by long-polls).
+                    // Using pollTimeout so doesn't wait forever, but waits 
for a reasonable (configurable) length of time.
+                    // TODO also execute this if the `buildRunScriptCommand` 
fails, as that might have left files behind?
+                    try {
+                        int execDeleteResult = asInt(acquire(new 
ShellAction(deleteTemporaryFilesCommand(), out, err, pollTimeout)), -1);
+                        if (execDeleteResult != 0) {
+                            LOG.debug("Problem deleting temporary files of 
async script on "+SshjTool.this.toString()+" (for "+getSummary()+"): exit 
status "+execDeleteResult);
+                        }
+                    } catch (Exception e) {
+                        Exceptions.propagateIfFatal(e);
+                        LOG.debug("Problem deleting temporary files of async 
script on "+SshjTool.this.toString()+" (for "+getSummary()+"); continuing", e);
+                    }
                 }
             }
+            
             Integer longPoll() throws IOException {
                 // Long-polling to get stdout, stderr + exit status of async 
task.
                 // If our long-poll disconnects, we will just re-execute.
@@ -391,11 +454,20 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                 // If we disconnect, we will pick up from that char of the 
stream.
                 // TODO Additional stdout/stderr written by 
buildLongPollCommand() could interfere, 
                 //      causing us to miss some characters.
-                // TODO Want to include timeout for long-polling
                 Duration nextPollTimeout = Duration.min(pollTimeout, 
Duration.millis(execTimeout.toMilliseconds()-timer.elapsed(TimeUnit.MILLISECONDS)));
                 CountingOutputStream countingOut = (out == null) ? null : new 
CountingOutputStream(out);
                 CountingOutputStream countingErr = (err == null) ? null : new 
CountingOutputStream(err);
-                int longPollResult = asInt(acquire(new 
ShellAction(buildLongPollCommand(stdoutCount, stderrCount, nextPollTimeout), 
countingOut, countingErr, nextPollTimeout)), -1);
+                List<String> pollCommand = buildLongPollCommand(stdoutCount, 
stderrCount, nextPollTimeout);
+                Duration sshJoinTimeout = 
nextPollTimeout.add(Duration.TEN_SECONDS);
+                ShellAction action = new ShellAction(pollCommand, countingOut, 
countingErr, sshJoinTimeout);
+                
+                int longPollResult;
+                try {
+                    longPollResult = asInt(acquire(action, 3, 
nextPollTimeout), -1);
+                } catch (RuntimeTimeoutException e) {
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll timed out 
on "+SshjTool.this.toString()+" (for "+getSummary()+"): "+e);
+                    return null;
+                }
                 stdoutCount += (countingOut == null) ? 0 : 
countingOut.getCount();
                 stderrCount += (countingErr == null) ? 0 : 
countingErr.getCount();
                 
@@ -407,7 +479,14 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                     // probably a connection failure; try again
                     if (LOG.isDebugEnabled()) LOG.debug("Long-poll received 
exit status -1; will retry on "+SshjTool.this.toString()+" (for 
"+getSummary()+")");
                     return null;
-                    
+
+                } else if (longPollResult == 125) {
+                    // 125 is the special code for timeout in long-poll (see 
buildLongPollCommand).
+                    // However, there is a tiny chance that the underlying 
command might have returned that exact exit code!
+                    // Don't treat a timeout as a "consecutiveSshFailure".
+                    if (LOG.isDebugEnabled()) LOG.debug("Long-poll received 
exit status "+longPollResult+"; most likely timeout; retrieving actual status 
on "+SshjTool.this.toString()+" (for "+getSummary()+")");
+                    return retrieveStatusCommand();
+
                 } else {
                     // want to double-check whether this is the exit-code from 
the async process, or
                     // some unexpected failure in our long-poll command.
@@ -427,6 +506,7 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                     return null;
                 }
             }
+            
             Integer retrieveStatusCommand() throws IOException {
                 // want to double-check whether this is the exit-code from the 
async process, or
                 // some unexpected failure in our long-poll command.
@@ -469,6 +549,7 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
             }
         }.run();
     }
+    
     public int execShellDirect(Map<String,?> props, List<String> commands, 
Map<String,?> env) {
         OutputStream out = getOptionalVal(props, PROP_OUT_STREAM);
         OutputStream err = getOptionalVal(props, PROP_ERR_STREAM);
@@ -493,6 +574,13 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
         if (Boolean.FALSE.equals(props.get("blocks"))) {
             throw new IllegalArgumentException("Cannot exec non-blocking: 
command="+commands);
         }
+        
+        // If async is set, then do it as execScript
+        Boolean execAsync = getOptionalVal(props, PROP_EXEC_ASYNC);
+        if (Boolean.TRUE.equals(execAsync) && 
BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC))
 {
+            return execScriptAsyncAndPoll(props, commands, env);
+        }
+
         OutputStream out = getOptionalVal(props, PROP_OUT_STREAM);
         OutputStream err = getOptionalVal(props, PROP_ERR_STREAM);
         String separator = getOptionalVal(props, PROP_SEPARATOR);
@@ -526,6 +614,10 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
     }
 
     protected <T, C extends SshAction<T>> T acquire(C action) {
+        return acquire(action, sshTries, sshTriesTimeout == 0 ? 
Duration.PRACTICALLY_FOREVER : Duration.millis(sshTriesTimeout));
+    }
+    
+    protected <T, C extends SshAction<T>> T acquire(C action, int sshTries, 
Duration sshTriesTimeout) {
         Stopwatch stopwatch = Stopwatch.createStarted();
         
         for (int i = 0; i < sshTries; i++) {
@@ -561,7 +653,7 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                 String errorMessage = String.format("(%s) error acquiring %s", 
toString(), action);
                 String fullMessage = String.format("%s (attempt %s/%s, in time 
%s/%s)", 
                         errorMessage, (i+1), sshTries, 
Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), 
-                        (sshTriesTimeout > 0 ? 
Time.makeTimeStringRounded(sshTriesTimeout) : "unlimited"));
+                        (sshTriesTimeout.equals(Duration.PRACTICALLY_FOREVER) 
? "unlimited" : Time.makeTimeStringRounded(sshTriesTimeout)));
                 try {
                     disconnect();
                 } catch (Exception e2) {
@@ -570,9 +662,9 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                 if (i + 1 == sshTries) {
                     LOG.debug("<< {} (rethrowing, out of retries): {}", 
fullMessage, e.getMessage());
                     throw propagate(e, fullMessage + "; out of retries");
-                } else if (sshTriesTimeout > 0 && 
stopwatch.elapsed(TimeUnit.MILLISECONDS) > sshTriesTimeout) {
-                    LOG.debug("<< {} (rethrowing, out of time - max {}): {}", 
new Object[] { fullMessage, Time.makeTimeString(sshTriesTimeout, true), 
e.getMessage() });
-                    throw propagate(e, fullMessage + "; out of time");
+                } else if (sshTriesTimeout.isShorterThan(stopwatch)) {
+                    LOG.debug("<< {} (rethrowing, out of time - max {}): {}", 
new Object[] { fullMessage, Time.makeTimeStringRounded(sshTriesTimeout), 
e.getMessage() });
+                    throw new RuntimeTimeoutException(fullMessage + "; out of 
time", e);
                 } else {
                     if (LOG.isDebugEnabled()) LOG.debug("<< {}: {}", 
fullMessage, e.getMessage());
                     backoffForAttempt(i + 1, errorMessage + ": " + 
e.getMessage());
@@ -804,15 +896,16 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                     errgobbler.start();
                 }
                 try {
-                    
                     output.join((int)Math.min(timeout.toMilliseconds(), 
Integer.MAX_VALUE), TimeUnit.MILLISECONDS);
                     return output;
                     
                 } finally {
                     // wait for all stdout/stderr to have been re-directed
                     try {
-                        if (outgobbler != null) outgobbler.join();
-                        if (errgobbler != null) errgobbler.join();
+                        // Don't use forever (i.e. 0) because BROOKLYN-106: 
ssh hangs
+                        long joinTimeout = 10*1000;
+                        if (outgobbler != null) outgobbler.join(joinTimeout);
+                        if (errgobbler != null) errgobbler.join(joinTimeout);
                     } catch (InterruptedException e) {
                         LOG.warn("Interrupted gobbling streams from ssh: 
"+command, e);
                         Thread.currentThread().interrupt();
@@ -904,6 +997,7 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                 }
                 closeWhispering(output, this);
                 
+                boolean timedOut = false;
                 try {
                     long timeoutMillis = Math.min(timeout.toMilliseconds(), 
Integer.MAX_VALUE);
                     long timeoutEnd = System.currentTimeMillis() + 
timeoutMillis;
@@ -917,19 +1011,25 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                             (!shell.isOpen() || 
((SessionChannel)session).getExitStatus()!=null);
                         try {
                             shell.join(1000, TimeUnit.MILLISECONDS);
-                        } catch (ConnectionException e) { last = e; }
-                        if (endBecauseReturned)
+                        } catch (ConnectionException e) {
+                            last = e;
+                        }
+                        if (endBecauseReturned) {
                             // shell is still open, ie some process is running
                             // but we have a result code, so main shell is 
finished
                             // we waited one second extra to allow any 
background process 
                             // which is nohupped to really be in the 
background (#162)
                             // now let's bail out
                             break;
+                        }
                     } while (System.currentTimeMillis() < timeoutEnd);
                     if (shell.isOpen() && 
((SessionChannel)session).getExitStatus()==null) {
-                        LOG.debug("Timeout ({}) in SSH shell to {}", 
sshClientConnection.getSessionTimeout(), this);
-                        // we timed out, or other problem -- reproduce the 
error
-                        throw last;
+                        LOG.debug("Timeout ({}) in SSH shell to {}", timeout, 
this);
+                        // we timed out, or other problem -- reproduce the 
error.
+                        // The shell.join should always have thrown 
ConnectionExceptoin (looking at code of
+                        // AbstractChannel), but javadoc of Channel doesn't 
explicity say that so play it safe.
+                        timedOut = true;
+                        throw (last != null) ? last : new 
TimeoutException("Timeout after "+timeout+" executing "+this);
                     }
                     return ((SessionChannel)session).getExitStatus();
                 } finally {
@@ -937,8 +1037,16 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                     closeWhispering(shell, this);
                     shell = null;
                     try {
-                        if (outgobbler != null) outgobbler.join();
-                        if (errgobbler != null) errgobbler.join();
+                        // Don't use forever (i.e. 0) because BROOKLYN-106: 
ssh hangs
+                        long joinTimeout = (timedOut) ? 1000 : 10*1000;
+                        if (outgobbler != null) {
+                            outgobbler.join(joinTimeout);
+                            outgobbler.close();
+                        }
+                        if (errgobbler != null) {
+                            errgobbler.join(joinTimeout);
+                            errgobbler.close();
+                        }
                     } catch (InterruptedException e) {
                         LOG.warn("Interrupted gobbling streams from ssh: 
"+commands, e);
                         Thread.currentThread().interrupt();

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
 
b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
index 2acb200..caebac1 100644
--- 
a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
+++ 
b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java
@@ -29,7 +29,9 @@ import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
 import org.testng.annotations.Test;
 
+import brooklyn.internal.BrooklynFeatureEnablement;
 import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.internal.ssh.SshAbstractTool.SshAction;
 import brooklyn.util.internal.ssh.sshj.SshjTool.ShellAction;
 import brooklyn.util.time.Duration;
 
@@ -58,15 +60,17 @@ public class SshjToolAsyncStubIntegrationTest {
     private SshjTool tool;
     private List<InjectedResult> sequence;
     int counter = 0;
+    private boolean origFeatureEnablement;
     
     @BeforeMethod(alwaysRun=true)
     public void setUp() throws Exception {
+        origFeatureEnablement = 
BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC);
         sequence = Lists.newArrayList();
         counter = 0;
         
         tool = new SshjTool(ImmutableMap.<String,Object>of("host", 
"localhost")) {
             @SuppressWarnings("unchecked")
-            protected <T, C extends SshAction<T>> T acquire(C action) {
+            protected <T, C extends SshAction<T>> T acquire(C action, int 
sshTries, Duration sshTriesTimeout) {
                 if (action instanceof SshjTool.ShellAction) {
                     SshjTool.ShellAction shellAction = (SshjTool.ShellAction) 
action;
                     InjectedResult injectedResult = sequence.get(counter);
@@ -74,14 +78,18 @@ public class SshjToolAsyncStubIntegrationTest {
                     counter++;
                     return (T) injectedResult.result.apply(shellAction);
                 }
-                return super.acquire(action);
+                return super.acquire(action, sshTries, sshTriesTimeout);
             }
         };
     }
 
     @AfterMethod(alwaysRun=true)
     public void tearDown() throws Exception {
-        if (tool != null) tool.disconnect();
+        try {
+            if (tool != null) tool.disconnect();
+        } finally {
+            
BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC,
 origFeatureEnablement);
+        }
     }
     
     private Predicate<SshjTool.ShellAction> containsCmd(final String cmd) {

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
 
b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
index b81712d..c809ad1 100644
--- 
a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
+++ 
b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java
@@ -37,8 +37,10 @@ import net.schmizz.sshj.connection.channel.direct.Session;
 
 import org.testng.annotations.Test;
 
+import brooklyn.internal.BrooklynFeatureEnablement;
 import brooklyn.test.Asserts;
 import brooklyn.util.exceptions.Exceptions;
+import brooklyn.util.exceptions.RuntimeTimeoutException;
 import brooklyn.util.internal.ssh.SshException;
 import brooklyn.util.internal.ssh.SshTool;
 import brooklyn.util.internal.ssh.SshToolAbstractIntegrationTest;
@@ -129,7 +131,7 @@ public class SshjToolIntegrationTest extends 
SshToolAbstractIntegrationTest {
         try {
             localtool.execScript(ImmutableMap.<String,Object>of(), 
ImmutableList.of("true"));
             fail();
-        } catch (SshException e) {
+        } catch (RuntimeTimeoutException e) {
             if (!e.toString().contains("out of time")) throw e;
             assertEquals(callCount.get(), 2);
         }
@@ -165,45 +167,56 @@ public class SshjToolIntegrationTest extends 
SshToolAbstractIntegrationTest {
 
     @Test(groups = {"Integration"})
     public void testAsyncExecStdoutAndStderr() throws Exception {
-        // Include a sleep, to ensure that the contents retrieved in first 
poll and subsequent polls are appended
-        List<String> cmds = ImmutableList.of(
-                "echo mystringToStdout",
-                "echo mystringToStderr 1>&2",
-                "sleep 3",
-                "echo mystringPostSleepToStdout",
-                "echo mystringPostSleepToStderr 1>&2");
-        
-        ByteArrayOutputStream out = new ByteArrayOutputStream();
-        ByteArrayOutputStream err = new ByteArrayOutputStream();
-        int exitCode = tool.execScript(
-                ImmutableMap.of(
-                        "out", out, 
-                        "err", err, 
-                        SshjTool.PROP_EXEC_ASYNC.getName(), true, 
-                        SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true,
-                        SshjTool.PROP_EXEC_ASYNC_POLLING_TIMEOUT.getName(), 
Duration.ONE_SECOND), 
-                cmds, 
-                ImmutableMap.<String,String>of());
-        String outStr = new String(out.toByteArray());
-        String errStr = new String(err.toByteArray());
-
-        assertEquals(exitCode, 0);
-        assertEquals(outStr.trim(), 
"mystringToStdout\nmystringPostSleepToStdout");
-        assertEquals(errStr.trim(), 
"mystringToStderr\nmystringPostSleepToStderr");
+        boolean origFeatureEnablement = 
BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC);
+        try {
+            // Include a sleep, to ensure that the contents retrieved in first 
poll and subsequent polls are appended
+            List<String> cmds = ImmutableList.of(
+                    "echo mystringToStdout",
+                    "echo mystringToStderr 1>&2",
+                    "sleep 5",
+                    "echo mystringPostSleepToStdout",
+                    "echo mystringPostSleepToStderr 1>&2");
+            
+            ByteArrayOutputStream out = new ByteArrayOutputStream();
+            ByteArrayOutputStream err = new ByteArrayOutputStream();
+            int exitCode = tool.execScript(
+                    ImmutableMap.of(
+                            "out", out, 
+                            "err", err, 
+                            SshjTool.PROP_EXEC_ASYNC.getName(), true, 
+                            SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true,
+                            
SshjTool.PROP_EXEC_ASYNC_POLLING_TIMEOUT.getName(), Duration.ONE_SECOND), 
+                    cmds, 
+                    ImmutableMap.<String,String>of());
+            String outStr = new String(out.toByteArray());
+            String errStr = new String(err.toByteArray());
+    
+            assertEquals(exitCode, 0);
+            assertEquals(outStr.trim(), 
"mystringToStdout\nmystringPostSleepToStdout");
+            assertEquals(errStr.trim(), 
"mystringToStderr\nmystringPostSleepToStderr");
+        } finally {
+            
BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC,
 origFeatureEnablement);
+        }
     }
 
     @Test(groups = {"Integration"})
     public void testAsyncExecReturnsExitCode() throws Exception {
-        int exitCode = tool.execScript(
-                ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true), 
-                ImmutableList.of("exit 123"), 
-                ImmutableMap.<String,String>of());
-        assertEquals(exitCode, 123);
+        boolean origFeatureEnablement = 
BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC);
+        try {
+            int exitCode = tool.execScript(
+                    ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true), 
+                    ImmutableList.of("exit 123"), 
+                    ImmutableMap.<String,String>of());
+            assertEquals(exitCode, 123);
+        } finally {
+            
BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC,
 origFeatureEnablement);
+        }
     }
 
     @Test(groups = {"Integration"})
     public void testAsyncExecTimesOut() throws Exception {
         Stopwatch stopwatch = Stopwatch.createStarted();
+        boolean origFeatureEnablement = 
BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC);
         try {
             tool.execScript(
                 ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, 
SshjTool.PROP_EXEC_TIMEOUT.getName(), Duration.millis(1)), 
@@ -213,6 +226,8 @@ public class SshjToolIntegrationTest extends 
SshToolAbstractIntegrationTest {
         } catch (Exception e) {
             TimeoutException te = Exceptions.getFirstThrowableOfType(e, 
TimeoutException.class);
             if (te == null) throw e;
+        } finally {
+            
BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC,
 origFeatureEnablement);
         }
         
         long seconds = stopwatch.elapsed(TimeUnit.SECONDS);
@@ -234,6 +249,8 @@ public class SshjToolIntegrationTest extends 
SshToolAbstractIntegrationTest {
                 long seconds = stopwatch.elapsed(TimeUnit.SECONDS);
                 assertTrue(seconds < 30, "exec took "+seconds+" seconds");
             }});
+        
+        boolean origFeatureEnablement = 
BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC);
         try {
             thread.start();
             
@@ -249,6 +266,7 @@ public class SshjToolIntegrationTest extends 
SshToolAbstractIntegrationTest {
             assertFalse(thread.isAlive());
         } finally {
             thread.interrupt();
+            
BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC,
 origFeatureEnablement);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/utils/common/src/main/java/brooklyn/util/time/Duration.java
----------------------------------------------------------------------
diff --git a/utils/common/src/main/java/brooklyn/util/time/Duration.java 
b/utils/common/src/main/java/brooklyn/util/time/Duration.java
index 3950bc7..0f73ef8 100644
--- a/utils/common/src/main/java/brooklyn/util/time/Duration.java
+++ b/utils/common/src/main/java/brooklyn/util/time/Duration.java
@@ -277,10 +277,18 @@ public class Duration implements Comparable<Duration>, 
Serializable {
         return compareTo(x) > 0;
     }
 
+    public boolean isLongerThan(Stopwatch stopwatch) {
+        return 
isLongerThan(Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
+    }
+
     public boolean isShorterThan(Duration x) {
         return compareTo(x) < 0;
     }
 
+    public boolean isShorterThan(Stopwatch stopwatch) {
+        return 
isShorterThan(Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS)));
+    }
+
     /** returns the larger of this value or the argument */
     public Duration lowerBound(Duration alternateMinimumValue) {
         if (isShorterThan(alternateMinimumValue)) return alternateMinimumValue;
@@ -303,5 +311,4 @@ public class Duration implements Comparable<Duration>, 
Serializable {
     public Duration maximum(Duration alternateMaximumValue) {
         return upperBound(alternateMaximumValue);
     }
-    
 }

Reply via email to