Author: rajdavies
Date: Fri Feb 17 23:20:16 2006
New Revision: 378700
URL: http://svn.apache.org/viewcvs?rev=378700&view=rev
Log:
Add more options for networks:
included, excluded destinationm filters
durable destinations as well as dynamic
conduit subscriptions (multiple subs on smae matching destination are treated
as one)
networkTTL = number of hops messages/subs can pass through - default = 1
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java?rev=378700&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ConduitBridge.java
Fri Feb 17 23:20:16 2006
@@ -0,0 +1,94 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.filter.DestinationFilter;
+import org.apache.activemq.transport.Transport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Consolidates subscriptions
+ *
+ * @version $Revision: 1.1 $
+ */
+public class ConduitBridge extends DemandForwardingBridge{
+ static final private Log log=LogFactory.getLog(ConduitBridge.class);
+ /**
+ * Constructor
+ * @param localBroker
+ * @param remoteBroker
+ */
+ public ConduitBridge(Transport localBroker,Transport remoteBroker){
+ super(localBroker,remoteBroker);
+ }
+
+ protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+ //search through existing subscriptions and see if we have a match
+ boolean matched = false;
+ DestinationFilter
filter=DestinationFilter.parseFilter(info.getDestination());
+ for (Iterator i = subscriptionMapByLocalId.values().iterator();
i.hasNext();){
+ DemandSubscription ds = (DemandSubscription)i.next();
+ if (filter.matches(ds.getLocalInfo().getDestination())){
+ //add the interest in the subscription
+ ds.add(ds.getRemoteInfo().getConsumerId());
+ matched = true;
+ //continue - we want interest to any existing
DemandSubscriptions
+ }
+ }
+ if (matched){
+ return null; //don't want this subscription added
+ }
+ //not matched so create a new one
+ //but first, if it's durable - changed set the
+ //ConsumerId here - so it won't be removed if the
+ //durable subscriber goes away on the other end
+ if (info.isDurable() || (info.getDestination().isQueue() &&
!info.getDestination().isTemporary())){
+ info.setConsumerId(new
ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
+ .getNextSequenceId()));
+ }
+ return super.createDemandSubscription(info);
+ }
+
+ protected void removeDemandSubscription(ConsumerId id) throws IOException{
+ List tmpList = new ArrayList();
+
+ for (Iterator i = subscriptionMapByLocalId.values().iterator();
i.hasNext();){
+ DemandSubscription ds = (DemandSubscription)i.next();
+ ds.remove(id);
+ if (ds.isEmpty()){
+ tmpList.add(ds);
+ }
+ }
+ for (Iterator i = tmpList.iterator(); i.hasNext();){
+ DemandSubscription ds = (DemandSubscription) i.next();
+
subscriptionMapByLocalId.remove(ds.getRemoteInfo().getConsumerId());
+ removeSubscription(ds);
+ if(log.isTraceEnabled())
+ log.trace("removing sub on "+localBroker+" from
"+remoteBrokerName+" : "+ds.getRemoteInfo());
+ }
+
+ }
+
+}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=378700&r1=378699&r2=378700&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Fri Feb 17 23:20:16 2006
@@ -16,6 +16,7 @@
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -37,9 +38,10 @@
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.BooleanExpression;
+import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
+import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
-import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
@@ -59,46 +61,41 @@
*/
public class DemandForwardingBridge implements Bridge{
static final private Log
log=LogFactory.getLog(DemandForwardingBridge.class);
- private final Transport localBroker;
- private final Transport remoteBroker;
- private IdGenerator idGenerator=new IdGenerator();
- private LongSequenceGenerator consumerIdGenerator=new
LongSequenceGenerator();
- private ConnectionInfo localConnectionInfo;
- private ConnectionInfo remoteConnectionInfo;
- private SessionInfo localSessionInfo;
- private ProducerInfo producerInfo;
- private String localBrokerName;
- private String remoteBrokerName;
- private String localClientId;
- private int prefetchSize=1000;
- private boolean dispatchAsync;
- private String destinationFilter=">";
- private ConsumerInfo demandConsumerInfo;
- private int demandConsumerDispatched;
- private AtomicBoolean localBridgeStarted=new AtomicBoolean(false);
- private AtomicBoolean remoteBridgeStarted=new AtomicBoolean(false);
- private boolean disposed=false;
- BrokerId localBrokerId;
- BrokerId remoteBrokerId;
- private Object brokerInfoMutex = new Object();
-
- private static class DemandSubscription{
- ConsumerInfo remoteInfo;
- ConsumerInfo localInfo;
- int dispatched;
-
- public DemandSubscription(ConsumerInfo info){
- remoteInfo=info;
- localInfo=info.copy();
- }
- }
-
- ConcurrentHashMap subscriptionMapByLocalId=new ConcurrentHashMap();
- ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
+ protected final Transport localBroker;
+ protected final Transport remoteBroker;
+ protected IdGenerator idGenerator=new IdGenerator();
+ protected LongSequenceGenerator consumerIdGenerator=new
LongSequenceGenerator();
+ protected ConnectionInfo localConnectionInfo;
+ protected ConnectionInfo remoteConnectionInfo;
+ protected SessionInfo localSessionInfo;
+ protected ProducerInfo producerInfo;
+ protected String localBrokerName;
+ protected String remoteBrokerName;
+ protected String localClientId;
+ protected int prefetchSize=1000;
+ protected boolean dispatchAsync;
+ protected String destinationFilter=">";
+ protected ConsumerInfo demandConsumerInfo;
+ protected int demandConsumerDispatched;
+ protected AtomicBoolean localBridgeStarted=new AtomicBoolean(false);
+ protected AtomicBoolean remoteBridgeStarted=new AtomicBoolean(false);
+ protected boolean disposed=false;
+ protected BrokerId localBrokerId;
+ protected BrokerId remoteBrokerId;
+ protected ActiveMQDestination[] excludedDestinations;
+ protected ActiveMQDestination[] dynamicallyIncludedDestinations;
+ protected ActiveMQDestination[] staticallyIncludedDestinations;
+ protected ActiveMQDestination[] durableDestinations;
+ protected ConcurrentHashMap subscriptionMapByLocalId=new
ConcurrentHashMap();
+ protected ConcurrentHashMap subscriptionMapByRemoteId=new
ConcurrentHashMap();
protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
- private CountDownLatch startedLatch = new CountDownLatch(2);
- private boolean decreaseNetowrkConsumerPriority;
+ protected CountDownLatch startedLatch = new CountDownLatch(2);
+ protected Object brokerInfoMutex = new Object();
+ protected boolean decreaseNetworkConsumerPriority;
+ protected int networkTTL = 1;
+
+
public DemandForwardingBridge(Transport localBroker,Transport
remoteBroker){
this.localBroker=localBroker;
@@ -107,7 +104,7 @@
public void start() throws Exception{
log.info("Starting a network connection between "+localBroker+" and
"+remoteBroker+" has been established.");
- localBroker.setTransportListener(new TransportListener(){
+ localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command){
serviceLocalCommand(command);
}
@@ -116,7 +113,7 @@
serviceLocalException(error);
}
});
- remoteBroker.setTransportListener(new TransportListener(){
+ remoteBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command){
serviceRemoteCommand(command);
}
@@ -168,6 +165,7 @@
log.info("Network connection between "+localBroker+" and
"+remoteBroker+"("+remoteBrokerName
+") has been established.");
startedLatch.countDown();
+ setupStaticDestinations();
}
}
@@ -195,7 +193,13 @@
startedLatch.countDown();
}
}
+
+
+ /**
+ * stop the bridge
+ * @throws Exception
+ */
public void stop() throws Exception{
if(!disposed){
try{
@@ -274,62 +278,39 @@
// Create a new local subscription
ConsumerInfo info=(ConsumerInfo) data;
BrokerId[] path=info.getBrokerPath();
- if((path!=null&&path.length>0)||info.isNetworkSubscription()){
- // Ignore: We only support directly connected brokers for now.
+ if((path!=null&&path.length>= networkTTL)){
+ if(log.isTraceEnabled())
+ log.trace("Ignoring Subscription " + info + " restricted
to " + networkTTL + " network hops only");
return;
}
if(contains(info.getBrokerPath(),localBrokerPath[0])){
// Ignore this consumer as it's a consumer we locally sent to
the broker.
+ if(log.isTraceEnabled())
+ log.trace("Ignoring sub " + info + " already routed
through this broker once");
+ return;
+ }
+ if (!isPermissableDestination(info.getDestination())){
+ //ignore if not in the permited or in the excluded list
+ if(log.isTraceEnabled())
+ log.trace("Ignoring sub " + info + " destination " +
info.getDestination() + " is not permiited");
return;
}
- if(log.isTraceEnabled())
- log.trace("Forwarding sub on "+localBroker+" from
"+remoteBrokerName+" : "+info);
// Update the packet to show where it came from.
info=info.copy();
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath));
- DemandSubscription sub=new DemandSubscription(info);
- sub.localInfo.setConsumerId(new
ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
- .getNextSequenceId()));
- sub.localInfo.setDispatchAsync(dispatchAsync);
- sub.localInfo.setPrefetchSize(prefetchSize);
-
- if( decreaseNetowrkConsumerPriority ) {
- byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
-
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
- // The longer the path to the consumer, the less it's
consumer priority.
- priority-=info.getBrokerPath().length+1;
- }
- sub.localInfo.setPriority(priority);
+ DemandSubscription sub=createDemandSubscription(info);
+ if (sub != null){
+ addSubscription(sub);
+ if(log.isTraceEnabled())
+ log.trace("Forwarding sub on "+localBroker+" from
"+remoteBrokerName+" : "+info);
+ }else {
+ if(log.isTraceEnabled())
+ log.trace("Ignoring sub " + info + " already subscribed to
matching destination");
}
-
- subscriptionMapByLocalId.put(sub.localInfo.getConsumerId(),sub);
- subscriptionMapByRemoteId.put(sub.remoteInfo.getConsumerId(),sub);
- sub.localInfo.setBrokerPath(info.getBrokerPath());
- sub.localInfo.setNetworkSubscription(true);
- // This works for now since we use a VM connection to the local
broker.
- // may need to change if we ever subscribe to a remote broker.
- sub.localInfo.setAdditionalPredicate(new BooleanExpression(){
- public boolean matches(MessageEvaluationContext message)
throws JMSException{
- try{
- return matchesForwardingFilter(message.getMessage());
- }catch(IOException e){
- throw JMSExceptionSupport.create(e);
- }
- }
-
- public Object evaluate(MessageEvaluationContext message)
throws JMSException{
- return matches(message)?Boolean.TRUE:Boolean.FALSE;
- }
- });
- localBroker.oneway(sub.localInfo);
}
if(data.getClass()==RemoveInfo.class){
ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
- DemandSubscription sub=(DemandSubscription)
subscriptionMapByRemoteId.remove(id);
- if(sub!=null){
- subscriptionMapByLocalId.remove(sub.localInfo.getConsumerId());
- localBroker.oneway(sub.localInfo.createRemoveCommand());
- }
+ removeDemandSubscription(id);
}
}
@@ -337,20 +318,39 @@
log.info("Network connection between "+localBroker+" and
"+remoteBroker+" shutdown: "+error.getMessage(),error);
ServiceSupport.dispose(this);
}
-
- boolean matchesForwardingFilter(Message message){
-
if(message.isRecievedByDFBridge()||contains(message.getBrokerPath(),remoteBrokerPath[0]))
- return false;
- // Don't propagate advisory messages about network subscriptions
- if(message.isAdvisory()&&message.getDataStructure()!=null
-
&&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
- ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
- if(info.isNetworkSubscription()){
- return false;
- }
+
+ protected void addSubscription(DemandSubscription sub) throws IOException{
+ if (sub != null){
+ localBroker.oneway(sub.getLocalInfo());
}
- return true;
}
+
+ protected void removeSubscription(DemandSubscription sub) throws
IOException{
+ if(sub!=null){
+
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
+ localBroker.oneway(sub.getLocalInfo().createRemoveCommand());
+ }
+ }
+
+ protected DemandSubscription getDemandSubscription(MessageDispatch md){
+ return (DemandSubscription)
subscriptionMapByLocalId.get(md.getConsumerId());
+ }
+
+ protected Message configureMessage(MessageDispatch md){
+ Message message=md.getMessage().copy();
+ // Update the packet to show where it came from.
+
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath));
+ message.setProducerId(producerInfo.getProducerId());
+ message.setDestination(md.getDestination());
+ if(message.getOriginalTransactionId()==null)
+ message.setOriginalTransactionId(message.getTransactionId());
+ message.setTransactionId(null);
+ message.setRecievedByDFBridge(true);
+ message.evictMarshlledForm();
+ return message;
+ }
+
+
protected void serviceLocalCommand(Command command){
if(!disposed){
@@ -359,22 +359,12 @@
if(command.isMessageDispatch()){
waitStarted();
MessageDispatch md=(MessageDispatch) command;
- Message message=md.getMessage();
DemandSubscription sub=(DemandSubscription)
subscriptionMapByLocalId.get(md.getConsumerId());
if(sub!=null){
- message=message.copy();
- // Update the packet to show where it came from.
-
message.setBrokerPath(appendToBrokerPath(message.getBrokerPath(),localBrokerPath));
- message.setProducerId(producerInfo.getProducerId());
- message.setDestination(md.getDestination());
- if(message.getOriginalTransactionId()==null)
-
message.setOriginalTransactionId(message.getTransactionId());
- message.setTransactionId(null);
- message.setRecievedByDFBridge(true);
- message.evictMarshlledForm();
+ Message message= configureMessage(md);
if(trace)
log.trace("bridging "+localBrokerName+" ->
"+remoteBrokerName+": "+message);
-
if(!message.isPersistent()||!sub.remoteInfo.isDurable()){
+
if(!message.isPersistent()||!sub.getRemoteInfo().isDurable()){
remoteBroker.oneway(message);
}else{
Response response=remoteBroker.request(message);
@@ -383,10 +373,10 @@
serviceLocalException(er.getException());
}
}
- sub.dispatched++;
-
if(sub.dispatched>(sub.localInfo.getPrefetchSize()*.75)){
- localBroker.oneway(new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,sub.dispatched));
- sub.dispatched=0;
+ int dispatched = sub.incrementDispatched();
+
if(dispatched>(sub.getLocalInfo().getPrefetchSize()*.75)){
+ localBroker.oneway(new
MessageAck(md,MessageAck.STANDARD_ACK_TYPE,dispatched));
+ sub.setDispatched(0);
}
}
}else if(command.isBrokerInfo()){
@@ -419,28 +409,91 @@
}
}
+ /**
+ * @return prefetch size
+ */
public int getPrefetchSize(){
return prefetchSize;
}
+
+ /**
+ * @param prefetchSize
+ */
public void setPrefetchSize(int prefetchSize){
this.prefetchSize=prefetchSize;
}
+
+ /**
+ * @return true if dispatch async
+ */
public boolean isDispatchAsync(){
return dispatchAsync;
}
+ /**
+ * @param dispatchAsync
+ */
public void setDispatchAsync(boolean dispatchAsync){
this.dispatchAsync=dispatchAsync;
}
+
+ /**
+ * @return Returns the dynamicallyIncludedDestinations.
+ */
+ public ActiveMQDestination[] getDynamicallyIncludedDestinations(){
+ return dynamicallyIncludedDestinations;
+ }
+
+ /**
+ * @param dynamicallyIncludedDestinations The
dynamicallyIncludedDestinations to set.
+ */
+ public void setDynamicallyIncludedDestinations(ActiveMQDestination[]
dynamicallyIncludedDestinations){
+ this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
+ }
+
+ /**
+ * @return Returns the excludedDestinations.
+ */
+ public ActiveMQDestination[] getExcludedDestinations(){
+ return excludedDestinations;
+ }
+
+ /**
+ * @param excludedDestinations The excludedDestinations to set.
+ */
+ public void setExcludedDestinations(ActiveMQDestination[]
excludedDestinations){
+ this.excludedDestinations=excludedDestinations;
+ }
- public String getDestinationFilter(){
- return destinationFilter;
+ /**
+ * @return Returns the staticallyIncludedDestinations.
+ */
+ public ActiveMQDestination[] getStaticallyIncludedDestinations(){
+ return staticallyIncludedDestinations;
}
- public void setDestinationFilter(String destinationFilter){
- this.destinationFilter=destinationFilter;
+ /**
+ * @param staticallyIncludedDestinations The
staticallyIncludedDestinations to set.
+ */
+ public void setStaticallyIncludedDestinations(ActiveMQDestination[]
staticallyIncludedDestinations){
+ this.staticallyIncludedDestinations=staticallyIncludedDestinations;
+ }
+
+
+ /**
+ * @return Returns the durableDestinations.
+ */
+ public ActiveMQDestination[] getDurableDestinations(){
+ return durableDestinations;
+ }
+
+ /**
+ * @param durableDestinations The durableDestinations to set.
+ */
+ public void setDurableDestinations(ActiveMQDestination[]
durableDestinations){
+ this.durableDestinations=durableDestinations;
}
/**
@@ -457,6 +510,64 @@
public void setLocalBrokerName(String localBrokerName){
this.localBrokerName=localBrokerName;
}
+
+ /**
+ * @return Returns the localBroker.
+ */
+ public Transport getLocalBroker(){
+ return localBroker;
+ }
+
+ /**
+ * @return Returns the remoteBroker.
+ */
+ public Transport getRemoteBroker(){
+ return remoteBroker;
+ }
+
+
+ /**
+ * @return Returns the remoteBrokerName.
+ */
+ public String getRemoteBrokerName(){
+ return remoteBrokerName;
+ }
+
+ /**
+ * @param remoteBrokerName The remoteBrokerName to set.
+ */
+ public void setRemoteBrokerName(String remoteBrokerName){
+ this.remoteBrokerName=remoteBrokerName;
+ }
+
+ /**
+ * @return Returns the decreaseNetworkConsumerPriority.
+ */
+ public boolean isDecreaseNetworkConsumerPriority(){
+ return decreaseNetworkConsumerPriority;
+ }
+
+ /**
+ * @param decreaseNetworkConsumerPriority The
decreaseNetworkConsumerPriority to set.
+ */
+ public void setDecreaseNetworkConsumerPriority(boolean
decreaseNetworkConsumerPriority){
+ this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
+ }
+
+ /**
+ * @return Returns the networkTTL.
+ */
+ public int getNetworkTTL(){
+ return networkTTL;
+ }
+
+ /**
+ * @param networkTTL The networkTTL to set.
+ */
+ public void setNetworkTTL(int networkTTL){
+ this.networkTTL=networkTTL;
+ }
+
private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){
if(brokerPath!=null){
@@ -477,15 +588,143 @@
return rc;
}
- private void waitStarted() throws InterruptedException {
- startedLatch.await();
+
+ protected boolean isPermissableDestination(ActiveMQDestination
destination){
+ DestinationFilter filter=DestinationFilter.parseFilter(destination);
+ ActiveMQDestination[] dests = excludedDestinations;
+ if(dests!=null&&dests.length>0){
+ for(int i=0;i<dests.length;i++){
+ ActiveMQDestination match=dests[i];
+ if(match!=null&&filter.matches(match)){
+ return false;
+ }
+ }
+ }
+ dests = dynamicallyIncludedDestinations;
+ if(dests!=null&&dests.length>0){
+ for(int i=0;i<dests.length;i++){
+ ActiveMQDestination match=dests[i];
+ if(match!=null&&filter.matches(match)){
+ return true;
+ }
+ }
+ return false;
+ }
+
+ return true;
}
-
- public boolean isDecreaseNetowrkConsumerPriority() {
- return decreaseNetowrkConsumerPriority;
+
+ /**
+ * Subscriptions for these desitnations are always created
+ * @throws IOException
+ *
+ */
+ protected void setupStaticDestinations() throws IOException{
+ ActiveMQDestination[] dests = staticallyIncludedDestinations;
+ if (dests != null){
+ for(int i=0;i<dests.length;i++){
+ ActiveMQDestination dest=dests[i];
+ DemandSubscription sub = createDemandSubscription(dest);
+ addSubscription(sub);
+ if(log.isTraceEnabled())
+ log.trace("Forwarding messages for static destination: " +
dest);
+ }
+ }
}
+
+ protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+ DemandSubscription result=new DemandSubscription(info);
+ result.getLocalInfo().setConsumerId(new
ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
+ .getNextSequenceId()));
+
+ if( decreaseNetworkConsumerPriority ) {
+ byte priority=ConsumerInfo.NETWORK_CONSUMER_PRIORITY;
+
if(priority>Byte.MIN_VALUE&&info.getBrokerPath()!=null&&info.getBrokerPath().length>1){
+ // The longer the path to the consumer, the less it's consumer
priority.
+ priority-=info.getBrokerPath().length+1;
+ }
+ result.getLocalInfo().setPriority(priority);
+ }
+ configureDemandSubscription(result);
+ return result;
+ }
+
+ protected DemandSubscription createDemandSubscription(ActiveMQDestination
destination){
+ ConsumerInfo info = new ConsumerInfo();
+ info.setDestination(destination);
+ //the remote info held by the DemandSubscription holds the original
consumerId,
+ //the local info get's overwritten
+ info.setConsumerId(new
ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
+ .getNextSequenceId()));
+ DemandSubscription result=new DemandSubscription(info);
+
result.getLocalInfo().setPriority(ConsumerInfo.NETWORK_CONSUMER_PRIORITY);
+
+ return result;
+ }
+
+ protected void configureDemandSubscription(DemandSubscription sub){
+ sub.getLocalInfo().setDispatchAsync(dispatchAsync);
+ sub.getLocalInfo().setPrefetchSize(prefetchSize);
+ subscriptionMapByLocalId.put(sub.getLocalInfo().getConsumerId(),sub);
+ subscriptionMapByRemoteId.put(sub.getRemoteInfo().getConsumerId(),sub);
+
+ // This works for now since we use a VM connection to the local broker.
+ // may need to change if we ever subscribe to a remote broker.
+ sub.getLocalInfo().setAdditionalPredicate(new BooleanExpression(){
+ public boolean matches(MessageEvaluationContext message) throws
JMSException{
+ try{
+ return matchesForwardingFilter(message.getMessage());
+ }catch(IOException e){
+ throw JMSExceptionSupport.create(e);
+ }
+ }
- public void setDecreaseNetowrkConsumerPriority(boolean
decreaseNetowrkConsumerPriority) {
- this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
+ public Object evaluate(MessageEvaluationContext message) throws
JMSException{
+ return matches(message)?Boolean.TRUE:Boolean.FALSE;
+ }
+ });
+ }
+
+ protected void removeDemandSubscription(ConsumerId id) throws IOException{
+ DemandSubscription sub=(DemandSubscription)
subscriptionMapByRemoteId.remove(id);
+ if (sub != null){
+ removeSubscription(sub);
+ if(log.isTraceEnabled())
+ log.trace("removing sub on "+localBroker+" from
"+remoteBrokerName+" : "+sub.getRemoteInfo());
+ }
+ }
+
+ protected boolean matchesForwardingFilter(Message message){
+ if (contains(message.getBrokerPath(),remoteBrokerPath[0])){
+ if (log.isTraceEnabled()){
+ log.trace("Message all ready routed once through this broker -
ignoring: " + message);
+ }
+ }
+ int hops = message.getBrokerPath() == null ? 0 :
message.getBrokerPath().length;
+ if(hops >= networkTTL){
+ if (log.isTraceEnabled()){
+ log.trace("Message restricted to " + networkTTL + " network
hops ignoring: " + message);
+ }
+ return false;
+ }
+ // Don't propagate advisory messages about network subscriptions
+ if(message.isAdvisory()&&message.getDataStructure()!=null
+
&&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
+ ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
+ if(info.isNetworkSubscription()){
+ return false;
+ }
+ }
+ return true;
}
+
+ protected void waitStarted() throws InterruptedException {
+ startedLatch.await();
+ }
+
+
+
+
+
+
}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java?rev=378700&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandSubscription.java
Fri Feb 17 23:20:16 2006
@@ -0,0 +1,122 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.util.Set;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicInteger;
+
+
+
+/**
+ * Represents a network bridge interface
+ *
+ * @version $Revision: 1.1 $
+ */
+public class DemandSubscription{
+ private ConsumerInfo remoteInfo;
+ private ConsumerInfo localInfo;
+ private Set remoteSubsIds = new CopyOnWriteArraySet();
+ private AtomicInteger dispatched = new AtomicInteger(0);
+
+ DemandSubscription(ConsumerInfo info){
+ remoteInfo=info;
+ localInfo=info.copy();
+ localInfo.setBrokerPath(info.getBrokerPath());
+ localInfo.setNetworkSubscription(true);
+ remoteSubsIds.add(info.getConsumerId());
+ }
+
+ /**
+ * Increment the consumers associated with this subscription
+ * @param id
+ * @return true if added
+ */
+ public boolean add(ConsumerId id){
+ return remoteSubsIds.add(id);
+ }
+
+ /**
+ * Increment the consumers associated with this subscription
+ * @param id
+ * @return true if added
+ */
+ public boolean remove(ConsumerId id){
+ return remoteSubsIds.remove(id);
+ }
+
+ /**
+ * @return true if there are no interested consumers
+ */
+ public boolean isEmpty(){
+ return remoteSubsIds.isEmpty();
+ }
+
+
+ /**
+ * @return Returns the dispatched.
+ */
+ public int getDispatched(){
+ return dispatched.get();
+ }
+
+ /**
+ * @param dispatched The dispatched to set.
+ */
+ public void setDispatched(int dispatched){
+ this.dispatched.set(dispatched);
+ }
+
+ /**
+ * @return dispatched count after incremented
+ */
+ public int incrementDispatched(){
+ return dispatched.incrementAndGet();
+ }
+
+ /**
+ * @return Returns the localInfo.
+ */
+ public ConsumerInfo getLocalInfo(){
+ return localInfo;
+ }
+
+ /**
+ * @param localInfo The localInfo to set.
+ */
+ public void setLocalInfo(ConsumerInfo localInfo){
+ this.localInfo=localInfo;
+ }
+
+ /**
+ * @return Returns the remoteInfo.
+ */
+ public ConsumerInfo getRemoteInfo(){
+ return remoteInfo;
+ }
+
+ /**
+ * @param remoteInfo The remoteInfo to set.
+ */
+ public void setRemoteInfo(ConsumerInfo remoteInfo){
+ this.remoteInfo=remoteInfo;
+ }
+
+}
Added:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java?rev=378700&view=auto
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
(added)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DurableConduitBridge.java
Fri Feb 17 23:20:16 2006
@@ -0,0 +1,63 @@
+/**
+ *
+ * Copyright 2005-2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import java.io.IOException;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.transport.Transport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+
+/**
+ * Consolidates subscriptions
+ *
+ * @version $Revision: 1.1 $
+ */
+public class DurableConduitBridge extends ConduitBridge{
+ static final private Log log=LogFactory.getLog(DurableConduitBridge.class);
+ /**
+ * Constructor
+ * @param localBroker
+ * @param remoteBroker
+ */
+ public DurableConduitBridge(Transport localBroker,Transport remoteBroker){
+ super(localBroker,remoteBroker);
+ }
+
+ /**
+ * Subscriptions for these desitnations are always created
+ * @throws IOException
+ *
+ */
+ protected void setupStaticDestinations() throws IOException{
+ super.setupStaticDestinations();
+ ActiveMQDestination[] dests=durableDestinations;
+ if(dests!=null){
+ for(int i=0;i<dests.length;i++){
+ ActiveMQDestination dest=dests[i];
+ if(isPermissableDestination(dest)){
+ DemandSubscription sub=createDemandSubscription(dest);
+ addSubscription(sub);
+ if(log.isTraceEnabled())
+ log.trace("Forwarding messages for durable
destination: "+dest);
+ }
+ }
+ }
+ }
+
+}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java?rev=378700&r1=378699&r2=378700&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/ForwardingBridge.java
Fri Feb 17 23:20:16 2006
@@ -32,6 +32,7 @@
import org.apache.activemq.command.ProducerInfo;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
@@ -80,7 +81,7 @@
public void start() throws Exception {
log.info("Starting a network connection between " + localBroker + "
and " + remoteBroker + " has been established.");
- localBroker.setTransportListener(new TransportListener(){
+ localBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command) {
serviceLocalCommand(command);
}
@@ -89,7 +90,7 @@
}
});
- remoteBroker.setTransportListener(new TransportListener(){
+ remoteBroker.setTransportListener(new DefaultTransportListener(){
public void onCommand(Command command) {
serviceRemoteCommand(command);
}
@@ -192,7 +193,7 @@
}
}
} else {
- System.out.println("Unexpected remote command: "+command);
+ log.warn("Unexpected remote command: "+command);
}
} catch (IOException e) {
serviceLocalException(e);
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java?rev=378700&r1=378699&r2=378700&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/NetworkConnector.java
Fri Feb 17 23:20:16 2006
@@ -22,6 +22,8 @@
import java.util.Set;
import org.apache.activemq.Service;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.DiscoveryEvent;
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
@@ -49,7 +51,14 @@
private ConcurrentHashMap bridges = new ConcurrentHashMap();
private Set durableDestinations;
private boolean failover=true;
- private boolean decreaseNetowrkConsumerPriority;
+ private ActiveMQDestination[] excludedDestinations;
+ private ActiveMQDestination[] dynamicallyIncludedDestinations;
+ private ActiveMQDestination[] staticallyIncludedDestinations;
+ private boolean dynamicOnly = false;
+ private boolean conduitSubscriptions = true;
+ private boolean decreaseNetworkConsumerPriority;
+ private int networkTTL = 1;
+
public NetworkConnector(){
@@ -182,24 +191,6 @@
setDiscoveryAgent(DiscoveryAgentFactory.createDiscoveryAgent(discoveryURI));
}
- // Implementation methods
- //
-------------------------------------------------------------------------
- protected Bridge createBridge(Transport localTransport, Transport
remoteTransport, final DiscoveryEvent event) {
- DemandForwardingBridge result = new
DemandForwardingBridge(localTransport, remoteTransport) {
- protected void serviceRemoteException(IOException error) {
- super.serviceRemoteException(error);
- try {
- // Notify the discovery agent that the remote broker
failed.
- discoveryAgent.serviceFailed(event);
- } catch (IOException e) {
- }
- }
- };
-
result.setDecreaseNetowrkConsumerPriority(isDecreaseNetowrkConsumerPriority());
- result.setLocalBrokerName(brokerName);
- return result;
- }
-
public boolean isFailover() {
@@ -243,13 +234,167 @@
}
- public boolean isDecreaseNetowrkConsumerPriority() {
- return decreaseNetowrkConsumerPriority;
+ /**
+ * @return Returns the dynamicallyIncludedDestinations.
+ */
+ public ActiveMQDestination[] getDynamicallyIncludedDestinations(){
+ return dynamicallyIncludedDestinations;
+ }
+
+
+ /**
+ * @param dynamicallyIncludedDestinations The
dynamicallyIncludedDestinations to set.
+ */
+ public void setDynamicallyIncludedDestinations(ActiveMQDestination[]
dynamicallyIncludedDestinations){
+ this.dynamicallyIncludedDestinations=dynamicallyIncludedDestinations;
+ }
+
+
+ /**
+ * @return Returns the dynamicOnly.
+ */
+ public boolean isDynamicOnly(){
+ return dynamicOnly;
+ }
+
+
+ /**
+ * @param dynamicOnly The dynamicOnly to set.
+ */
+ public void setDynamicOnly(boolean dynamicOnly){
+ this.dynamicOnly=dynamicOnly;
+ }
+
+ /**
+ * @return Returns the conduitSubscriptions.
+ */
+ public boolean isConduitSubscriptions(){
+ return conduitSubscriptions;
+ }
+
+
+ /**
+ * @param conduitSubscriptions The conduitSubscriptions to set.
+ */
+ public void setConduitSubscriptions(boolean conduitSubscriptions){
+ this.conduitSubscriptions=conduitSubscriptions;
+ }
+
+ /**
+ * @return Returns the decreaseNetworkConsumerPriority.
+ */
+ public boolean isDecreaseNetworkConsumerPriority(){
+ return decreaseNetworkConsumerPriority;
+ }
+
+ /**
+ * @param decreaseNetworkConsumerPriority The
decreaseNetworkConsumerPriority to set.
+ */
+ public void setDecreaseNetworkConsumerPriority(boolean
decreaseNetworkConsumerPriority){
+ this.decreaseNetworkConsumerPriority=decreaseNetworkConsumerPriority;
+ }
+
+ /**
+ * @return Returns the networkTTL.
+ */
+ public int getNetworkTTL(){
+ return networkTTL;
+ }
+
+ /**
+ * @param networkTTL The networkTTL to set.
+ */
+ public void setNetworkTTL(int networkTTL){
+ this.networkTTL=networkTTL;
+ }
+
+
+ /**
+ * @return Returns the excludedDestinations.
+ */
+ public ActiveMQDestination[] getExcludedDestinations(){
+ return excludedDestinations;
+ }
+
+
+ /**
+ * @param excludedDestinations The excludedDestinations to set.
+ */
+ public void setExcludedDestinations(ActiveMQDestination[]
exludedDestinations){
+ this.excludedDestinations=exludedDestinations;
+ }
+
+
+ /**
+ * @return Returns the staticallyIncludedDestinations.
+ */
+ public ActiveMQDestination[] getStaticallyIncludedDestinations(){
+ return staticallyIncludedDestinations;
}
- public void setDecreaseNetowrkConsumerPriority(boolean
decreaseNetowrkConsumerPriority) {
- this.decreaseNetowrkConsumerPriority = decreaseNetowrkConsumerPriority;
+ /**
+ * @param staticallyIncludedDestinations The
staticallyIncludedDestinations to set.
+ */
+ public void setStaticallyIncludedDestinations(ActiveMQDestination[]
staticallyIncludedDestinations){
+ this.staticallyIncludedDestinations=staticallyIncludedDestinations;
}
+
+
+
+
+ // Implementation methods
+ //
-------------------------------------------------------------------------
+ protected Bridge createBridge(Transport localTransport, Transport
remoteTransport, final DiscoveryEvent event) {
+ DemandForwardingBridge result = null;
+ if (conduitSubscriptions){
+ if (dynamicOnly){
+ result = new ConduitBridge(localTransport, remoteTransport) {
+ protected void serviceRemoteException(IOException error) {
+ super.serviceRemoteException(error);
+ try {
+ // Notify the discovery agent that the remote
broker failed.
+ discoveryAgent.serviceFailed(event);
+ } catch (IOException e) {
+ }
+ }
+ };
+ }else {
+ result = new DurableConduitBridge(localTransport,
remoteTransport) {
+ protected void serviceRemoteException(IOException error) {
+ super.serviceRemoteException(error);
+ try {
+ // Notify the discovery agent that the remote
broker failed.
+ discoveryAgent.serviceFailed(event);
+ } catch (IOException e) {
+ }
+ }
+ };
+ }
+ }else {
+ result = new DemandForwardingBridge(localTransport, remoteTransport) {
+ protected void serviceRemoteException(IOException error) {
+ super.serviceRemoteException(error);
+ try {
+ // Notify the discovery agent that the remote broker
failed.
+ discoveryAgent.serviceFailed(event);
+ } catch (IOException e) {
+ }
+ }
+ };
+ }
+ result.setLocalBrokerName(brokerName);
+ result.setNetworkTTL(getNetworkTTL());
+
result.setDecreaseNetworkConsumerPriority(isDecreaseNetworkConsumerPriority());
+
result.setDynamicallyIncludedDestinations(getDynamicallyIncludedDestinations());
+ result.setExcludedDestinations(getExcludedDestinations());
+
result.setStaticallyIncludedDestinations(getStaticallyIncludedDestinations());
+ if (durableDestinations != null){
+ ActiveMQDestination[] dest = new
ActiveMQDestination[durableDestinations.size()];
+ dest = (ActiveMQDestination[]) durableDestinations.toArray(dest);
+ result.setDurableDestinations(dest);
+ }
+ return result;
+ }
}