Author: rajdavies
Date: Tue Jan 24 05:34:59 2006
New Revision: 371910
URL: http://svn.apache.org/viewcvs?rev=371910&view=rev
Log:
send back responses
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=371910&r1=371909&r2=371910&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
Tue Jan 24 05:34:59 2006
@@ -26,8 +26,10 @@
import org.apache.activemq.command.CommandTypes;
import org.apache.activemq.command.ConnectionId;
import org.apache.activemq.command.ConnectionInfo;
+import org.apache.activemq.command.ExceptionResponse;
import org.apache.activemq.command.MessageDispatch;
import org.apache.activemq.command.ProducerInfo;
+import org.apache.activemq.command.Response;
import org.apache.activemq.command.SessionInfo;
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.Transport;
@@ -56,7 +58,7 @@
private Transport localBroker;
private Transport remoteBroker;
private TransportConnector connector;
- private AtomicBoolean masterActive = new AtomicBoolean(false);
+ private AtomicBoolean masterActive=new AtomicBoolean(false);
IdGenerator idGenerator=new IdGenerator();
ConnectionInfo connectionInfo;
@@ -64,8 +66,8 @@
ProducerInfo producerInfo;
public MasterConnector(BrokerService broker,TransportConnector connector){
- this.broker = broker;
- this.connector = connector;
+ this.broker=broker;
+ this.connector=connector;
}
public boolean isSlave(){
@@ -127,13 +129,13 @@
producerInfo=new ProducerInfo(sessionInfo,1);
producerInfo.setResponseRequired(false);
remoteBroker.oneway(producerInfo);
-
- BrokerInfo brokerInfo = null;
- if (connector != null){
-
- brokerInfo = connector.getBrokerInfo();
+
+ BrokerInfo brokerInfo=null;
+ if (connector!=null){
+
+ brokerInfo=connector.getBrokerInfo();
}else{
- brokerInfo = new BrokerInfo();
+ brokerInfo=new BrokerInfo();
}
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
@@ -177,9 +179,16 @@
if (command.getDataStructureType()==CommandTypes.SHUTDOWN_INFO){
log.warn("The Master has shutdown");
shutDown();
-
- }else {
+
+ }else{
+ boolean responseRequired = command.isResponseRequired();
+ short commandId = command.getCommandId();
localBroker.oneway(command);
+ if (responseRequired){
+ Response response=new Response();
+ response.setCorrelationId(commandId);
+ remoteBroker.oneway(response);
+ }
}
}catch(IOException e){
serviceRemoteException(e);
@@ -220,10 +229,10 @@
public void setRemoteURI(URI remoteURI){
this.remoteURI=remoteURI;
}
-
+
private void shutDown(){
masterActive.set(false);
broker.masterFailed();
- ServiceSupport.dispose(this);
+ //ServiceSupport.dispose(this);
}
}