This is an automated email from the ASF dual-hosted git repository. heneveld pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git
commit 105b661ba152f19f12ccafef58ea35174dd381fb Author: Alex Heneveld <[email protected]> AuthorDate: Tue Aug 2 11:29:36 2022 +0100 catch other case where interrupt might be swallowed --- .../util/core/internal/ssh/sshj/SshjTool.java | 35 ++++++++++------ .../internal/ssh/sshj/SshjToolIntegrationTest.java | 49 +++++++++------------- 2 files changed, 43 insertions(+), 41 deletions(-) diff --git a/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java b/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java index 9c379cf6ed..ddc96ad611 100644 --- a/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java +++ b/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java @@ -44,7 +44,6 @@ import org.apache.brooklyn.util.core.internal.ssh.ShellTool; import org.apache.brooklyn.util.core.internal.ssh.SshAbstractTool; import org.apache.brooklyn.util.core.internal.ssh.SshTool; import org.apache.brooklyn.util.exceptions.Exceptions; -import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException; import org.apache.brooklyn.util.exceptions.RuntimeTimeoutException; import org.apache.brooklyn.util.repeat.Repeater; import org.apache.brooklyn.util.stream.KnownSizeInputStream; @@ -67,7 +66,6 @@ import java.util.concurrent.atomic.AtomicReference; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Throwables.getCausalChain; -import static com.google.common.base.Throwables.getRootCause; import static com.google.common.collect.Iterables.any; /** @@ -646,7 +644,7 @@ public class SshjTool extends SshAbstractTool implements SshTool { } catch (Exception e2) { LOG.debug("<< ("+toString()+") error closing connection: "+e+" / "+e2, e); } - if (Thread.currentThread().isInterrupted()) { + if (checkInterrupted(e)) { LOG.debug("<< {} (rethrowing, interrupted): {}", fullMessage, e.getMessage()); throw propagate(e, fullMessage + "; interrupted"); } @@ -797,7 +795,7 @@ public class SshjTool extends SshAbstractTool implements SshTool { } } - // TODO simpler not to use predicates + // TODO simpler not to use predicates (this seems not to be used) @VisibleForTesting Predicate<String> causalChainHasMessageContaining(final Exception from) { return new Predicate<String>() { @@ -1003,12 +1001,16 @@ public class SshjTool extends SshAbstractTool implements SshTool { try { shell.join(1000, TimeUnit.MILLISECONDS); } catch (ConnectionException e) { - LOG.debug("SshjTool exception joining shell", e); - if (isNonRetryableException(e)) { - throw e; + if (Throwables.getRootCause(e) instanceof TimeoutException) { + // normal, do nothing + } else { + LOG.debug("SshjTool exception joining shell", e); + if (isNonRetryableException(e)) { + throw e; + } + // don't automatically give up here, it might be a transient network failure + last = e; } - // don't automatically give up here, it might be a transient network failure - last = e; } LOG.info("SshjTool looping waiting for shell; thread "+Thread.currentThread()+" interrupted? "+Thread.currentThread().isInterrupted()); if (endBecauseReturned) { @@ -1061,12 +1063,21 @@ public class SshjTool extends SshAbstractTool implements SshTool { } } - protected boolean isNonRetryableException(ConnectionException e) throws ConnectionException { - if (Exceptions.isRootCauseIsInterruption(e)) { - // if we don't check for ^ wrapped in e then the interrupt is swallowed; that's how sshj works :( + protected boolean checkInterrupted(Throwable t) { + if (Thread.currentThread().isInterrupted()) return true; + if (t!=null && Exceptions.isRootCauseIsInterruption(t)) { + // sshj has an ugly habit of catching & clearing thread interrupts, and returning wrapped in ConnectionExceptions + // restore the interrupt if this is the case Thread.currentThread().interrupt(); return true; } + return false; + } + + protected boolean isNonRetryableException(ConnectionException e) throws ConnectionException { + if (checkInterrupted(e)) { + return true; + } // anything else assume transient network failure until something else (eg shell) times out return false; } diff --git a/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java b/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java index 22082c95eb..e5309ab3d4 100644 --- a/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java +++ b/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java @@ -18,34 +18,16 @@ */ package org.apache.brooklyn.util.core.internal.ssh.sshj; -import static org.apache.brooklyn.util.core.internal.ssh.ShellTool.PROP_ERR_STREAM; -import static org.apache.brooklyn.util.core.internal.ssh.ShellTool.PROP_OUT_STREAM; -import static org.apache.brooklyn.util.time.Duration.FIVE_SECONDS; -import static org.apache.brooklyn.util.time.Duration.ONE_SECOND; -import static org.testng.Assert.assertEquals; -import static org.testng.Assert.assertFalse; -import static org.testng.Assert.assertNotNull; -import static org.testng.Assert.assertTrue; -import static org.testng.Assert.fail; - -import java.io.ByteArrayOutputStream; -import java.io.File; -import java.util.Arrays; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - +import com.google.common.base.Stopwatch; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import net.schmizz.sshj.connection.channel.direct.Session; import org.apache.brooklyn.core.BrooklynFeatureEnablement; import org.apache.brooklyn.test.Asserts; import org.apache.brooklyn.util.core.internal.ssh.ShellTool; import org.apache.brooklyn.util.core.internal.ssh.SshException; import org.apache.brooklyn.util.core.internal.ssh.SshTool; import org.apache.brooklyn.util.core.internal.ssh.SshToolAbstractIntegrationTest; -import org.apache.brooklyn.util.core.task.ssh.SshPutTaskFactory; import org.apache.brooklyn.util.exceptions.Exceptions; import org.apache.brooklyn.util.exceptions.RuntimeTimeoutException; import org.apache.brooklyn.util.os.Os; @@ -55,11 +37,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.testng.annotations.Test; -import com.google.common.base.Stopwatch; -import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; +import java.io.ByteArrayOutputStream; +import java.io.File; +import java.util.Arrays; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicReference; -import net.schmizz.sshj.connection.channel.direct.Session; +import static org.apache.brooklyn.util.core.internal.ssh.ShellTool.PROP_ERR_STREAM; +import static org.apache.brooklyn.util.core.internal.ssh.ShellTool.PROP_OUT_STREAM; +import static org.apache.brooklyn.util.time.Duration.ONE_SECOND; +import static org.testng.Assert.*; /** * Test the operation of the {@link SshjTool} utility class. @@ -345,7 +337,7 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest { return outstr; } - @Test(groups = {"Integration"}) + @Test(groups = {"Integration"}, invocationCount = 10) public void testSshIsInterrupted() { log.info("STARTING"); final SshTool localTool = new SshjTool(ImmutableMap.of( @@ -375,14 +367,13 @@ public class SshjToolIntegrationTest extends SshToolAbstractIntegrationTest { }); log.info("STARTING"); t.start(); - Time.sleep(FIVE_SECONDS); + if (Math.random()>0.1) Time.sleep(Duration.millis(3*Math.random()*Math.random())); // sleep for a small amount of time, up to three seconds, but usually much less, and 10% of time not at all log.info("INTERRUPTING"); t.interrupt(); Time.sleep(ONE_SECOND); Arrays.asList(t.getStackTrace()).forEach(traceElement -> System.out.println(traceElement)); log.info("JOINING"); Stopwatch s = Stopwatch.createStarted(); - t.join(); if (Duration.of(s.elapsed()).isLongerThan(ONE_SECOND)) { Asserts.fail("Join should have been immediate as other thread was interrupted, but instead took "+Duration.of(s.elapsed())); }
