Author: chirino
Date: Wed Mar 14 18:45:49 2007
New Revision: 518437
URL: http://svn.apache.org/viewvc?view=rev&rev=518437
Log:
- Propagate the AlwaysSyncSend setting from the ConnectionFactory to the
Connection
- Got rid of the UseSyncSend property since AlwaysSyncSend was already there
and did the same thing.
- Updated VMPendingMessageCursor so that it updates the reference counters of
the message so that the usage managers are properly updated since the messages
are being kept in memory.
- Updated the region Queue so that it decrements the usage in the case of a
transaction.
- Enabled the ProducerFlowControlTest since it is now working.
Modified:
activemq/trunk/activemq-core/pom.xml
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Modified: activemq/trunk/activemq-core/pom.xml
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Mar 14 18:45:49 2007
@@ -267,7 +267,6 @@
<exclude>**/nio/**</exclude>
<exclude>**/AMQDeadlockTest3.*</exclude>
- <exclude>**/ProducerFlowControlTest.*</exclude>
</excludes>
</configuration>
</plugin>
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
Wed Mar 14 18:45:49 2007
@@ -90,7 +90,6 @@
private boolean nestedMapAndListEnabled = true;
JMSStatsImpl factoryStats = new JMSStatsImpl();
private boolean alwaysSyncSend;
- private boolean useSyncSend=false;
private boolean watchTopicAdvisories=true;
private int producerWindowSize=DEFAULT_PRODUCER_WINDOW_SIZE;
@@ -259,6 +258,7 @@
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
connection.setDispatchAsync(isDispatchAsync());
connection.setUseAsyncSend(isUseAsyncSend());
+ connection.setAlwaysSyncSend(isAlwaysSyncSend());
connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
@@ -437,10 +437,6 @@
this.useAsyncSend = useAsyncSend;
}
- public void setUseSyncSend(boolean forceSyncSend) {
- this.useSyncSend = forceSyncSend;
- }
-
public synchronized boolean isWatchTopicAdvisories() {
return watchTopicAdvisories;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Wed Mar 14 18:45:49 2007
@@ -1151,6 +1151,8 @@
}
producerExchanges.put(id,result);
}
+ } else {
+ context = result.getConnectionContext();
}
return result;
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
Wed Mar 14 18:45:49 2007
@@ -431,38 +431,41 @@
if(store!=null&&message.isPersistent()){
store.addMessage(context,message);
}
- message.incrementReferenceCount();
+
if(context.isInTransaction()){
+ // If this is a transacted message.. increase the usage now so
that a big TX does not blow up
+ // our memory. This increment is decremented once the tx
finishes..
+ message.incrementReferenceCount();
context.getTransaction().addSynchronization(new Synchronization(){
-
public void afterCommit() throws Exception{
- //even though the message could be expired - it won't be
from the store
- //and it's important to keep the store/cursor in step
- synchronized(messages){
- messages.addMessageLast(message);
- }
- // It could take while before we receive the commit
- // op, by that time the message could have expired..
- if(message.isExpired()){
- // TODO: remove message from store.
- if (log.isDebugEnabled()) {
- log.debug("Expired message: " + message);
- }
- if(
producerExchange.getProducerState().getInfo().getWindowSize() > 0 ||
!message.isResponseRequired() ) {
- ProducerAck ack = new
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
-
context.getConnection().dispatchAsync(ack);
- }
- return;
- }
- sendMessage(context,message);
+ try {
+ // It could take while before we receive the commit
+ // op, by that time the message could have expired..
+ if(message.isExpired()){
+ // TODO: remove message from store.
+ if (log.isDebugEnabled()) {
+ log.debug("Expired message: " + message);
+ }
+ if(
producerExchange.getProducerState().getInfo().getWindowSize() > 0 ||
!message.isResponseRequired() ) {
+ ProducerAck ack = new
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(),
message.getSize());
+
context.getConnection().dispatchAsync(ack);
+ }
+ return;
+ }
+ sendMessage(context,message);
+ } finally {
+ message.decrementReferenceCount();
+ }
+ }
+
+ @Override
+ public void afterRollback() throws Exception {
+ message.decrementReferenceCount();
}
});
}else{
- synchronized(messages){
- messages.addMessageLast(message);
- }
- sendMessage(context,message);
-
+ // Add to the pending list, this takes care of incrementing the
usage manager.
+ sendMessage(context,message);
}
}
@@ -982,8 +985,9 @@
private void sendMessage(final ConnectionContext context,Message msg)
throws Exception{
-
-
+ synchronized(messages){
+ messages.addMessageLast(msg);
+ }
destinationStatistics.getEnqueues().increment();
destinationStatistics.getMessages().increment();
pageInMessages(false);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
Wed Mar 14 18:45:49 2007
@@ -21,8 +21,10 @@
* @version $Revision$
*/
public class VMPendingMessageCursor extends AbstractPendingMessageCursor{
- private LinkedList list = new LinkedList();
- private Iterator iter = null;
+ private LinkedList<MessageReference> list = new
LinkedList<MessageReference>();
+ private Iterator<MessageReference> iter = null;
+ private MessageReference last;
+
/**
* @return true if there are no pending messages
*/
@@ -36,6 +38,7 @@
*/
public void reset(){
iter = list.listIterator();
+ last=null;
}
/**
@@ -44,6 +47,7 @@
* @param node
*/
public void addMessageLast(MessageReference node){
+ node.incrementReferenceCount();
list.addLast(node);
}
@@ -53,6 +57,7 @@
* @param node
*/
public void addMessageFirst(MessageReference node){
+ node.incrementReferenceCount();
list.addFirst(node);
}
@@ -68,7 +73,8 @@
* @return the next pending message
*/
public MessageReference next(){
- return (MessageReference) iter.next();
+ last = (MessageReference) iter.next();
+ return last;
}
/**
@@ -76,6 +82,9 @@
*
*/
public void remove(){
+ if( last!=null ) {
+ last.decrementReferenceCount();
+ }
iter.remove();
}
@@ -95,9 +104,10 @@
}
public void remove(MessageReference node){
- for(Iterator i=list.iterator();i.hasNext();){
- MessageReference ref=(MessageReference)i.next();
+ for(Iterator<MessageReference> i=list.iterator();i.hasNext();){
+ MessageReference ref=i.next();
if(node.getMessageId().equals(ref.getMessageId())){
+ ref.decrementReferenceCount();
i.remove();
break;
}
@@ -109,7 +119,7 @@
* @param maxItems
* @return a list of paged in messages
*/
- public LinkedList pageInList(int maxItems) {
+ public LinkedList<MessageReference> pageInList(int maxItems) {
return list;
}
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
Wed Mar 14 18:45:49 2007
@@ -285,7 +285,7 @@
acf.setUseCompression(false);
acf.setOptimizeAcknowledge(false);
acf.setOptimizedMessageDispatch(true);
- acf.setUseSyncSend(true);
+ acf.setAlwaysSyncSend(true);
return acf;
}
Modified:
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
---
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
(original)
+++
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
Wed Mar 14 18:45:49 2007
@@ -17,6 +17,7 @@
import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.policy.PolicyEntry;
import org.apache.activemq.broker.region.policy.PolicyMap;
+import
org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
import
org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.transport.tcp.TcpTransport;
@@ -31,21 +32,19 @@
public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws
Exception {
ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)
createConnectionFactory();
- factory.setUseSyncSend(true);
+ factory.setAlwaysSyncSend(true);
connection = (ActiveMQConnection) factory.createConnection();
connections.add(connection);
connection.start();
- // Test sending to Queue A
- // 1st send should not block.
- fillQueue(queueA);
-
Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
MessageConsumer consumer = session.createConsumer(queueB);
- // Test sending to Queue B it should block.
- // Since even though the it's queue limits have not been reached, the
connection
- // is blocked.
+ // Test sending to Queue A
+ // 1st send should not block. But the rest will.
+ fillQueue(queueA);
+
+ // Test sending to Queue B it should not block.
CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
@@ -61,6 +60,32 @@
msg.acknowledge();
}
+ public void testSimpleSendReceive() throws Exception {
+ ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory)
createConnectionFactory();
+ factory.setAlwaysSyncSend(true);
+ connection = (ActiveMQConnection) factory.createConnection();
+ connections.add(connection);
+ connection.start();
+
+ Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queueA);
+
+ // Test sending to Queue B it should not block.
+ CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
+ assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) );
+
+ TextMessage msg = (TextMessage) consumer.receive();
+ assertEquals("Message 1", msg.getText());
+ msg.acknowledge();
+
+ pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
+ assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) );
+
+ msg = (TextMessage) consumer.receive();
+ assertEquals("Message 2", msg.getText());
+ msg.acknowledge();
+ }
+
public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws
Exception {
ConnectionFactory factory = createConnectionFactory();
connection = (ActiveMQConnection) factory.createConnection();
@@ -143,6 +168,7 @@
PolicyEntry policy = new PolicyEntry();
policy.setMemoryLimit(1);
policy.setPendingSubscriberPolicy(new
VMPendingSubscriberMessageStoragePolicy());
+ policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
policyMap.setDefaultEntry(policy);
service.setDestinationPolicy(policyMap);