ARTEMIS-820 AMQP: Add frame inspection capability to the test client.
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/490bd31c Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/490bd31c Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/490bd31c Branch: refs/heads/ARTEMIS-780 Commit: 490bd31c4b07df1e3d8b9636afdab9e9f89e81d6 Parents: 1ac69fd Author: Francesco Nigro <[email protected]> Authored: Tue Oct 25 17:15:43 2016 +0200 Committer: Clebert Suconic <[email protected]> Committed: Tue Oct 25 14:15:28 2016 -0400 ---------------------------------------------------------------------- .../transport/amqp/client/AmqpConnection.java | 31 ++++- .../amqp/client/AmqpFrameValidator.java | 103 ++++++++++++++++ .../amqp/client/AmqpProtocolTracer.java | 116 +++++++++++++++++++ .../transport/amqp/client/AmqpSession.java | 57 ++++++++- .../amqp/AmqpDurableReceiverTest.java | 26 +++++ 5 files changed, 328 insertions(+), 5 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java index 53fb9f5..01c60bc 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpConnection.java @@ -50,6 +50,7 @@ import org.apache.qpid.proton.engine.Event.Type; import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.impl.CollectorImpl; +import org.apache.qpid.proton.engine.impl.TransportImpl; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -87,6 +88,8 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements private List<Symbol> offeredCapabilities = Collections.emptyList(); private Map<Symbol, Object> offeredProperties = Collections.emptyMap(); + private volatile AmqpFrameValidator sentFrameInspector; + private volatile AmqpFrameValidator receivedFrameInspector; private AmqpConnectionListener listener; private SaslAuthenticator authenticator; private String mechanismRestriction; @@ -100,6 +103,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements private long connectTimeout = DEFAULT_CONNECT_TIMEOUT; private long closeTimeout = DEFAULT_CLOSE_TIMEOUT; private long drainTimeout = DEFAULT_DRAIN_TIMEOUT; + private boolean trace; public AmqpConnection(org.apache.activemq.transport.amqp.client.transport.NettyTransport transport, String username, @@ -155,6 +159,7 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements sasl.client(); } authenticator = new SaslAuthenticator(sasl, username, password, authzid, mechanismRestriction); + ((TransportImpl) protonTransport).setProtocolTracer(new AmqpProtocolTracer(AmqpConnection.this)); open(future); pumpToProtonTransport(future); @@ -439,6 +444,30 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements return mechanismRestriction; } + public boolean isTraceFrames() { + return trace; + } + + public void setTraceFrames(boolean trace) { + this.trace = trace; + } + + public AmqpFrameValidator getSentFrameInspector() { + return sentFrameInspector; + } + + public void setSentFrameInspector(AmqpFrameValidator amqpFrameInspector) { + this.sentFrameInspector = amqpFrameInspector; + } + + public AmqpFrameValidator getReceivedFrameInspector() { + return receivedFrameInspector; + } + + public void setReceivedFrameInspector(AmqpFrameValidator amqpFrameInspector) { + this.receivedFrameInspector = amqpFrameInspector; + } + //----- Internal getters used from the child AmqpResource classes --------// ScheduledExecutorService getScheduler() { @@ -706,4 +735,4 @@ public class AmqpConnection extends AmqpAbstractResource<Connection> implements public String toString() { return "AmqpConnection { " + connectionId + " }"; } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java new file mode 100644 index 0000000..4796110 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpFrameValidator.java @@ -0,0 +1,103 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.transport.Attach; +import org.apache.qpid.proton.amqp.transport.Begin; +import org.apache.qpid.proton.amqp.transport.Close; +import org.apache.qpid.proton.amqp.transport.Detach; +import org.apache.qpid.proton.amqp.transport.Disposition; +import org.apache.qpid.proton.amqp.transport.End; +import org.apache.qpid.proton.amqp.transport.Flow; +import org.apache.qpid.proton.amqp.transport.Open; +import org.apache.qpid.proton.amqp.transport.Transfer; + +/** + * Abstract base for a validation hook that is used in tests to check + * the state of a remote resource after a variety of lifecycle events. + */ +public class AmqpFrameValidator { + + private boolean valid = true; + private String errorMessage; + + public void inspectOpen(Open open, Binary encoded) { + + } + + public void inspectBegin(Begin begin, Binary encoded) { + + } + + public void inspectAttach(Attach attach, Binary encoded) { + + } + + public void inspectFlow(Flow flow, Binary encoded) { + + } + + public void inspectTransfer(Transfer transfer, Binary encoded) { + + } + + public void inspectDisposition(Disposition disposition, Binary encoded) { + + } + + public void inspectDetach(Detach detach, Binary encoded) { + + } + + public void inspectEnd(End end, Binary encoded) { + + } + + public void inspectClose(Close close, Binary encoded) { + + } + + public boolean isValid() { + return valid; + } + + protected void setValid(boolean valid) { + this.valid = valid; + } + + public String getErrorMessage() { + return errorMessage; + } + + protected void setErrorMessage(String errorMessage) { + this.errorMessage = errorMessage; + } + + protected void markAsInvalid(String errorMessage) { + if (valid) { + setValid(false); + setErrorMessage(errorMessage); + } + } + + public void assertValid() { + if (!isValid()) { + throw new AssertionError(errorMessage); + } + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java new file mode 100644 index 0000000..68fcd85 --- /dev/null +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpProtocolTracer.java @@ -0,0 +1,116 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.client; + +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.transport.Attach; +import org.apache.qpid.proton.amqp.transport.Begin; +import org.apache.qpid.proton.amqp.transport.Close; +import org.apache.qpid.proton.amqp.transport.Detach; +import org.apache.qpid.proton.amqp.transport.Disposition; +import org.apache.qpid.proton.amqp.transport.End; +import org.apache.qpid.proton.amqp.transport.Flow; +import org.apache.qpid.proton.amqp.transport.FrameBody.FrameBodyHandler; +import org.apache.qpid.proton.amqp.transport.Open; +import org.apache.qpid.proton.amqp.transport.Transfer; +import org.apache.qpid.proton.engine.impl.ProtocolTracer; +import org.apache.qpid.proton.framing.TransportFrame; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Tracer used to spy on AMQP traffic + */ +public class AmqpProtocolTracer implements ProtocolTracer, FrameBodyHandler<AmqpFrameValidator> { + + private static final Logger TRACE_FRAMES = LoggerFactory.getLogger(AmqpProtocolTracer.class.getPackage().getName() + ".FRAMES"); + + private final AmqpConnection connection; + + public AmqpProtocolTracer(AmqpConnection connection) { + this.connection = connection; + } + + @Override + public void receivedFrame(TransportFrame transportFrame) { + if (connection.isTraceFrames()) { + TRACE_FRAMES.trace("{} | RECV: {}", connection.getRemoteURI(), transportFrame.getBody()); + } + + AmqpFrameValidator inspector = connection.getReceivedFrameInspector(); + if (inspector != null) { + transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector); + } + } + + @Override + public void sentFrame(TransportFrame transportFrame) { + if (connection.isTraceFrames()) { + TRACE_FRAMES.trace("{} | SENT: {}", connection.getRemoteURI(), transportFrame.getBody()); + } + + AmqpFrameValidator inspector = connection.getSentFrameInspector(); + if (inspector != null) { + transportFrame.getBody().invoke(this, transportFrame.getPayload(), inspector); + } + } + + @Override + public void handleOpen(Open open, Binary payload, AmqpFrameValidator context) { + context.inspectOpen(open, payload); + } + + @Override + public void handleBegin(Begin begin, Binary payload, AmqpFrameValidator context) { + context.inspectBegin(begin, payload); + } + + @Override + public void handleAttach(Attach attach, Binary payload, AmqpFrameValidator context) { + context.inspectAttach(attach, payload); + } + + @Override + public void handleFlow(Flow flow, Binary payload, AmqpFrameValidator context) { + context.inspectFlow(flow, payload); + } + + @Override + public void handleTransfer(Transfer transfer, Binary payload, AmqpFrameValidator context) { + context.inspectTransfer(transfer, payload); + } + + @Override + public void handleDisposition(Disposition disposition, Binary payload, AmqpFrameValidator context) { + context.inspectDisposition(disposition, payload); + } + + @Override + public void handleDetach(Detach detach, Binary payload, AmqpFrameValidator context) { + context.inspectDetach(detach, payload); + } + + @Override + public void handleEnd(End end, Binary payload, AmqpFrameValidator context) { + context.inspectEnd(end, payload); + } + + @Override + public void handleClose(Close close, Binary payload, AmqpFrameValidator context) { + context.inspectClose(close, payload); + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 936d4ef..fc3fdf7 100644 --- a/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/tests/artemis-test-support/src/main/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -16,7 +16,9 @@ */ package org.apache.activemq.transport.amqp.client; +import java.io.IOException; import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicLong; import org.apache.activemq.transport.amqp.client.util.AsyncResult; @@ -38,6 +40,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> { private final AmqpConnection connection; private final String sessionId; private final AmqpTransactionContext txContext; + private final AtomicBoolean closed = new AtomicBoolean(); /** * Create a new session instance. @@ -52,6 +55,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> { } /** + * Close the receiver, a closed receiver will throw exceptions if any further send + * calls are made. + * + * @throws IOException if an error occurs while closing the receiver. + */ + public void close() throws IOException { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + close(request); + pumpToProtonTransport(request); + } + }); + + request.sync(); + } + } + + /** + * Create an anonymous sender. + * + * @return a newly created sender that is ready for use. + * + * @throws Exception if an error occurs while creating the sender. + */ + public AmqpSender createSender() throws Exception { + return createSender(null, false); + } + + /** * Create a sender instance using the given address * * @param address the address to which the sender will produce its messages. @@ -101,9 +138,21 @@ public class AmqpSession extends AmqpAbstractResource<Session> { * @throws Exception if an error occurs while creating the receiver. */ public AmqpSender createSender(Target target) throws Exception { + return createSender(target, getNextSenderId()); + } + + /** + * Create a sender instance using the given Target + * + * @param target the caller created and configured Traget used to create the sender link. + * @param senderId the sender ID to assign to the newly created Sender. + * @return a newly created sender that is ready for use. + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpSender createSender(Target target, String senderId) throws Exception { checkClosed(); - final AmqpSender sender = new AmqpSender(AmqpSession.this, target, getNextSenderId()); + final AmqpSender sender = new AmqpSender(AmqpSession.this, target, senderId); final ClientFuture request = new ClientFuture(); connection.getScheduler().execute(new Runnable() { @@ -222,7 +271,7 @@ public class AmqpSession extends AmqpAbstractResource<Session> { checkClosed(); final ClientFuture request = new ClientFuture(); - final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, receiverId); + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, source, getNextReceiverId()); connection.getScheduler().execute(new Runnable() { @@ -465,4 +514,4 @@ public class AmqpSession extends AmqpAbstractResource<Session> { throw new IllegalStateException("Session is already closed"); } } -} +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/490bd31c/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java ---------------------------------------------------------------------- diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java index e0c6b6c..86a35a2 100644 --- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java +++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/amqp/AmqpDurableReceiverTest.java @@ -26,14 +26,17 @@ import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.core.postoffice.Binding; import org.apache.activemq.transport.amqp.client.AmqpClient; import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpFrameValidator; import org.apache.activemq.transport.amqp.client.AmqpMessage; import org.apache.activemq.transport.amqp.client.AmqpReceiver; import org.apache.activemq.transport.amqp.client.AmqpSender; import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.messaging.Source; import org.apache.qpid.proton.amqp.messaging.TerminusDurability; import org.apache.qpid.proton.amqp.messaging.TerminusExpiryPolicy; +import org.apache.qpid.proton.amqp.transport.Detach; import org.apache.qpid.proton.engine.Receiver; import org.junit.Test; import org.slf4j.Logger; @@ -90,6 +93,26 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { connection.setContainerId(getContainerID()); connection.connect(); + connection.setReceivedFrameInspector(new AmqpFrameValidator() { + + @Override + public void inspectDetach(Detach detach, Binary encoded) { + if (detach.getClosed()) { + markAsInvalid("Remote should have detached but closed instead."); + } + } + }); + + connection.setSentFrameInspector(new AmqpFrameValidator() { + + @Override + public void inspectDetach(Detach detach, Binary encoded) { + if (detach.getClosed()) { + markAsInvalid("Client should have detached but closed instead."); + } + } + }); + AmqpSession session = connection.createSession(); AmqpReceiver receiver = session.createDurableReceiver(getTopicName(), getSubscriptionName()); @@ -99,6 +122,9 @@ public class AmqpDurableReceiverTest extends AmqpClientTestSupport { assertEquals(getTopicName(), lookupSubscription()); + connection.getSentFrameInspector().assertValid(); + connection.getReceivedFrameInspector().assertValid(); + connection.close(); }
