Author: chirino
Date: Wed Feb 15 18:49:48 2006
New Revision: 378145
URL: http://svn.apache.org/viewcvs?rev=378145&view=rev
Log:
http://jira.activemq.org/jira/browse/AMQ-505
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=378145&r1=378144&r2=378145&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Wed Feb 15 18:49:48 2006
@@ -81,6 +81,7 @@
BrokerId localBrokerId;
BrokerId remoteBrokerId;
private Object brokerInfoMutex = new Object();
+
private static class DemandSubscription{
ConsumerInfo remoteInfo;
ConsumerInfo localInfo;
@@ -91,11 +92,13 @@
localInfo=info.copy();
}
}
+
ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
private CountDownLatch startedLatch = new CountDownLatch(2);
+ private boolean decreaseNetowrkConsumerPriority;
public DemandForwardingBridge(Transport localBroker,Transport
remoteBroker){
this.localBroker=localBroker;
@@ -289,12 +292,16 @@
.getNextSequenceId()));
sub.localInfo.setDispatchAsync(dispatchAsync);
sub.localInfo.setPrefetchSize(prefetchSize);
- byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
-
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
- // The longer the path to the consumer, the less it's consumer
priority.
- priority-=info.getBrokerPath().length+1;
+
+ if( decreaseNetowrkConsumerPriority ) {
+ byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
+
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
+ // The longer the path to the consumer, the less it's
consumer priority.
+ priority-=info.getBrokerPath().length+1;
+ }
+ sub.localInfo.setPriority(priority);
}
- sub.localInfo.setPriority(priority);
+
subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub);
subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub);
sub.localInfo.setBrokerPath(info.getBrokerPath());
@@ -472,5 +479,13 @@
private void waitStarted() throws InterruptedException {
startedLatch.await();
+ }
+
+ public boolean isDecreaseNetowrkConsumerPriority() {
+ return decreaseNetowrkConsumerPriority;
+ }
+
+ public void setDecreaseNetowrkConsumerPriority(boolean
decreaseNetowrkConsumerPriority) {
+ this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=378145&r1=378144&r2=378145&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Wed Feb 15 18:49:48 2006
@@ -22,7 +22,6 @@
import java.util.Set;
import org.apache.activemq.Service;
-import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@@ -49,8 +48,8 @@
private ConcurrentHashMap bridges = new ConcurrentHashMap();
private Set durableDestinations;
- boolean failover=true;
-
+ private boolean failover=true;
+ private boolean decreaseNetowrkConsumerPriority;
public NetworkConnector(){
@@ -196,6 +195,7 @@
}
}
};
+
result.setDecreaseNetowrkConsumerPriority(isDecreaseNetowrkConsumerPriority());
result.setLocalBrokerName(brokerName);
return result;
}
@@ -240,6 +240,16 @@
*/
public void setDurableDestinations(Set durableDestinations){
this.durableDestinations=durableDestinations;
+ }
+
+
+ public boolean isDecreaseNetowrkConsumerPriority() {
+ return decreaseNetowrkConsumerPriority;
+ }
+
+
+ public void setDecreaseNetowrkConsumerPriority(boolean
decreaseNetowrkConsumerPriority) {
+ this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
}
}