Author: chirino
Date: Wed Mar 14 18:45:49 2007
New Revision: 518437

URL: http://svn.apache.org/viewvc?view=rev&rev=518437
Log:
- Propagate the AlwaysSyncSend setting from the ConnectionFactory to the 
Connection
- Got rid of the UseSyncSend property since AlwaysSyncSend was already there 
and did the same thing.
- Updated VMPendingMessageCursor so that it updates the reference counters of 
the message so that the usage managers are properly updated since the messages 
are being kept in memory.
- Updated the region Queue so that it decrements the usage in the case of a 
transaction.
- Enabled the ProducerFlowControlTest since it is now working.

Modified:
    activemq/trunk/activemq-core/pom.xml
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java

Modified: activemq/trunk/activemq-core/pom.xml
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/pom.xml?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- activemq/trunk/activemq-core/pom.xml (original)
+++ activemq/trunk/activemq-core/pom.xml Wed Mar 14 18:45:49 2007
@@ -267,7 +267,6 @@
             <exclude>**/nio/**</exclude>
 
             <exclude>**/AMQDeadlockTest3.*</exclude>
-            <exclude>**/ProducerFlowControlTest.*</exclude>
           </excludes>
         </configuration>
       </plugin>

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnectionFactory.java
 Wed Mar 14 18:45:49 2007
@@ -90,7 +90,6 @@
     private boolean nestedMapAndListEnabled = true;
     JMSStatsImpl factoryStats = new JMSStatsImpl();
     private boolean alwaysSyncSend;
-    private boolean useSyncSend=false;
     private boolean watchTopicAdvisories=true;
     private int producerWindowSize=DEFAULT_PRODUCER_WINDOW_SIZE;
 
@@ -259,6 +258,7 @@
             
connection.setObjectMessageSerializationDefered(isObjectMessageSerializationDefered());
             connection.setDispatchAsync(isDispatchAsync());
             connection.setUseAsyncSend(isUseAsyncSend());
+            connection.setAlwaysSyncSend(isAlwaysSyncSend());
             connection.setAlwaysSessionAsync(isAlwaysSessionAsync());
             connection.setOptimizeAcknowledge(isOptimizeAcknowledge());
             connection.setUseRetroactiveConsumer(isUseRetroactiveConsumer());
@@ -437,10 +437,6 @@
         this.useAsyncSend = useAsyncSend;
     }
     
-       public void setUseSyncSend(boolean forceSyncSend) {
-               this.useSyncSend = forceSyncSend;
-       }
-
        public synchronized boolean isWatchTopicAdvisories() {
                return watchTopicAdvisories;
        }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
 Wed Mar 14 18:45:49 2007
@@ -1151,6 +1151,8 @@
                 }
                 producerExchanges.put(id,result);
             }
+        } else {
+               context = result.getConnectionContext();
         }
         return result;
     }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Wed Mar 14 18:45:49 2007
@@ -431,38 +431,41 @@
         if(store!=null&&message.isPersistent()){
             store.addMessage(context,message);
         }
