Author: gtully
Date: Thu Oct 16 09:48:52 2008
New Revision: 705281
URL: http://svn.apache.org/viewvc?rev=705281&view=rev
Log:
AMQ-1976, individual messages passing through a network bridge need an
individual ack; for an individual ack, it is ok to have more than one message
dispatched
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=705281&r1=705280&r2=705281&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Thu Oct 16 09:48:52 2008
@@ -411,40 +411,44 @@
* @param lastAckedMsg
* @throws JMSException if it does not match
*/
- protected void assertAckMatchesDispatched(MessageAck ack)
- throws JMSException {
+ protected void assertAckMatchesDispatched(MessageAck ack) throws
JMSException {
MessageId firstAckedMsg = ack.getFirstMessageId();
- MessageId lastAckedMsg = ack.getLastMessageId();
+ MessageId lastAckedMsg = ack.getLastMessageId();
+ int checkCount = 0;
+ boolean checkFoundStart = false;
+ boolean checkFoundEnd = false;
+ for (MessageReference node : dispatched) {
+
+ if (firstAckedMsg == null) {
+ checkFoundStart = true;
+ } else if (!checkFoundStart &&
firstAckedMsg.equals(node.getMessageId())) {
+ checkFoundStart = true;
+ }
+
+ if (checkFoundStart) {
+ checkCount++;
+ }
- int checkCount = 0;
- boolean checkFoundStart = false;
- boolean checkFoundEnd = false;
- for (MessageReference node : dispatched) {
-
- if( firstAckedMsg == null ) {
- checkFoundStart=true;
- } else if (!checkFoundStart &&
firstAckedMsg.equals(node.getMessageId())) {
- checkFoundStart = true;
- }
-
- if (checkFoundStart) {
- checkCount++;
- }
-
- if (lastAckedMsg != null &&
lastAckedMsg.equals(node.getMessageId())) {
- checkFoundEnd = true;
- break;
- }
- }
- if (!checkFoundStart && firstAckedMsg != null)
- throw new JMSException("Unmatched acknowledege: Could
not find Message-ID "+firstAckedMsg+" in dispatched-list (start of ack)");
- if (!checkFoundEnd && lastAckedMsg != null)
- throw new JMSException("Unmatched acknowledege: Could
not find Message-ID "+lastAckedMsg+" in dispatched-list (end of ack)");
- if (ack.getMessageCount() != checkCount) {
- throw new JMSException("Unmatched acknowledege:
Expected message count ("+ack.getMessageCount()+
- ") differs from count in
dispatched-list ("+checkCount+")");
- }
- }
+ if (lastAckedMsg != null &&
lastAckedMsg.equals(node.getMessageId())) {
+ checkFoundEnd = true;
+ break;
+ }
+ }
+ if (!checkFoundStart && firstAckedMsg != null)
+ throw new JMSException("Unmatched acknowledege: " + ack
+ + "; Could not find Message-ID " + firstAckedMsg
+ + " in dispatched-list (start of ack)");
+ if (!checkFoundEnd && lastAckedMsg != null)
+ throw new JMSException("Unmatched acknowledege: " + ack
+ + "; Could not find Message-ID " + lastAckedMsg
+ + " in dispatched-list (end of ack)");
+ if (ack.getMessageCount() != checkCount && ack.isStandardAck()) {
+ throw new JMSException("Unmatched acknowledege: " + ack
+ + "; Expected message count (" + ack.getMessageCount()
+ + ") differs from count in dispatched-list (" + checkCount
+ + ")");
+ }
+ }
/**
* @param context
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=705281&r1=705280&r2=705281&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Thu Oct 16 09:48:52 2008
@@ -647,7 +647,7 @@
else{
LOG.info("Message not forwarded on to remote,
because message came from remote");
}
- localBroker.oneway(new MessageAck(md,
MessageAck.STANDARD_ACK_TYPE, 1));
+ localBroker.oneway(new MessageAck(md,
MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet();
} else {
@@ -664,7 +664,7 @@
ExceptionResponse er =
(ExceptionResponse)response;
serviceLocalException(er.getException());
} else {
- localBroker.oneway(new
MessageAck(md, MessageAck.STANDARD_ACK_TYPE, 1));
+ localBroker.oneway(new
MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
dequeueCounter.incrementAndGet();
}
} catch (IOException e) {
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java?rev=705281&r1=705280&r2=705281&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/QueueMemoryFullMultiBrokersTest.java
Thu Oct 16 09:48:52 2008
@@ -39,7 +39,6 @@
public static final int MESSAGE_COUNT = 2000;
public void testQueueNetworkWithConsumerFull() throws Exception {
- if (true) return;
bridgeAllBrokers();
startAllBrokers();
@@ -67,8 +66,6 @@
assertTrue("All messages are consumed and acked from source:" +
internalQueue, internalQueue.getMessages().isEmpty());
assertEquals("messages source:" + internalQueue, 0,
internalQueue.getDestinationStatistics().getMessages().getCount());
assertEquals("inflight source:" + internalQueue, 0,
internalQueue.getDestinationStatistics().getInflight().getCount());
-
-
}
public void setUp() throws Exception {
@@ -82,7 +79,6 @@
}
BrokerService broker2 = brokers.get("Broker2").broker;
applyMemoryLimitPolicy(broker2);
-
}
private void applyMemoryLimitPolicy(BrokerService broker) {