Repository: qpid-jms Updated Branches: refs/heads/master e3e0452cb -> 76f372510
QPIDJMS-412: stop further [re]connection attempts after a non-temporary SASL failure Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/76f37251 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/76f37251 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/76f37251 Branch: refs/heads/master Commit: 76f372510e154a0a963f4b2ba11025189e53193a Parents: e3e0452 Author: Robbie Gemmell <rob...@apache.org> Authored: Fri Aug 31 18:34:47 2018 +0100 Committer: Robbie Gemmell <rob...@apache.org> Committed: Fri Aug 31 18:34:47 2018 +0100 ---------------------------------------------------------------------- .../exceptions/JMSSecuritySaslException.java | 44 +++ .../provider/amqp/AmqpSaslAuthenticator.java | 20 +- .../jms/provider/failover/FailoverProvider.java | 23 +- .../integration/SaslGssApiIntegrationTest.java | 2 +- .../jms/integration/SaslIntegrationTest.java | 38 ++- .../amqp/AmqpSaslAuthenticatorTest.java | 24 ++ .../failover/FailoverIntegrationTest.java | 323 ++++++++++++++++++- .../qpid/jms/test/testpeer/TestAmqpPeer.java | 18 +- 8 files changed, 475 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JMSSecuritySaslException.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JMSSecuritySaslException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JMSSecuritySaslException.java new file mode 100644 index 0000000..8e1b6b9 --- /dev/null +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JMSSecuritySaslException.java @@ -0,0 +1,44 @@ +/* +* +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, +* software distributed under the License is distributed on an +* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +* KIND, either express or implied. See the License for the +* specific language governing permissions and limitations +* under the License. +* +*/ +package org.apache.qpid.jms.exceptions; + +import javax.jms.JMSSecurityException; + +public class JMSSecuritySaslException extends JMSSecurityException { + + private static final long serialVersionUID = -6181892492517836496L; + private static final int SASL_SYS_TEMP = 4; + + private int outcome = -1; + + public JMSSecuritySaslException(String reason) { + super(reason); + } + + public JMSSecuritySaslException(String reason, int outcome) { + super(reason); + this.outcome = outcome; + } + + public boolean isSysTempFailure() { + return outcome == SASL_SYS_TEMP; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java index d7c8d1d..6b9aa7e 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticator.java @@ -16,8 +16,10 @@ */ package org.apache.qpid.jms.provider.amqp; +import org.apache.qpid.jms.exceptions.JMSSecuritySaslException; import org.apache.qpid.jms.sasl.Mechanism; import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Sasl.SaslOutcome; import org.apache.qpid.proton.engine.Transport; import java.util.function.Function; @@ -104,7 +106,7 @@ public class AmqpSaslAuthenticator { try { switch (sasl.getState()) { case PN_SASL_FAIL: - handleSaslFail(); + handleSaslFail(sasl); break; case PN_SASL_PASS: handleSaslCompletion(sasl); @@ -119,7 +121,7 @@ public class AmqpSaslAuthenticator { //----- Internal support methods -----------------------------------------// - private void handleSaslFail() { + private void handleSaslFail(Sasl sasl) { StringBuilder message = new StringBuilder("Client failed to authenticate"); if (mechanism != null) { message.append(" using SASL: ").append(mechanism.getName()); @@ -127,7 +129,13 @@ public class AmqpSaslAuthenticator { message.append(" (").append(mechanism.getAdditionalFailureInformation()).append(")"); } } - recordFailure(message.toString(), null); + + SaslOutcome outcome = sasl.getOutcome(); + if(outcome.equals(SaslOutcome.PN_SASL_TEMP)) { + message.append(", due to temporary system error."); + } + + recordFailure(message.toString(), null, outcome.getCode()); } private void handleSaslCompletion(Sasl sasl) { @@ -145,7 +153,11 @@ public class AmqpSaslAuthenticator { } private void recordFailure(String message, Throwable cause) { - failureCause = new JMSSecurityException(message); + recordFailure(message, cause, SaslOutcome.PN_SASL_NONE.getCode()); + } + + private void recordFailure(String message, Throwable cause, int outcome) { + failureCause = new JMSSecuritySaslException(message, outcome); if (cause instanceof Exception) { failureCause.setLinkedException((Exception) cause); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java index ad4752c..320beaf 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/failover/FailoverProvider.java @@ -39,6 +39,7 @@ import javax.jms.TransactionRolledBackException; import org.apache.qpid.jms.JmsOperationTimedOutException; import org.apache.qpid.jms.JmsSendTimedOutException; +import org.apache.qpid.jms.exceptions.JMSSecuritySaslException; import org.apache.qpid.jms.message.JmsInboundMessageDispatch; import org.apache.qpid.jms.message.JmsMessageFactory; import org.apache.qpid.jms.message.JmsOutboundMessageDispatch; @@ -738,6 +739,11 @@ public class FailoverProvider extends DefaultProviderListener implements Provide } finally { provider = null; } + + if(reconnectControl.isStoppageCause(e)) { + LOG.trace("Stopping attempt due to type of failure"); + break; + } } } } else { @@ -751,7 +757,7 @@ public class FailoverProvider extends DefaultProviderListener implements Provide } finally { if (provider == null) { LOG.trace("Connection attempt:[{}] failed error: {}", reconnectControl.reconnectAttempts, failure.getMessage()); - if (reconnectControl.isLimitExceeded()) { + if (!reconnectControl.isReconnectAllowed(failure)) { reportReconnectFailure(failure); } else { reconnectControl.scheduleReconnect(this); @@ -1366,18 +1372,29 @@ public class FailoverProvider extends DefaultProviderListener implements Provide return false; } - public boolean isReconnectAllowed(IOException cause) { + public boolean isReconnectAllowed(Throwable cause) { // If a connection attempts fail due to Security errors than // we abort reconnection as there is a configuration issue and // we want to avoid a spinning reconnect cycle that can never // complete. - if (cause.getCause() instanceof JMSSecurityException) { + if(isStoppageCause(cause)) { return false; } return !isLimitExceeded(); } + private boolean isStoppageCause(Throwable cause) { + if(cause.getCause() instanceof JMSSecuritySaslException) { + JMSSecuritySaslException saslFailure = (JMSSecuritySaslException) cause.getCause(); + return !saslFailure.isSysTempFailure(); + } else if (cause.getCause() instanceof JMSSecurityException ) { + return true; + } + + return false; + } + private int reconnectAttemptLimit() { int maxReconnectValue = maxReconnectAttempts; if (!recoveryRequired && startupMaxReconnectAttempts != UNDEFINED) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java index c4c3b19..0e63b40 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslGssApiIntegrationTest.java @@ -248,7 +248,7 @@ public class SaslGssApiIntegrationTest extends QpidJmsTestCase { private void doMechanismSelectedTestImpl(String username, String password, Symbol clientSelectedMech, Symbol[] serverMechs, boolean enableGssapiExplicitly) throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - testPeer.expectFailingSaslAuthentication(serverMechs, clientSelectedMech); + testPeer.expectSaslFailingAuthentication(serverMechs, clientSelectedMech); String uriOptions = "?jms.clientID=myclientid"; if(enableGssapiExplicitly) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java index 2bcb363..eb0b0cc 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/SaslIntegrationTest.java @@ -47,6 +47,7 @@ import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; import org.apache.qpid.jms.transports.TransportOptions; import org.apache.qpid.jms.transports.TransportSupport; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; import org.junit.Test; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -63,6 +64,11 @@ public class SaslIntegrationTest extends QpidJmsTestCase { private static final Symbol EXTERNAL = Symbol.valueOf("EXTERNAL"); private static final Symbol XOAUTH2 = Symbol.valueOf("XOAUTH2"); + private static final UnsignedByte SASL_FAIL_AUTH = UnsignedByte.valueOf((byte) 1); + private static final UnsignedByte SASL_SYS = UnsignedByte.valueOf((byte) 2); + private static final UnsignedByte SASL_SYS_PERM = UnsignedByte.valueOf((byte) 3); + private static final UnsignedByte SASL_SYS_TEMP = UnsignedByte.valueOf((byte) 4); + private static final String BROKER_JKS_KEYSTORE = "src/test/resources/broker-jks.keystore"; private static final String BROKER_JKS_TRUSTSTORE = "src/test/resources/broker-jks.truststore"; private static final String CLIENT_JKS_KEYSTORE = "src/test/resources/client-jks.keystore"; @@ -225,6 +231,32 @@ public class SaslIntegrationTest extends QpidJmsTestCase { } } + @Test(timeout = 20000) + public void testSaslFailureCodes() throws Exception { + doSaslFailureCodesTestImpl(SASL_FAIL_AUTH); + doSaslFailureCodesTestImpl(SASL_SYS); + doSaslFailureCodesTestImpl(SASL_SYS_PERM); + doSaslFailureCodesTestImpl(SASL_SYS_TEMP); + } + + private void doSaslFailureCodesTestImpl(UnsignedByte saslFailureCode) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, ANONYMOUS}, PLAIN, saslFailureCode); + + ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=myClientID"); + + try { + factory.createConnection("username", "password"); + fail("Excepted exception to be thrown"); + }catch (JMSSecurityException jmsse) { + LOG.info("Caught expected security exception: {}", jmsse.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + /** * Add a small delay after the SASL process fails, test peer will throw if * any unexpected frames arrive, such as erroneous open+close. @@ -270,7 +302,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase { private void doMechanismSelectedTestImpl(String username, String password, Symbol clientSelectedMech, Symbol[] serverMechs, boolean wait) throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - testPeer.expectFailingSaslAuthentication(serverMechs, clientSelectedMech); + testPeer.expectSaslFailingAuthentication(serverMechs, clientSelectedMech); ConnectionFactory factory = new JmsConnectionFactory("amqp://localhost:" + testPeer.getServerPort() + "?jms.clientID=myclientid"); try { @@ -323,7 +355,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase { "transport.keyStorePassword=" + PASSWORD; } - testPeer.expectFailingSaslAuthentication(serverMechs, clientSelectedMech); + testPeer.expectSaslFailingAuthentication(serverMechs, clientSelectedMech); JmsConnectionFactory factory = new JmsConnectionFactory("amqps://localhost:" + testPeer.getServerPort() + connOptions); try { @@ -385,7 +417,7 @@ public class SaslIntegrationTest extends QpidJmsTestCase { private void doMechanismSelectionRestrictedTestImpl(String username, String password, Symbol clientSelectedMech, Symbol[] serverMechs, String mechanismsOptionValue) throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { - testPeer.expectFailingSaslAuthentication(serverMechs, clientSelectedMech); + testPeer.expectSaslFailingAuthentication(serverMechs, clientSelectedMech); String uriOptions = "?jms.clientID=myclientid"; if(mechanismsOptionValue != null) { http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java index 8fd212b..1658561 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpSaslAuthenticatorTest.java @@ -36,8 +36,10 @@ import java.util.function.Function; import javax.jms.JMSSecurityRuntimeException; import javax.security.sasl.SaslException; +import org.apache.qpid.jms.exceptions.JMSSecuritySaslException; import org.apache.qpid.jms.sasl.Mechanism; import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Sasl.SaslOutcome; import org.apache.qpid.proton.engine.Sasl.SaslState; import org.apache.qpid.proton.engine.Transport; import org.junit.Before; @@ -137,12 +139,34 @@ public class AmqpSaslAuthenticatorTest { verifySaslMockReceived(sasl, INITIAL_RESPONSE); when(sasl.getState()).thenReturn(SaslState.PN_SASL_FAIL); + when(sasl.getOutcome()).thenReturn(SaslOutcome.PN_SASL_AUTH); authenticator.handleSaslOutcome(sasl, transport); assertTrue(authenticator.isComplete()); assertFalse(authenticator.wasSuccessful()); assertNotNull(authenticator.getFailureCause()); assertTrue(authenticator.getFailureCause().getMessage().contains("Client failed to authenticate")); + assertFalse(authenticator.getFailureCause().getMessage().contains("due to temporary system error")); + } + + @Test + public void testPeerSignalsAuthenticationSysTemp() throws Exception { + Mechanism mechanism = new TestSaslMechanism(INITIAL_RESPONSE); + AmqpSaslAuthenticator authenticator = new AmqpSaslAuthenticator(mechanismName -> mechanism); + + authenticator.handleSaslMechanisms(sasl, transport); + verifySaslMockReceived(sasl, INITIAL_RESPONSE); + + when(sasl.getState()).thenReturn(SaslState.PN_SASL_FAIL); + when(sasl.getOutcome()).thenReturn(SaslOutcome.PN_SASL_TEMP); + authenticator.handleSaslOutcome(sasl, transport); + + assertTrue(authenticator.isComplete()); + assertFalse(authenticator.wasSuccessful()); + assertNotNull(authenticator.getFailureCause()); + assertTrue(authenticator.getFailureCause() instanceof JMSSecuritySaslException); + assertTrue(authenticator.getFailureCause().getMessage().contains("Client failed to authenticate")); + assertTrue(authenticator.getFailureCause().getMessage().contains("due to temporary system error")); } @Test http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java index 0a80741..7ff43fc 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/failover/FailoverIntegrationTest.java @@ -92,6 +92,7 @@ import org.apache.qpid.jms.util.StopWatch; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.junit.Test; import org.junit.runner.RunWith; @@ -104,9 +105,15 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { private static final Logger LOG = LoggerFactory.getLogger(FailoverIntegrationTest.class); - @Test(timeout = 20000) - public void testConnectSecurityViolation() throws Exception { + private static final Symbol ANONYMOUS = Symbol.valueOf("ANONYMOUS"); + private static final Symbol PLAIN = Symbol.valueOf("PLAIN"); + private static final UnsignedByte SASL_FAIL_AUTH = UnsignedByte.valueOf((byte) 1); + private static final UnsignedByte SASL_SYS = UnsignedByte.valueOf((byte) 2); + private static final UnsignedByte SASL_SYS_PERM = UnsignedByte.valueOf((byte) 3); + private static final UnsignedByte SASL_SYS_TEMP = UnsignedByte.valueOf((byte) 4); + @Test(timeout = 20000) + public void testConnectThrowsSecurityViolationOnFailureFromOpen() throws Exception { try (TestAmqpPeer testPeer = new TestAmqpPeer();) { testPeer.rejectConnect(AmqpError.UNAUTHORIZED_ACCESS, "Anonymous connections not allowed", null); @@ -127,6 +134,318 @@ public class FailoverIntegrationTest extends QpidJmsTestCase { } @Test(timeout = 20000) + public void testConnectThrowsSecurityViolationOnFailureFromSaslWithClientID() throws Exception { + doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true, SASL_FAIL_AUTH); + doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true, SASL_SYS); + doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(true, SASL_SYS_PERM); + } + + @Test(timeout = 20000) + public void testConnectThrowsSecurityViolationOnFailureFromSaslExplicitlyWithoutClientID() throws Exception { + doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false, SASL_FAIL_AUTH); + doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false, SASL_SYS); + doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(false, SASL_SYS_PERM); + } + + private void doConnectThrowsSecurityViolationOnFailureFromSaslWithOrExplicitlyWithoutClientIDTestImpl(boolean clientID, UnsignedByte saslFailureCode) throws Exception { + String optionString; + if(clientID) { + optionString = "?jms.clientID=myClientID"; + } else { + optionString = "?jms.awaitClientID=false"; + } + + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + + testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, ANONYMOUS}, PLAIN, saslFailureCode); + + ConnectionFactory factory = new JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + ")" + optionString); + + try { + factory.createConnection("username", "password"); + fail("Excepted exception to be thrown"); + }catch (JMSSecurityException jmsse) { + LOG.info("Caught expected security exception: {}", jmsse.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientID() throws Exception { + doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_FAIL_AUTH); + doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_SYS); + doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(SASL_SYS_PERM); + } + + private void doConnectThrowsSecurityViolationOnFailureFromSaslImplicitlyWithoutClientIDTestImpl(UnsignedByte saslFailureCode) throws Exception { + try (TestAmqpPeer testPeer = new TestAmqpPeer();) { + testPeer.expectSaslFailingExchange(new Symbol[] {PLAIN, ANONYMOUS}, PLAIN, saslFailureCode); + + ConnectionFactory factory = new JmsConnectionFactory("failover:(amqp://localhost:" + testPeer.getServerPort() + ")"); + Connection connection = factory.createConnection("username", "password"); + + try { + connection.start(); + fail("Excepted exception to be thrown"); + }catch (JMSSecurityException jmsse) { + LOG.info("Caught expected security exception: {}", jmsse.getMessage()); + } + + testPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) + public void testConnectHandlesSaslTempFailure() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final CountDownLatch finalConnected = new CountDownLatch(1); + final String finalURI = createPeerURI(finalPeer); + + originalPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, ANONYMOUS, SASL_SYS_TEMP); + + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, finalPeer); + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (finalURI.equals(remoteURI.toString())) { + finalConnected.countDown(); + } + } + }); + + try { + connection.start(); + } catch (Exception ex) { + fail("Should not have thrown an Exception: " + ex); + } + + assertTrue("Should connect to final peer", finalConnected.await(5, TimeUnit.SECONDS)); + + String content = "myContent"; + final DescribedType amqpValueNullContent = new AmqpValueDescribedType(content); + + finalPeer.expectBegin(); + finalPeer.expectReceiverAttach(); + finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, amqpValueNullContent); + finalPeer.expectDispositionThatIsAcceptedAndSettled(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + MessageConsumer consumer = session.createConsumer(queue); + Message message = consumer.receive(2000); + + finalPeer.expectClose(); + connection.close(); + finalPeer.waitForAllHandlersToComplete(1000); + + assertNotNull(message); + assertTrue(message instanceof TextMessage); + assertEquals(content, ((TextMessage) message).getText()); + } + } + + @Test(timeout = 20000) + public void testFailoverStopsOnNonTemporarySaslFailure() throws Exception { + doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_FAIL_AUTH); + doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_SYS); + doFailoverStopsOnNonTemporarySaslFailureTestImpl(SASL_SYS_PERM); + } + + private void doFailoverStopsOnNonTemporarySaslFailureTestImpl(UnsignedByte saslFailureCode) throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer rejectingPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final CountDownLatch originalConnected = new CountDownLatch(1); + final CountDownLatch exceptionListenerFired = new CountDownLatch(1); + final AtomicReference<Throwable> failure = new AtomicReference<>(); + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String rejectingURI = createPeerURI(rejectingPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Rejecting peer is at: {}", rejectingURI); + LOG.info("Final peer is at: {}", finalURI); + + // Expect connection to the first peer (and have it drop) + originalPeer.expectSaslAnonymous(); + originalPeer.expectOpen(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + originalPeer.expectReceiverAttach(); + originalPeer.expectLinkFlow(); + originalPeer.dropAfterLastHandler(); + + // --- Post Failover Expectations of Rejecting Peer--- // + rejectingPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, ANONYMOUS, saslFailureCode); + + // --- Post Failover Expectations of FinalPeer --- // + // This shouldn't get hit, but if it does accept the connect so we don't pass the failed + // to connect assertion. + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + LOG.trace("JMS ExceptionListener: ", exception); + failure.compareAndSet(null, exception); + exceptionListenerFired.countDown(); + } + }); + + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (originalURI.equals(remoteURI.toString())) { + originalConnected.countDown(); + } + } + }); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + final MessageConsumer consumer = session.createConsumer(queue); + + assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS)); + + assertTrue("The ExceptionListener should have been alerted", exceptionListenerFired.await(3, TimeUnit.SECONDS)); + Throwable ex = failure.get(); + assertTrue("Unexpected failure exception: " + ex, ex instanceof JMSSecurityException); + + // Verify the consumer gets marked closed + assertTrue("consumer never closed.", Wait.waitFor(new Wait.Condition() { + @Override + public boolean isSatisfied() throws Exception { + try { + consumer.getMessageSelector(); + } catch (IllegalStateException jmsise) { + return true; + } + return false; + } + }, 5000, 5)); + + // Shut down last peer and verify no connection made to it + finalPeer.purgeExpectations(); + finalPeer.close(); + assertNotNull("First peer should have accepted a TCP connection", originalPeer.getClientSocket()); + assertNotNull("Rejecting peer should have accepted a TCP connection", rejectingPeer.getClientSocket()); + assertNull("Final peer should not have accepted any TCP connection", finalPeer.getClientSocket()); + } + } + + @Test(timeout = 20000) + public void testFailoverHandlesTemporarySaslFailure() throws Exception { + try (TestAmqpPeer originalPeer = new TestAmqpPeer(); + TestAmqpPeer rejectingPeer = new TestAmqpPeer(); + TestAmqpPeer finalPeer = new TestAmqpPeer();) { + + final CountDownLatch originalConnected = new CountDownLatch(1); + final CountDownLatch finalConnected = new CountDownLatch(1); + final AtomicBoolean exceptionListenerFired = new AtomicBoolean(); + + // Create a peer to connect to, then one to reconnect to + final String originalURI = createPeerURI(originalPeer); + final String rejectingURI = createPeerURI(rejectingPeer); + final String finalURI = createPeerURI(finalPeer); + + LOG.info("Original peer is at: {}", originalURI); + LOG.info("Rejecting peer is at: {}", rejectingURI); + LOG.info("Final peer is at: {}", finalURI); + + // Expect connection to the first peer (and have it drop) + originalPeer.expectSaslAnonymous(); + originalPeer.expectOpen(); + originalPeer.expectBegin(); + originalPeer.expectBegin(); + originalPeer.expectReceiverAttach(); + originalPeer.expectLinkFlow(); + originalPeer.dropAfterLastHandler(); + + // --- Post Failover Expectations of Rejecting --- // + rejectingPeer.expectSaslFailingExchange(new Symbol[] { ANONYMOUS }, ANONYMOUS, SASL_SYS_TEMP); + + // --- Post Failover Expectations of FinalPeer --- // + finalPeer.expectSaslAnonymous(); + finalPeer.expectOpen(); + finalPeer.expectBegin(); + finalPeer.expectBegin(); + finalPeer.expectReceiverAttach(); + final String expectedMessageContent = "myTextMessage"; + finalPeer.expectLinkFlowRespondWithTransfer(null, null, null, null, new AmqpValueDescribedType(expectedMessageContent)); + + final JmsConnection connection = establishAnonymousConnecton(originalPeer, rejectingPeer, finalPeer); + connection.setExceptionListener(new ExceptionListener() { + @Override + public void onException(JMSException exception) { + LOG.trace("JMS ExceptionListener: ", exception); + exceptionListenerFired.set(true); + } + }); + + connection.addConnectionListener(new JmsDefaultConnectionListener() { + @Override + public void onConnectionEstablished(URI remoteURI) { + LOG.info("Connection Established: {}", remoteURI); + if (originalURI.equals(remoteURI.toString())) { + originalConnected.countDown(); + } + } + + @Override + public void onConnectionRestored(URI remoteURI) { + LOG.info("Connection Restored: {}", remoteURI); + if (finalURI.equals(remoteURI.toString())) { + finalConnected.countDown(); + } + } + }); + connection.start(); + + Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE); + Queue queue = session.createQueue("myQueue"); + final MessageConsumer consumer = session.createConsumer(queue); + + finalPeer.waitForAllHandlersToComplete(2000); + + assertTrue("Should connect to original peer", originalConnected.await(3, TimeUnit.SECONDS)); + assertTrue("Should connect to final peer", finalConnected.await(3, TimeUnit.SECONDS)); + + // Check message arrives + finalPeer.expectDispositionThatIsAcceptedAndSettled(); + + Message msg = consumer.receive(5000); + assertTrue("Expected an instance of TextMessage, got: " + msg, msg instanceof TextMessage); + assertEquals("Unexpected msg content", expectedMessageContent, ((TextMessage) msg).getText()); + + assertFalse("The ExceptionListener should not have been alerted", exceptionListenerFired.get()); + + // Shut it down + finalPeer.expectClose(); + connection.close(); + + finalPeer.waitForAllHandlersToComplete(1000); + } + } + + @Test(timeout = 20000) public void testFailoverHandlesConnectErrorInvalidField() throws Exception { doFailoverHandlesConnectErrorInvalidFieldTestImpl(false); } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/76f37251/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java index a84406e..5ad37c0 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/test/testpeer/TestAmqpPeer.java @@ -141,8 +141,9 @@ public class TestAmqpPeer implements AutoCloseable private static final Symbol PLAIN = Symbol.valueOf("PLAIN"); private static final Symbol GSSAPI = Symbol.valueOf("GSSAPI"); private static final Symbol XOAUTH2 = Symbol.valueOf("XOAUTH2"); - private static final UnsignedByte SASL_OK = UnsignedByte.valueOf((byte)0); - private static final UnsignedByte SASL_FAIL_AUTH = UnsignedByte.valueOf((byte)1); + private static final UnsignedByte SASL_OK = UnsignedByte.valueOf((byte) 0); + private static final UnsignedByte SASL_FAIL_AUTH = UnsignedByte.valueOf((byte) 1); + private static final UnsignedByte SASL_SYS_TEMP = UnsignedByte.valueOf((byte) 4); private static final int CONNECTION_CHANNEL = 0; private static final int DEFAULT_PRODUCER_CREDIT = 100; private static final Symbol[] DEFAULT_DESIRED_CAPABILITIES = new Symbol[] { SOLE_CONNECTION_CAPABILITY, DELAYED_DELIVERY, ANONYMOUS_RELAY, SHARED_SUBS}; @@ -817,7 +818,12 @@ public class TestAmqpPeer implements AutoCloseable expectSaslAuthentication(ANONYMOUS, equalTo(new Binary(new byte[0])), null, true, true); } - public void expectFailingSaslAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech) + public void expectSaslFailingAuthentication(Symbol[] serverMechs, Symbol clientSelectedMech) + { + expectSaslFailingExchange(serverMechs, clientSelectedMech, SASL_FAIL_AUTH); + } + + public void expectSaslFailingExchange(Symbol[] serverMechs, Symbol clientSelectedMech, UnsignedByte saslFailureAuthCode) { SaslMechanismsFrame saslMechanismsFrame = new SaslMechanismsFrame().setSaslServerMechanisms(serverMechs); addHandler(new HeaderHandlerImpl(AmqpHeader.SASL_HEADER, AmqpHeader.SASL_HEADER, @@ -825,6 +831,10 @@ public class TestAmqpPeer implements AutoCloseable this, FrameType.SASL, 0, saslMechanismsFrame, null))); + if(saslFailureAuthCode.compareTo(SASL_FAIL_AUTH) < 0 || saslFailureAuthCode.compareTo(SASL_SYS_TEMP) > 0) { + throw new IllegalArgumentException("A valid failing SASL code must be supplied"); + } + SaslInitMatcher saslInitMatcher = new SaslInitMatcher().withMechanism(equalTo(clientSelectedMech)); saslInitMatcher.onCompletion(new AmqpPeerRunnable() { @@ -833,7 +843,7 @@ public class TestAmqpPeer implements AutoCloseable { TestAmqpPeer.this.sendFrame( FrameType.SASL, 0, - new SaslOutcomeFrame().setCode(SASL_FAIL_AUTH), + new SaslOutcomeFrame().setCode(saslFailureAuthCode), null, false, 0); _driverRunnable.expectHeader(); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org