-        message.incrementReferenceCount();
+        
         if(context.isInTransaction()){
+               // If this is a transacted message.. increase the usage now so 
that a big TX does not blow up
+               // our memory.  This increment is decremented once the tx 
finishes..
+            message.incrementReferenceCount();
             context.getTransaction().addSynchronization(new Synchronization(){
-
                 public void afterCommit() throws Exception{
-                    //even though the message could be expired - it won't be 
from the store
-                    //and it's important to keep the store/cursor in step
-                    synchronized(messages){
-                        messages.addMessageLast(message);
-                    }
-                    // It could take while before we receive the commit
-                    // op, by that time the message could have expired..
-                    if(message.isExpired()){
-                        // TODO: remove message from store.
-                        if (log.isDebugEnabled()) {
-                            log.debug("Expired message: " + message);
-                        }
-                        if( 
producerExchange.getProducerState().getInfo().getWindowSize() > 0 || 
!message.isResponseRequired() ) {
-                               ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
-                                       
context.getConnection().dispatchAsync(ack);                                     
                
-                        }
-                        return;
-                    }
-                    sendMessage(context,message);
+                       try { 
+                        // It could take while before we receive the commit
+                        // op, by that time the message could have expired..
+                           if(message.isExpired()){
+                               // TODO: remove message from store.
+                               if (log.isDebugEnabled()) {
+                                   log.debug("Expired message: " + message);
+                               }
+                               if( 
producerExchange.getProducerState().getInfo().getWindowSize() > 0 || 
!message.isResponseRequired() ) {
+                                       ProducerAck ack = new 
ProducerAck(producerExchange.getProducerState().getInfo().getProducerId(), 
message.getSize());
+                                               
context.getConnection().dispatchAsync(ack);                                     
                
+                               }
+                               return;
+                           }
+                           sendMessage(context,message);
+                       } finally {
+                        message.decrementReferenceCount();
+                       }
+                }
+                
+                @Override
+                public void afterRollback() throws Exception {
+                    message.decrementReferenceCount();
                 }
             });
         }else{
-            synchronized(messages){
-                messages.addMessageLast(message);
-            }
-            sendMessage(context,message);
-            
+               // Add to the pending list, this takes care of incrementing the 
usage manager.
+            sendMessage(context,message);            
         }
        }    
 
@@ -982,8 +985,9 @@
     
       
     private void sendMessage(final ConnectionContext context,Message msg) 
