Repository: qpid-broker-j Updated Branches: refs/heads/master 061fb1b03 -> 32733fc22
QPID-7782: [AMQP1.0] Add supporting AMQP 1.0 SASL protocol tests Also: * prevented NPE if client selected a SASL mechanism which was not one of those offered by SaslMechanisms. * grouped sasl methods togther within the Connection_1_0 implementation. Project: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/commit/32733fc2 Tree: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/tree/32733fc2 Diff: http://git-wip-us.apache.org/repos/asf/qpid-broker-j/diff/32733fc2 Branch: refs/heads/master Commit: 32733fc224a5f374bfb34cbdfe04f715ef9e59af Parents: 061fb1b Author: Keith Wall <[email protected]> Authored: Fri Jun 9 16:38:49 2017 +0100 Committer: Keith Wall <[email protected]> Committed: Fri Jun 9 16:43:26 2017 +0100 ---------------------------------------------------------------------- .../protocol/v1_0/AMQPConnection_1_0Impl.java | 256 ++++++++-------- .../protocol/v1_0/framing/FrameHandler.java | 2 +- .../tests/protocol/v1_0/FrameTransport.java | 124 +++++++- .../qpid/tests/protocol/v1_0/InputHandler.java | 49 +++- .../qpid/tests/protocol/v1_0/OutputHandler.java | 8 +- .../protocol/v1_0/SaslPerformativeResponse.java | 55 ++++ .../main/resources/config-protocol-tests.json | 1 + .../v1_0/transport/ProtocolHeaderTest.java | 20 ++ .../v1_0/transport/security/sasl/SaslTest.java | 289 +++++++++++++++++++ 9 files changed, 652 insertions(+), 152 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java index 977dc8f..870a961 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0Impl.java @@ -274,6 +274,138 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio _frameWriter = new FrameWriter(getDescribedTypeRegistry(), getSender()); } + @Override + public void receiveSaslInit(final SaslInit saslInit) + { + assertState(ConnectionState.AWAIT_SASL_INIT); + if(saslInit.getHostname() != null && !"".equals(saslInit.getHostname().trim())) + { + _localHostname = saslInit.getHostname(); + } + else if(getNetwork().getSelectedHost() != null) + { + _localHostname = getNetwork().getSelectedHost(); + } + String mechanism = saslInit.getMechanism().toString(); + final Binary initialResponse = saslInit.getInitialResponse(); + byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray(); + + List<String> availableMechanisms = + _subjectCreator.getAuthenticationProvider().getAvailableMechanisms(getTransport().isSecure()); + if (!availableMechanisms.contains(mechanism)) + { + handleSaslError(); + } + else + { + _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this); + processSaslResponse(response); + } + } + + @Override + public void receiveSaslResponse(final SaslResponse saslResponse) + { + assertState(ConnectionState.AWAIT_SASL_RESPONSE); + final Binary responseBinary = saslResponse.getResponse(); + byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray(); + + processSaslResponse(response); + } + + @Override + public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms) + { + LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject()); + closeSaslWithFailure(); + } + + @Override + public void receiveSaslChallenge(final SaslChallenge saslChallenge) + { + LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject()); + closeSaslWithFailure(); + } + + @Override + public void receiveSaslOutcome(final SaslOutcome saslOutcome) + { + LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject()); + closeSaslWithFailure(); + } + + private void processSaslResponse(final byte[] response) + { + byte[] challenge = null; + SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult; + if (authenticationResult == null) + { + authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]); + challenge = authenticationResult.getChallenge(); + } + + if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS) + { + _successfulAuthenticationResult = authenticationResult; + if (challenge == null || challenge.length == 0) + { + setSubject(_successfulAuthenticationResult.getSubject()); + SaslOutcome outcome = new SaslOutcome(); + outcome.setCode(SaslCode.OK); + send(new SASLFrame(outcome), null); + _saslComplete = true; + _connectionState = ConnectionState.AWAIT_AMQP_HEADER; + disposeSaslNegotiator(); + } + else + { + continueSaslNegotiation(challenge); + } + } + else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE) + { + continueSaslNegotiation(challenge); + } + else + { + handleSaslError(); + } + } + + private void continueSaslNegotiation(final byte[] challenge) + { + SaslChallenge challengeBody = new SaslChallenge(); + challengeBody.setChallenge(new Binary(challenge)); + send(new SASLFrame(challengeBody), null); + + _connectionState = ConnectionState.AWAIT_SASL_RESPONSE; + } + + private void handleSaslError() + { + SaslOutcome outcome = new SaslOutcome(); + outcome.setCode(SaslCode.AUTH); + send(new SASLFrame(outcome), null); + _saslComplete = true; + closeSaslWithFailure(); + } + + private void closeSaslWithFailure() + { + _saslComplete = true; + disposeSaslNegotiator(); + _connectionState = ConnectionState.CLOSED; + addCloseTicker(); + } + + private void disposeSaslNegotiator() + { + if (_saslNegotiator != null) + { + _saslNegotiator.dispose(); + } + _saslNegotiator = null; + } private void setUserPrincipal(final Principal user) { @@ -359,26 +491,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } } - private void closeSaslWithFailure() - { - _saslComplete = true; - disposeSaslNegotiator(); - _connectionState = ConnectionState.CLOSED; - addCloseTicker(); - } - - private void disposeSaslNegotiator() - { - _saslNegotiator.dispose(); - _saslNegotiator = null; - } - - @Override - public void receiveSaslChallenge(final SaslChallenge saslChallenge) - { - LOGGER.info("{} : Unexpected frame sasl-challenge", getLogSubject()); - closeSaslWithFailure(); - } @Override public void receiveClose(final int channel, final Close close) @@ -438,23 +550,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } } - @Override - public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms) - { - LOGGER.info("{} : Unexpected frame sasl-mechanisms", getLogSubject()); - closeSaslWithFailure(); - } - - @Override - public void receiveSaslResponse(final SaslResponse saslResponse) - { - final Binary responseBinary = saslResponse.getResponse(); - byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray(); - - assertState(ConnectionState.AWAIT_SASL_RESPONSE); - - processSaslResponse(response); - } @Override public AMQPDescribedTypeRegistry getDescribedTypeRegistry() @@ -538,13 +633,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } @Override - public void receiveSaslOutcome(final SaslOutcome saslOutcome) - { - LOGGER.info("{} : Unexpected frame sasl-outcome", getLogSubject()); - closeSaslWithFailure(); - } - - @Override public void receiveEnd(final int channel, final End end) { assertState(ConnectionState.OPENED); @@ -877,7 +965,8 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio // already sent our close - probably due to an error break; default: - throw new ConnectionScopedRuntimeException("Connection Open failed under mysterious circumstances."); + throw new ConnectionScopedRuntimeException(String.format( + "Unexpected state %s during connection open.", _connectionState)); } } } @@ -992,87 +1081,11 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } @Override - public void receiveSaslInit(final SaslInit saslInit) - { - assertState(ConnectionState.AWAIT_SASL_INIT); - if(saslInit.getHostname() != null && !"".equals(saslInit.getHostname().trim())) - { - _localHostname = saslInit.getHostname(); - } - else if(getNetwork().getSelectedHost() != null) - { - _localHostname = getNetwork().getSelectedHost(); - } - String mechanism = saslInit.getMechanism() == null ? null : saslInit.getMechanism().toString(); - final Binary initialResponse = saslInit.getInitialResponse(); - byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray(); - - _saslNegotiator = _subjectCreator.createSaslNegotiator(mechanism, this); - processSaslResponse(response); - } - - @Override public String getLocalFQDN() { return _localHostname != null ? _localHostname : super.getLocalFQDN(); } - private void processSaslResponse(final byte[] response) - { - byte[] challenge = null; - SubjectAuthenticationResult authenticationResult = _successfulAuthenticationResult; - if (authenticationResult == null) - { - authenticationResult = _subjectCreator.authenticate(_saslNegotiator, response != null ? response : new byte[0]); - challenge = authenticationResult.getChallenge(); - } - - if (authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.SUCCESS) - { - _successfulAuthenticationResult = authenticationResult; - if (challenge == null || challenge.length == 0) - { - setSubject(_successfulAuthenticationResult.getSubject()); - SaslOutcome outcome = new SaslOutcome(); - outcome.setCode(SaslCode.OK); - send(new SASLFrame(outcome), null); - _saslComplete = true; - _connectionState = ConnectionState.AWAIT_AMQP_HEADER; - disposeSaslNegotiator(); - } - else - { - continueSaslNegotiation(challenge); - } - } - else if(authenticationResult.getStatus() == AuthenticationResult.AuthenticationStatus.CONTINUE) - { - continueSaslNegotiation(challenge); - } - else - { - handleSaslError(); - } - } - - private void continueSaslNegotiation(final byte[] challenge) - { - SaslChallenge challengeBody = new SaslChallenge(); - challengeBody.setChallenge(new Binary(challenge)); - send(new SASLFrame(challengeBody), null); - - _connectionState = ConnectionState.AWAIT_SASL_RESPONSE; - } - - private void handleSaslError() - { - SaslOutcome outcome = new SaslOutcome(); - outcome.setCode(SaslCode.AUTH); - send(new SASLFrame(outcome), null); - _saslComplete = true; - closeSaslWithFailure(); - } - @Override public int getMaxFrameSize() { @@ -1419,7 +1432,6 @@ public class AMQPConnection_1_0Impl extends AbstractAMQPConnection<AMQPConnectio } - @Override public void send(final AMQFrame amqFrame, ByteBuffer buf) { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java ---------------------------------------------------------------------- diff --git a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java index a4dc4af..c5e0334 100644 --- a/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java +++ b/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/framing/FrameHandler.java @@ -41,7 +41,7 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; public class FrameHandler implements ProtocolHandler { - public static Logger LOGGER = LoggerFactory.getLogger(FrameHandler.class); + public static final Logger LOGGER = LoggerFactory.getLogger(FrameHandler.class); private final boolean _isSasl; private final ConnectionHandler _connectionHandler; http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java index 47872f0..eda903a 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/FrameTransport.java @@ -20,12 +20,14 @@ package org.apache.qpid.tests.protocol.v1_0; import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.anyOf; import static org.hamcrest.Matchers.instanceOf; import static org.hamcrest.Matchers.is; import static org.hamcrest.Matchers.notNullValue; -import static org.junit.Assert.assertNull; +import static org.hamcrest.Matchers.nullValue; import java.net.InetSocketAddress; +import java.nio.channels.ClosedChannelException; import java.nio.charset.StandardCharsets; import java.util.Arrays; import java.util.Collections; @@ -54,8 +56,10 @@ import org.hamcrest.CoreMatchers; import org.hamcrest.core.Is; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; +import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame; import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame; import org.apache.qpid.server.protocol.v1_0.type.FrameBody; +import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody; import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; @@ -70,18 +74,25 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; public class FrameTransport implements AutoCloseable { - private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>()); public static final long RESPONSE_TIMEOUT = 6000; + private static final Set<Integer> AMQP_CONNECTION_IDS = Collections.newSetFromMap(new ConcurrentHashMap<>()); + private static final Response CHANNEL_CLOSED_RESPONSE = new ChannelClosedResponse(); private final Channel _channel; private final BlockingQueue<Response> _queue = new ArrayBlockingQueue<>(100); private final EventLoopGroup _workerGroup; + private volatile boolean _channelClosedSeen = false; private int _amqpConnectionId; private short _amqpChannelId; public FrameTransport(final InetSocketAddress brokerAddress) { + this(brokerAddress, false); + } + + public FrameTransport(final InetSocketAddress brokerAddress, boolean isSasl) + { _workerGroup = new NioEventLoopGroup(); try @@ -95,12 +106,16 @@ public class FrameTransport implements AutoCloseable @Override public void initChannel(SocketChannel ch) throws Exception { - ch.pipeline().addLast(new InputHandler(_queue)) - .addLast(new OutputHandler()); + ch.pipeline().addLast(new InputHandler(_queue, isSasl)).addLast(new OutputHandler()); } }); _channel = b.connect(brokerAddress).sync().channel(); + _channel.closeFuture().addListener(future -> + { + _channelClosedSeen = true; + _queue.add(CHANNEL_CLOSED_RESPONSE); + }); } catch (InterruptedException e) { @@ -134,13 +149,21 @@ public class FrameTransport implements AutoCloseable public ListenableFuture<Void> sendPerformative(final FrameBody frameBody, UnsignedShort channel) throws Exception { - final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer)frameBody).getPayload() : null; + final List<QpidByteBuffer> payload = frameBody instanceof Transfer ? ((Transfer) frameBody).getPayload() : null; TransportFrame transportFrame = new TransportFrame(channel.shortValue(), frameBody, payload); ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame); channelFuture.sync(); return JdkFutureAdapters.listenInPoolThread(channelFuture); } + public ListenableFuture<Void> sendPerformative(final SaslFrameBody frameBody) throws Exception + { + SASLFrame transportFrame = new SASLFrame(frameBody); + ChannelFuture channelFuture = _channel.writeAndFlush(transportFrame); + channelFuture.sync(); + return JdkFutureAdapters.listenInPoolThread(channelFuture); + } + public ListenableFuture<Void> sendPerformative(final FrameBody frameBody) throws Exception { return sendPerformative(frameBody, UnsignedShort.valueOf(_amqpChannelId)); @@ -174,6 +197,52 @@ public class FrameTransport implements AutoCloseable return _queue.poll(RESPONSE_TIMEOUT, TimeUnit.MILLISECONDS); } + public <R extends Response> R getNextResponse(Class<? extends Response> expectedResponseClass) throws Exception + { + R actualResponse = (R) getNextResponse(); + if (actualResponse == null) + { + throw new IllegalStateException(String.format("No response received within timeout %d - expecting %s", + RESPONSE_TIMEOUT, expectedResponseClass.getName())); + } + else if (!expectedResponseClass.isAssignableFrom(actualResponse.getClass())) + { + throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s", + expectedResponseClass.getName(), + actualResponse.getClass().getName())); + } + + return actualResponse; + } + + public <P> P getNextPerformativeResponse(Class<?> expectedFrameBodyClass) throws Exception + { + final P actualFrameBody; + if (SaslFrameBody.class.isAssignableFrom(expectedFrameBodyClass)) + { + SaslPerformativeResponse response = getNextResponse(SaslPerformativeResponse.class); + actualFrameBody = (P) response.getFrameBody(); + } + else if (FrameBody.class.isAssignableFrom(expectedFrameBodyClass)) + { + PerformativeResponse response = getNextResponse(PerformativeResponse.class); + actualFrameBody = (P) response.getFrameBody(); + } + else + { + throw new IllegalArgumentException(String.format("Unexpected class %s", expectedFrameBodyClass.getName())); + } + + if (!expectedFrameBodyClass.isAssignableFrom(actualFrameBody.getClass())) + { + throw new IllegalStateException(String.format("Unexpected response - expecting %s - received - %s", + expectedFrameBodyClass.getName(), + actualFrameBody.getClass().getName())); + } + + return actualFrameBody; + } + public void doProtocolNegotiation() throws Exception { byte[] bytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); @@ -286,7 +355,6 @@ public class FrameTransport implements AutoCloseable Attach responseAttach = (Attach) response.getFrameBody(); assertThat(responseAttach.getTarget(), is(notNullValue())); - PerformativeResponse flowResponse = (PerformativeResponse) getNextResponse(); assertThat(flowResponse, Is.is(CoreMatchers.notNullValue())); assertThat(flowResponse.getFrameBody(), Is.is(CoreMatchers.instanceOf(Flow.class))); @@ -295,9 +363,16 @@ public class FrameTransport implements AutoCloseable public void assertNoMoreResponses() throws Exception { Response response = getNextResponse(); - assertNull("Unexpected response.", response); + assertThat(response, anyOf(nullValue(), instanceOf(ChannelClosedResponse.class))); + } + + public void assertNoMoreResponsesAndChannelClosed() throws Exception + { + assertNoMoreResponses(); + assertThat(_channelClosedSeen, is(true)); } + private int getConnectionId() { if (_amqpConnectionId == 0) @@ -310,4 +385,39 @@ public class FrameTransport implements AutoCloseable } return _amqpConnectionId; } + + public void assertChannelClosed() + { + try + { + ChannelFuture channelFuture = _channel.write(new byte[]{0}); + channelFuture.sync(); + throw new IllegalStateException( + "Expecting the channel to be already closed by, but it was able to take more input."); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + } + catch (Exception e) + { + if (e instanceof ClosedChannelException) + { + // PASS + } + else + { + throw new IllegalStateException("Unexpected exception", e); + } + } + } + + private static class ChannelClosedResponse implements Response + { + @Override + public String toString() + { + return "ChannelClosed"; + } + } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java index 8468f5a..10fde24 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/InputHandler.java @@ -38,6 +38,7 @@ import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody; import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode; import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit; import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms; import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome; @@ -56,12 +57,6 @@ import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; public class InputHandler extends ChannelInboundHandlerAdapter { - private enum ParsingState - { - HEADER, - PERFORMATIVES - }; - private static final Logger LOGGER = LoggerFactory.getLogger(InputHandler.class); private static final AMQPDescribedTypeRegistry TYPE_REGISTRY = AMQPDescribedTypeRegistry.newInstance() .registerTransportLayer() @@ -69,18 +64,27 @@ public class InputHandler extends ChannelInboundHandlerAdapter .registerTransactionLayer() .registerSecurityLayer() .registerExtensionSoleconnLayer(); + + private enum ParsingState + { + HEADER, + PERFORMATIVES + }; + + private final MyConnectionHandler _connectionHandler; private final ValueHandler _valueHandler; - private final FrameHandler _frameHandler; + private final BlockingQueue<Response> _responseQueue; private QpidByteBuffer _inputBuffer = QpidByteBuffer.allocate(0); - private BlockingQueue<Response> _responseQueue; - private ParsingState _state = ParsingState.HEADER; + private volatile FrameHandler _frameHandler; + private volatile ParsingState _state = ParsingState.HEADER; - public InputHandler(final BlockingQueue<Response> queue) + public InputHandler(final BlockingQueue<Response> queue, final boolean isSasl) { _valueHandler = new ValueHandler(TYPE_REGISTRY); - _frameHandler = new FrameHandler(_valueHandler, new MyConnectionHandler(), false); + _connectionHandler = new MyConnectionHandler(); + _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, isSasl); _responseQueue = queue; } @@ -140,12 +144,17 @@ public class InputHandler extends ChannelInboundHandlerAdapter } } + private void resetInputHandlerAfterSaslOutcome() + { + _state = ParsingState.HEADER; + _frameHandler = new FrameHandler(_valueHandler, _connectionHandler, false); + } + private class MyConnectionHandler implements ConnectionHandler { @Override public void receiveOpen(final int channel, final Open close) { - System.out.println(); } @Override @@ -211,7 +220,7 @@ public class InputHandler extends ChannelInboundHandlerAdapter @Override public void handleError(final Error parsingError) { - + LOGGER.error("Unexpected error {}", parsingError); } @Override @@ -230,16 +239,24 @@ public class InputHandler extends ChannelInboundHandlerAdapter int channel = channelFrameBody.getChannel(); if (val instanceof FrameBody) { - response = new PerformativeResponse((short) channel, (FrameBody) val); + FrameBody frameBody = (FrameBody) val; + response = new PerformativeResponse((short) channel, frameBody); } else if (val instanceof SaslFrameBody) { - throw new UnsupportedOperationException("TODO: "); + SaslFrameBody frameBody = (SaslFrameBody) val; + response = new SaslPerformativeResponse((short) channel, frameBody); + + if (frameBody instanceof SaslOutcome && ((SaslOutcome) frameBody).getCode().equals(SaslCode.OK)) + { + resetInputHandlerAfterSaslOutcome(); + } } else { - throw new UnsupportedOperationException("Unexoected frame type : " + val.getClass()); + throw new UnsupportedOperationException("Unexpected frame type : " + val.getClass()); } + _responseQueue.add(response); } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java index 414ad90..dbd4cd6 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/OutputHandler.java @@ -28,11 +28,7 @@ import io.netty.channel.ChannelPromise; import org.apache.qpid.server.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter; -import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter; import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame; -import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame; -import org.apache.qpid.server.protocol.v1_0.type.FrameBody; -import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody; import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.server.transport.ByteBufferSender; @@ -50,7 +46,7 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter public void write(final ChannelHandlerContext ctx, final Object msg, final ChannelPromise promise) throws Exception { - if (msg instanceof TransportFrame) + if (msg instanceof AMQFrame) { FrameWriter _frameWriter = new FrameWriter(TYPE_REGISTRY, new ByteBufferSender() { @@ -86,7 +82,7 @@ public class OutputHandler extends ChannelOutboundHandlerAdapter } }); - _frameWriter.send((TransportFrame) msg); + _frameWriter.send(((AMQFrame) msg)); } else { http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java new file mode 100644 index 0000000..cd2da99 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/main/java/org/apache/qpid/tests/protocol/v1_0/SaslPerformativeResponse.java @@ -0,0 +1,55 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tests.protocol.v1_0; + +import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody; + +public class SaslPerformativeResponse implements Response +{ + private final short _channelId; + private final SaslFrameBody _frameBody; + + public SaslPerformativeResponse(final short channelId, + final SaslFrameBody frameBody) + { + _channelId = channelId; + _frameBody = frameBody; + } + + public SaslFrameBody getFrameBody() + { + return _frameBody; + } + + public short getChannelId() + { + return _channelId; + } + + @Override + public String toString() + { + return "SaslPerformativeResponse{" + + "_channelId=" + _channelId + + ", _frameBody=" + _frameBody + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json index 7a5e20d..1aaa210 100644 --- a/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json +++ b/systests/protocol-tests-amqp-1-0/src/main/resources/config-protocol-tests.json @@ -27,6 +27,7 @@ }, { "name" : "plain", "type" : "Plain", + "secureOnlyMechanisms" : [], "users" : [ { "name" : "admin", "type" : "managed", http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java index 91d2c48..ed9c10f 100644 --- a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/ProtocolHeaderTest.java @@ -66,6 +66,26 @@ public class ProtocolHeaderTest extends ProtocolTestBase HeaderResponse response = (HeaderResponse) transport.getNextResponse(); assertArrayEquals("Unexpected protocol header response", bytes, response.getHeader()); } + } + + @Test + @SpecificationTest(section = "2.2", + description = " A client might request use of a protocol id that is unacceptable to a server. [...]" + + "In this case, the server MUST send a protocol header with an acceptable protocol id" + + "(and version) and then close the socket.") + public void unacceptableProtocolIdSent_SaslAcceptable() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(addr)) + { + byte[] rawHeaderBytes = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); + byte[] expectedSaslHeaderBytes = "AMQP\3\1\0\0".getBytes(StandardCharsets.UTF_8); + transport.sendProtocolHeader(rawHeaderBytes); + HeaderResponse response = (HeaderResponse) transport.getNextResponse(); + assertArrayEquals("Unexpected protocol header response", expectedSaslHeaderBytes, response.getHeader()); + + transport.assertNoMoreResponses(); + } } } http://git-wip-us.apache.org/repos/asf/qpid-broker-j/blob/32733fc2/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java ---------------------------------------------------------------------- diff --git a/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java new file mode 100644 index 0000000..ede03a2 --- /dev/null +++ b/systests/protocol-tests-amqp-1-0/src/test/java/org/apache/qpid/tests/protocol/v1_0/transport/security/sasl/SaslTest.java @@ -0,0 +1,289 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package org.apache.qpid.tests.protocol.v1_0.transport.security.sasl; + +import static org.hamcrest.MatcherAssert.assertThat; +import static org.hamcrest.Matchers.equalTo; +import static org.hamcrest.Matchers.hasItem; +import static org.hamcrest.Matchers.is; +import static org.hamcrest.Matchers.notNullValue; +import static org.hamcrest.Matchers.nullValue; + +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.Arrays; + +import javax.crypto.Mac; +import javax.crypto.spec.SecretKeySpec; +import javax.xml.bind.DatatypeConverter; + +import org.junit.Test; + +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.tests.protocol.v1_0.BrokerAdmin; +import org.apache.qpid.tests.protocol.v1_0.FrameTransport; +import org.apache.qpid.tests.protocol.v1_0.HeaderResponse; +import org.apache.qpid.tests.protocol.v1_0.ProtocolTestBase; +import org.apache.qpid.tests.protocol.v1_0.SpecificationTest; + +public class SaslTest extends ProtocolTestBase +{ + private static final Symbol CRAM_MD5 = Symbol.getSymbol("CRAM-MD5"); + private static final Symbol PLAIN = Symbol.getSymbol("PLAIN"); + + private static final byte[] SASL_AMQP_HEADER_BYTES = "AMQP\3\1\0\0".getBytes(StandardCharsets.UTF_8); + private static final byte[] AMQP_HEADER_BYTES = "AMQP\0\1\0\0".getBytes(StandardCharsets.UTF_8); + + @Test + @SpecificationTest(section = "5.3.2", + description = "SASL Negotiation [...] challenge/response step occurs zero times") + public void saslSuccessfulAuthentication() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(addr, true)) + { + transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); + HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); + + assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES))); + + SaslMechanisms saslMechanismsResponse = transport.getNextPerformativeResponse(SaslMechanisms.class); + assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(PLAIN)); + + + SaslInit saslInit = new SaslInit(); + saslInit.setMechanism(PLAIN); + saslInit.setInitialResponse(new Binary("\0guest\0guest".getBytes(StandardCharsets.US_ASCII))); + transport.sendPerformative(saslInit); + + SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class); + assertThat(saslOutcome.getCode(), equalTo(SaslCode.OK)); + + transport.sendProtocolHeader(AMQP_HEADER_BYTES); + HeaderResponse headerResponse = transport.getNextResponse(HeaderResponse.class); + assertThat(headerResponse.getHeader(), is(equalTo(AMQP_HEADER_BYTES))); + + transport.assertNoMoreResponses(); + } + } + + @Test + @SpecificationTest(section = "5.3.2", + description = "SASL Negotiation [...] challenge/response step occurs once") + public void saslSuccessfulAuthenticationWithChallengeResponse() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(addr, true)) + { + transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); + HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); + + assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES))); + + SaslMechanisms saslMechanismsResponse = transport.getNextPerformativeResponse(SaslMechanisms.class); + assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(CRAM_MD5)); + + SaslInit saslInit = new SaslInit(); + saslInit.setMechanism(CRAM_MD5); + transport.sendPerformative(saslInit); + + SaslChallenge challenge = transport.getNextPerformativeResponse(SaslChallenge.class); + assertThat(challenge.getChallenge(), is(notNullValue())); + + byte[] response = generateCramMD5ClientResponse("guest", "guest", challenge.getChallenge().getArray()); + + SaslResponse saslResponse = new SaslResponse(); + saslResponse.setResponse(new Binary(response)); + transport.sendPerformative(saslResponse); + + SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class); + assertThat(saslOutcome.getCode(), equalTo(SaslCode.OK)); + + transport.sendProtocolHeader(AMQP_HEADER_BYTES); + HeaderResponse headerResponse = transport.getNextResponse(HeaderResponse.class); + assertThat(headerResponse.getHeader(), is(equalTo(AMQP_HEADER_BYTES))); + + transport.assertNoMoreResponses(); + } + } + + @Test + @SpecificationTest(section = "5.3.2", description = "SASL Negotiation") + public void saslUnsuccessfulAuthentication() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(addr, true)) + { + transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); + HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); + + assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES))); + + SaslMechanisms saslMechanismsResponse = transport.getNextPerformativeResponse(SaslMechanisms.class); + assertThat(Arrays.asList(saslMechanismsResponse.getSaslServerMechanisms()), hasItem(PLAIN)); + + SaslInit saslInit = new SaslInit(); + saslInit.setMechanism(PLAIN); + saslInit.setInitialResponse(new Binary("\0guest\0badpassword".getBytes(StandardCharsets.US_ASCII))); + transport.sendPerformative(saslInit); + + SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class); + assertThat(saslOutcome.getCode(), equalTo(SaslCode.AUTH)); + + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + @Test + @SpecificationTest(section = "5.3.2", + description = "The partner MUST then choose one of the supported mechanisms and initiate a sasl exchange." + + "If the selected mechanism is not supported by the receiving peer, it MUST close the connection " + + "with the authentication-failure close-code.") + public void unsupportedSaslMechanism() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(addr, true)) + { + transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); + HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); + + assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES))); + + transport.getNextPerformativeResponse(SaslMechanisms.class); + + SaslInit saslInit = new SaslInit(); + saslInit.setMechanism(Symbol.getSymbol("NOT-A-MECHANISM")); + transport.sendPerformative(saslInit); + + SaslOutcome saslOutcome = transport.getNextPerformativeResponse(SaslOutcome.class); + assertThat(saslOutcome.getCode(), equalTo(SaslCode.AUTH)); + assertThat(saslOutcome.getAdditionalData(), is(nullValue())); + + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + @Test + @SpecificationTest(section = "5.3.2", description = "SASL Negotiation") + public void authenticationBypassDisallowed() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(addr, true)) + { + transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); + HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); + assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES))); + + transport.getNextPerformativeResponse(SaslMechanisms.class); + + Open open = new Open(); + open.setContainerId("testContainerId"); + transport.sendPerformative(open); + + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + @Test + @SpecificationTest(section = "5.3.2", + description = "The peer acting as the SASL server MUST announce supported authentication mechanisms using" + + "the sasl-mechanisms frame.") + public void clientSendsSaslMechanisms() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(addr, true)) + { + transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); + HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); + assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES))); + + transport.getNextPerformativeResponse(SaslMechanisms.class); + + SaslMechanisms clientMechs = new SaslMechanisms(); + clientMechs.setSaslServerMechanisms(new Symbol[] {Symbol.valueOf("CLIENT-MECH")}); + transport.sendPerformative(clientMechs); + + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + @Test + @SpecificationTest(section = "5.3.2", description = "SASL Negotiation") + public void clientSendsSaslChallenge() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(addr, true)) + { + transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); + HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); + assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES))); + + transport.getNextPerformativeResponse(SaslMechanisms.class); + + SaslChallenge saslChallenge = new SaslChallenge(); + saslChallenge.setChallenge(new Binary(new byte[] {})); + transport.sendPerformative(saslChallenge); + + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + @Test + @SpecificationTest(section = "5.3.2", description = "SASL Negotiation") + public void clientSendsSaslOutcome() throws Exception + { + final InetSocketAddress addr = getBrokerAdmin().getBrokerAddress(BrokerAdmin.PortType.AMQP); + try (FrameTransport transport = new FrameTransport(addr, true)) + { + transport.sendProtocolHeader(SASL_AMQP_HEADER_BYTES); + HeaderResponse saslHeaderResponse = transport.getNextResponse(HeaderResponse.class); + assertThat(saslHeaderResponse.getHeader(), is(equalTo(SASL_AMQP_HEADER_BYTES))); + + transport.getNextPerformativeResponse(SaslMechanisms.class); + + SaslOutcome saslOutcome = new SaslOutcome(); + saslOutcome.setCode(SaslCode.OK); + transport.sendPerformative(saslOutcome); + + transport.assertNoMoreResponsesAndChannelClosed(); + } + } + + private static byte[] generateCramMD5ClientResponse(String userName, String userPassword, byte[] challengeBytes) + throws Exception + { + String macAlgorithm = "HmacMD5"; + Mac mac = Mac.getInstance(macAlgorithm); + mac.init(new SecretKeySpec(userPassword.getBytes(StandardCharsets.UTF_8), macAlgorithm)); + final byte[] messageAuthenticationCode = mac.doFinal(challengeBytes); + String responseAsString = userName + " " + DatatypeConverter.printHexBinary(messageAuthenticationCode) + .toLowerCase(); + return responseAsString.getBytes(); + } +} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
