Author: gnodet
Date: Wed Feb 1 23:47:16 2006
New Revision: 374292
URL: http://svn.apache.org/viewcvs?rev=374292&view=rev
Log:
Fix threading problems in the DemandForwardingBridge.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=374292&r1=374291&r2=374292&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Wed Feb 1 23:47:16 2006
@@ -16,7 +16,6 @@
import java.io.IOException;
import javax.jms.JMSException;
import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.BrokerId;
import org.apache.activemq.command.BrokerInfo;
@@ -49,6 +48,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CountDownLatch;
import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* Forwards messages from the local broker to the remote broker based on
demand.
@@ -94,6 +94,7 @@
ConcurrentHashMap subscriptionMapByRemoteId=new ConcurrentHashMap();
protected final BrokerId localBrokerPath[]=new BrokerId[] { null };
protected final BrokerId remoteBrokerPath[]=new BrokerId[] { null };
+ private CountDownLatch startedLatch = new CountDownLatch(2);
public DemandForwardingBridge(Transport localBroker,Transport
remoteBroker){
this.localBroker=localBroker;
@@ -162,6 +163,7 @@
localBroker.oneway(localSessionInfo);
log.info("Network connection between "+localBroker+" and
"+remoteBroker+"("+remoteBrokerName
+") has been established.");
+ startedLatch.countDown();
}
}
@@ -186,6 +188,7 @@
+destinationFilter));
demandConsumerInfo.setPrefetchSize(prefetchSize);
remoteBroker.oneway(demandConsumerInfo);
+ startedLatch.countDown();
}
}
@@ -214,7 +217,7 @@
}
}
- protected void serviceRemoteException(IOException error){
+ protected void serviceRemoteException(Exception error){
log.info("Network connection between "+localBroker+" and
"+remoteBroker+" shutdown: "+error.getMessage(),error);
ServiceSupport.dispose(this);
}
@@ -223,6 +226,7 @@
if(!disposed){
try{
if(command.isMessageDispatch()){
+ waitStarted();
MessageDispatch md=(MessageDispatch) command;
serviceRemoteConsumerAdvisory(md.getMessage().getDataStructure());
demandConsumerDispatched++;
@@ -239,6 +243,7 @@
if(localBrokerId!=null){
if(localBrokerId.equals(remoteBrokerId)){
log.info("Disconnecting loop back
connection.");
+ waitStarted();
ServiceSupport.dispose(this);
}else{
triggerLocalStartBridge();
@@ -253,7 +258,7 @@
log.warn("Unexpected remote command: "+command);
}
}
- }catch(IOException e){
+ }catch(Exception e){
serviceRemoteException(e);
}
}
@@ -343,6 +348,7 @@
final boolean trace=log.isTraceEnabled();
try{
if(command.isMessageDispatch()){
+ waitStarted();
MessageDispatch md=(MessageDispatch) command;
Message message=md.getMessage();
DemandSubscription sub=(DemandSubscription)
subscriptionMapByLocalId.get(md.getConsumerId());
@@ -381,6 +387,7 @@
if(remoteBrokerId!=null){
if(remoteBrokerId.equals(localBrokerId)){
log.info("Disconnecting loop back
connection.");
+ waitStarted();
ServiceSupport.dispose(this);
}
}
@@ -459,5 +466,9 @@
System.arraycopy(brokerPath,0,rc,0,brokerPath.length);
System.arraycopy(pathsToAppend,0,rc,brokerPath.length,pathsToAppend.length);
return rc;
+ }
+
+ private void waitStarted() throws InterruptedException {
+ startedLatch.await();
}
}