This is an automated email from the ASF dual-hosted git repository.

heneveld pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brooklyn-server.git

commit 105b661ba152f19f12ccafef58ea35174dd381fb
Author: Alex Heneveld <[email protected]>
AuthorDate: Tue Aug 2 11:29:36 2022 +0100

    catch other case where interrupt might be swallowed
---
 .../util/core/internal/ssh/sshj/SshjTool.java      | 35 ++++++++++------
 .../internal/ssh/sshj/SshjToolIntegrationTest.java | 49 +++++++++-------------
 2 files changed, 43 insertions(+), 41 deletions(-)

diff --git 
a/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
 
b/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
index 9c379cf6ed..ddc96ad611 100644
--- 
a/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
+++ 
b/core/src/main/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjTool.java
@@ -44,7 +44,6 @@ import org.apache.brooklyn.util.core.internal.ssh.ShellTool;
 import org.apache.brooklyn.util.core.internal.ssh.SshAbstractTool;
 import org.apache.brooklyn.util.core.internal.ssh.SshTool;
 import org.apache.brooklyn.util.exceptions.Exceptions;
-import org.apache.brooklyn.util.exceptions.RuntimeInterruptedException;
 import org.apache.brooklyn.util.exceptions.RuntimeTimeoutException;
 import org.apache.brooklyn.util.repeat.Repeater;
 import org.apache.brooklyn.util.stream.KnownSizeInputStream;
@@ -67,7 +66,6 @@ import java.util.concurrent.atomic.AtomicReference;
 
 import static com.google.common.base.Preconditions.checkNotNull;
 import static com.google.common.base.Throwables.getCausalChain;
-import static com.google.common.base.Throwables.getRootCause;
 import static com.google.common.collect.Iterables.any;
 
 /**
@@ -646,7 +644,7 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
                 } catch (Exception e2) {
                     LOG.debug("<< ("+toString()+") error closing connection: 
"+e+" / "+e2, e);
                 }
-                if (Thread.currentThread().isInterrupted()) {
+                if (checkInterrupted(e)) {
                     LOG.debug("<< {} (rethrowing, interrupted): {}", 
fullMessage, e.getMessage());
                     throw propagate(e, fullMessage + "; interrupted");
                 }
@@ -797,7 +795,7 @@ public class SshjTool extends SshAbstractTool implements 
SshTool {
         }
     }
 
-    // TODO simpler not to use predicates
+    // TODO simpler not to use predicates (this seems not to be used)
     @VisibleForTesting
     Predicate<String> causalChainHasMessageContaining(final Exception from) {
         return new Predicate<String>() {
@@ -1003,12 +1001,16 @@ public class SshjTool extends SshAbstractTool 
implements SshTool {
                         try {
                             shell.join(1000, TimeUnit.MILLISECONDS);
                         } catch (ConnectionException e) {
-                            LOG.debug("SshjTool exception joining shell", e);
-                            if (isNonRetryableException(e)) {
-                                throw e;
+                            if (Throwables.getRootCause(e) instanceof 
TimeoutException) {
+                                // normal, do nothing
+                            } else {
+                                LOG.debug("SshjTool exception joining shell", 
e);
+                                if (isNonRetryableException(e)) {
+                                    throw e;
+                                }
+                                // don't automatically give up here, it might 
be a transient network failure
+                                last = e;
                             }
-                            // don't automatically give up here, it might be a 
transient network failure
-                            last = e;
                         }
                         LOG.info("SshjTool looping waiting for shell; thread 
"+Thread.currentThread()+" interrupted? 
"+Thread.currentThread().isInterrupted());
                         if (endBecauseReturned) {
@@ -1061,12 +1063,21 @@ public class SshjTool extends SshAbstractTool 
implements SshTool {
         }
     }
 
-    protected boolean isNonRetryableException(ConnectionException e) throws 
ConnectionException {
-        if (Exceptions.isRootCauseIsInterruption(e)) {
-            // if we don't check for ^ wrapped in e then the interrupt is 
swallowed; that's how sshj works :(
+    protected boolean checkInterrupted(Throwable t) {
+        if (Thread.currentThread().isInterrupted()) return true;
+        if (t!=null && Exceptions.isRootCauseIsInterruption(t)) {
+            // sshj has an ugly habit of catching & clearing thread 
interrupts, and returning wrapped in ConnectionExceptions
+            // restore the interrupt if this is the case
             Thread.currentThread().interrupt();
             return true;
         }
+        return false;
+    }
+
+    protected boolean isNonRetryableException(ConnectionException e) throws 
ConnectionException {
+        if (checkInterrupted(e)) {
+            return true;
+        }
         // anything else assume transient network failure until something else 
(eg shell) times out
         return false;
     }
diff --git 
a/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
 
b/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
index 22082c95eb..e5309ab3d4 100644
--- 
a/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
+++ 
b/core/src/test/java/org/apache/brooklyn/util/core/internal/ssh/sshj/SshjToolIntegrationTest.java
@@ -18,34 +18,16 @@
  */
 package org.apache.brooklyn.util.core.internal.ssh.sshj;
 
