Author: rajdavies
Date: Fri May 16 10:44:21 2008
New Revision: 657147

URL: http://svn.apache.org/viewvc?rev=657147&view=rev
Log:
patch for https://issues.apache.org/activemq/browse/AMQ-1661

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=657147&r1=657146&r2=657147&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 Fri May 16 10:44:21 2008
@@ -388,7 +388,7 @@
         }
     }
 
-    protected void serviceRemoteCommand(Command command) {
+    protected void serviceRemoteCommand(Command command) {             
         if (!disposed) {
             try {
                 if (command.isMessageDispatch()) {
@@ -580,9 +580,20 @@
                     final MessageDispatch md = (MessageDispatch)command;
                     DemandSubscription sub = 
subscriptionMapByLocalId.get(md.getConsumerId());
                     if (sub != null && md.getMessage()!=null) {
+                       
+                         // See if this consumer's brokerPath tells us it came 
from the broker at the other end
+                         // of the bridge. I think we should be making this 
decision based on the message's
+                         // broker bread crumbs and not the consumer's? 
However, the message's broker bread
+                         // crumbs are null, which is another matter.   
+                         boolean cameFromRemote = false;
+                        Object consumerInfo = 
md.getMessage().getDataStructure(); 
+                        if( consumerInfo != null && (consumerInfo instanceof 
ConsumerInfo) )                                             
+                           cameFromRemote = contains( 
((ConsumerInfo)consumerInfo).getBrokerPath(),remoteBrokerInfo.getBrokerId());   
                                                    
+                                                                       
                         Message message = configureMessage(md);
                         if (trace) {
                             LOG.trace("bridging " + 
configuration.getBrokerName() + " -> " + remoteBrokerName + ": " + message);
+                            LOG.trace("cameFromRemote = "+cameFromRemote);    
                         }
 
                         if (!message.isResponseRequired() || isDuplex()) {
@@ -591,9 +602,16 @@
                             // send, we will preserve that QOS
                             // by bridging it using an async send (small chance
                             // of message loss).
-                            remoteBroker.oneway(message);
+                            
+                            // Don't send it off to the remote if it 
originally came from the remote. 
+                            if( !cameFromRemote ) {
+                               remoteBroker.oneway(message);
+                              }
+                            else{
+                              LOG.info("Message not forwarded on to remote, 
because message came from remote");                               
+                            }
                             localBroker.oneway(new MessageAck(md, 
MessageAck.STANDARD_ACK_TYPE, 1));
-                            dequeueCounter.incrementAndGet();
+                            dequeueCounter.incrementAndGet();                  
        
 
                         } else {
 


Reply via email to