throws Exception{
-        
-        
+        synchronized(messages){
+            messages.addMessageLast(msg);
+        }
         destinationStatistics.getEnqueues().increment();
         destinationStatistics.getMessages().increment();
         pageInMessages(false);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/cursors/VMPendingMessageCursor.java
 Wed Mar 14 18:45:49 2007
@@ -21,8 +21,10 @@
  * @version $Revision$
  */
 public class VMPendingMessageCursor extends AbstractPendingMessageCursor{
-    private LinkedList list = new LinkedList();
-    private Iterator iter = null;
+    private LinkedList<MessageReference> list = new 
LinkedList<MessageReference>();
+    private Iterator<MessageReference> iter = null;
+    private MessageReference last;
+    
     /**
      * @return true if there are no pending messages
      */
@@ -36,6 +38,7 @@
      */
     public void reset(){
         iter = list.listIterator();
+        last=null;
     }
     
     /**
@@ -44,6 +47,7 @@
      * @param node
      */
     public void addMessageLast(MessageReference node){
+       node.incrementReferenceCount();
         list.addLast(node);
     }
     
@@ -53,6 +57,7 @@
      * @param node
      */
     public void addMessageFirst(MessageReference node){
+       node.incrementReferenceCount();
         list.addFirst(node);
     }
 
@@ -68,7 +73,8 @@
      * @return the next pending message
      */
     public MessageReference next(){
-        return (MessageReference) iter.next();
+       last = (MessageReference) iter.next();
+       return last;
     }
 
     /**
@@ -76,6 +82,9 @@
      * 
      */
     public void remove(){
+       if( last!=null ) {
+               last.decrementReferenceCount();
+       }
         iter.remove();
     }
 
@@ -95,9 +104,10 @@
     }
     
     public void remove(MessageReference node){
-        for(Iterator i=list.iterator();i.hasNext();){
-            MessageReference ref=(MessageReference)i.next();
+        for(Iterator<MessageReference> i=list.iterator();i.hasNext();){
+            MessageReference ref=i.next();
             if(node.getMessageId().equals(ref.getMessageId())){
+               ref.decrementReferenceCount();
                 i.remove();
                 break;
             }
@@ -109,7 +119,7 @@
      * @param maxItems
      * @return a list of paged in messages
      */
-    public LinkedList pageInList(int maxItems) {
+    public LinkedList<MessageReference> pageInList(int maxItems) {
         return list;
     }
 }

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/AMQDeadlockTest3.java
 Wed Mar 14 18:45:49 2007
@@ -285,7 +285,7 @@
                acf.setUseCompression(false);
                acf.setOptimizeAcknowledge(false);
                acf.setOptimizedMessageDispatch(true);
-               acf.setUseSyncSend(true);
+               acf.setAlwaysSyncSend(true);
                return acf;
        }
 

Modified: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java?view=diff&rev=518437&r1=518436&r2=518437
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
 (original)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/ProducerFlowControlTest.java
 Wed Mar 14 18:45:49 2007
@@ -17,6 +17,7 @@
 import org.apache.activemq.broker.TransportConnector;
 import org.apache.activemq.broker.region.policy.PolicyEntry;
 import org.apache.activemq.broker.region.policy.PolicyMap;
+import 
org.apache.activemq.broker.region.policy.VMPendingQueueMessageStoragePolicy;
 import 
org.apache.activemq.broker.region.policy.VMPendingSubscriberMessageStoragePolicy;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.transport.tcp.TcpTransport;
@@ -31,21 +32,19 @@
 
     public void test2ndPubisherWithSyncSendConnectionThatIsBlocked() throws 
Exception {
         ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) 
createConnectionFactory();
-        factory.setUseSyncSend(true);
+        factory.setAlwaysSyncSend(true);
         connection = (ActiveMQConnection) factory.createConnection();
         connections.add(connection);
        connection.start();
 
-       // Test sending to Queue A
-       // 1st send should not block.
-       fillQueue(queueA);
-       
        Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
        MessageConsumer consumer = session.createConsumer(queueB);
 
-       // Test sending to Queue B it should block. 
-       // Since even though  the it's queue limits have not been reached, the 
connection
-       // is blocked.
+       // Test sending to Queue A
+       // 1st send should not block.  But the rest will.
+       fillQueue(queueA);
+
+       // Test sending to Queue B it should not block. 
        CountDownLatch pubishDoneToQeueuB = asyncSendTo(queueB, "Message 1");
        assertTrue( pubishDoneToQeueuB.await(2, TimeUnit.SECONDS) );
        
@@ -61,6 +60,32 @@
        msg.acknowledge();
     }
 
+    public void testSimpleSendReceive() throws Exception {
+        ActiveMQConnectionFactory factory = (ActiveMQConnectionFactory) 
createConnectionFactory();
+        factory.setAlwaysSyncSend(true);
+        connection = (ActiveMQConnection) factory.createConnection();
+        connections.add(connection);
+       connection.start();
+
+       Session session = connection.createSession(false, 
Session.CLIENT_ACKNOWLEDGE);
+       MessageConsumer consumer = session.createConsumer(queueA);
+
+       // Test sending to Queue B it should not block. 
+       CountDownLatch pubishDoneToQeueuA = asyncSendTo(queueA, "Message 1");
+       assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) );
+       
+       TextMessage msg = (TextMessage) consumer.receive();
+       assertEquals("Message 1", msg.getText());
+       msg.acknowledge();
+       
+       pubishDoneToQeueuA = asyncSendTo(queueA, "Message 2");
+       assertTrue( pubishDoneToQeueuA.await(2, TimeUnit.SECONDS) );
+       
+       msg = (TextMessage) consumer.receive();
+       assertEquals("Message 2", msg.getText());
+       msg.acknowledge();
+    }
+
     public void test2ndPubisherWithStandardConnectionThatIsBlocked() throws 
Exception {
         ConnectionFactory factory = createConnectionFactory();
         connection = (ActiveMQConnection) factory.createConnection();
@@ -143,6 +168,7 @@
         PolicyEntry policy = new PolicyEntry();
         policy.setMemoryLimit(1);
         policy.setPendingSubscriberPolicy(new 
VMPendingSubscriberMessageStoragePolicy());
+        policy.setPendingQueuePolicy(new VMPendingQueueMessageStoragePolicy());
         policyMap.setDefaultEntry(policy);        
         service.setDestinationPolicy(policyMap);
         


Reply via email to