Repository: qpid-jms
Updated Branches:
  refs/heads/master e3e0452cb -> 76f372510


QPIDJMS-412: stop further [re]connection attempts after a non-temporary SASL 
failure


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/76f37251
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/76f37251
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/76f37251

Branch: refs/heads/master
Commit: 76f372510e154a0a963f4b2ba11025189e53193a
Parents: e3e0452
Author: Robbie Gemmell <rob...@apache.org>
Authored: Fri Aug 31 18:34:47 2018 +0100
Committer: Robbie Gemmell <rob...@apache.org>
Committed: Fri Aug 31 18:34:47 2018 +0100

----------------------------------------------------------------------
 .../exceptions/JMSSecuritySaslException.java    |  44 +++
 .../provider/amqp/AmqpSaslAuthenticator.java    |  20 +-
 .../jms/provider/failover/FailoverProvider.java |  23 +-
 .../integration/SaslGssApiIntegrationTest.java  |   2 +-
 .../jms/integration/SaslIntegrationTest.java    |  38 ++-
 .../amqp/AmqpSaslAuthenticatorTest.java         |  24 ++
 .../failover/FailoverIntegrationTest.java       | 323 ++++++++++++++++++-
 .../qpid/jms/test/testpeer/TestAmqpPeer.java    |  18 +-
 8 files changed, 475 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JMSSecuritySaslException.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JMSSecuritySaslException.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JMSSecuritySaslException.java
