Author: rajdavies
Date: Fri Mar 24 22:55:33 2006
New Revision: 388714
URL: http://svn.apache.org/viewcvs?rev=388714&view=rev
Log:
Fix for http://jira.activemq.org/jira/browse/AMQ-487
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
Fri Mar 24 22:55:33 2006
@@ -136,7 +136,8 @@
}
public Destination addDestination(ConnectionContext context,
ActiveMQDestination destination) throws Exception {
- Destination answer = next.addDestination(context, destination);
+ Destination answer = next.addDestination(context, destination);
+
ActiveMQTopic topic =
AdvisorySupport.getDestinationAdvisoryTopic(destination);
DestinationInfo info = new DestinationInfo(context.getConnectionId(),
DestinationInfo.ADD_OPERATION_TYPE, destination);
fireAdvisory(context, topic, info);
@@ -152,6 +153,21 @@
info.setOperationType(DestinationInfo.REMOVE_OPERATION_TYPE);
fireAdvisory(context, topic, info);
}
+ }
+
+ public void addDestinationInfo(ConnectionContext context,DestinationInfo
info) throws Exception{
+ ActiveMQDestination destination = info.getDestination();
+ next.addDestinationInfo(context, info);
+
+ ActiveMQTopic topic =
AdvisorySupport.getDestinationAdvisoryTopic(destination);
+ fireAdvisory(context, topic, info);
+ destinations.put(destination, info);
+ }
+
+ public void removeDestinationInfo(ConnectionContext
context,DestinationInfo info) throws Exception{
+ next.removeDestinationInfo(context, info);
+ ActiveMQTopic topic =
AdvisorySupport.getDestinationAdvisoryTopic(info.getDestination());
+ fireAdvisory(context, topic, info);
}
public void removeConnection(ConnectionContext context, ConnectionInfo
info, Throwable error) throws Exception {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/AbstractConnection.java
Fri Mar 24 22:55:33 2006
@@ -139,24 +139,24 @@
this.processDispatch(connector.getBrokerInfo());
}
- public void stop() throws Exception {
- if( disposed)
+ public void stop() throws Exception{
+ if(disposed)
return;
-
disposed=true;
//
// Remove all logical connection associated with this connection
// from the broker.
- ArrayList l = new ArrayList(connectionStates.keySet());
- for (Iterator iter = l.iterator(); iter.hasNext();) {
- ConnectionId connectionId = (ConnectionId) iter.next();
- try {
- processRemoveConnection(connectionId);
- } catch (Throwable ignore) {
+ if(!broker.isStopped()){
+ ArrayList l=new ArrayList(connectionStates.keySet());
+ for(Iterator iter=l.iterator();iter.hasNext();){
+ ConnectionId connectionId=(ConnectionId) iter.next();
+ try{
+ processRemoveConnection(connectionId);
+ }catch(Throwable ignore){}
+ }
+ if(brokerInfo!=null){
+ broker.removeBroker(this,brokerInfo);
}
- }
- if (brokerInfo != null){
- broker.removeBroker(this, brokerInfo);
}
}
@@ -364,7 +364,7 @@
public Response processAddDestination(DestinationInfo info) throws
Exception {
ConnectionState cs = lookupConnectionState(info.getConnectionId());
- broker.addDestination(cs.getContext(), info.getDestination());
+ broker.addDestinationInfo(cs.getContext(), info);
if( info.getDestination().isTemporary() ) {
cs.addTempDestination(info.getDestination());
}
@@ -373,7 +373,7 @@
public Response processRemoveDestination(DestinationInfo info) throws
Exception {
ConnectionState cs = lookupConnectionState(info.getConnectionId());
- broker.removeDestination(cs.getContext(), info.getDestination(),
info.getTimeout());
+ broker.removeDestinationInfo(cs.getContext(), info);
if( info.getDestination().isTemporary() ) {
cs.removeTempDestination(info.getDestination());
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/Broker.java
Fri Mar 24 22:55:33 2006
@@ -18,11 +18,13 @@
import java.util.Set;
import org.apache.activemq.Service;
+import org.apache.activemq.broker.region.Destination;
import org.apache.activemq.broker.region.Region;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.MessageDispatchNotification;
import org.apache.activemq.command.ProducerInfo;
@@ -213,5 +215,23 @@
* @return a Set of all durable destinations
*/
public Set getDurableDestinations();
+
+ /**
+ * Add and process a DestinationInfo object
+ * @param context
+ * @param info
+ * @throws Exception
+ */
+ public void addDestinationInfo(ConnectionContext context, DestinationInfo
info) throws Exception;
+
+
+ /**
+ * Remove and process a DestinationInfo object
+ * @param context
+ * @param info
+ * @throws Exception
+ */
+ public void removeDestinationInfo(ConnectionContext context,
DestinationInfo info) throws Exception;
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerFilter.java
Fri Mar 24 22:55:33 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@@ -192,6 +193,16 @@
public Set getDurableDestinations(){
return next.getDurableDestinations();
+ }
+
+ public void addDestinationInfo(ConnectionContext context,DestinationInfo
info) throws Exception{
+ next.addDestinationInfo(context, info);
+
+ }
+
+ public void removeDestinationInfo(ConnectionContext
context,DestinationInfo info) throws Exception{
+ next.removeDestinationInfo(context, info);
+
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/EmptyBroker.java
Fri Mar 24 22:55:33 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@@ -192,4 +193,11 @@
return null;
}
+ public void addDestinationInfo(ConnectionContext context,DestinationInfo
info) throws Exception{
+ }
+
+ public void removeDestinationInfo(ConnectionContext
context,DestinationInfo info) throws Exception{
+ }
+
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ErrorBroker.java
Fri Mar 24 22:55:33 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@@ -188,6 +189,16 @@
public Set getDurableDestinations(){
throw new IllegalStateException(this.message);
+ }
+
+ public void addDestinationInfo(ConnectionContext context,DestinationInfo
info) throws Exception{
+ throw new IllegalStateException(this.message);
+
+ }
+
+ public void removeDestinationInfo(ConnectionContext
context,DestinationInfo info) throws Exception{
+ throw new IllegalStateException(this.message);
+
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/MutableBrokerFilter.java
Fri Mar 24 22:55:33 2006
@@ -24,6 +24,7 @@
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@@ -202,6 +203,16 @@
public Set getDurableDestinations(){
return getNext().getDurableDestinations();
+ }
+
+ public void addDestinationInfo(ConnectionContext context,DestinationInfo
info) throws Exception{
+ getNext().addDestinationInfo(context, info);
+
+ }
+
+ public void removeDestinationInfo(ConnectionContext
context,DestinationInfo info) throws Exception{
+ getNext().removeDestinationInfo(context, info);
+
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
Fri Mar 24 22:55:33 2006
@@ -29,6 +29,7 @@
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.Message;
import org.apache.activemq.command.MessageAck;
import org.apache.activemq.command.MessageDispatch;
@@ -182,8 +183,10 @@
}
public Destination addDestination(ConnectionContext context,
ActiveMQDestination destination) throws Exception {
- if( destinations.contains(destination) )
+ if( destinations.contains(destination) ){
+ System.err.println(brokerService.getBrokerName() + "
SPLATYTTTT!!!!");
throw new JMSException("Destination already exists: "+destination);
+ }
Destination answer = null;
switch(destination.getDestinationType()) {
@@ -229,6 +232,16 @@
}
destinations.remove(destination);
+ }
+
+ public void addDestinationInfo(ConnectionContext context,DestinationInfo
info) throws Exception{
+ addDestination(context,info.getDestination());
+
+ }
+
+ public void removeDestinationInfo(ConnectionContext
context,DestinationInfo info) throws Exception{
+ removeDestination(context,info.getDestination(), info.getTimeout());
+
}
public ActiveMQDestination[] getDestinations() throws Exception {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/ActiveMQTempDestination.java
Fri Mar 24 22:55:33 2006
@@ -71,6 +71,10 @@
public String getConnectionId() {
return connectionId;
}
+
+ public void setConnectionId(String connectionId) {
+ this.connectionId = connectionId;
+ }
public int getSequenceId() {
return sequenceId;
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/CompositeDemandForwardingBridge.java
Fri Mar 24 22:55:33 2006
@@ -99,5 +99,9 @@
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info)
throws IOException {
return new NetworkBridgeFilter(getFromBrokerId(info), networkTTL);
}
+
+ protected BrokerId[] getRemoteBrokerPath(){
+ return remoteBrokerPath;
+ }
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Fri Mar 24 22:55:33 2006
@@ -60,7 +60,7 @@
}
protected void addRemoteBrokerToBrokerPath(ConsumerInfo info) {
-
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),remoteBrokerPath));
+
info.setBrokerPath(appendToBrokerPath(info.getBrokerPath(),getRemoteBrokerPath()));
}
protected void serviceLocalBrokerInfo(Command command) throws
InterruptedException {
@@ -79,5 +79,9 @@
protected NetworkBridgeFilter createNetworkBridgeFilter(ConsumerInfo info)
throws IOException {
return new NetworkBridgeFilter(remoteBrokerPath[0], networkTTL);
+ }
+
+ protected BrokerId[] getRemoteBrokerPath(){
+ return remoteBrokerPath;
}
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Fri Mar 24 22:55:33 2006
@@ -22,6 +22,7 @@
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQTempDestination;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -31,6 +32,7 @@
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.KeepAliveInfo;
import org.apache.activemq.command.Message;
@@ -55,6 +57,7 @@
import org.apache.commons.logging.LogFactory;
import java.io.IOException;
+import javax.jms.TemporaryTopic;
/**
* A useful base class for implementing demand forwarding bridges.
@@ -211,7 +214,13 @@
+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
-
+
+ //we want infomation about Destinations as well
+ ConsumerInfo destinationInfo = new
ConsumerInfo(remoteSessionInfo,2);
+
destinationInfo.setDestination(AdvisorySupport.TEMP_DESTINATION_COMPOSITE_ADVISORY_TOPIC);
+ destinationInfo.setPrefetchSize(prefetchSize);
+ remoteBroker.oneway(destinationInfo);
+
startedLatch.countDown();
}
}
@@ -322,6 +331,32 @@
if(log.isTraceEnabled())
log.trace("Ignoring sub " + info + " already subscribed to
matching destination");
}
+ }else if (data.getClass()==DestinationInfo.class){
+// It's a destination info - we want to pass up
+ //infomation about temporary destinations
+ DestinationInfo destInfo = (DestinationInfo) data;
+ BrokerId[] path=destInfo.getBrokerPath();
+ if((path!=null&&path.length>= networkTTL)){
+ if(log.isTraceEnabled())
+ log.trace("Ignoring Subscription " + destInfo + "
restricted to " + networkTTL + " network hops only");
+ return;
+ }
+ if(contains(destInfo.getBrokerPath(),localBrokerPath[0])){
+ // Ignore this consumer as it's a consumer we locally sent to
the broker.
+ if(log.isTraceEnabled())
+ log.trace("Ignoring sub " + destInfo + " already routed
through this broker once");
+ return;
+ }
+
+ destInfo.setConnectionId(localConnectionInfo.getConnectionId());
+ if (destInfo.getDestination() instanceof ActiveMQTempDestination){
+ //re-set connection id so comes from here
+ ActiveMQTempDestination tempDest = (ActiveMQTempDestination)
destInfo.getDestination();
+
tempDest.setConnectionId(localSessionInfo.getSessionId().getConnectionId());
+ }
+
destInfo.setBrokerPath(appendToBrokerPath(destInfo.getBrokerPath(),getRemoteBrokerPath()));
+ localBroker.oneway(destInfo);
+
}
if(data.getClass()==RemoveInfo.class){
ConsumerId id=(ConsumerId) ((RemoveInfo) data).getObjectId();
@@ -339,7 +374,8 @@
localBroker.oneway(sub.getLocalInfo());
}
}
-
+
+
protected void removeSubscription(DemandSubscription sub) throws
IOException {
if(sub!=null){
subscriptionMapByLocalId.remove(sub.getLocalInfo().getConsumerId());
@@ -732,5 +768,7 @@
protected abstract void addRemoteBrokerToBrokerPath(ConsumerInfo info)
throws IOException;
protected abstract void serviceRemoteBrokerInfo(Command command) throws
IOException;
+
+ protected abstract BrokerId[] getRemoteBrokerPath();
}
Modified:
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
Fri Mar 24 22:55:33 2006
@@ -16,10 +16,16 @@
import java.net.URI;
import javax.jms.Connection;
import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
import javax.jms.MessageProducer;
import javax.jms.Session;
+import javax.jms.TextMessage;
+import javax.jms.TopicRequestor;
+import javax.jms.TopicSession;
import junit.framework.TestCase;
import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
@@ -40,6 +46,37 @@
protected ActiveMQTopic included;
protected ActiveMQTopic excluded;
protected String consumerName="durableSubs";
+
+
+ public void testRequestReply() throws Exception{
+ final MessageProducer
remoteProducer=remoteSession.createProducer(null);
+ MessageConsumer remoteConsumer=remoteSession.createConsumer(included);
+ remoteConsumer.setMessageListener(new MessageListener(){
+ public void onMessage(Message msg){
+ try{
+ TextMessage textMsg=(TextMessage) msg;
+ String payload="REPLY: "+textMsg.getText();
+ Destination replyTo;
+ replyTo=msg.getJMSReplyTo();
+ textMsg.clearBody();
+ textMsg.setText(payload);
+ remoteProducer.send(replyTo,textMsg);
+ }catch(JMSException e){
+ // TODO Auto-generated catch block
+ e.printStackTrace();
+ }
+ }
+ });
+
+ TopicRequestor requestor=new TopicRequestor((TopicSession)
localSession,included);
+ Thread.sleep(2000);//alow for consumer infos to perculate arround
+ for (int i =0;i < MESSAGE_COUNT; i++){
+ TextMessage msg = localSession.createTextMessage("test msg: " +i);
+ TextMessage result = (TextMessage) requestor.request(msg);
+ assertNotNull(result);
+ System.out.println(result.getText());
+ }
+ }
public void testFiltering() throws Exception{
MessageConsumer
includedConsumer=remoteSession.createConsumer(included);
@@ -93,6 +130,8 @@
assertNotNull(remoteConsumer.receive(500));
}
}
+
+
protected void setUp() throws Exception{
super.setUp();
@@ -114,16 +153,19 @@
}
protected void doSetUp() throws Exception{
- Resource resource=new ClassPathResource(getLocalBrokerURI());
+ Resource resource=new ClassPathResource(getRemoteBrokerURI());
BrokerFactoryBean factory=new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
- localBroker=factory.getBroker();
- resource=new ClassPathResource(getRemoteBrokerURI());
+ remoteBroker=factory.getBroker();
+ remoteBroker.start();
+
+ resource=new ClassPathResource(getLocalBrokerURI());
factory=new BrokerFactoryBean(resource);
factory.afterPropertiesSet();
- remoteBroker=factory.getBroker();
+ localBroker=factory.getBroker();
+
localBroker.start();
- remoteBroker.start();
+
URI localURI=localBroker.getVmConnectorURI();
ActiveMQConnectionFactory fac=new ActiveMQConnectionFactory(localURI);
localConnection=fac.createConnection();
Modified:
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/localBroker.xml
Fri Mar 24 22:55:33 2006
@@ -23,7 +23,7 @@
</transportConnectors>
<networkConnectors>
- <networkConnector uri="static://(tcp://localhost:61617)">
+ <networkConnector uri="static:failover:(tcp://localhost:61617)">
dynamicOnly = false
conduitSubscriptions = true
decreaseNetworkConsumerPriority = false
Modified:
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml?rev=388714&r1=388713&r2=388714&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
(original)
+++
incubator/activemq/trunk/activemq-core/src/test/resources/org/apache/activemq/network/remoteBroker.xml
Fri Mar 24 22:55:33 2006
@@ -21,6 +21,9 @@
<transportConnectors>
<transportConnector uri="tcp://localhost:61617"/>
</transportConnectors>
+ <networkConnectors>
+ <networkConnector uri="static:failover:(tcp://localhost:61616)"/>
+ </networkConnectors>
</broker>
</beans>