Author: dejanb
Date: Tue Oct 12 13:29:37 2010
New Revision: 1021768
URL: http://svn.apache.org/viewvc?rev=1021768&view=rev
Log:
https://issues.apache.org/activemq/browse/AMQ-2973 - removing composite
subscription
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ListenerTest.java
activemq/trunk/activemq-spring/src/test/resources/spring/spring.xml
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java?rev=1021768&r1=1021767&r2=1021768&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/PrefetchSubscription.java
Tue Oct 12 13:29:37 2010
@@ -547,14 +547,16 @@ public abstract class PrefetchSubscripti
// Synchronized to DispatchLock
synchronized(dispatchLock) {
+ ArrayList<MessageReference> references = new
ArrayList<MessageReference>();
for (MessageReference r : dispatched) {
if( r.getRegionDestination() == destination) {
- rc.add(r);
+ references.add(r);
}
}
-
destination.getDestinationStatistics().getDispatched().subtract(dispatched.size());
-
destination.getDestinationStatistics().getInflight().subtract(dispatched.size());
- dispatched.clear();
+ rc.addAll(references);
+
destination.getDestinationStatistics().getDispatched().subtract(references.size());
+
destination.getDestinationStatistics().getInflight().subtract(references.size());
+ dispatched.removeAll(references);
}
}
return rc;
Modified:
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ListenerTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ListenerTest.java?rev=1021768&r1=1021767&r2=1021768&view=diff
==============================================================================
---
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ListenerTest.java
(original)
+++
activemq/trunk/activemq-spring/src/test/java/org/apache/activemq/spring/ListenerTest.java
Tue Oct 12 13:29:37 2010
@@ -51,7 +51,7 @@ public class ListenerTest {
Thread.sleep(3000);
LOG.info("messages received= " + listener.messages.size());
- Assert.assertEquals(listener.messages.size(), msgNum);
+ Assert.assertEquals(msgNum, listener.messages.size());
}
@@ -63,7 +63,7 @@ public class ListenerTest {
Thread.sleep(3000);
LOG.info("messages received= " + listener.messages.size());
- Assert.assertEquals(listener.messages.size(), 6 * msgNum);
+ Assert.assertEquals(6 * msgNum, listener.messages.size());
}
public void sendMessages(String destName, int msgNum) throws Exception {
Modified: activemq/trunk/activemq-spring/src/test/resources/spring/spring.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-spring/src/test/resources/spring/spring.xml?rev=1021768&r1=1021767&r2=1021768&view=diff
==============================================================================
--- activemq/trunk/activemq-spring/src/test/resources/spring/spring.xml
(original)
+++ activemq/trunk/activemq-spring/src/test/resources/spring/spring.xml Tue Oct
12 13:29:37 2010
@@ -58,9 +58,8 @@
<bean id="compositeContainer"
class="org.springframework.jms.listener.DefaultMessageListenerContainer">
<property name="connectionFactory" ref="connectionFactory"/>
<property name="messageListener" ref="messageListener"/>
- <property name="destinationName" value="TEST.>"/>
+ <property name="destinationName"
value="TEST.>?consumer.prefetchSize=1"/>
<property name="transactionManager" ref="transactionManager"/>
- <property name="cacheLevelName" value="CACHE_CONSUMER"/>
</bean>
</beans>
\ No newline at end of file