Author: rajdavies
Date: Sun Feb 19 14:13:54 2006
New Revision: 378967
URL: http://svn.apache.org/viewcvs?rev=378967&view=rev
Log:
remove await latch call if closing a loop back network connector - this
should 'fix' - hanging SpringTest.
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java?rev=378967&r1=378966&r2=378967&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridge.java
Sun Feb 19 14:13:54 2006
@@ -42,6 +42,7 @@
import org.apache.activemq.filter.MessageEvaluationContext;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportListener;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.JMSExceptionSupport;
import org.apache.activemq.util.LongSequenceGenerator;
@@ -93,6 +94,7 @@
protected CountDownLatch startedLatch = new CountDownLatch(2);
protected Object brokerInfoMutex = new Object();
protected boolean decreaseNetworkConsumerPriority;
+ protected boolean shutDown;
protected int networkTTL = 1;
@@ -113,7 +115,7 @@
serviceLocalException(error);
}
});
- remoteBroker.setTransportListener(new DefaultTransportListener(){
+ remoteBroker.setTransportListener(new TransportListener(){
public void onCommand(Command command){
serviceRemoteCommand(command);
}
@@ -121,6 +123,20 @@
public void onException(IOException error){
serviceRemoteException(error);
}
+
+ public void transportInterupted(){
+ //clear any subscriptions - to try and prevent the bridge from
stalling the broker
+ log.warn("Outbound transport to " + remoteBrokerName + "
interrupted ...");
+ clearDownSubscriptions();
+
+ }
+
+ public void transportResumed(){
+ //restart and static subscriptions - the consumer advisories
will be replayed
+ log.info("Outbound transport to " + remoteBrokerName + "
resumed");
+ setupStaticDestinations();
+
+ }
});
localBroker.start();
remoteBroker.start();
@@ -195,25 +211,32 @@
}
+ public void stop() throws Exception{
+ shutDown = true;
+ doStop();
+ }
/**
* stop the bridge
* @throws Exception
*/
- public void stop() throws Exception{
+ protected void doStop() throws Exception{
+ log.debug(" stopping "+localBrokerName+ " bridge to " +
remoteBrokerName + " is disposed already ? "+disposed);
if(!disposed){
try{
disposed=true;
localBridgeStarted.set(false);
remoteBridgeStarted.set(false);
- if(localConnectionInfo!=null){
-
localBroker.request(localConnectionInfo.createRemoveCommand());
-
remoteBroker.request(remoteConnectionInfo.createRemoveCommand());
+ if(!shutDown){
+ remoteBroker.oneway(new ShutdownInfo());
+ if(localConnectionInfo!=null){
+
localBroker.oneway(localConnectionInfo.createRemoveCommand());
+
remoteBroker.oneway(remoteConnectionInfo.createRemoveCommand());
+ }
+ localBroker.oneway(new ShutdownInfo());
}
localBroker.setTransportListener(null);
remoteBroker.setTransportListener(null);
- remoteBroker.oneway(new ShutdownInfo());
- localBroker.oneway(new ShutdownInfo());
}catch(IOException e){
log.debug("Caught exception stopping",e);
}finally{
@@ -223,6 +246,7 @@
ss.throwFirstException();
}
}
+ log.debug(localBrokerName+ " bridge to " + remoteBrokerName + "
stopped");
}
protected void serviceRemoteException(Exception error){
@@ -251,7 +275,7 @@
if(localBrokerId!=null){
if(localBrokerId.equals(remoteBrokerId)){
log.info("Disconnecting loop back
connection.");
- waitStarted();
+ //waitStarted();
ServiceSupport.dispose(this);
}
}
@@ -345,7 +369,6 @@
if(message.getOriginalTransactionId()==null)
message.setOriginalTransactionId(message.getTransactionId());
message.setTransactionId(null);
- message.setRecievedByDFBridge(true);
message.evictMarshlledForm();
return message;
}
@@ -393,8 +416,10 @@
}
}else if(command.isShutdownInfo()){
log.info(localBrokerName+" Shutting down");
- disposed = true;
- stop();
+ shutDown = true;
+ doStop();
+
+
}else{
switch(command.getDataStructureType()){
case WireFormatInfo.DATA_STRUCTURE_TYPE:
@@ -567,6 +592,21 @@
public void setNetworkTTL(int networkTTL){
this.networkTTL=networkTTL;
}
+
+ /**
+ * @return Returns the shutDown.
+ */
+ public boolean isShutDown(){
+ return shutDown;
+ }
+
+ /**
+ * @param shutDown The shutDown to set.
+ */
+ public void setShutDown(boolean shutDown){
+ this.shutDown=shutDown;
+ }
+
private boolean contains(BrokerId[] brokerPath,BrokerId brokerId){
@@ -616,16 +656,19 @@
/**
* Subscriptions for these desitnations are always created
- * @throws IOException
*
*/
- protected void setupStaticDestinations() throws IOException{
+ protected void setupStaticDestinations(){
ActiveMQDestination[] dests = staticallyIncludedDestinations;
if (dests != null){
for(int i=0;i<dests.length;i++){
ActiveMQDestination dest=dests[i];
DemandSubscription sub = createDemandSubscription(dest);
- addSubscription(sub);
+ try{
+ addSubscription(sub);
+ }catch(IOException e){
+ log.error("Failed to add static destination " + dest,e);
+ }
if(log.isTraceEnabled())
log.trace("Forwarding messages for static destination: " +
dest);
}
@@ -633,6 +676,10 @@
}
protected DemandSubscription createDemandSubscription(ConsumerInfo info){
+ return doCreateDemandSubscription(info);
+ }
+
+ protected DemandSubscription doCreateDemandSubscription(ConsumerInfo info){
DemandSubscription result=new DemandSubscription(info);
result.getLocalInfo().setConsumerId(new
ConsumerId(localSessionInfo.getSessionId(),consumerIdGenerator
.getNextSequenceId()));
@@ -711,7 +758,11 @@
if(message.isAdvisory()&&message.getDataStructure()!=null
&&message.getDataStructure().getDataStructureType()==CommandTypes.CONSUMER_INFO){
ConsumerInfo info=(ConsumerInfo) message.getDataStructure();
- if(info.isNetworkSubscription()){
+ hops = info.getBrokerPath() == null ? 0 :
message.getBrokerPath().length;
+ if(hops >= networkTTL){
+ if (log.isTraceEnabled()){
+ log.trace("ConsumerInfo advisory restricted to " +
networkTTL + " network hops ignoring: " + message);
+ }
return false;
}
}
@@ -721,7 +772,12 @@
protected void waitStarted() throws InterruptedException {
startedLatch.await();
}
+
+ protected void clearDownSubscriptions(){
+
+ }
+