Author: rajdavies
Date: Mon May 19 06:06:37 2008
New Revision: 657817
URL: http://svn.apache.org/viewvc?rev=657817&view=rev
Log:
more for https://issues.apache.org/activemq/browse/AMQ-1736
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
(with props)
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQMessageConsumer.java
Mon May 19 06:06:37 2008
@@ -858,7 +858,7 @@
}
void acknowledge(MessageDispatch md) throws JMSException {
- MessageAck ack = new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,deliveredMessages.size());
+ MessageAck ack = new MessageAck(md,MessageAck.INDIVIDUAL_ACK_TYPE,1);
session.asyncSendPacket(ack);
synchronized(deliveredMessages){
deliveredMessages.remove(md);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQSession.java
Mon May 19 06:06:37 2008
@@ -133,6 +133,12 @@
*/
public class ActiveMQSession implements Session, QueueSession, TopicSession,
StatsCapable, ActiveMQDispatcher {
+ /**
+ * Only acknowledge an individual message - using message.acknowledge()
+ * as opposed to CLIENT_ACKNOWLEDGE which
+ * acknowledges all messages consumed by a session at when acknowledge()
+ * is called
+ */
public static final int INDIVIDUAL_ACKNOWLEDGE=4;
public static interface DeliveryListener {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Mon May 19 06:06:37 2008
@@ -252,7 +252,24 @@
+ ack);
}
}
- } else if (ack.isDeliveredAck()) {
+ } else if (ack.isIndividualAck()) {
+ // Message was delivered and acknowledge - but only delete the
+ // individual message
+ for (final MessageReference node : dispatched) {
+ MessageId messageId = node.getMessageId();
+ if (ack.getLastMessageId().equals(messageId)) {
+ // this should never be within a transaction
+
node.getRegionDestination().getDestinationStatistics().getDequeues().increment();
+
node.getRegionDestination().getDestinationStatistics().getInflight().decrement();
+ destination = node.getRegionDestination();
+ acknowledge(context, ack, node);
+ dispatched.remove(node);
+ prefetchExtension = Math.max(0, prefetchExtension - 1);
+ callDispatchMatched = true;
+ break;
+ }
+ }
+ }else if (ack.isDeliveredAck()) {
// Message was delivered but not acknowledged: update pre-fetch
// counters.
// Acknowledge all dispatched messages up till the message id
of
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TopicSubscription.java
Mon May 19 06:06:37 2008
@@ -182,7 +182,7 @@
public synchronized void acknowledge(final ConnectionContext context,
final MessageAck ack) throws Exception {
// Handle the standard acknowledgment case.
- if (ack.isStandardAck() || ack.isPoisonAck()) {
+ if (ack.isStandardAck() || ack.isPoisonAck() || ack.isIndividualAck())
{
if (context.isInTransaction()) {
context.getTransaction().addSynchronization(new
Synchronization() {
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java?rev=657817&r1=657816&r2=657817&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/MessageAck.java
Mon May 19 06:06:37 2008
@@ -54,6 +54,11 @@
*/
public static final byte REDELIVERED_ACK_TYPE = 3;
+ /**
+ * The ack case where a client wants only an individual message to be
discarded.
+ */
+ public static final byte INDIVIDUAL_ACK_TYPE = 4;
+
protected byte ackType;
protected ConsumerId consumerId;
protected MessageId firstMessageId;
@@ -108,6 +113,10 @@
public boolean isRedeliveredAck() {
return ackType == REDELIVERED_ACK_TYPE;
}
+
+ public boolean isIndividualAck() {
+ return ackType == INDIVIDUAL_ACK_TYPE;
+ }
/**
* @openwire:property version=1 cache=true
Added:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java?rev=657817&view=auto
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
(added)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
Mon May 19 06:06:37 2008
@@ -0,0 +1,144 @@
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * @version $Revision: 1.4 $
+ */
+public class JMSIndividualAckTest extends TestSupport {
+
+ private Connection connection;
+
+ protected void setUp() throws Exception {
+ super.setUp();
+ connection = createConnection();
+ }
+
+ /**
+ * @see junit.framework.TestCase#tearDown()
+ */
+ protected void tearDown() throws Exception {
+ if (connection != null) {
+ connection.close();
+ connection = null;
+ }
+ super.tearDown();
+ }
+
+ /**
+ * Tests if acknowledged messages are being consumed.
+ *
+ * @throws JMSException
+ */
+ public void testAckedMessageAreConsumed() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ // Reset the session.
+ session.close();
+ session = connection.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(1000);
+ assertNull(msg);
+
+ session.close();
+ }
+
+ /**
+ * Tests if acknowledged messages are being consumed.
+ *
+ * @throws JMSException
+ */
+ public void testLastMessageAcked() throws JMSException {
+ connection.start();
+ Session session = connection.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ TextMessage msg1 = session.createTextMessage("msg1");
+ TextMessage msg2 = session.createTextMessage("msg2");
+ TextMessage msg3 = session.createTextMessage("msg3");
+ producer.send(msg1);
+ producer.send(msg2);
+ producer.send(msg3);
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ // Reset the session.
+ session.close();
+ session = connection.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals(msg1,msg);
+ msg = consumer.receive(1000);
+ assertNotNull(msg);
+ assertEquals(msg2,msg);
+ msg = consumer.receive(1000);
+ assertNull(msg);
+ session.close();
+ }
+
+ /**
+ * Tests if unacknowledged messages are being re-delivered when the
consumer connects again.
+ *
+ * @throws JMSException
+ */
+ public void testUnAckedMessageAreNotConsumedOnSessionClose() throws
JMSException {
+ connection.start();
+ Session session = connection.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+ Queue queue = session.createQueue(getQueueName());
+ MessageProducer producer = session.createProducer(queue);
+ producer.send(session.createTextMessage("Hello"));
+
+ // Consume the message...
+ MessageConsumer consumer = session.createConsumer(queue);
+ Message msg = consumer.receive(1000);
+ assertNotNull(msg);
+ // Don't ack the message.
+
+ // Reset the session. This should cause the unacknowledged message to
be re-delivered.
+ session.close();
+ session = connection.createSession(false,
ActiveMQSession.INDIVIDUAL_ACKNOWLEDGE);
+
+ // Attempt to Consume the message...
+ consumer = session.createConsumer(queue);
+ msg = consumer.receive(2000);
+ assertNotNull(msg);
+ msg.acknowledge();
+
+ session.close();
+ }
+
+ protected String getQueueName() {
+ return getClass().getName() + "." + getName();
+ }
+
+}
Propchange:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JMSIndividualAckTest.java
------------------------------------------------------------------------------
svn:eol-style = native