Repository: activemq Updated Branches: refs/heads/master 559f52a2a -> 934ad44ad
https://issues.apache.org/jira/browse/AMQ-5666 Add some initial tests for durable subscription handling. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/934ad44a Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/934ad44a Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/934ad44a Branch: refs/heads/master Commit: 934ad44add2a73b9bc3b5ac4e9ce2925f747f8a1 Parents: 559f52a Author: Timothy Bish <[email protected]> Authored: Mon Mar 16 17:51:45 2015 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Mar 16 17:51:45 2015 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpTestSupport.java | 7 +- .../amqp/client/AmqpAbstractResource.java | 60 ++++++-- .../transport/amqp/client/AmqpReceiver.java | 38 ++++- .../transport/amqp/client/AmqpResource.java | 9 ++ .../transport/amqp/client/AmqpSession.java | 34 +++++ .../amqp/interop/AmqpDurableReceiverTest.java | 139 +++++++++++++++++++ 6 files changed, 268 insertions(+), 19 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java index 33ae799..cf4fa95 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java @@ -42,6 +42,7 @@ import org.apache.activemq.broker.TransportConnector; import org.apache.activemq.broker.jmx.BrokerViewMBean; import org.apache.activemq.broker.jmx.ConnectorViewMBean; import org.apache.activemq.broker.jmx.QueueViewMBean; +import org.apache.activemq.broker.jmx.TopicViewMBean; import org.apache.activemq.openwire.OpenWireFormat; import org.apache.activemq.spring.SpringSslContext; import org.apache.activemq.store.kahadb.KahaDBStore; @@ -320,10 +321,10 @@ public class AmqpTestSupport { return proxy; } - protected QueueViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException { + protected TopicViewMBean getProxyToTopic(String name) throws MalformedObjectNameException, JMSException { ObjectName queueViewMBeanName = new ObjectName("org.apache.activemq:type=Broker,brokerName=localhost,destinationType=Topic,destinationName="+name); - QueueViewMBean proxy = (QueueViewMBean) brokerService.getManagementContext() - .newProxyInstance(queueViewMBeanName, QueueViewMBean.class, true); + TopicViewMBean proxy = (TopicViewMBean) brokerService.getManagementContext() + .newProxyInstance(queueViewMBeanName, TopicViewMBean.class, true); return proxy; } } \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java index 8a5a587..b5a6324 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpAbstractResource.java @@ -66,25 +66,57 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe } @Override - public void close(AsyncResult request) { + public void detach(AsyncResult request) { // If already closed signal success or else the caller might never get notified. if (getEndpoint().getLocalState() == EndpointState.CLOSED || getEndpoint().getRemoteState() == EndpointState.CLOSED) { if (getEndpoint().getLocalState() != EndpointState.CLOSED) { - // Remote already closed this resource, close locally and free. - if (getEndpoint().getLocalState() != EndpointState.CLOSED) { - doClose(); - getEndpoint().free(); - } + doDetach(); + getEndpoint().free(); } request.onSuccess(); - return; + } else { + this.closeRequest = request; + doDetach(); } + } + + @Override + public void close(AsyncResult request) { + // If already closed signal success or else the caller might never get notified. + if (getEndpoint().getLocalState() == EndpointState.CLOSED || + getEndpoint().getRemoteState() == EndpointState.CLOSED) { - this.closeRequest = request; - doClose(); + if (getEndpoint().getLocalState() != EndpointState.CLOSED) { + doClose(); + getEndpoint().free(); + } + + request.onSuccess(); + } else { + this.closeRequest = request; + doClose(); + } +// // If already closed signal success or else the caller might never get notified. +// if (getEndpoint().getLocalState() == EndpointState.CLOSED || +// getEndpoint().getRemoteState() == EndpointState.CLOSED) { +// +// if (getEndpoint().getLocalState() != EndpointState.CLOSED) { +// // Remote already closed this resource, close locally and free. +// if (getEndpoint().getLocalState() != EndpointState.CLOSED) { +// doClose(); +// getEndpoint().free(); +// } +// } +// +// request.onSuccess(); +// return; +// } +// +// this.closeRequest = request; +// doClose(); } @Override @@ -278,6 +310,16 @@ public abstract class AmqpAbstractResource<E extends Endpoint> implements AmqpRe } /** + * Perform the detach operation on the managed endpoint. + * + * By default this method throws an UnsupportedOperationException, a subclass + * must implement this and do a detach if its resource supports that. + */ + protected void doDetach() { + throw new UnsupportedOperationException("Endpoint cannot be detached."); + } + + /** * Complete the open operation on the managed endpoint. A subclass may * override this method to provide additional verification actions or configuration * updates. http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java index ec37710..ff530b9 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpReceiver.java @@ -94,10 +94,10 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { } /** - * Close the sender, a closed sender will throw exceptions if any further send + * Close the receiver, a closed receiver will throw exceptions if any further send * calls are made. * - * @throws IOException if an error occurs while closing the sender. + * @throws IOException if an error occurs while closing the receiver. */ public void close() throws IOException { if (closed.compareAndSet(false, true)) { @@ -117,6 +117,29 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { } /** + * Detach the receiver, a closed receiver will throw exceptions if any further send + * calls are made. + * + * @throws IOException if an error occurs while closing the receiver. + */ + public void detach() throws IOException { + if (closed.compareAndSet(false, true)) { + final ClientFuture request = new ClientFuture(); + session.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + detach(request); + session.pumpToProtonTransport(); + } + }); + + request.sync(); + } + } + + /** * @return this session's parent AmqpSession. */ public AmqpSession getSession() { @@ -442,11 +465,12 @@ public class AmqpReceiver extends AmqpAbstractResource<Receiver> { @Override protected void doClose() { - if (isDurable()) { - getEndpoint().detach(); - } else { - getEndpoint().close(); - } + getEndpoint().close(); + } + + @Override + protected void doDetach() { + getEndpoint().detach(); } @Override http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java index b4e6215..f20fd7c 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpResource.java @@ -58,6 +58,15 @@ public interface AmqpResource { void close(AsyncResult request); /** + * Perform all work needed to detach this resource and store the request + * until such time as the remote peer indicates the resource has been detached. + * + * @param request + * The initiating request that triggered this detach call. + */ + void detach(AsyncResult request); + + /** * @return if the resource has moved to the closed state on the remote. */ boolean isClosed(); http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java index 9368b26..b2fc2f1 100644 --- a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/client/AmqpSession.java @@ -111,6 +111,40 @@ public class AmqpSession extends AmqpAbstractResource<Session> { } /** + * Create a receiver instance using the given address that creates a durable subscription. + * + * @param address + * the address to which the receiver will subscribe for its messages. + * @param subscriptionName + * the name of the subscription that is being created. + * + * @return a newly created receiver that is ready for use. + * + * @throws Exception if an error occurs while creating the receiver. + */ + public AmqpReceiver createDurableReceiver(String address, String subscriptionName) throws Exception { + checkClosed(); + + final AmqpReceiver receiver = new AmqpReceiver(AmqpSession.this, address, getNextReceiverId()); + receiver.setSubscriptionName(subscriptionName); + final ClientFuture request = new ClientFuture(); + + connection.getScheduler().execute(new Runnable() { + + @Override + public void run() { + checkClosed(); + receiver.open(request); + pumpToProtonTransport(); + } + }); + + request.sync(); + + return receiver; + } + + /** * @return this session's parent AmqpConnection. */ public AmqpConnection getConnection() { http://git-wip-us.apache.org/repos/asf/activemq/blob/934ad44a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java new file mode 100644 index 0000000..7fd6080 --- /dev/null +++ b/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/interop/AmqpDurableReceiverTest.java @@ -0,0 +1,139 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.transport.amqp.interop; + +import static org.junit.Assert.assertTrue; + +import java.util.concurrent.TimeUnit; + +import org.apache.activemq.broker.jmx.BrokerViewMBean; +import org.apache.activemq.transport.amqp.client.AmqpClient; +import org.apache.activemq.transport.amqp.client.AmqpClientTestSupport; +import org.apache.activemq.transport.amqp.client.AmqpConnection; +import org.apache.activemq.transport.amqp.client.AmqpReceiver; +import org.apache.activemq.transport.amqp.client.AmqpSession; +import org.apache.activemq.util.Wait; +import org.junit.Test; + +/** + * Tests for broker side support of the Durable Subscription mapping for JMS. + */ +public class AmqpDurableReceiverTest extends AmqpClientTestSupport { + + @Override + protected boolean isUseOpenWireConnector() { + return true; + } + + @Test(timeout = 60000) + public void testCreateDurableReceiver() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getTestName()); + connection.connect(); + + AmqpSession session = connection.createSession(); + session.createDurableReceiver("topic://" + getTestName(), getTestName()); + + final BrokerViewMBean brokerView = getProxyToBroker(); + + assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerView.getDurableTopicSubscribers().length == 1; + } + + }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10))); + + connection.close(); + } + + @Test(timeout = 60000) + public void testDetachedDurableReceiverRemainsActive() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getTestName()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName()); + + final BrokerViewMBean brokerView = getProxyToBroker(); + + assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerView.getDurableTopicSubscribers().length == 1; + } + + }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10))); + + receiver.detach(); + + assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerView.getInactiveDurableTopicSubscribers().length == 1; + } + + }, TimeUnit.SECONDS.toMillis(5000), TimeUnit.MILLISECONDS.toMillis(10))); + + connection.close(); + } + + @Test(timeout = 60000) + public void testCloseDurableReceiverRemovesSubscription() throws Exception { + + AmqpClient client = createAmqpClient(); + AmqpConnection connection = client.createConnection(); + connection.setContainerId(getTestName()); + connection.connect(); + + AmqpSession session = connection.createSession(); + AmqpReceiver receiver = session.createDurableReceiver("topic://" + getTestName(), getTestName()); + + final BrokerViewMBean brokerView = getProxyToBroker(); + + assertTrue("Should be a durable sub", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerView.getDurableTopicSubscribers().length == 1; + } + + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10))); + + receiver.close(); + + assertTrue("Should be an inactive durable sub", Wait.waitFor(new Wait.Condition() { + + @Override + public boolean isSatisified() throws Exception { + return brokerView.getDurableTopicSubscribers().length == 0 && + brokerView.getInactiveDurableTopicSubscribers().length == 0; + } + + }, TimeUnit.SECONDS.toMillis(5), TimeUnit.MILLISECONDS.toMillis(10))); + + connection.close(); + } +} \ No newline at end of file
