Author: tabish
Date: Wed Jun 20 14:56:38 2012
New Revision: 1352137
URL: http://svn.apache.org/viewvc?rev=1352137&view=rev
Log:
fix for: https://issues.apache.org/jira/browse/AMQ-3887
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1352137&r1=1352136&r2=1352137&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
Wed Jun 20 14:56:38 2012
@@ -30,6 +30,7 @@ import java.util.concurrent.atomic.Atomi
import java.util.concurrent.atomic.AtomicLong;
import javax.management.ObjectName;
+
import org.apache.activemq.Service;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.BrokerService;
@@ -41,7 +42,32 @@ import org.apache.activemq.broker.region
import org.apache.activemq.broker.region.RegionBroker;
import org.apache.activemq.broker.region.Subscription;
import org.apache.activemq.broker.region.policy.PolicyEntry;
-import org.apache.activemq.command.*;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQTempDestination;
+import org.apache.activemq.command.ActiveMQTopic;
+import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.command.Command;
+import org.apache.activemq.command.ConnectionError;
+import org.apache.activemq.command.ConnectionId;
+import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.DataStructure;
+import org.apache.activemq.command.DestinationInfo;
+import org.apache.activemq.command.ExceptionResponse;
+import org.apache.activemq.command.KeepAliveInfo;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.MessageDispatch;
+import org.apache.activemq.command.NetworkBridgeFilter;
+import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.activemq.command.Response;
+import org.apache.activemq.command.SessionInfo;
+import org.apache.activemq.command.ShutdownInfo;
+import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.filter.DestinationFilter;
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.thread.DefaultThreadPools;
@@ -245,6 +271,9 @@ public abstract class DemandForwardingBr
}
startedLatch.countDown();
localStartedLatch.countDown();
+
+ safeWaitUntilStarted();
+
if (!disposed.get()) {
setupStaticDestinations();
} else {
@@ -1183,6 +1212,20 @@ public abstract class DemandForwardingBr
startedLatch.await();
}
+ /**
+ * Performs a timed wait on the started latch and then checks for disposed
before performing
+ * another wait each time the the started wait times out.
+ *
+ * @throws InterruptedException
+ */
+ protected void safeWaitUntilStarted() throws InterruptedException {
+ while (!disposed.get()) {
+ if (startedLatch.await(1, TimeUnit.SECONDS)) {
+ return;
+ }
+ }
+ }
+
protected void clearDownSubscriptions() {
subscriptionMapByLocalId.clear();
subscriptionMapByRemoteId.clear();