new file mode 100644
index 0000000..8e1b6b9
--- /dev/null
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JMSSecuritySaslException.java
@@ -0,0 +1,44 @@
+/*
+*
+* Licensed to the Apache Software Foundation (ASF) under one
+* or more contributor license agreements.  See the NOTICE file
+* distributed with this work for additional information
+* regarding copyright ownership.  The ASF licenses this file
+* to you under the Apache License, Version 2.0 (the
+* "License"); you may not use this file except in compliance
+* with the License.  You may obtain a copy of the License at
+*
+*   http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied.  See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*
+*/
+package org.apache.qpid.jms.exceptions;
+
+import javax.jms.JMSSecurityException;
+
+public class JMSSecuritySaslException extends JMSSecurityException {
+
+    private static final long serialVersionUID = -6181892492517836496L;
+    private static final int SASL_SYS_TEMP = 4;
+
+    private int outcome = -1;
+
+    public JMSSecuritySaslException(String reason) {
+        super(reason);
+    }
+
+    public JMSSecuritySaslException(String reason, int outcome) {
+        super(reason);
+        this.outcome = outcome;
+    }
+
+    public boolean isSysTempFailure() {
+        return outcome == SASL_SYS_TEMP;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
index d7c8d1d..6b9aa7e 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java
@@ -16,8 +16,10 @@
  */
 package org.apache.qpid.jms.provider.amqp;
 
+import org.apache.qpid.jms.exceptions.JMSSecuritySaslException;
 import org.apache.qpid.jms.sasl.Mechanism;
 import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
 import org.apache.qpid.proton.engine.Transport;
 
 import java.util.function.Function;
@@ -104,7 +106,7 @@ public class AmqpSaslAuthenticator {
         try {
             switch (sasl.getState()) {
                 case PN_SASL_FAIL:
-                    handleSaslFail();
+                    handleSaslFail(sasl);
                     break;
                 case PN_SASL_PASS:
                     handleSaslCompletion(sasl);
@@ -119,7 +121,7 @@ public class AmqpSaslAuthenticator {
 
     //----- Internal support methods 
-----------------------------------------//
 
-    private void handleSaslFail() {
+    private void handleSaslFail(Sasl sasl) {
         StringBuilder message = new StringBuilder("Client failed to 
authenticate");
         if (mechanism != null) {
             message.append(" using SASL: ").append(mechanism.getName());
@@ -127,7 +129,13 @@ public class AmqpSaslAuthenticator {
                 message.append(" 
(").append(mechanism.getAdditionalFailureInformation()).append(")");
             }
         }
-        recordFailure(message.toString(), null);
+
+        SaslOutcome outcome = sasl.getOutcome();
+        if(outcome.equals(SaslOutcome.PN_SASL_TEMP)) {
+            message.append(", due to temporary system error.");
+        }
+
+        recordFailure(message.toString(), null, outcome.getCode());
     }
 
     private void handleSaslCompletion(Sasl sasl) {
@@ -145,7 +153,11 @@ public class AmqpSaslAuthenticator {
     }
 
     private void recordFailure(String message, Throwable cause) {
-        failureCause = new JMSSecurityException(message);
+        recordFailure(message, cause, SaslOutcome.PN_SASL_NONE.getCode());
+    }
+
+    private void recordFailure(String message, Throwable cause, int outcome) {
+        failureCause = new JMSSecuritySaslException(message, outcome);
         if (cause instanceof Exception) {
             failureCause.setLinkedException((Exception) cause);
         }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
index ad4752c..320beaf 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java
@@ -39,6 +39,7 @@ import javax.jms.TransactionRolledBackException;
 
 import org.apache.qpid.jms.JmsOperationTimedOutException;
 import org.apache.qpid.jms.JmsSendTimedOutException;
+import org.apache.qpid.jms.exceptions.JMSSecuritySaslException;
 import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
 import org.apache.qpid.jms.message.JmsMessageFactory;
 import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
@@ -738,6 +739,11 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                                 } finally {
                                     provider = null;
                                 }
+
+                                if(reconnectControl.isStoppageCause(e)) {
+                                    LOG.trace("Stopping attempt due to type of 
failure");
+                                    break;
+                                }
                             }
                         }
                     } else {
@@ -751,7 +757,7 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
                 } finally {
                     if (provider == null) {
                         LOG.trace("Connection attempt:[{}] failed error: {}", 
reconnectControl.reconnectAttempts, failure.getMessage());
-                        if (reconnectControl.isLimitExceeded()) {
+                        if (!reconnectControl.isReconnectAllowed(failure)) {
                             reportReconnectFailure(failure);
                         } else {
                             reconnectControl.scheduleReconnect(this);
@@ -1366,18 +1372,29 @@ public class FailoverProvider extends 
DefaultProviderListener implements Provide
             return false;
         }
 
-        public boolean isReconnectAllowed(IOException cause) {
+        public boolean isReconnectAllowed(Throwable cause) {
             // If a connection attempts fail due to Security errors than
             // we abort reconnection as there is a configuration issue and
             // we want to avoid a spinning reconnect cycle that can never
             // complete.
-            if (cause.getCause() instanceof JMSSecurityException) {
+            if(isStoppageCause(cause)) {
                 return false;
             }
 
             return !isLimitExceeded();
         }
 
+        private boolean isStoppageCause(Throwable cause) {
+            if(cause.getCause() instanceof JMSSecuritySaslException) {
+                JMSSecuritySaslException saslFailure = 
(JMSSecuritySaslException) cause.getCause();
+                return !saslFailure.isSysTempFailure();
+            } else if (cause.getCause() instanceof JMSSecurityException ) {
+                return true;
+            }
+
+            return false;
+        }
+
         private int reconnectAttemptLimit() {
             int maxReconnectValue = maxReconnectAttempts;
             if (!recoveryRequired && startupMaxReconnectAttempts != UNDEFINED) 
{

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java
index c4c3b19..0e63b40 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java
@@ -248,7 +248,7 @@ public class SaslGssApiIntegrationTest extends 
QpidJmsTestCase {
     private void doMechanismSelectedTestImpl(String username, String password, 
Symbol clientSelectedMech, Symbol[] serverMechs, boolean 
enableGssapiExplicitly) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
 
-            testPeer.expectFailingSaslAuthentication(serverMechs, 
clientSelectedMech);
+            testPeer.expectSaslFailingAuthentication(serverMechs, 
clientSelectedMech);
 
             String uriOptions = "?jms.clientID=myclientid";
             if(enableGssapiExplicitly) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
index 2bcb363..eb0b0cc 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java
@@ -47,6 +47,7 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
 import org.apache.qpid.jms.transports.TransportOptions;
 import org.apache.qpid.jms.transports.TransportSupport;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -63,6 +64,11 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
     private static final Symbol EXTERNAL = Symbol.valueOf("EXTERNAL");
     private static final Symbol XOAUTH2 = Symbol.valueOf("XOAUTH2");
 
+    private static final UnsignedByte SASL_FAIL_AUTH = 
UnsignedByte.valueOf((byte) 1);
+    private static final UnsignedByte SASL_SYS = UnsignedByte.valueOf((byte) 
2);
+    private static final UnsignedByte SASL_SYS_PERM = 
UnsignedByte.valueOf((byte) 3);
+    private static final UnsignedByte SASL_SYS_TEMP = 
UnsignedByte.valueOf((byte) 4);
+
     private static final String BROKER_JKS_KEYSTORE = 
"src/test/resources/broker-jks.keystore";
     private static final String BROKER_JKS_TRUSTSTORE = 
"src/test/resources/broker-jks.truststore";
     private static final String CLIENT_JKS_KEYSTORE = 
"src/test/resources/client-jks.keystore";
@@ -225,6 +231,32 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
         }
     }
 
+    @Test(timeout = 20000)
+    public void testSaslFailureCodes() throws Exception {
+        doSaslFailureCodesTestImpl(SASL_FAIL_AUTH);
+        doSaslFailureCodesTestImpl(SASL_SYS);
+        doSaslFailureCodesTestImpl(SASL_SYS_PERM);
+        doSaslFailureCodesTestImpl(SASL_SYS_TEMP);
+    }
+
+    private void doSaslFailureCodesTestImpl(UnsignedByte saslFailureCode) 
throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, 
ANONYMOUS}, PLAIN, saslFailureCode);
+
+            ConnectionFactory factory = new 
JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + 
"?jms.clientID=myClientID");
+
+            try {
+                factory.createConnection("username", "password");
+                fail("Excepted exception to be thrown");
+            }catch (JMSSecurityException jmsse) {
+                LOG.info("Caught expected security exception: {}", 
jmsse.getMessage());
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
     /**
      * Add a small delay after the SASL process fails, test peer will throw if
      * any unexpected frames arrive, such as erroneous open+close.
@@ -270,7 +302,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
     private void doMechanismSelectedTestImpl(String username, String password, 
Symbol clientSelectedMech, Symbol[] serverMechs, boolean wait) throws Exception 
{
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
 
-            testPeer.expectFailingSaslAuthentication(serverMechs, 
clientSelectedMech);
+            testPeer.expectSaslFailingAuthentication(serverMechs, 
clientSelectedMech);
 
             ConnectionFactory factory = new 
JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + 
"?jms.clientID=myclientid");
             try {
@@ -323,7 +355,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
                                "transport.keyStorePassword=" + PASSWORD;
             }
 
-            testPeer.expectFailingSaslAuthentication(serverMechs, 
clientSelectedMech);
+            testPeer.expectSaslFailingAuthentication(serverMechs, 
clientSelectedMech);
 
             JmsConnectionFactory factory = new 
JmsConnectionFactory("amqps://localhost:" + testPeer.getServerPort() + 
connOptions);
             try {
@@ -385,7 +417,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase {
     private void doMechanismSelectionRestrictedTestImpl(String username, 
String password, Symbol clientSelectedMech, Symbol[] serverMechs, String 
mechanismsOptionValue) throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
 
-            testPeer.expectFailingSaslAuthentication(serverMechs, 
clientSelectedMech);
+            testPeer.expectSaslFailingAuthentication(serverMechs, 
clientSelectedMech);
 
             String uriOptions = "?jms.clientID=myclientid";
             if(mechanismsOptionValue != null) {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java
index 8fd212b..1658561 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java
@@ -36,8 +36,10 @@ import java.util.function.Function;
 import javax.jms.JMSSecurityRuntimeException;
 import javax.security.sasl.SaslException;
 
+import org.apache.qpid.jms.exceptions.JMSSecuritySaslException;
 import org.apache.qpid.jms.sasl.Mechanism;
 import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sasl.SaslOutcome;
 import org.apache.qpid.proton.engine.Sasl.SaslState;
 import org.apache.qpid.proton.engine.Transport;
 import org.junit.Before;
@@ -137,12 +139,34 @@ public class AmqpSaslAuthenticatorTest {
         verifySaslMockReceived(sasl, INITIAL_RESPONSE);
 
         when(sasl.getState()).thenReturn(SaslState.PN_SASL_FAIL);
+        when(sasl.getOutcome()).thenReturn(SaslOutcome.PN_SASL_AUTH);
         authenticator.handleSaslOutcome(sasl, transport);
 
         assertTrue(authenticator.isComplete());
         assertFalse(authenticator.wasSuccessful());
         assertNotNull(authenticator.getFailureCause());
         
assertTrue(authenticator.getFailureCause().getMessage().contains("Client failed 
to authenticate"));
+        assertFalse(authenticator.getFailureCause().getMessage().contains("due 
to temporary system error"));
+    }
+
+    @Test
+    public void testPeerSignalsAuthenticationSysTemp() throws Exception {
+        Mechanism mechanism = new TestSaslMechanism(INITIAL_RESPONSE);
+        AmqpSaslAuthenticator authenticator = new 
AmqpSaslAuthenticator(mechanismName -> mechanism);
+
+        authenticator.handleSaslMechanisms(sasl, transport);
+        verifySaslMockReceived(sasl, INITIAL_RESPONSE);
+
+        when(sasl.getState()).thenReturn(SaslState.PN_SASL_FAIL);
+        when(sasl.getOutcome()).thenReturn(SaslOutcome.PN_SASL_TEMP);
+        authenticator.handleSaslOutcome(sasl, transport);
+
+        assertTrue(authenticator.isComplete());
+        assertFalse(authenticator.wasSuccessful());
+        assertNotNull(authenticator.getFailureCause());
+        assertTrue(authenticator.getFailureCause() instanceof 
JMSSecuritySaslException);
+        
assertTrue(authenticator.getFailureCause().getMessage().contains("Client failed 
to authenticate"));
+        assertTrue(authenticator.getFailureCause().getMessage().contains("due 
to temporary system error"));
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
index 0a80741..7ff43fc 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java
@@ -92,6 +92,7 @@ import org.apache.qpid.jms.util.StopWatch;
 import org.apache.qpid.proton.amqp.Binary;
 import org.apache.qpid.proton.amqp.DescribedType;
 import org.apache.qpid.proton.amqp.Symbol;
+import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -104,9 +105,15 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(FailoverIntegrationTest.class);
 
-    @Test(timeout = 20000)
-    public void testConnectSecurityViolation() throws Exception {
+    private static final Symbol ANONYMOUS = Symbol.valueOf("ANONYMOUS");
+    private static final Symbol PLAIN = Symbol.valueOf("PLAIN");
+    private static final UnsignedByte SASL_FAIL_AUTH = 
UnsignedByte.valueOf((byte) 1);
+    private static final UnsignedByte SASL_SYS = UnsignedByte.valueOf((byte) 
2);
+    private static final UnsignedByte SASL_SYS_PERM = 
UnsignedByte.valueOf((byte) 3);
+    private static final UnsignedByte SASL_SYS_TEMP = 
UnsignedByte.valueOf((byte) 4);
 
+    @Test(timeout = 20000)
+    public void testConnectThrowsSecurityViolationOnFailureFromOpen() throws 
Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
 
             testPeer.rejectConnect(AmqpError.UNAUTHORIZED_ACCESS, "Anonymous 
connections not allowed", null);
@@ -127,6 +134,318 @@ public class FailoverIntegrationTest extends 
QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void 
testConnectThrowsSecurityViolationOnFailureFromSaslWithClientID() throws 
Exception {
+        
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true,
 SASL_FAIL_AUTH);
+        
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true,
 SASL_SYS);
+        
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true,
 SASL_SYS_PERM);
+    }
+
+    @Test(timeout = 20000)
+    public void 
testConnectThrowsSecurityViolationOnFailureFromSaslExplicitlyWithoutClientID() 
throws Exception {
+        
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false,
 SASL_FAIL_AUTH);
+        
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false,
 SASL_SYS);
+        
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false,
 SASL_SYS_PERM);
+    }
+
+    private void 
doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(boolean
 clientID, UnsignedByte saslFailureCode) throws Exception {
+        String optionString;
+        if(clientID) {
+            optionString = "?jms.clientID=myClientID";
+        } else {
+            optionString = "?jms.awaitClientID=false";
+        }
+
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, 
ANONYMOUS}, PLAIN, saslFailureCode);
+
+            ConnectionFactory factory = new 
JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + 
")" + optionString);
+
+            try {
+                factory.createConnection("username", "password");
+                fail("Excepted exception to be thrown");
+            }catch (JMSSecurityException jmsse) {
+                LOG.info("Caught expected security exception: {}", 
jmsse.getMessage());
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void 
testConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientID() 
throws Exception {
+        
doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_FAIL_AUTH);
+        
doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_SYS);
+        
doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_SYS_PERM);
+    }
+
+    private void 
doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(UnsignedByte
 saslFailureCode) throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+            testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, 
ANONYMOUS}, PLAIN, saslFailureCode);
+
+            ConnectionFactory factory = new 
JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + 
")");
+            Connection connection = factory.createConnection("username", 
"password");
+
+            try {
+                connection.start();
+                fail("Excepted exception to be thrown");
+            }catch (JMSSecurityException jmsse) {
+                LOG.info("Caught expected security exception: {}", 
jmsse.getMessage());
+            }
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testConnectHandlesSaslTempFailure() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+            final String finalURI = createPeerURI(finalPeer);
+
+            originalPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, 
ANONYMOUS, SASL_SYS_TEMP);
+
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, finalPeer);
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+
+            try {
+                connection.start();
+            } catch (Exception ex) {
+                fail("Should not have thrown an Exception: " + ex);
+            }
+
+            assertTrue("Should connect to final peer", finalConnected.await(5, 
TimeUnit.SECONDS));
+
+            String content = "myContent";
+            final DescribedType amqpValueNullContent = new 
AmqpValueDescribedType(content);
+
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, 
null, amqpValueNullContent);
+            finalPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message message = consumer.receive(2000);
+
+            finalPeer.expectClose();
+            connection.close();
+            finalPeer.waitForAllHandlersToComplete(1000);
+
+            assertNotNull(message);
+            assertTrue(message instanceof TextMessage);
+            assertEquals(content, ((TextMessage) message).getText());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testFailoverStopsOnNonTemporarySaslFailure() throws Exception {
+        doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_FAIL_AUTH);
+        doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_SYS);
+        doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_SYS_PERM);
+    }
+
+    private void doFailoverStopsOnNonTemporarySaslFailureTestImpl(UnsignedByte 
saslFailureCode) throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer rejectingPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch exceptionListenerFired = new 
CountDownLatch(1);
+            final AtomicReference<Throwable> failure = new AtomicReference<>();
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String rejectingURI = createPeerURI(rejectingPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Rejecting peer is at: {}", rejectingURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Expect connection to the first peer (and have it drop)
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectReceiverAttach();
+            originalPeer.expectLinkFlow();
+            originalPeer.dropAfterLastHandler();
+
+            // --- Post Failover Expectations of Rejecting Peer--- //
+            rejectingPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS 
}, ANONYMOUS, saslFailureCode);
+
+            // --- Post Failover Expectations of FinalPeer --- //
+            // This shouldn't get hit, but if it does accept the connect so we 
don't pass the failed
+            // to connect assertion.
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
+            connection.setExceptionListener(new ExceptionListener() {
+                @Override
+                public void onException(JMSException exception) {
+                    LOG.trace("JMS ExceptionListener: ", exception);
+                    failure.compareAndSet(null, exception);
+                    exceptionListenerFired.countDown();
+                }
+            });
+
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            assertTrue("Should connect to original peer", 
originalConnected.await(3, TimeUnit.SECONDS));
+
+            assertTrue("The ExceptionListener should have been alerted", 
exceptionListenerFired.await(3, TimeUnit.SECONDS));
+            Throwable ex = failure.get();
+            assertTrue("Unexpected failure exception: " + ex, ex instanceof 
JMSSecurityException);
+
+            // Verify the consumer gets marked closed
+            assertTrue("consumer never closed.", Wait.waitFor(new 
Wait.Condition() {
+                @Override
+                public boolean isSatisfied() throws Exception {
+                    try {
+                        consumer.getMessageSelector();
+                    } catch (IllegalStateException jmsise) {
+                        return true;
+                    }
+                    return false;
+                }
+            }, 5000, 5));
+
+            // Shut down last peer and verify no connection made to it
+            finalPeer.purgeExpectations();
+            finalPeer.close();
+            assertNotNull("First peer should have accepted a TCP connection", 
originalPeer.getClientSocket());
+            assertNotNull("Rejecting peer should have accepted a TCP 
connection", rejectingPeer.getClientSocket());
+            assertNull("Final peer should not have accepted any TCP 
connection", finalPeer.getClientSocket());
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testFailoverHandlesTemporarySaslFailure() throws Exception {
+        try (TestAmqpPeer originalPeer = new TestAmqpPeer();
+             TestAmqpPeer rejectingPeer = new TestAmqpPeer();
+             TestAmqpPeer finalPeer = new TestAmqpPeer();) {
+
+            final CountDownLatch originalConnected = new CountDownLatch(1);
+            final CountDownLatch finalConnected = new CountDownLatch(1);
+            final AtomicBoolean exceptionListenerFired = new AtomicBoolean();
+
+            // Create a peer to connect to, then one to reconnect to
+            final String originalURI = createPeerURI(originalPeer);
+            final String rejectingURI = createPeerURI(rejectingPeer);
+            final String finalURI = createPeerURI(finalPeer);
+
+            LOG.info("Original peer is at: {}", originalURI);
+            LOG.info("Rejecting peer is at: {}", rejectingURI);
+            LOG.info("Final peer is at: {}", finalURI);
+
+            // Expect connection to the first peer (and have it drop)
+            originalPeer.expectSaslAnonymous();
+            originalPeer.expectOpen();
+            originalPeer.expectBegin();
+            originalPeer.expectBegin();
+            originalPeer.expectReceiverAttach();
+            originalPeer.expectLinkFlow();
+            originalPeer.dropAfterLastHandler();
+
+            // --- Post Failover Expectations of Rejecting --- //
+            rejectingPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS 
}, ANONYMOUS, SASL_SYS_TEMP);
+
+            // --- Post Failover Expectations of FinalPeer --- //
+            finalPeer.expectSaslAnonymous();
+            finalPeer.expectOpen();
+            finalPeer.expectBegin();
+            finalPeer.expectBegin();
+            finalPeer.expectReceiverAttach();
+            final String expectedMessageContent = "myTextMessage";
+            finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, 
null, new AmqpValueDescribedType(expectedMessageContent));
+
+            final JmsConnection connection = 
establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer);
+            connection.setExceptionListener(new ExceptionListener() {
+                @Override
+                public void onException(JMSException exception) {
+                    LOG.trace("JMS ExceptionListener: ", exception);
+                    exceptionListenerFired.set(true);
+                }
+            });
+
+            connection.addConnectionListener(new 
JmsDefaultConnectionListener() {
+                @Override
+                public void onConnectionEstablished(URI remoteURI) {
+                    LOG.info("Connection Established: {}", remoteURI);
+                    if (originalURI.equals(remoteURI.toString())) {
+                        originalConnected.countDown();
+                    }
+                }
+
+                @Override
+                public void onConnectionRestored(URI remoteURI) {
+                    LOG.info("Connection Restored: {}", remoteURI);
+                    if (finalURI.equals(remoteURI.toString())) {
+                        finalConnected.countDown();
+                    }
+                }
+            });
+            connection.start();
+
+            Session session = connection.createSession(false, 
Session.AUTO_ACKNOWLEDGE);
+            Queue queue = session.createQueue("myQueue");
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            finalPeer.waitForAllHandlersToComplete(2000);
+
+            assertTrue("Should connect to original peer", 
originalConnected.await(3, TimeUnit.SECONDS));
+            assertTrue("Should connect to final peer", finalConnected.await(3, 
TimeUnit.SECONDS));
+
+            // Check message arrives
+            finalPeer.expectDispositionThatIsAcceptedAndSettled();
+
+            Message msg = consumer.receive(5000);
+            assertTrue("Expected an instance of TextMessage, got: " + msg, msg 
instanceof TextMessage);
+            assertEquals("Unexpected msg content", expectedMessageContent, 
((TextMessage) msg).getText());
+
+            assertFalse("The ExceptionListener should not have been alerted", 
exceptionListenerFired.get());
+
+            // Shut it down
+            finalPeer.expectClose();
+            connection.close();
+
+            finalPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testFailoverHandlesConnectErrorInvalidField() throws Exception 
{
         doFailoverHandlesConnectErrorInvalidFieldTestImpl(false);
     }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
index a84406e..5ad37c0 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java
@@ -141,8 +141,9 @@ public class TestAmqpPeer implements AutoCloseable
     private static final Symbol PLAIN = Symbol.valueOf("PLAIN");
     private static final Symbol GSSAPI = Symbol.valueOf("GSSAPI");
     private static final Symbol XOAUTH2 = Symbol.valueOf("XOAUTH2");
-    private static final UnsignedByte SASL_OK = UnsignedByte.valueOf((byte)0);
-    private static final UnsignedByte SASL_FAIL_AUTH = 
UnsignedByte.valueOf((byte)1);
+    private static final UnsignedByte SASL_OK = UnsignedByte.valueOf((byte) 0);
+    private static final UnsignedByte SASL_FAIL_AUTH = 
UnsignedByte.valueOf((byte) 1);
+    private static final UnsignedByte SASL_SYS_TEMP = 
UnsignedByte.valueOf((byte) 4);
     private static final int CONNECTION_CHANNEL = 0;
     private static final int DEFAULT_PRODUCER_CREDIT = 100;
     private static final Symbol[] DEFAULT_DESIRED_CAPABILITIES = new Symbol[] 
{ SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY, ANONYMOUS_RELAY, SHARED_SUBS};
@@ -817,7 +818,12 @@ public class TestAmqpPeer implements AutoCloseable
         expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), 
null, true, true);
     }
 
-    public void expectFailingSaslAuthentication(Symbol[] serverMechs, Symbol 
clientSelectedMech)
+    public void expectSaslFailingAuthentication(Symbol[] serverMechs, Symbol 
clientSelectedMech)
+    {
+        expectSaslFailingExchange(serverMechs, clientSelectedMech, 
SASL_FAIL_AUTH);
+    }
+
+    public void expectSaslFailingExchange(Symbol[] serverMechs, Symbol 
clientSelectedMech, UnsignedByte saslFailureAuthCode)
     {
         SaslMechanismsFrame saslMechanismsFrame = new 
SaslMechanismsFrame().setSaslServerMechanisms(serverMechs);
         addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, 
AmqpHeader.SASL_HEADER,
@@ -825,6 +831,10 @@ public class TestAmqpPeer implements AutoCloseable
                                                     this, FrameType.SASL, 0,
                                                     saslMechanismsFrame, 
null)));
 
+        if(saslFailureAuthCode.compareTo(SASL_FAIL_AUTH) < 0 || 
saslFailureAuthCode.compareTo(SASL_SYS_TEMP) > 0) {
+            throw new IllegalArgumentException("A valid failing SASL code must 
be supplied");
+        }
+
         SaslInitMatcher saslInitMatcher = new 
SaslInitMatcher().withMechanism(equalTo(clientSelectedMech));
         saslInitMatcher.onCompletion(new AmqpPeerRunnable()
         {
@@ -833,7 +843,7 @@ public class TestAmqpPeer implements AutoCloseable
             {
                 TestAmqpPeer.this.sendFrame(
                         FrameType.SASL, 0,
-                        new SaslOutcomeFrame().setCode(SASL_FAIL_AUTH),
+                        new SaslOutcomeFrame().setCode(saslFailureAuthCode),
                         null,
                         false, 0);
                 _driverRunnable.expectHeader();


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org
For additional commands, e-mail: commits-h...@qpid.apache.org

Reply via email to