Author: chirino
Date: Wed Mar 15 11:04:32 2006
New Revision: 386132
URL: http://svn.apache.org/viewcvs?rev=386132&view=rev
Log:
disable the use of range acks with network connectors since that could cause
the broker to block waiting for messages to be consumed (in the case of big
messages being sent).
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=386132&r1=386131&r2=386132&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Mar 15 11:04:32 2006
@@ -375,11 +375,18 @@
serviceLocalException(er.getException());
}
}
- int dispatched = sub.incrementDispatched();
-
if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
- localBroker.oneway(new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
- sub.setDispatched(0);
- }
+
+ // Ack on every message since we don't know if the
broker is blocked due to memory
+ // usage and is waiting for an Ack to un-block him.
+ localBroker.oneway(new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+
+ // Acking a range is more efficient, but also more prone
to locking up a server
+ // Perhaps doing something like the following should be
policy based.
+// int dispatched = sub.incrementDispatched();
+//
if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
+// localBroker.oneway(new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
+// sub.setDispatched(0);
+// }
}
}else if(command.isBrokerInfo()){
serviceLocalBrokerInfo(command);
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=386132&r1=386131&r2=386132&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
Wed Mar 15 11:04:32 2006
@@ -218,19 +218,25 @@
remoteBroker.oneway( message );
- if(
md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
- queueDispatched++;
- if( queueDispatched >
(queueConsumerInfo.getPrefetchSize()/2) ) {
- localBroker.oneway(new MessageAck(md,
MessageAck.STANDARD_ACK_TYPE, queueDispatched));
- queueDispatched=0;
- }
- } else {
- topicDispatched++;
- if( topicDispatched >
(topicConsumerInfo.getPrefetchSize()/2) ) {
- localBroker.oneway(new MessageAck(md,
MessageAck.STANDARD_ACK_TYPE, topicDispatched));
- topicDispatched=0;
- }
- }
+ // Ack on every message since we don't know if the broker is
blocked due to memory
+ // usage and is waiting for an Ack to un-block him.
+ localBroker.oneway(new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,1));
+
+ // Acking a range is more efficient, but also more prone to
locking up a server
+ // Perhaps doing something like the following should be policy
based.
+// if(
md.getConsumerId().equals(queueConsumerInfo.getConsumerId()) ) {
+// queueDispatched++;
+// if( queueDispatched >
(queueConsumerInfo.getPrefetchSize()/2) ) {
+// localBroker.oneway(new MessageAck(md,
MessageAck.STANDARD_ACK_TYPE, queueDispatched));
+// queueDispatched=0;
+// }
+// } else {
+// topicDispatched++;
+// if( topicDispatched >
(topicConsumerInfo.getPrefetchSize()/2) ) {
+// localBroker.oneway(new MessageAck(md,
MessageAck.STANDARD_ACK_TYPE, topicDispatched));
+// topicDispatched=0;
+// }
+// }
} else if(command.isBrokerInfo() ) {
synchronized( this ) {
localBrokerId = ((BrokerInfo)command).getBrokerId();