Repository: activemq Updated Branches: refs/heads/trunk d60022ec6 -> a88e19e7c
https://issues.apache.org/jira/browse/AMQ-5163 Allow for durable topic consumers to use individual ack mode. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a88e19e7 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a88e19e7 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a88e19e7 Branch: refs/heads/trunk Commit: a88e19e7cda1f21de119db2672d1727453f552d3 Parents: d60022e Author: Timothy Bish <[email protected]> Authored: Mon Apr 28 14:03:43 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon Apr 28 14:03:43 2014 -0400 ---------------------------------------------------------------------- .../org/apache/activemq/ActiveMQSession.java | 47 ++++++++++-- .../apache/activemq/JMSIndividualAckTest.java | 21 +----- .../DurableSubscriptionTestSupport.java | 76 ++++++++++++++++---- 3 files changed, 107 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/a88e19e7/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java ---------------------------------------------------------------------- diff --git a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java index 1dc197f..e327ef1 100755 --- a/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java +++ b/activemq-client/src/main/java/org/apache/activemq/ActiveMQSession.java @@ -90,7 +90,6 @@ import org.apache.activemq.thread.Scheduler; import org.apache.activemq.transaction.Synchronization; import org.apache.activemq.usage.MemoryUsage; import org.apache.activemq.util.Callback; -import org.apache.activemq.util.JMSExceptionSupport; import org.apache.activemq.util.LongSequenceGenerator; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -291,6 +290,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * * @see org.apache.activemq.management.StatsCapable#getStats() */ + @Override public StatsImpl getStats() { return stats; } @@ -313,6 +313,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException if the JMS provider fails to create this message due * to some internal error. */ + @Override public BytesMessage createBytesMessage() throws JMSException { ActiveMQBytesMessage message = new ActiveMQBytesMessage(); configureMessage(message); @@ -329,6 +330,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException if the JMS provider fails to create this message due * to some internal error. */ + @Override public MapMessage createMapMessage() throws JMSException { ActiveMQMapMessage message = new ActiveMQMapMessage(); configureMessage(message); @@ -346,6 +348,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException if the JMS provider fails to create this message due * to some internal error. */ + @Override public Message createMessage() throws JMSException { ActiveMQMessage message = new ActiveMQMessage(); configureMessage(message); @@ -361,6 +364,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException if the JMS provider fails to create this message due * to some internal error. */ + @Override public ObjectMessage createObjectMessage() throws JMSException { ActiveMQObjectMessage message = new ActiveMQObjectMessage(); configureMessage(message); @@ -377,6 +381,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException if the JMS provider fails to create this message due * to some internal error. */ + @Override public ObjectMessage createObjectMessage(Serializable object) throws JMSException { ActiveMQObjectMessage message = new ActiveMQObjectMessage(); configureMessage(message); @@ -393,6 +398,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException if the JMS provider fails to create this message due * to some internal error. */ + @Override public StreamMessage createStreamMessage() throws JMSException { ActiveMQStreamMessage message = new ActiveMQStreamMessage(); configureMessage(message); @@ -408,6 +414,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException if the JMS provider fails to create this message due * to some internal error. */ + @Override public TextMessage createTextMessage() throws JMSException { ActiveMQTextMessage message = new ActiveMQTextMessage(); configureMessage(message); @@ -424,6 +431,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException if the JMS provider fails to create this message due * to some internal error. */ + @Override public TextMessage createTextMessage(String text) throws JMSException { ActiveMQTextMessage message = new ActiveMQTextMessage(); message.setText(text); @@ -519,6 +527,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @return true if the session is in transacted mode * @throws JMSException if there is some internal error. */ + @Override public boolean getTransacted() throws JMSException { checkClosed(); return isTransacted(); @@ -536,6 +545,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @see javax.jms.Connection#createSession(boolean,int) * @since 1.1 exception JMSException if there is some internal error. */ + @Override public int getAcknowledgeMode() throws JMSException { checkClosed(); return this.acknowledgementMode; @@ -552,6 +562,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws javax.jms.IllegalStateException if the method is not called by a * transacted session. */ + @Override public void commit() throws JMSException { checkClosed(); if (!getTransacted()) { @@ -572,6 +583,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws javax.jms.IllegalStateException if the method is not called by a * transacted session. */ + @Override public void rollback() throws JMSException { checkClosed(); if (!getTransacted()) { @@ -611,6 +623,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException if the JMS provider fails to close the session due * to some internal error. */ + @Override public void close() throws JMSException { if (!closed) { if (getTransactionContext().isInXATransaction()) { @@ -787,6 +800,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws IllegalStateException if the method is called by a transacted * session. */ + @Override public void recover() throws JMSException { checkClosed(); @@ -811,6 +825,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @see javax.jms.ServerSessionPool * @see javax.jms.ServerSession */ + @Override public MessageListener getMessageListener() throws JMSException { checkClosed(); return this.messageListener; @@ -837,6 +852,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @see javax.jms.ServerSessionPool * @see javax.jms.ServerSession */ + @Override public void setMessageListener(MessageListener listener) throws JMSException { // only check for closed if we set a new listener, as we allow to clear // the listener, such as when an application is shutting down, and is @@ -857,6 +873,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * * @see javax.jms.ServerSession */ + @Override public void run() { MessageDispatch messageDispatch; while ((messageDispatch = executor.dequeueNoWait()) != null) { @@ -885,6 +902,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta if (isClientAcknowledge()||isIndividualAcknowledge()) { message.setAcknowledgeCallback(new Callback() { + @Override public void execute() throws Exception { } }); @@ -905,6 +923,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta getTransactionContext().addSynchronization(new Synchronization() { final int clearRequestCount = (clearRequestsCounter.get() == Integer.MAX_VALUE ? clearRequestsCounter.incrementAndGet() : clearRequestsCounter.get()); + @Override public void beforeEnd() throws Exception { // validate our consumer so we don't push stale acks that get ignored if (ack.getTransactionId().isXATransaction() && !connection.hasDispatcher(ack.getConsumerId())) { @@ -961,6 +980,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta } connection.getScheduler().executeAfterDelay(new Runnable() { + @Override public void run() { ((ActiveMQDispatcher)md.getConsumer()).dispatch(md); } @@ -1017,6 +1037,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * specified. * @since 1.1 */ + @Override public MessageProducer createProducer(Destination destination) throws JMSException { checkClosed(); if (destination instanceof CustomDestination) { @@ -1041,6 +1062,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * specified. * @since 1.1 */ + @Override public MessageConsumer createConsumer(Destination destination) throws JMSException { return createConsumer(destination, (String) null); } @@ -1068,6 +1090,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws InvalidSelectorException if the message selector is invalid. * @since 1.1 */ + @Override public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException { return createConsumer(destination, messageSelector, false); } @@ -1155,6 +1178,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws InvalidSelectorException if the message selector is invalid. * @since 1.1 */ + @Override public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal) throws JMSException { return createConsumer(destination, messageSelector, noLocal, null); } @@ -1236,6 +1260,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * internal error. * @since 1.1 */ + @Override public Queue createQueue(String queueName) throws JMSException { checkClosed(); if (queueName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { @@ -1264,6 +1289,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * internal error. * @since 1.1 */ + @Override public Topic createTopic(String topicName) throws JMSException { checkClosed(); if (topicName.startsWith(ActiveMQDestination.TEMP_DESTINATION_NAME_PREFIX)) { @@ -1315,6 +1341,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws InvalidDestinationException if an invalid topic is specified. * @since 1.1 */ + @Override public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException { checkClosed(); return createDurableSubscriber(topic, name, null, false); @@ -1360,6 +1387,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws InvalidSelectorException if the message selector is invalid. * @since 1.1 */ + @Override public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException { checkClosed(); @@ -1367,11 +1395,6 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta throw new InvalidDestinationException("Topic cannot be null"); } - if (isIndividualAcknowledge()) { - throw JMSExceptionSupport.create("Cannot create a durable consumer for a Session in "+ - "INDIVIDUAL_ACKNOWLEDGE mode.", null); - } - if (topic instanceof CustomDestination) { CustomDestination customDestination = (CustomDestination)topic; return customDestination.createDurableSubscriber(this, name, messageSelector, noLocal); @@ -1397,6 +1420,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * specified * @since 1.1 */ + @Override public QueueBrowser createBrowser(Queue queue) throws JMSException { checkClosed(); return createBrowser(queue, null); @@ -1419,6 +1443,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws InvalidSelectorException if the message selector is invalid. * @since 1.1 */ + @Override public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException { checkClosed(); return new ActiveMQQueueBrowser(this, getNextConsumerId(), ActiveMQMessageTransformation.transformDestination(queue), messageSelector, asyncDispatch); @@ -1433,6 +1458,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * to some internal error. * @since 1.1 */ + @Override public TemporaryQueue createTemporaryQueue() throws JMSException { checkClosed(); return (TemporaryQueue)connection.createTempDestination(false); @@ -1447,6 +1473,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * to some internal error. * @since 1.1 */ + @Override public TemporaryTopic createTemporaryTopic() throws JMSException { checkClosed(); return (TemporaryTopic)connection.createTempDestination(true); @@ -1463,6 +1490,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws JMSException * @throws InvalidDestinationException if an invalid queue is specified. */ + @Override public QueueReceiver createReceiver(Queue queue) throws JMSException { checkClosed(); return createReceiver(queue, null); @@ -1483,6 +1511,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws InvalidDestinationException if an invalid queue is specified. * @throws InvalidSelectorException if the message selector is invalid. */ + @Override public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException { checkClosed(); @@ -1507,6 +1536,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * internal error. * @throws InvalidDestinationException if an invalid queue is specified. */ + @Override public QueueSender createSender(Queue queue) throws JMSException { checkClosed(); if (queue instanceof CustomDestination) { @@ -1537,6 +1567,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * some internal error. * @throws InvalidDestinationException if an invalid topic is specified. */ + @Override public TopicSubscriber createSubscriber(Topic topic) throws JMSException { checkClosed(); return createSubscriber(topic, null, false); @@ -1575,6 +1606,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * @throws InvalidDestinationException if an invalid topic is specified. * @throws InvalidSelectorException if the message selector is invalid. */ + @Override public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException { checkClosed(); @@ -1603,6 +1635,7 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * some internal error. * @throws InvalidDestinationException if an invalid topic is specified. */ + @Override public TopicPublisher createPublisher(Topic topic) throws JMSException { checkClosed(); @@ -1633,11 +1666,13 @@ public class ActiveMQSession implements Session, QueueSession, TopicSession, Sta * specified. * @since 1.1 */ + @Override public void unsubscribe(String name) throws JMSException { checkClosed(); connection.unsubscribe(name); } + @Override public void dispatch(MessageDispatch messageDispatch) { try { executor.execute(messageDispatch); http://git-wip-us.apache.org/repos/asf/activemq/blob/a88e19e7/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java index 90a7dee..ec3c153 100644 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/JMSIndividualAckTest.java @@ -24,7 +24,6 @@ import javax.jms.MessageProducer; import javax.jms.Queue; import javax.jms.Session; import javax.jms.TextMessage; -import javax.jms.Topic; /** * @@ -33,6 +32,7 @@ public class JMSIndividualAckTest extends TestSupport { private Connection connection; + @Override protected void setUp() throws Exception { super.setUp(); connection = createConnection(); @@ -41,6 +41,7 @@ public class JMSIndividualAckTest extends TestSupport { /** * @see junit.framework.TestCase#tearDown() */ + @Override protected void tearDown() throws Exception { if (connection != null) { connection.close(); @@ -154,25 +155,7 @@ public class JMSIndividualAckTest extends TestSupport { session.close(); } - /** - * Tests that a durable consumer cannot be created for Individual Ack mode. - * - * @throws JMSException - */ - public void testCreateDurableConsumerFails() throws JMSException { - connection.start(); - Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); - Topic dest = session.createTopic(getName()); - - try { - session.createDurableSubscriber(dest, getName()); - fail("Should not be able to create duable subscriber."); - } catch(Exception e) { - } - } - protected String getQueueName() { return getClass().getName() + "." + getName(); } - } http://git-wip-us.apache.org/repos/asf/activemq/blob/a88e19e7/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java ---------------------------------------------------------------------- diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java index 8017ffe..b610f8c 100755 --- a/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java +++ b/activemq-unit-tests/src/test/java/org/apache/activemq/usecases/DurableSubscriptionTestSupport.java @@ -29,12 +29,13 @@ import javax.jms.Topic; import javax.jms.TopicSubscriber; import org.apache.activemq.ActiveMQConnectionFactory; +import org.apache.activemq.ActiveMQSession; import org.apache.activemq.TestSupport; import org.apache.activemq.broker.BrokerService; import org.apache.activemq.store.PersistenceAdapter; /** - * + * */ public abstract class DurableSubscriptionTestSupport extends TestSupport { @@ -44,21 +45,25 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport { private MessageProducer producer; private BrokerService broker; + @Override protected ActiveMQConnectionFactory createConnectionFactory() throws Exception { return new ActiveMQConnectionFactory("vm://durable-broker"); } + @Override protected Connection createConnection() throws Exception { Connection rc = super.createConnection(); rc.setClientID(getName()); return rc; } + @Override protected void setUp() throws Exception { createBroker(); super.setUp(); } + @Override protected void tearDown() throws Exception { super.tearDown(); destroyBroker(); @@ -104,7 +109,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport { } protected abstract PersistenceAdapter createPersistenceAdapter() throws Exception; - + public void testMessageExpire() throws Exception { session = connection.createSession(false, javax.jms.Session.AUTO_ACKNOWLEDGE); Topic topic = session.createTopic("TestTopic"); @@ -117,12 +122,12 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport { // Make sure it works when the durable sub is active. producer.send(session.createTextMessage("Msg:1")); assertTextMessageEquals("Msg:1", consumer.receive(1000)); - + consumer.close(); - + producer.send(session.createTextMessage("Msg:2")); producer.send(session.createTextMessage("Msg:3")); - + consumer = session.createDurableSubscriber(topic, "sub1"); // Try to get the message. @@ -225,7 +230,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport { assertTextMessageEquals("Msg:2", consumer.receive(5000)); assertNull(consumer.receive(5000)); } - + public void testDurableSubscriptionBrokerRestart() throws Exception { // Create the durable sub. @@ -235,12 +240,12 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport { // Ensure that consumer will receive messages sent before it was created Topic topic = session.createTopic("TestTopic?consumer.retroactive=true"); consumer = session.createDurableSubscriber(topic, "sub1"); - + producer = session.createProducer(topic); producer.setDeliveryMode(DeliveryMode.PERSISTENT); producer.send(session.createTextMessage("Msg:1")); assertTextMessageEquals("Msg:1", consumer.receive(5000)); - + // Make sure cleanup kicks in Thread.sleep(1000); @@ -428,8 +433,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport { consumer = session.createDurableSubscriber(topic, "sub1"); Message msg = consumer.receive(1000); assertNotNull(msg); - assertEquals("Message 1", ((TextMessage)msg).getText()); - + assertEquals("Message 1", ((TextMessage) msg).getText()); } public void testDurableSubWorksInNewConnection() throws Exception { @@ -459,8 +463,56 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport { consumer = session.createDurableSubscriber(topic, "sub1"); Message msg = consumer.receive(1000); assertNotNull(msg); - assertEquals("Message 1", ((TextMessage)msg).getText()); + assertEquals("Message 1", ((TextMessage) msg).getText()); + } + + public void testIndividualAckWithDurableSubs() throws Exception { + // Create the consumer. + connection.start(); + + Session session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + Topic topic = session.createTopic("topic-" + getName()); + MessageConsumer consumer = session.createDurableSubscriber(topic, "sub1"); + // Drain any messages that may allready be in the sub + while (consumer.receive(1000) != null) { + } + consumer.close(); + MessageProducer producer = session.createProducer(topic); + producer.setDeliveryMode(DeliveryMode.PERSISTENT); + producer.send(session.createTextMessage("Message 1")); + producer.send(session.createTextMessage("Message 2")); + producer.send(session.createTextMessage("Message 3")); + producer.close(); + + connection.close(); + connection = createConnection(); + connection.start(); + + session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + consumer = session.createDurableSubscriber(topic, "sub1"); + + Message message = null; + for (int i = 0; i < 3; ++i) { + message = consumer.receive(5000); + assertNotNull(message); + assertEquals("Message " + (i + 1), ((TextMessage) message).getText()); + } + + message.acknowledge(); + + connection.close(); + connection = createConnection(); + connection.start(); + + session = connection.createSession(false, ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE); + consumer = session.createDurableSubscriber(topic, "sub1"); + + for (int i = 0; i < 2; ++i) { + message = consumer.receive(5000); + assertNotNull(message); + assertEquals("Message " + (i + 1), ((TextMessage) message).getText()); + } } private MessageProducer createProducer(Session session, Destination queue) throws JMSException { @@ -476,7 +528,7 @@ public abstract class DurableSubscriptionTestSupport extends TestSupport { private void assertTextMessageEquals(String string, Message message) throws JMSException { assertNotNull("Message was null", message); assertTrue("Message is not a TextMessage", message instanceof TextMessage); - assertEquals(string, ((TextMessage)message).getText()); + assertEquals(string, ((TextMessage) message).getText()); } }
