Author: rajdavies
Date: Fri Apr 21 01:02:19 2006
New Revision: 395810
URL: http://svn.apache.org/viewcvs?rev=395810&view=rev
Log:
put back support for request/reply across networks
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=395810&r1=395809&r2=395810&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Apr 21 01:02:19 2006
@@ -22,6 +22,7 @@
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -31,6 +32,7 @@
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
@@ -219,7 +221,11 @@
+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
-
+ //we want infomation about Destinations as well
+ ConsumerInfo destinationInfo = new
ConsumerInfo(remoteSessionInfo,2);
+
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+ destinationInfo.setPrefetchSize(prefetchSize);
+ remoteBroker.oneway(destinationInfo);
startedLatch.countDown();
}
}
@@ -331,7 +337,34 @@
log.trace("Ignoring sub " + info + " already subscribed to
matching destination");
}
}
- if(data.getClass()==RemoveInfo.class){
+ else if (data.getClass()==DestinationInfo.class){
+// It's a destination info - we want to pass up
+ //infomation about temporary destinations
+ DestinationInfo destInfo = (DestinationInfo) data;
+ BrokerId[] path=destInfo.getBrokerPath();
+ if((path!=null&&path.length>= networkTTL)){
+ if(log.isTraceEnabled())
+ log.trace("Ignoring Subscription " + destInfo + "
restricted to " + networkTTL + " network hops only");
+ return;
+ }
+ if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){
+ // Ignore this consumer as it's a consumer we locally sent to
the broker.
+ if(log.isTraceEnabled())
+ log.trace("Ignoring sub " + destInfo + " already routed
through this broker once");
+ return;
+ }
+
+ destInfo.setConnectionId(localConnectionInfo.getConnectionId());
+ if (destInfo.getDestination() instanceof ActiveMQTempDestination){
+ //re-set connection id so comes from here
+ ActiveMQTempDestination tempDest = (ActiveMQTempDestination)
destInfo.getDestination();
+
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
+ }
+
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
+ localBroker.oneway(destInfo);
+
+ }
+ else if(data.getClass()==RemoveInfo.class){
ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
removeDemandSubscription(id);
}
@@ -761,6 +794,8 @@
protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info)
throws IOException;
protected abstract void serviceRemoteBrokerInfo(Command command) throws
IOException;
+
+ protected abstract BrokerId[] getRemoteBrokerPath();
public String getPassword() {
return password;