Author: chirino
Date: Wed Mar 15 11:04:32 2006
New Revision: 386132

URL: http://svn.apache.org/viewcvs?rev=386132&view=rev
Log:
disable the use of range acks with network connectors since that could cause 
the broker to block waiting for messages to be consumed (in the case of big 
messages being sent).

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=386132&r1=386131&r2=386132&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 Wed Mar 15 11:04:32 2006
@@ -375,11 +375,18 @@
                                 serviceLocalException(er.getException());
                             }
                         }
-                        int dispatched = sub.incrementDispatched();
-                        
if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
-                            localBroker.oneway(new 
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
-                            sub.setDispatched(0);
-                        }
+                        
+                      // Ack on every message since we don't know if the 
broker is blocked due to memory
+                      // usage and is waiting for an Ack to un-block him. 
+                      localBroker.oneway(new 
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+
+                      // Acking a range is more efficient, but also more prone 
to locking up a server
+                      // Perhaps doing something like the following should be 
policy based.
+//                        int dispatched = sub.incrementDispatched();
+//                        
if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
+//                            localBroker.oneway(new 
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
+//                            sub.setDispatched(0);
+//                        }
                     }
                 }else if(command.isBrokerInfo()){
                     serviceLocalBrokerInfo(command);

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=386132&r1=386131&r2=386132&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
 Wed Mar 15 11:04:32 2006
@@ -218,19 +218,25 @@
 
                 remoteBroker.oneway( message );
                 
-                if( 
md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
-                    queueDispatched++;
-                    if( queueDispatched > 
(queueConsumerInfo.getPrefetchSize()/2) ) {
-                        localBroker.oneway(new MessageAck(md, 
MessageAck.STANDARD_ACK_TYPE, queueDispatched));
-                        queueDispatched=0;
-                    }
-                } else {
-                    topicDispatched++;
-                    if( topicDispatched > 
(topicConsumerInfo.getPrefetchSize()/2) ) {
-                        localBroker.oneway(new MessageAck(md, 
MessageAck.STANDARD_ACK_TYPE, topicDispatched));
-                        topicDispatched=0;
-                    }
-                }
+                // Ack on every message since we don't know if the broker is 
blocked due to memory
+                // usage and is waiting for an Ack to un-block him. 
+                localBroker.oneway(new 
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+
+                // Acking a range is more efficient, but also more prone to 
locking up a server
+                // Perhaps doing something like the following should be policy 
based.
+//                if( 
md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
+//                    queueDispatched++;
+//                    if( queueDispatched > 
(queueConsumerInfo.getPrefetchSize()/2) ) {
+//                        localBroker.oneway(new MessageAck(md, 
MessageAck.STANDARD_ACK_TYPE, queueDispatched));
+//                        queueDispatched=0;
+//                    }
+//                } else {
+//                    topicDispatched++;
+//                    if( topicDispatched > 
(topicConsumerInfo.getPrefetchSize()/2) ) {
+//                        localBroker.oneway(new MessageAck(md, 
MessageAck.STANDARD_ACK_TYPE, topicDispatched));
+//                        topicDispatched=0;
+//                    }
+//                }
             } else if(command.isBrokerInfo() ) {
                 synchronized( this ) {
                     localBrokerId = ((BrokerInfo)command).getBrokerId();


Reply via email to