Author: tabish
Date: Fri Sep 30 19:35:28 2011
New Revision: 1177797

URL: http://svn.apache.org/viewvc?rev=1177797&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3014

Added:
    
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
   (with props)
Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1177797&r1=1177796&r2=1177797&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
 Fri Sep 30 19:35:28 2011
@@ -81,8 +81,8 @@ import org.slf4j.LoggerFactory;
 
 /**
  * A useful base class for implementing demand forwarding bridges.
- * 
- * 
+ *
+ *
  */
 public abstract class DemandForwardingBridgeSupport implements NetworkBridge, 
BrokerServiceAware {
     private static final Logger LOG = 
LoggerFactory.getLogger(DemandForwardingBridgeSupport.class);
@@ -231,7 +231,7 @@ public abstract class DemandForwardingBr
                 } catch (IOException e) {
                     LOG.warn("Caught exception from remote start", e);
                 }
-           } else {
+            } else {
                 LOG.warn ("Bridge was disposed before the start() method was 
fully executed.");
                 throw new TransportDisposedIOException();
             }
@@ -339,6 +339,7 @@ public abstract class DemandForwardingBr
                     IntrospectionSupport.getProperties(configuration, props, 
null);
                     String str = MarshallingSupport.propertiesToString(props);
                     brokerInfo.setNetworkProperties(str);
+                    localBrokerIdKnownLatch.await();
                     brokerInfo.setBrokerId(this.localBrokerId);
                     remoteBroker.oneway(brokerInfo);
                 }
@@ -487,7 +488,7 @@ public abstract class DemandForwardingBr
                     if (isDuplex()) {
                         if (command.isMessage()) {
                             ActiveMQMessage message = (ActiveMQMessage) 
command;
-                            if 
(AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination()) 
+                            if 
(AdvisorySupport.isConsumerAdvisoryTopic(message.getDestination())
                                 || 
AdvisorySupport.isDestinationAdvisoryTopic(message.getDestination())) {
                                 
serviceRemoteConsumerAdvisory(message.getDataStructure());
                             } else {
@@ -708,7 +709,7 @@ public abstract class DemandForwardingBr
                     final MessageDispatch md = (MessageDispatch) command;
                     final DemandSubscription sub = 
subscriptionMapByLocalId.get(md.getConsumerId());
                     if (sub != null && md.getMessage() != null && 
sub.incrementOutstandingResponses()) {
-                        
+
                         if (suppressMessageDispatch(md, sub)) {
                             if (LOG.isDebugEnabled()) {
                                 LOG.debug(configuration.getBrokerName() + " 
message not forwarded to " + remoteBrokerName + " because message came from 
there or fails networkTTL, brokerPath: " + 
Arrays.toString(md.getMessage().getBrokerPath()) + ", message: " + 
md.getMessage());
@@ -721,14 +722,14 @@ public abstract class DemandForwardingBr
                             }
                             return;
                         }
-                        
+
                         Message message = configureMessage(md);
                         if (LOG.isDebugEnabled()) {
                             LOG.debug("bridging (" + 
configuration.getBrokerName() + " -> " + remoteBrokerName + ") " + 
message.getMessageId() + ", consumer: " + md.getConsumerId() + ", destination " 
+ message.getDestination() + ", brokerPath: " + 
Arrays.toString(message.getBrokerPath()) + ", message: " + message);
                         }
-                        
+
                         if (!message.isResponseRequired()) {
-                            
+
                             // If the message was originally sent using async
                             // send, we will preserve that QOS
                             // by bridging it using an async send (small chance
@@ -740,9 +741,9 @@ public abstract class DemandForwardingBr
                             } finally {
                                 sub.decrementOutstandingResponses();
                             }
-                            
+
                         } else {
-                            
+
                             // The message was not sent using async send, so we
                             // should only ack the local
                             // broker when we get confirmation that the remote
@@ -757,7 +758,7 @@ public abstract class DemandForwardingBr
                                         } else {
                                             localBroker.oneway(new 
MessageAck(md, MessageAck.INDIVIDUAL_ACK_TYPE, 1));
                                             dequeueCounter.incrementAndGet();
-                                        }   
+                                        }
                                     } catch (IOException e) {
                                         serviceLocalException(e);
                                     } finally {
@@ -765,9 +766,9 @@ public abstract class DemandForwardingBr
                                     }
                                 }
                             };
-                            
+
                             remoteBroker.asyncRequest(message, callback);
