BROOKLYN-106: Fix sshjâs async-exec - on repeated timeouts of longPoll, then keep going. - On long-poll, reduce the sshTriesTimeout. - On timeout/error, donât wait forever for stdout/stderr streams to close. - Potential improvement to disconnect/reconnect. - Adds Duration.isShorterThan(Stopwatch)
Project: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/commit/c06a2297 Tree: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/tree/c06a2297 Diff: http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/diff/c06a2297 Branch: refs/heads/master Commit: c06a2297eadfd5b6322bde303a65cbc0e477c7d1 Parents: 0673146 Author: Aled Sage <[email protected]> Authored: Thu Jan 8 21:12:58 2015 +0000 Committer: Aled Sage <[email protected]> Committed: Wed Jan 14 22:38:08 2015 +0000 ---------------------------------------------------------------------- .../util/internal/ssh/ShellAbstractTool.java | 34 +++-- .../internal/ssh/sshj/SshjClientConnection.java | 2 +- .../util/internal/ssh/sshj/SshjTool.java | 150 ++++++++++++++++--- .../sshj/SshjToolAsyncStubIntegrationTest.java | 14 +- .../ssh/sshj/SshjToolIntegrationTest.java | 80 ++++++---- .../main/java/brooklyn/util/time/Duration.java | 9 +- 6 files changed, 219 insertions(+), 70 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java index 82624d7..e677399 100644 --- a/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java +++ b/core/src/main/java/brooklyn/util/internal/ssh/ShellAbstractTool.java @@ -247,7 +247,7 @@ public abstract class ShellAbstractTool implements ShellTool { } this.scriptNameWithoutExtension = "brooklyn-"+ Time.makeDateStampString()+"-"+Identifiers.makeRandomId(4)+ - (summary==null ? "" : "-"+summary); + (Strings.isBlank(summary) ? "" : "-"+summary); this.scriptPath = Os.mergePathsUnix(scriptDir, scriptNameWithoutExtension+".sh"); } @@ -367,14 +367,13 @@ public abstract class ShellAbstractTool implements ShellTool { * An offset can be given, to only retrieve data starting at a particular character (indexed from 0). */ protected List<String> buildLongPollCommand(int stdoutPosition, int stderrPosition, Duration timeout) { - // Note that `tail -c +1` means start at the *first* character (i.e. start counting from 1, not 0) - // TODO Relies on commands terminating when ssh connection dropped (because not run with `nohup`) - String catStdoutCmd = "tail -c +"+(stdoutPosition+1)+" -f "+stdoutPath+" &"; - String catStderrCmd = "tail -c +"+(stderrPosition+1)+" -f "+stderrPath+" 1>&2 &"; long maxTime = Math.max(1, timeout.toSeconds()); + // Note that `tail -c +1` means start at the *first* character (i.e. start counting from 1, not 0) List<String> waitForExitStatusParts = ImmutableList.of( "# Long poll", // comment is to aid testing - see SshjToolAsyncStubIntegrationTest + "tail -c +"+(stdoutPosition+1)+" -f "+stdoutPath+" & export TAIL_STDOUT_PID=$!", + "tail -c +"+(stderrPosition+1)+" -f "+stderrPath+" 1>&2 & export TAIL_STDERR_PID=$!", "EXIT_STATUS_PATH="+exitStatusPath, "PID_PATH="+pidPath, "MAX_TIME="+maxTime, @@ -390,9 +389,11 @@ public abstract class ShellAbstractTool implements ShellTool { " sleep 3", " if test -s $EXIT_STATUS_PATH; then", " EXIT_STATUS=`cat $EXIT_STATUS_PATH`", + " kill ${TAIL_STDERR_PID} ${TAIL_STDOUT_PID}", " exit $EXIT_STATUS", " else", " echo \"No exit status in $EXIT_STATUS_PATH, and pid in $PID_PATH ($PID) not executing\"", + " kill ${TAIL_STDERR_PID} ${TAIL_STDOUT_PID}", " exit 126", " fi", " fi", @@ -401,25 +402,32 @@ public abstract class ShellAbstractTool implements ShellTool { " sleep 1", " COUNTER+=1", "done", + "kill ${TAIL_STDERR_PID} ${TAIL_STDOUT_PID}", "exit 125"+"\n"); String waitForExitStatus = Joiner.on("\n").join(waitForExitStatusParts); - MutableList.Builder<String> cmds = MutableList.<String>builder() - .add(runAsRoot ? BashCommands.sudo(catStdoutCmd) : catStdoutCmd) - .add(runAsRoot ? BashCommands.sudo(catStderrCmd) : catStderrCmd) - .add(runAsRoot ? BashCommands.sudo(waitForExitStatus) : waitForExitStatus); - return cmds.build(); + return ImmutableList.of(runAsRoot ? BashCommands.sudo(waitForExitStatus) : waitForExitStatus); } protected List<String> deleteTemporaryFilesCommand() { + ImmutableList.Builder<String> cmdParts = ImmutableList.builder(); + if (!Boolean.TRUE.equals(noDeleteAfterExec)) { // use "-f" because some systems have "rm" aliased to "rm -i" // use "< /dev/null" to guarantee doesn't hang - return ImmutableList.of( + cmdParts.add( "rm -f "+scriptPath+" "+stdoutPath+" "+stderrPath+" "+exitStatusPath+" "+pidPath+" < /dev/null"); - } else { - return ImmutableList.<String>of(); } + + // If the buildLongPollCommand didn't complete properly then it might have left tail command running; + // ensure they are killed. + cmdParts.add( + "ps aux | grep \"tail -c\" | grep \""+stdoutPath+"\" | grep -v grep | awk '{ printf $2 }' | xargs kill", + "ps aux | grep \"tail -c\" | grep \""+stderrPath+"\" | grep -v grep | awk '{ printf $2 }' | xargs kill"); + + String cmd = Joiner.on("\n").join(cmdParts.build()); + + return ImmutableList.of(runAsRoot ? BashCommands.sudo(cmd) : cmd); } public abstract int run(); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java index 52347d2..982022a 100644 --- a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java +++ b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjClientConnection.java @@ -169,8 +169,8 @@ public class SshjClientConnection implements SshAction<SSHClient> { } catch (IOException e) { if (LOG.isDebugEnabled()) LOG.debug("<< exception disconnecting from {}: {}", e, e.getMessage()); } - ssh = null; } + ssh = null; } @Override http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java ---------------------------------------------------------------------- diff --git a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java index e7ecd0c..8fa38ad 100644 --- a/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java +++ b/core/src/main/java/brooklyn/util/internal/ssh/sshj/SshjTool.java @@ -55,7 +55,9 @@ import org.slf4j.LoggerFactory; import brooklyn.internal.BrooklynFeatureEnablement; import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.RuntimeTimeoutException; import brooklyn.util.internal.ssh.BackoffLimitedRetryHandler; +import brooklyn.util.internal.ssh.ShellTool; import brooklyn.util.internal.ssh.SshAbstractTool; import brooklyn.util.internal.ssh.SshTool; import brooklyn.util.io.FileUtil; @@ -333,6 +335,24 @@ public class SshjTool extends SshAbstractTool implements SshTool { } } + /** + * Executes the script in the background (`nohup ... &`), and then executes other ssh commands to poll for the + * stdout, stderr and exit code of that original process (which will each have been written to separate files). + * + * The polling is a "long poll". That is, it executes a long-running ssh command to retrieve the stdout, etc. + * If that long-poll command fails, then we just execute another one to pick up from where it left off. + * This means we do not need to execute many ssh commands (which are expensive), but can still return promptly + * when the command completes. + * + * Much of this was motivated by https://issues.apache.org/jira/browse/BROOKLYN-106, which is no longer + * an issue. The retries (e.g. in the upload-script) are arguably overkill given that {@link #acquire(SshAction)} + * will already retry. However, leaving this in place as it could prove useful when working with flakey + * networks in the future. + * + * TODO There are (probably) issues with this method when using {@link ShellTool#PROP_RUN_AS_ROOT}. + * I (Aled) saw the .pid file having an owner of root:root, and a failure message in stderr of: + * -bash: line 3: /tmp/brooklyn-20150113-161203056-XMEo-move_install_dir_from_user_to_.pid: Permission denied + */ protected int execScriptAsyncAndPoll(final Map<String,?> props, final List<String> commands, final Map<String,?> env) { return new ToolAbstractAsyncExecScript(props) { private int maxConsecutiveSshFailures = 3; @@ -346,9 +366,37 @@ public class SshjTool extends SshAbstractTool implements SshTool { public int run() { timer = Stopwatch.createStarted(); - String scriptContents = toScript(props, commands, env); + final String scriptContents = toScript(props, commands, env); if (LOG.isTraceEnabled()) LOG.trace("Running shell command at {} as async script: {}", host, scriptContents); - copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath); + + // Upload script; try repeatedly because have seen timeout intermittently on vcloud-director (BROOKLYN-106 related). + boolean uploadSuccess = Repeater.create("async script upload on "+SshjTool.this.toString()+" (for "+getSummary()+")") + .backoffTo(maxDelayBetweenPolls) + .limitIterationsTo(3) + .rethrowException() + .until(new Callable<Boolean>() { + @Override + public Boolean call() throws Exception { + iteration++; + if (LOG.isDebugEnabled()) { + String msg = "Uploading (iteration="+iteration+") for async script on "+SshjTool.this.toString()+" (for "+getSummary()+")"; + if (iteration == 1) { + LOG.trace(msg); + } else { + LOG.debug(msg); + } + } + copyToServer(ImmutableMap.of("permissions", "0700"), scriptContents.getBytes(), scriptPath); + return true; + }}) + .run(); + + if (!uploadSuccess) { + // Unexpected! Should have either returned true or have rethrown the exception; should never get false. + String msg = "Unexpected state: repeated failure for async script upload on "+SshjTool.this.toString()+" ("+getSummary()+")"; + LOG.warn(msg+"; rethrowing"); + throw new IllegalStateException(msg); + } // Execute script asynchronously int execResult = asInt(acquire(new ShellAction(buildRunScriptCommand(), out, err, execTimeout)), -1); @@ -357,7 +405,7 @@ public class SshjTool extends SshAbstractTool implements SshTool { // Long polling to get the status try { final AtomicReference<Integer> result = new AtomicReference<Integer>(); - boolean success = Repeater.create("async script on "+SshjTool.this.toString()+" (for "+getSummary()+")") + boolean success = Repeater.create("async script long-poll on "+SshjTool.this.toString()+" (for "+getSummary()+")") .backoffTo(maxDelayBetweenPolls) .limitTimeTo(execTimeout) .until(new Callable<Boolean>() { @@ -377,13 +425,28 @@ public class SshjTool extends SshAbstractTool implements SshTool { LOG.warn(msg+"; rethrowing"); throw new TimeoutException(msg); } - + return result.get(); } catch (Exception e) { + LOG.debug("Problem polling for async script on "+SshjTool.this.toString()+" (for "+getSummary()+"); rethrowing after deleting temporary files", e); throw Exceptions.propagate(e); + } finally { + // Delete the temporary files created (and the `tail -c` commands that might have been left behind by long-polls). + // Using pollTimeout so doesn't wait forever, but waits for a reasonable (configurable) length of time. + // TODO also execute this if the `buildRunScriptCommand` fails, as that might have left files behind? + try { + int execDeleteResult = asInt(acquire(new ShellAction(deleteTemporaryFilesCommand(), out, err, pollTimeout)), -1); + if (execDeleteResult != 0) { + LOG.debug("Problem deleting temporary files of async script on "+SshjTool.this.toString()+" (for "+getSummary()+"): exit status "+execDeleteResult); + } + } catch (Exception e) { + Exceptions.propagateIfFatal(e); + LOG.debug("Problem deleting temporary files of async script on "+SshjTool.this.toString()+" (for "+getSummary()+"); continuing", e); + } } } + Integer longPoll() throws IOException { // Long-polling to get stdout, stderr + exit status of async task. // If our long-poll disconnects, we will just re-execute. @@ -391,11 +454,20 @@ public class SshjTool extends SshAbstractTool implements SshTool { // If we disconnect, we will pick up from that char of the stream. // TODO Additional stdout/stderr written by buildLongPollCommand() could interfere, // causing us to miss some characters. - // TODO Want to include timeout for long-polling Duration nextPollTimeout = Duration.min(pollTimeout, Duration.millis(execTimeout.toMilliseconds()-timer.elapsed(TimeUnit.MILLISECONDS))); CountingOutputStream countingOut = (out == null) ? null : new CountingOutputStream(out); CountingOutputStream countingErr = (err == null) ? null : new CountingOutputStream(err); - int longPollResult = asInt(acquire(new ShellAction(buildLongPollCommand(stdoutCount, stderrCount, nextPollTimeout), countingOut, countingErr, nextPollTimeout)), -1); + List<String> pollCommand = buildLongPollCommand(stdoutCount, stderrCount, nextPollTimeout); + Duration sshJoinTimeout = nextPollTimeout.add(Duration.TEN_SECONDS); + ShellAction action = new ShellAction(pollCommand, countingOut, countingErr, sshJoinTimeout); + + int longPollResult; + try { + longPollResult = asInt(acquire(action, 3, nextPollTimeout), -1); + } catch (RuntimeTimeoutException e) { + if (LOG.isDebugEnabled()) LOG.debug("Long-poll timed out on "+SshjTool.this.toString()+" (for "+getSummary()+"): "+e); + return null; + } stdoutCount += (countingOut == null) ? 0 : countingOut.getCount(); stderrCount += (countingErr == null) ? 0 : countingErr.getCount(); @@ -407,7 +479,14 @@ public class SshjTool extends SshAbstractTool implements SshTool { // probably a connection failure; try again if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status -1; will retry on "+SshjTool.this.toString()+" (for "+getSummary()+")"); return null; - + + } else if (longPollResult == 125) { + // 125 is the special code for timeout in long-poll (see buildLongPollCommand). + // However, there is a tiny chance that the underlying command might have returned that exact exit code! + // Don't treat a timeout as a "consecutiveSshFailure". + if (LOG.isDebugEnabled()) LOG.debug("Long-poll received exit status "+longPollResult+"; most likely timeout; retrieving actual status on "+SshjTool.this.toString()+" (for "+getSummary()+")"); + return retrieveStatusCommand(); + } else { // want to double-check whether this is the exit-code from the async process, or // some unexpected failure in our long-poll command. @@ -427,6 +506,7 @@ public class SshjTool extends SshAbstractTool implements SshTool { return null; } } + Integer retrieveStatusCommand() throws IOException { // want to double-check whether this is the exit-code from the async process, or // some unexpected failure in our long-poll command. @@ -469,6 +549,7 @@ public class SshjTool extends SshAbstractTool implements SshTool { } }.run(); } + public int execShellDirect(Map<String,?> props, List<String> commands, Map<String,?> env) { OutputStream out = getOptionalVal(props, PROP_OUT_STREAM); OutputStream err = getOptionalVal(props, PROP_ERR_STREAM); @@ -493,6 +574,13 @@ public class SshjTool extends SshAbstractTool implements SshTool { if (Boolean.FALSE.equals(props.get("blocks"))) { throw new IllegalArgumentException("Cannot exec non-blocking: command="+commands); } + + // If async is set, then do it as execScript + Boolean execAsync = getOptionalVal(props, PROP_EXEC_ASYNC); + if (Boolean.TRUE.equals(execAsync) && BrooklynFeatureEnablement.isEnabled(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC)) { + return execScriptAsyncAndPoll(props, commands, env); + } + OutputStream out = getOptionalVal(props, PROP_OUT_STREAM); OutputStream err = getOptionalVal(props, PROP_ERR_STREAM); String separator = getOptionalVal(props, PROP_SEPARATOR); @@ -526,6 +614,10 @@ public class SshjTool extends SshAbstractTool implements SshTool { } protected <T, C extends SshAction<T>> T acquire(C action) { + return acquire(action, sshTries, sshTriesTimeout == 0 ? Duration.PRACTICALLY_FOREVER : Duration.millis(sshTriesTimeout)); + } + + protected <T, C extends SshAction<T>> T acquire(C action, int sshTries, Duration sshTriesTimeout) { Stopwatch stopwatch = Stopwatch.createStarted(); for (int i = 0; i < sshTries; i++) { @@ -561,7 +653,7 @@ public class SshjTool extends SshAbstractTool implements SshTool { String errorMessage = String.format("(%s) error acquiring %s", toString(), action); String fullMessage = String.format("%s (attempt %s/%s, in time %s/%s)", errorMessage, (i+1), sshTries, Time.makeTimeStringRounded(stopwatch.elapsed(TimeUnit.MILLISECONDS)), - (sshTriesTimeout > 0 ? Time.makeTimeStringRounded(sshTriesTimeout) : "unlimited")); + (sshTriesTimeout.equals(Duration.PRACTICALLY_FOREVER) ? "unlimited" : Time.makeTimeStringRounded(sshTriesTimeout))); try { disconnect(); } catch (Exception e2) { @@ -570,9 +662,9 @@ public class SshjTool extends SshAbstractTool implements SshTool { if (i + 1 == sshTries) { LOG.debug("<< {} (rethrowing, out of retries): {}", fullMessage, e.getMessage()); throw propagate(e, fullMessage + "; out of retries"); - } else if (sshTriesTimeout > 0 && stopwatch.elapsed(TimeUnit.MILLISECONDS) > sshTriesTimeout) { - LOG.debug("<< {} (rethrowing, out of time - max {}): {}", new Object[] { fullMessage, Time.makeTimeString(sshTriesTimeout, true), e.getMessage() }); - throw propagate(e, fullMessage + "; out of time"); + } else if (sshTriesTimeout.isShorterThan(stopwatch)) { + LOG.debug("<< {} (rethrowing, out of time - max {}): {}", new Object[] { fullMessage, Time.makeTimeStringRounded(sshTriesTimeout), e.getMessage() }); + throw new RuntimeTimeoutException(fullMessage + "; out of time", e); } else { if (LOG.isDebugEnabled()) LOG.debug("<< {}: {}", fullMessage, e.getMessage()); backoffForAttempt(i + 1, errorMessage + ": " + e.getMessage()); @@ -804,15 +896,16 @@ public class SshjTool extends SshAbstractTool implements SshTool { errgobbler.start(); } try { - output.join((int)Math.min(timeout.toMilliseconds(), Integer.MAX_VALUE), TimeUnit.MILLISECONDS); return output; } finally { // wait for all stdout/stderr to have been re-directed try { - if (outgobbler != null) outgobbler.join(); - if (errgobbler != null) errgobbler.join(); + // Don't use forever (i.e. 0) because BROOKLYN-106: ssh hangs + long joinTimeout = 10*1000; + if (outgobbler != null) outgobbler.join(joinTimeout); + if (errgobbler != null) errgobbler.join(joinTimeout); } catch (InterruptedException e) { LOG.warn("Interrupted gobbling streams from ssh: "+command, e); Thread.currentThread().interrupt(); @@ -904,6 +997,7 @@ public class SshjTool extends SshAbstractTool implements SshTool { } closeWhispering(output, this); + boolean timedOut = false; try { long timeoutMillis = Math.min(timeout.toMilliseconds(), Integer.MAX_VALUE); long timeoutEnd = System.currentTimeMillis() + timeoutMillis; @@ -917,19 +1011,25 @@ public class SshjTool extends SshAbstractTool implements SshTool { (!shell.isOpen() || ((SessionChannel)session).getExitStatus()!=null); try { shell.join(1000, TimeUnit.MILLISECONDS); - } catch (ConnectionException e) { last = e; } - if (endBecauseReturned) + } catch (ConnectionException e) { + last = e; + } + if (endBecauseReturned) { // shell is still open, ie some process is running // but we have a result code, so main shell is finished // we waited one second extra to allow any background process // which is nohupped to really be in the background (#162) // now let's bail out break; + } } while (System.currentTimeMillis() < timeoutEnd); if (shell.isOpen() && ((SessionChannel)session).getExitStatus()==null) { - LOG.debug("Timeout ({}) in SSH shell to {}", sshClientConnection.getSessionTimeout(), this); - // we timed out, or other problem -- reproduce the error - throw last; + LOG.debug("Timeout ({}) in SSH shell to {}", timeout, this); + // we timed out, or other problem -- reproduce the error. + // The shell.join should always have thrown ConnectionExceptoin (looking at code of + // AbstractChannel), but javadoc of Channel doesn't explicity say that so play it safe. + timedOut = true; + throw (last != null) ? last : new TimeoutException("Timeout after "+timeout+" executing "+this); } return ((SessionChannel)session).getExitStatus(); } finally { @@ -937,8 +1037,16 @@ public class SshjTool extends SshAbstractTool implements SshTool { closeWhispering(shell, this); shell = null; try { - if (outgobbler != null) outgobbler.join(); - if (errgobbler != null) errgobbler.join(); + // Don't use forever (i.e. 0) because BROOKLYN-106: ssh hangs + long joinTimeout = (timedOut) ? 1000 : 10*1000; + if (outgobbler != null) { + outgobbler.join(joinTimeout); + outgobbler.close(); + } + if (errgobbler != null) { + errgobbler.join(joinTimeout); + errgobbler.close(); + } } catch (InterruptedException e) { LOG.warn("Interrupted gobbling streams from ssh: "+commands, e); Thread.currentThread().interrupt(); http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java index 2acb200..caebac1 100644 --- a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java +++ b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolAsyncStubIntegrationTest.java @@ -29,7 +29,9 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; +import brooklyn.internal.BrooklynFeatureEnablement; import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.internal.ssh.SshAbstractTool.SshAction; import brooklyn.util.internal.ssh.sshj.SshjTool.ShellAction; import brooklyn.util.time.Duration; @@ -58,15 +60,17 @@ public class SshjToolAsyncStubIntegrationTest { private SshjTool tool; private List<InjectedResult> sequence; int counter = 0; + private boolean origFeatureEnablement; @BeforeMethod(alwaysRun=true) public void setUp() throws Exception { + origFeatureEnablement = BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC); sequence = Lists.newArrayList(); counter = 0; tool = new SshjTool(ImmutableMap.<String,Object>of("host", "localhost")) { @SuppressWarnings("unchecked") - protected <T, C extends SshAction<T>> T acquire(C action) { + protected <T, C extends SshAction<T>> T acquire(C action, int sshTries, Duration sshTriesTimeout) { if (action instanceof SshjTool.ShellAction) { SshjTool.ShellAction shellAction = (SshjTool.ShellAction) action; InjectedResult injectedResult = sequence.get(counter); @@ -74,14 +78,18 @@ public class SshjToolAsyncStubIntegrationTest { counter++; return (T) injectedResult.result.apply(shellAction); } - return super.acquire(action); + return super.acquire(action, sshTries, sshTriesTimeout); } }; } @AfterMethod(alwaysRun=true) public void tearDown() throws Exception { - if (tool != null) tool.disconnect(); + try { + if (tool != null) tool.disconnect(); + } finally { + BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC, origFeatureEnablement); + } } private Predicate<SshjTool.ShellAction> containsCmd(final String cmd) { http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java ---------------------------------------------------------------------- diff --git a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java index b81712d..c809ad1 100644 --- a/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java +++ b/core/src/test/java/brooklyn/util/internal/ssh/sshj/SshjToolIntegrationTest.java @@ -37,8 +37,10 @@ import net.schmizz.sshj.connection.channel.direct.Session; import org.testng.annotations.Test; +import brooklyn.internal.BrooklynFeatureEnablement; import brooklyn.test.Asserts; import brooklyn.util.exceptions.Exceptions; +import brooklyn.util.exceptions.RuntimeTimeoutException; import brooklyn.util.internal.ssh.SshException; import brooklyn.util.internal.ssh.SshTool; import brooklyn.util.internal.ssh.SshToolAbstractIntegrationTest; @@ -129,7 +131,7 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest { try { localtool.execScript(ImmutableMap.<String,Object>of(), ImmutableList.of("true")); fail(); - } catch (SshException e) { + } catch (RuntimeTimeoutException e) { if (!e.toString().contains("out of time")) throw e; assertEquals(callCount.get(), 2); } @@ -165,45 +167,56 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest { @Test(groups = {"Integration"}) public void testAsyncExecStdoutAndStderr() throws Exception { - // Include a sleep, to ensure that the contents retrieved in first poll and subsequent polls are appended - List<String> cmds = ImmutableList.of( - "echo mystringToStdout", - "echo mystringToStderr 1>&2", - "sleep 3", - "echo mystringPostSleepToStdout", - "echo mystringPostSleepToStderr 1>&2"); - - ByteArrayOutputStream out = new ByteArrayOutputStream(); - ByteArrayOutputStream err = new ByteArrayOutputStream(); - int exitCode = tool.execScript( - ImmutableMap.of( - "out", out, - "err", err, - SshjTool.PROP_EXEC_ASYNC.getName(), true, - SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true, - SshjTool.PROP_EXEC_ASYNC_POLLING_TIMEOUT.getName(), Duration.ONE_SECOND), - cmds, - ImmutableMap.<String,String>of()); - String outStr = new String(out.toByteArray()); - String errStr = new String(err.toByteArray()); - - assertEquals(exitCode, 0); - assertEquals(outStr.trim(), "mystringToStdout\nmystringPostSleepToStdout"); - assertEquals(errStr.trim(), "mystringToStderr\nmystringPostSleepToStderr"); + boolean origFeatureEnablement = BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC); + try { + // Include a sleep, to ensure that the contents retrieved in first poll and subsequent polls are appended + List<String> cmds = ImmutableList.of( + "echo mystringToStdout", + "echo mystringToStderr 1>&2", + "sleep 5", + "echo mystringPostSleepToStdout", + "echo mystringPostSleepToStderr 1>&2"); + + ByteArrayOutputStream out = new ByteArrayOutputStream(); + ByteArrayOutputStream err = new ByteArrayOutputStream(); + int exitCode = tool.execScript( + ImmutableMap.of( + "out", out, + "err", err, + SshjTool.PROP_EXEC_ASYNC.getName(), true, + SshjTool.PROP_NO_EXTRA_OUTPUT.getName(), true, + SshjTool.PROP_EXEC_ASYNC_POLLING_TIMEOUT.getName(), Duration.ONE_SECOND), + cmds, + ImmutableMap.<String,String>of()); + String outStr = new String(out.toByteArray()); + String errStr = new String(err.toByteArray()); + + assertEquals(exitCode, 0); + assertEquals(outStr.trim(), "mystringToStdout\nmystringPostSleepToStdout"); + assertEquals(errStr.trim(), "mystringToStderr\nmystringPostSleepToStderr"); + } finally { + BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC, origFeatureEnablement); + } } @Test(groups = {"Integration"}) public void testAsyncExecReturnsExitCode() throws Exception { - int exitCode = tool.execScript( - ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true), - ImmutableList.of("exit 123"), - ImmutableMap.<String,String>of()); - assertEquals(exitCode, 123); + boolean origFeatureEnablement = BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC); + try { + int exitCode = tool.execScript( + ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true), + ImmutableList.of("exit 123"), + ImmutableMap.<String,String>of()); + assertEquals(exitCode, 123); + } finally { + BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC, origFeatureEnablement); + } } @Test(groups = {"Integration"}) public void testAsyncExecTimesOut() throws Exception { Stopwatch stopwatch = Stopwatch.createStarted(); + boolean origFeatureEnablement = BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC); try { tool.execScript( ImmutableMap.of(SshjTool.PROP_EXEC_ASYNC.getName(), true, SshjTool.PROP_EXEC_TIMEOUT.getName(), Duration.millis(1)), @@ -213,6 +226,8 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest { } catch (Exception e) { TimeoutException te = Exceptions.getFirstThrowableOfType(e, TimeoutException.class); if (te == null) throw e; + } finally { + BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC, origFeatureEnablement); } long seconds = stopwatch.elapsed(TimeUnit.SECONDS); @@ -234,6 +249,8 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest { long seconds = stopwatch.elapsed(TimeUnit.SECONDS); assertTrue(seconds < 30, "exec took "+seconds+" seconds"); }}); + + boolean origFeatureEnablement = BrooklynFeatureEnablement.enable(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC); try { thread.start(); @@ -249,6 +266,7 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest { assertFalse(thread.isAlive()); } finally { thread.interrupt(); + BrooklynFeatureEnablement.setEnablement(BrooklynFeatureEnablement.FEATURE_SSH_ASYNC_EXEC, origFeatureEnablement); } } http://git-wip-us.apache.org/repos/asf/incubator-brooklyn/blob/c06a2297/utils/common/src/main/java/brooklyn/util/time/Duration.java ---------------------------------------------------------------------- diff --git a/utils/common/src/main/java/brooklyn/util/time/Duration.java b/utils/common/src/main/java/brooklyn/util/time/Duration.java index 3950bc7..0f73ef8 100644 --- a/utils/common/src/main/java/brooklyn/util/time/Duration.java +++ b/utils/common/src/main/java/brooklyn/util/time/Duration.java @@ -277,10 +277,18 @@ public class Duration implements Comparable<Duration>, Serializable { return compareTo(x) > 0; } + public boolean isLongerThan(Stopwatch stopwatch) { + return isLongerThan(Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS))); + } + public boolean isShorterThan(Duration x) { return compareTo(x) < 0; } + public boolean isShorterThan(Stopwatch stopwatch) { + return isShorterThan(Duration.millis(stopwatch.elapsed(TimeUnit.MILLISECONDS))); + } + /** returns the larger of this value or the argument */ public Duration lowerBound(Duration alternateMinimumValue) { if (isShorterThan(alternateMinimumValue)) return alternateMinimumValue; @@ -303,5 +311,4 @@ public class Duration implements Comparable<Duration>, Serializable { public Duration maximum(Duration alternateMaximumValue) { return upperBound(alternateMaximumValue); } - }