-import static 
org.apache.brooklyn.util.core.internal.ssh.ShellTool.PROP_ERR_STREAM;
-import static 
org.apache.brooklyn.util.core.internal.ssh.ShellTool.PROP_OUT_STREAM;
-import static org.apache.brooklyn.util.time.Duration.FIVE_SECONDS;
-import static org.apache.brooklyn.util.time.Duration.ONE_SECOND;
-import static org.testng.Assert.assertEquals;
-import static org.testng.Assert.assertFalse;
-import static org.testng.Assert.assertNotNull;
-import static org.testng.Assert.assertTrue;
-import static org.testng.Assert.fail;
-
-import java.io.ByteArrayOutputStream;
-import java.io.File;
-import java.util.Arrays;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.concurrent.atomic.AtomicInteger;
-import java.util.concurrent.atomic.AtomicReference;
-
+import com.google.common.base.Stopwatch;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+import net.schmizz.sshj.connection.channel.direct.Session;
 import org.apache.brooklyn.core.BrooklynFeatureEnablement;
 import org.apache.brooklyn.test.Asserts;
 import org.apache.brooklyn.util.core.internal.ssh.ShellTool;
 import org.apache.brooklyn.util.core.internal.ssh.SshException;
 import org.apache.brooklyn.util.core.internal.ssh.SshTool;
 import 
org.apache.brooklyn.util.core.internal.ssh.SshToolAbstractIntegrationTest;
-import org.apache.brooklyn.util.core.task.ssh.SshPutTaskFactory;
 import org.apache.brooklyn.util.exceptions.Exceptions;
 import org.apache.brooklyn.util.exceptions.RuntimeTimeoutException;
 import org.apache.brooklyn.util.os.Os;
@@ -55,11 +37,21 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.testng.annotations.Test;
 
-import com.google.common.base.Stopwatch;
-import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.util.Arrays;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicReference;
 
-import net.schmizz.sshj.connection.channel.direct.Session;
+import static 
org.apache.brooklyn.util.core.internal.ssh.ShellTool.PROP_ERR_STREAM;
+import static 
org.apache.brooklyn.util.core.internal.ssh.ShellTool.PROP_OUT_STREAM;
+import static org.apache.brooklyn.util.time.Duration.ONE_SECOND;
+import static org.testng.Assert.*;
 
 /**
  * Test the operation of the {@link SshjTool} utility class.
@@ -345,7 +337,7 @@ public class SshjToolIntegrationTest extends 
SshToolAbstractIntegrationTest {
         return outstr;
     }
 
-    @Test(groups = {"Integration"})
+    @Test(groups = {"Integration"}, invocationCount = 10)
     public void testSshIsInterrupted() {
         log.info("STARTING");
         final SshTool localTool = new SshjTool(ImmutableMap.of(
@@ -375,14 +367,13 @@ public class SshjToolIntegrationTest extends 
SshToolAbstractIntegrationTest {
             });
             log.info("STARTING");
             t.start();
-            Time.sleep(FIVE_SECONDS);
+            if (Math.random()>0.1) 
Time.sleep(Duration.millis(3*Math.random()*Math.random()));  // sleep for a 
small amount of time, up to three seconds, but usually much less, and 10% of 
time not at all
             log.info("INTERRUPTING");
             t.interrupt();
             Time.sleep(ONE_SECOND);
             Arrays.asList(t.getStackTrace()).forEach(traceElement -> 
System.out.println(traceElement));
             log.info("JOINING");
             Stopwatch s = Stopwatch.createStarted();
-            t.join();
             if (Duration.of(s.elapsed()).isLongerThan(ONE_SECOND)) {
                 Asserts.fail("Join should have been immediate as other thread 
was interrupted, but instead took "+Duration.of(s.elapsed()));
             }

Reply via email to