-                            
+
                         }
                     } else {
                         if (LOG.isDebugEnabled()) {
@@ -1042,7 +1043,7 @@ public abstract class DemandForwardingBr
         }
 
         List<ConsumerId> candidateConsumers = 
consumerInfo.getNetworkConsumerIds();
-        Collection<Subscription> currentSubs = 
+        Collection<Subscription> currentSubs =
             getRegionSubscriptions(consumerInfo.getDestination().isTopic());
         for (Subscription sub : currentSubs) {
             List<ConsumerId> networkConsumers = 
sub.getConsumerInfo().getNetworkConsumerIds();
@@ -1070,7 +1071,7 @@ public abstract class DemandForwardingBr
         if (existingSub.getConsumerInfo().getPriority() >= 
candidateInfo.getPriority()) {
             if (LOG.isDebugEnabled()) {
                 LOG.debug(configuration.getBrokerName() + " Ignoring duplicate 
subscription from " + remoteBrokerName
-                        + ", sub: " + candidateInfo + " is duplicated by 
network subscription with equal or higher network priority: " 
+                        + ", sub: " + candidateInfo + " is duplicated by 
network subscription with equal or higher network priority: "
                         + existingSub  + ", networkConsumerIds: " + 
existingSub.getConsumerInfo().getNetworkConsumerIds());
             }
             suppress = true;
@@ -1082,7 +1083,7 @@ public abstract class DemandForwardingBr
                 if (LOG.isDebugEnabled()) {
                     LOG.debug(configuration.getBrokerName() + " Replacing 
duplicate subscription " + existingSub.getConsumerInfo()
                             + " with sub from " + remoteBrokerName
-                            + ", which has a higher priority, new sub: " + 
candidateInfo + ", networkComsumerIds: " 
+                            + ", which has a higher priority, new sub: " + 
candidateInfo + ", networkComsumerIds: "
                             + candidateInfo.getNetworkConsumerIds());
                 }
             } catch (IOException e) {
@@ -1113,7 +1114,7 @@ public abstract class DemandForwardingBr
 
     private final Collection<Subscription> getRegionSubscriptions(boolean 
isTopic) {
         RegionBroker region = (RegionBroker) brokerService.getRegionBroker();
-        AbstractRegion abstractRegion = (AbstractRegion) 
+        AbstractRegion abstractRegion = (AbstractRegion)
             (isTopic ? region.getTopicRegion() : region.getQueueRegion());
         return abstractRegion.getSubscriptions().values();
     }

Added: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java?rev=1177797&view=auto
==============================================================================
--- 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
 (added)
+++ 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
 Fri Sep 30 19:35:28 2011
@@ -0,0 +1,195 @@
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.Connection;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.thread.Task;
+import org.apache.activemq.thread.TaskRunner;
+import org.apache.activemq.thread.TaskRunnerFactory;
+import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportFactory;
+import org.apache.activemq.transport.TransportListener;
+import org.apache.activemq.transport.TransportServer;
+import org.apache.activemq.transport.discovery.simple.SimpleDiscoveryAgent;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+/**
+ * This test involves the creation of a local and remote broker, both of which
+ * communicate over VM and TCP. The local broker establishes a bridge to the
+ * remote broker for the purposes of verifying that broker info is only
+ * transfered once the local broker's ID is known to the bridge support.
+ */
+public class AMQ3014Test {
+       // Change this URL to be an unused port.
+       private static final String REMOTE_BROKER_URL = "tcp://localhost:50000";
+
+       private List<BrokerInfo> remoteBrokerInfos = Collections
+                       .synchronizedList(new ArrayList<BrokerInfo>());
+
+       private BrokerService localBroker = new BrokerService();
+
+       // Override the "remote" broker so that it records all (remote) 
BrokerInfos
+       // that it receives.
+       private BrokerService remoteBroker = new BrokerService() {
+               @Override
+               protected TransportConnector createTransportConnector(URI 
brokerURI)
+                               throws Exception {
+                       TransportServer transport = TransportFactory.bind(this, 
brokerURI);
+                       return new TransportConnector(transport) {
+                               @Override
+                               protected Connection createConnection(Transport 
transport)
+                                               throws IOException {
+                                       Connection connection = 
super.createConnection(transport);
+                                       final TransportListener proxiedListener 
= transport
+                                                       .getTransportListener();
+                                       transport.setTransportListener(new 
TransportListener() {
+
+                                               @Override
+                                               public void onCommand(Object 
command) {
+                                                       if (command instanceof 
BrokerInfo) {
+                                                               
remoteBrokerInfos.add((BrokerInfo) command);
+                                                       }
+                                                       
proxiedListener.onCommand(command);
+                                               }
+
+                                               @Override
+                                               public void 
onException(IOException error) {
+                                                       
proxiedListener.onException(error);
+                                               }
+
+                                               @Override
+                                               public void 
transportInterupted() {
+                                                       
proxiedListener.transportInterupted();
+                                               }
+
+                                               @Override
+                                               public void transportResumed() {
+                                                       
proxiedListener.transportResumed();
+                                               }
+                                       });
+                                       return connection;
+                               }
+
+                       };
+               }
+       };
+
+       @Before
+       public void init() throws Exception {
+               localBroker.setBrokerName("localBroker");
+               localBroker.setPersistent(false);
+               localBroker.setUseJmx(false);
+               localBroker.setSchedulerSupport(false);
+
+               remoteBroker.setBrokerName("remoteBroker");
+               remoteBroker.setPersistent(false);
+               remoteBroker.setUseJmx(false);
+               remoteBroker.addConnector(REMOTE_BROKER_URL);
+               remoteBroker.setSchedulerSupport(false);
+       }
+
+       @After
+       public void cleanup() throws Exception {
+               try {
+                       localBroker.stop();
+               } finally {
+                       remoteBroker.stop();
+               }
+       }
+
+       /**
+        * This test verifies that the local broker's ID is typically known by 
the
+        * bridge support before the local broker's BrokerInfo is sent to the 
remote
+        * broker.
+        */
+       @Test
+       public void NormalCaseTest() throws Exception {
+               runTest(0, 3000);
+       }
+
+       /**
+        * This test verifies that timing can arise under which the local 
broker's
+        * ID is not known by the bridge support before the local broker's
+        * BrokerInfo is sent to the remote broker.
+        */
+       @Test
+       public void DelayedCaseTest() throws Exception {
+               runTest(500, 3000);
+       }
+
+       private void runTest(final long taskRunnerDelay, long timeout)
+                       throws Exception {
+               // Add a network connector to the local broker that will create 
a bridge
+               // to the remote broker.
+               DiscoveryNetworkConnector dnc = new DiscoveryNetworkConnector();
+               SimpleDiscoveryAgent da = new SimpleDiscoveryAgent();
+               da.setServices(REMOTE_BROKER_URL);
+               dnc.setDiscoveryAgent(da);
+               localBroker.addNetworkConnector(dnc);
+
+               // Before starting the local broker, intercept the task runner 
factory
+               // so that the
+               // local VMTransport dispatcher is artificially delayed.
+               final TaskRunnerFactory realTaskRunnerFactory = localBroker
+                               .getTaskRunnerFactory();
+               localBroker.setTaskRunnerFactory(new TaskRunnerFactory() {
+                       public TaskRunner createTaskRunner(Task task, String 
name) {
+                               final TaskRunner realTaskRunner = 
realTaskRunnerFactory
+                                               .createTaskRunner(task, name);
+                               if (name.startsWith("ActiveMQ Connection 
Dispatcher: ")) {
+                                       return new TaskRunner() {
+                                               @Override
+                                               public void shutdown() throws 
InterruptedException {
+                                                       
realTaskRunner.shutdown();
+                                               }
+
+                                               @Override
+                                               public void shutdown(long 
timeout)
+                                                               throws 
InterruptedException {
+                                                       
realTaskRunner.shutdown(timeout);
+                                               }
+
+                                               @Override
+                                               public void wakeup() throws 
InterruptedException {
+                                                       
Thread.sleep(taskRunnerDelay);
+                                                       realTaskRunner.wakeup();
+                                               }
+                                       };
+                               } else {
+                                       return 
realTaskRunnerFactory.createTaskRunner(task, name);
+                               }
+                       }
+               });
+
+               // Start the brokers and wait for the bridge to be created; the 
remote
+               // broker is started first to ensure it is available for the 
local
+               // broker to connect to.
+               remoteBroker.start();
+               localBroker.start();
+
+               // Wait for the remote broker to receive the local broker's 
BrokerInfo
+               // and then verify the local broker's ID is known.
+               long startTimeMillis = System.currentTimeMillis();
+               while (remoteBrokerInfos.isEmpty()
+                               && (System.currentTimeMillis() - 
startTimeMillis) < timeout) {
+                       Thread.sleep(100);
+               }
+
+               Assert.assertFalse("Timed out waiting for bridge to form.",
+                               remoteBrokerInfos.isEmpty());
+               ;
+               Assert.assertNotNull("Local broker ID is null.", 
remoteBrokerInfos.get(
+                               0).getBrokerId());
+       }
+}

Propchange: 
activemq/trunk/activemq-core/src/test/java/org/apache/activemq/bugs/AMQ3014Test.java
------------------------------------------------------------------------------
    svn:eol-style = native


Reply via email to