Author: rajdavies
Date: Fri Sep 11 16:16:08 2009
New Revision: 813906
URL: http://svn.apache.org/viewvc?rev=813906&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2387
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Fri Sep 11 16:16:08 2009
@@ -111,6 +111,7 @@
private boolean shutdownOnMasterFailure;
private boolean shutdownOnSlaveFailure;
private boolean waitForSlave;
+ private boolean passiveSlave;
private String brokerName = DEFAULT_BROKER_NAME;
private File dataDirectoryFile;
private File tmpDataDirectory;
@@ -1551,7 +1552,7 @@
getManagementContext().unregisterMBean(objectName);
} catch (Throwable e) {
throw IOExceptionSupport.create(
- "Transport Connector could not be registered in JMX: "
+ e.getMessage(), e);
+ "Transport Connector could not be unregistered in JMX:
" + e.getMessage(), e);
}
}
}
@@ -2069,6 +2070,22 @@
public CountDownLatch getSlaveStartSignal() {
return slaveStartSignal;
}
+
+ /**
+ * Get the passiveSlave
+ * @return the passiveSlave
+ */
+ public boolean isPassiveSlave() {
+ return this.passiveSlave;
+ }
+
+ /**
+ * Set the passiveSlave
+ * @param passiveSlave the passiveSlave to set
+ */
+ public void setPassiveSlave(boolean passiveSlave) {
+ this.passiveSlave = passiveSlave;
+ }
}
\ No newline at end of file
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Fri Sep 11 16:16:08 2009
@@ -91,17 +91,13 @@
import org.apache.activemq.util.URISupport;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-
/**
* @version $Revision: 1.8 $
*/
public class TransportConnection implements Connection, Task, CommandVisitor {
-
private static final Log LOG =
LogFactory.getLog(TransportConnection.class);
- private static final Log TRANSPORTLOG =
LogFactory.getLog(TransportConnection.class.getName()
- + ".Transport");
+ private static final Log TRANSPORTLOG =
LogFactory.getLog(TransportConnection.class.getName() + ".Transport");
private static final Log SERVICELOG =
LogFactory.getLog(TransportConnection.class.getName() + ".Service");
-
// Keeps track of the broker and connector that created this connection.
protected final Broker broker;
protected final TransportConnector connector;
@@ -115,7 +111,6 @@
protected TaskRunner taskRunner;
protected final AtomicReference<IOException> transportException = new
AtomicReference<IOException>();
protected AtomicBoolean dispatchStopped = new AtomicBoolean(false);
-
private MasterBroker masterBroker;
private final Transport transport;
private MessageAuthorizationPolicy messageAuthorizationPolicy;
@@ -147,23 +142,22 @@
private DemandForwardingBridge duplexBridge;
private final TaskRunnerFactory taskRunnerFactory;
private TransportConnectionStateRegister connectionStateRegister = new
SingleTransportConnectionStateRegister();
-
private final ReentrantReadWriteLock serviceLock = new
ReentrantReadWriteLock();
-
-
+
/**
* @param connector
* @param transport
* @param broker
- * @param taskRunnerFactory - can be null if you want direct dispatch to
the
- * transport else commands are sent async.
+ * @param taskRunnerFactory
+ * - can be null if you want direct dispatch to the transport
+ * else commands are sent async.
*/
public TransportConnection(TransportConnector connector, final Transport
transport, Broker broker,
- TaskRunnerFactory taskRunnerFactory) {
+ TaskRunnerFactory taskRunnerFactory) {
this.connector = connector;
this.broker = broker;
this.messageAuthorizationPolicy =
connector.getMessageAuthorizationPolicy();
- RegionBroker rb = (RegionBroker)broker.getAdaptor(RegionBroker.class);
+ RegionBroker rb = (RegionBroker) broker.getAdaptor(RegionBroker.class);
brokerConnectionStates = rb.getConnectionStates();
if (connector != null) {
this.statistics.setParent(connector.getStatistics());
@@ -171,14 +165,13 @@
this.taskRunnerFactory = taskRunnerFactory;
this.transport = transport;
this.transport.setTransportListener(new DefaultTransportListener() {
-
public void onCommand(Object o) {
serviceLock.readLock().lock();
try {
if (!(o instanceof Command)) {
throw new RuntimeException("Protocol violation -
Command corrupted: " + o.toString());
}
- Command command = (Command)o;
+ Command command = (Command) o;
Response response = service(command);
if (response != null) {
dispatchSync(response);
@@ -206,26 +199,26 @@
* @return size of dispatch queue
*/
public int getDispatchQueueSize() {
- synchronized(dispatchQueue) {
+ synchronized (dispatchQueue) {
return dispatchQueue.size();
}
}
public void serviceTransportException(IOException e) {
- BrokerService bService=connector.getBrokerService();
- if(bService.isShutdownOnSlaveFailure()){
- if(brokerInfo!=null){
- if(brokerInfo.isSlaveBroker()){
- LOG.error("Slave has exception: " +
e.getMessage()+" shutting down master now.", e);
- try {
- doStop();
- bService.stop();
- }catch(Exception ex){
- LOG.warn("Failed to stop the master",ex);
- }
- }
- }
- }
+ BrokerService bService = connector.getBrokerService();
+ if (bService.isShutdownOnSlaveFailure()) {
+ if (brokerInfo != null) {
+ if (brokerInfo.isSlaveBroker()) {
+ LOG.error("Slave has exception: " + e.getMessage() + "
shutting down master now.", e);
+ try {
+ doStop();
+ bService.stop();
+ } catch (Exception ex) {
+ LOG.warn("Failed to stop the master", ex);
+ }
+ }
+ }
+ }
if (!stopping.get()) {
transportException.set(e);
if (TRANSPORTLOG.isDebugEnabled()) {
@@ -258,21 +251,17 @@
* error transmitted to the client before stopping it's transport.
*/
public void serviceException(Throwable e) {
-
// are we a transport exception such as not being able to dispatch
// synchronously to a transport
if (e instanceof IOException) {
- serviceTransportException((IOException)e);
+ serviceTransportException((IOException) e);
} else if (e.getClass() == BrokerStoppedException.class) {
// Handle the case where the broker is stopped
// But the client is still connected.
-
if (!stopping.get()) {
if (SERVICELOG.isDebugEnabled()) {
- SERVICELOG
- .debug("Broker has been stopped. Notifying client and
closing his connection.");
+ SERVICELOG.debug("Broker has been stopped. Notifying
client and closing his connection.");
}
-
ConnectionError ce = new ConnectionError();
ce.setException(e);
dispatchSync(ce);
@@ -308,8 +297,7 @@
response = command.visit(this);
} catch (Throwable e) {
if (SERVICELOG.isDebugEnabled() && e.getClass() !=
BrokerStoppedException.class) {
- SERVICELOG.debug("Error occured while processing "
- + (responseRequired ? "sync": "async")
+ SERVICELOG.debug("Error occured while processing " +
(responseRequired ? "sync" : "async")
+ " command: " + command + ", exception: " + e, e);
}
if (responseRequired) {
@@ -396,7 +384,7 @@
TransactionState transactionState =
cs.getTransactionState(info.getTransactionId());
if (transactionState == null) {
throw new IllegalStateException("Cannot prepare a transaction that
had not been started: "
- + info.getTransactionId());
+ + info.getTransactionId());
}
// Avoid dups.
if (!transactionState.isPrepared()) {
@@ -466,8 +454,7 @@
return
broker.messagePull(lookupConnectionState(pull.getConsumerId()).getContext(),
pull);
}
- public Response
processMessageDispatchNotification(MessageDispatchNotification notification)
- throws Exception {
+ public Response
processMessageDispatchNotification(MessageDispatchNotification notification)
throws Exception {
broker.processDispatchNotification(notification);
return null;
}
@@ -496,9 +483,8 @@
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
- throw new IllegalStateException(
- "Cannot add a producer to a
session that had not been registered: "
- + sessionId);
+ throw new IllegalStateException("Cannot add a producer to a
session that had not been registered: "
+ + sessionId);
}
// Avoid replaying dup commands
if (!ss.getProducerIds().contains(info.getProducerId())) {
@@ -518,9 +504,8 @@
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
- throw new IllegalStateException(
- "Cannot remove a producer from a
session that had not been registered: "
- + sessionId);
+ throw new IllegalStateException("Cannot remove a producer from a
session that had not been registered: "
+ + sessionId);
}
ProducerState ps = ss.removeProducer(id);
if (ps == null) {
@@ -537,9 +522,8 @@
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
- throw new IllegalStateException(
- broker.getBrokerName() + " Cannot
add a consumer to a session that had not been registered: "
- + sessionId);
+ throw new IllegalStateException(broker.getBrokerName()
+ + " Cannot add a consumer to a session that had not been
registered: " + sessionId);
}
// Avoid replaying dup commands
if (!ss.getConsumerIds().contains(info.getConsumerId())) {
@@ -559,9 +543,8 @@
TransportConnectionState cs = lookupConnectionState(connectionId);
SessionState ss = cs.getSessionState(sessionId);
if (ss == null) {
- throw new IllegalStateException(
- "Cannot remove a consumer from a
session that had not been registered: "
- + sessionId);
+ throw new IllegalStateException("Cannot remove a consumer from a
session that had not been registered: "
+ + sessionId);
}
ConsumerState consumerState = ss.removeConsumer(id);
if (consumerState == null) {
@@ -583,7 +566,7 @@
try {
cs.addSession(info);
} catch (IllegalStateException e) {
- e.printStackTrace();
+ e.printStackTrace();
broker.removeSession(cs.getContext(), info);
}
}
@@ -602,7 +585,7 @@
session.shutdown();
// Cascade the connection stop to the consumers and producers.
for (Iterator iter = session.getConsumerIds().iterator();
iter.hasNext();) {
- ConsumerId consumerId = (ConsumerId)iter.next();
+ ConsumerId consumerId = (ConsumerId) iter.next();
try {
processRemoveConsumer(consumerId, lastDeliveredSequenceId);
} catch (Throwable e) {
@@ -610,7 +593,7 @@
}
}
for (Iterator iter = session.getProducerIds().iterator();
iter.hasNext();) {
- ProducerId producerId = (ProducerId)iter.next();
+ ProducerId producerId = (ProducerId) iter.next();
try {
processRemoveProducer(producerId);
} catch (Throwable e) {
@@ -623,29 +606,29 @@
}
public Response processAddConnection(ConnectionInfo info) throws Exception
{
- //if the broker service has slave attached, wait for the slave to be
attached to allow client connection. slave connection is fine
-
if(!info.isBrokerMasterConnector()&&connector.getBrokerService().isWaitForSlave()&&connector.getBrokerService().getSlaveStartSignal().getCount()==1){
- ServiceSupport.dispose(transport);
- return new ExceptionResponse(new Exception("Master's
slave not attached yet."));
- }
- // Older clients should have been defaulting this field to true.. but
they were not.
- if( wireFormatInfo!=null && wireFormatInfo.getVersion() <= 2 ) {
+ // if the broker service has slave attached, wait for the slave to be
+ // attached to allow client connection. slave connection is fine
+ if (!info.isBrokerMasterConnector() &&
connector.getBrokerService().isWaitForSlave()
+ &&
connector.getBrokerService().getSlaveStartSignal().getCount() == 1) {
+ ServiceSupport.dispose(transport);
+ return new ExceptionResponse(new Exception("Master's slave not
attached yet."));
+ }
+ // Older clients should have been defaulting this field to true.. but
+ // they were not.
+ if (wireFormatInfo != null && wireFormatInfo.getVersion() <= 2) {
info.setClientMaster(true);
}
-
TransportConnectionState state;
-
// Make sure 2 concurrent connections by the same ID only generate 1
// TransportConnectionState object.
synchronized (brokerConnectionStates) {
- state =
(TransportConnectionState)brokerConnectionStates.get(info.getConnectionId());
+ state = (TransportConnectionState)
brokerConnectionStates.get(info.getConnectionId());
if (state == null) {
state = new TransportConnectionState(info, this);
brokerConnectionStates.put(info.getConnectionId(), state);
}
state.incrementReference();
}
-
// If there are 2 concurrent connections for the same connection id,
// then last one in wins, we need to sync here
// to figure out the winner.
@@ -654,14 +637,12 @@
LOG.debug("Killing previous stale connection: " +
state.getConnection().getRemoteAddress());
state.getConnection().stop();
LOG.debug("Connection " + getRemoteAddress() + " taking over
previous connection: "
- + state.getConnection().getRemoteAddress());
+ + state.getConnection().getRemoteAddress());
state.setConnection(this);
state.reset(info);
}
}
-
registerConnectionState(info.getConnectionId(), state);
-
LOG.debug("Setting up new connection: " + getRemoteAddress());
// Setup the context.
String clientId = info.getClientId();
@@ -681,13 +662,12 @@
this.manageable = info.isManageable();
state.setContext(context);
state.setConnection(this);
-
try {
- broker.addConnection(context, info);
- }catch(Exception e){
- brokerConnectionStates.remove(info);
- LOG.warn("Failed to add Connection",e);
- throw e;
+ broker.addConnection(context, info);
+ } catch (Exception e) {
+ brokerConnectionStates.remove(info);
+ LOG.warn("Failed to add Connection", e);
+ throw e;
}
if (info.isManageable() && broker.isFaultTolerantConfiguration()) {
// send ConnectionCommand
@@ -698,16 +678,17 @@
return null;
}
- public synchronized Response processRemoveConnection(ConnectionId id, long
lastDeliveredSequenceId) throws InterruptedException {
+ public synchronized Response processRemoveConnection(ConnectionId id, long
lastDeliveredSequenceId)
+ throws InterruptedException {
TransportConnectionState cs = lookupConnectionState(id);
if (cs != null) {
- // Don't allow things to be added to the connection state while we
are
+ // Don't allow things to be added to the connection state while we
+ // are
// shutting down.
cs.shutdown();
-
// Cascade the connection stop to the sessions.
for (Iterator iter = cs.getSessionIds().iterator();
iter.hasNext();) {
- SessionId sessionId = (SessionId)iter.next();
+ SessionId sessionId = (SessionId) iter.next();
try {
processRemoveSession(sessionId, lastDeliveredSequenceId);
} catch (Throwable e) {
@@ -716,7 +697,7 @@
}
// Cascade the connection stop to temp destinations.
for (Iterator iter = cs.getTempDesinations().iterator();
iter.hasNext();) {
- DestinationInfo di = (DestinationInfo)iter.next();
+ DestinationInfo di = (DestinationInfo) iter.next();
try {
broker.removeDestination(cs.getContext(),
di.getDestination(), 0);
} catch (Throwable e) {
@@ -729,7 +710,6 @@
} catch (Throwable e) {
SERVICELOG.warn("Failed to remove connection " + cs.getInfo(),
e);
}
-
TransportConnectionState state = unregisterConnectionState(id);
if (state != null) {
synchronized (brokerConnectionStates) {
@@ -754,7 +734,7 @@
}
public void dispatchSync(Command message) {
- //getStatistics().getEnqueues().increment();
+ // getStatistics().getEnqueues().increment();
try {
processDispatch(message);
} catch (IOException e) {
@@ -764,11 +744,11 @@
public void dispatchAsync(Command message) {
if (!stopping.get()) {
- //getStatistics().getEnqueues().increment();
+ // getStatistics().getEnqueues().increment();
if (taskRunner == null) {
dispatchSync(message);
} else {
- synchronized(dispatchQueue) {
+ synchronized (dispatchQueue) {
dispatchQueue.add(message);
}
try {
@@ -779,7 +759,7 @@
}
} else {
if (message.isMessageDispatch()) {
- MessageDispatch md = (MessageDispatch)message;
+ MessageDispatch md = (MessageDispatch) message;
Runnable sub = md.getTransmitCallback();
broker.postProcessDispatch(md);
if (sub != null) {
@@ -790,8 +770,7 @@
}
protected void processDispatch(Command command) throws IOException {
- final MessageDispatch messageDispatch =
(MessageDispatch)(command.isMessageDispatch()
- ? command : null);
+ final MessageDispatch messageDispatch = (MessageDispatch)
(command.isMessageDispatch() ? command : null);
try {
if (!stopping.get()) {
if (messageDispatch != null) {
@@ -807,7 +786,7 @@
sub.run();
}
}
- //getStatistics().getDequeues().increment();
+ // getStatistics().getDequeues().increment();
}
}
@@ -825,10 +804,9 @@
}
return false;
}
-
if (!dispatchStopped.get()) {
Command command = null;
- synchronized(dispatchQueue) {
+ synchronized (dispatchQueue) {
if (dispatchQueue.isEmpty()) {
return false;
}
@@ -838,7 +816,6 @@
return true;
}
return false;
-
} catch (IOException e) {
if (dispatchStopped.compareAndSet(false, true)) {
dispatchStoppedLatch.countDown();
@@ -870,19 +847,18 @@
public void start() throws Exception {
starting = true;
try {
- synchronized(this) {
- if (taskRunnerFactory != null) {
- taskRunner = taskRunnerFactory.createTaskRunner(this,
"ActiveMQ Connection Dispatcher: "
- +
getRemoteAddress());
- } else {
- taskRunner = null;
- }
- transport.start();
-
- active = true;
- dispatchAsync(connector.getBrokerInfo());
- connector.onStarted(this);
- }
+ synchronized (this) {
+ if (taskRunnerFactory != null) {
+ taskRunner = taskRunnerFactory.createTaskRunner(this,
"ActiveMQ Connection Dispatcher: "
+ + getRemoteAddress());
+ } else {
+ taskRunner = null;
+ }
+ transport.start();
+ active = true;
+ dispatchAsync(connector.getBrokerInfo());
+ connector.onStarted(this);
+ }
} catch (Exception e) {
// Force clean up on an error starting up.
stop();
@@ -898,6 +874,7 @@
}
}
}
+
public void stop() throws Exception {
synchronized (this) {
pendingStop = true;
@@ -907,31 +884,30 @@
}
}
stopAsync();
- while( !stopped.await(5, TimeUnit.SECONDS) ) {
- LOG.info("The connection to '" + transport.getRemoteAddress()+ "'
is taking a long time to shutdown.");
+ while (!stopped.await(5, TimeUnit.SECONDS)) {
+ LOG.info("The connection to '" + transport.getRemoteAddress() + "'
is taking a long time to shutdown.");
}
}
-
+
public void stopAsync() {
// If we're in the middle of starting
// then go no further... for now.
if (stopping.compareAndSet(false, true)) {
-
// Let all the connection contexts know we are shutting down
// so that in progress operations can notice and unblock.
List<TransportConnectionState> connectionStates =
listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().getStopping().set(true);
}
-
- new Thread("ActiveMQ Transport Stopper: "+
transport.getRemoteAddress()) {
+ new Thread("ActiveMQ Transport Stopper: " +
transport.getRemoteAddress()) {
@Override
public void run() {
serviceLock.writeLock().lock();
try {
doStop();
} catch (Throwable e) {
- LOG.debug("Error occured while shutting down a
connection to '" + transport.getRemoteAddress()+ "': ", e);
+ LOG.debug("Error occured while shutting down a
connection to '" + transport.getRemoteAddress()
+ + "': ", e);
} finally {
stopped.countDown();
serviceLock.writeLock().unlock();
@@ -943,9 +919,9 @@
@Override
public String toString() {
- return "Transport Connection to: "+transport.getRemoteAddress();
+ return "Transport Connection to: " + transport.getRemoteAddress();
}
-
+
protected void doStop() throws Exception, InterruptedException {
LOG.debug("Stopping connection: " + transport.getRemoteAddress());
connector.onStopped(this);
@@ -958,31 +934,26 @@
duplexBridge.stop();
}
}
-
} catch (Exception ignore) {
LOG.trace("Exception caught stopping", ignore);
}
-
try {
transport.stop();
LOG.debug("Stopped transport: " + transport.getRemoteAddress());
} catch (Exception e) {
LOG.debug("Could not stop transport: " + e, e);
}
-
if (taskRunner != null) {
taskRunner.shutdown(1);
}
-
active = false;
-
// Run the MessageDispatch callbacks so that message references get
// cleaned up.
- synchronized(dispatchQueue) {
+ synchronized (dispatchQueue) {
for (Iterator<Command> iter = dispatchQueue.iterator();
iter.hasNext();) {
Command command = iter.next();
if (command.isMessageDispatch()) {
- MessageDispatch md = (MessageDispatch)command;
+ MessageDispatch md = (MessageDispatch) command;
Runnable sub = md.getTransmitCallback();
broker.postProcessDispatch(md);
if (sub != null) {
@@ -995,9 +966,7 @@
//
// Remove all logical connection associated with this connection
// from the broker.
-
if (!broker.isStopped()) {
-
List<TransportConnectionState> connectionStates =
listConnectionStates();
connectionStates = listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
@@ -1009,7 +978,6 @@
ignore.printStackTrace();
}
}
-
if (brokerInfo != null) {
broker.removeBroker(this, brokerInfo);
}
@@ -1025,7 +993,8 @@
}
/**
- * @param blockedCandidate The blockedCandidate to set.
+ * @param blockedCandidate
+ * The blockedCandidate to set.
*/
public void setBlockedCandidate(boolean blockedCandidate) {
this.blockedCandidate = blockedCandidate;
@@ -1039,7 +1008,8 @@
}
/**
- * @param markedCandidate The markedCandidate to set.
+ * @param markedCandidate
+ * The markedCandidate to set.
*/
public void setMarkedCandidate(boolean markedCandidate) {
this.markedCandidate = markedCandidate;
@@ -1050,7 +1020,8 @@
}
/**
- * @param slow The slow to set.
+ * @param slow
+ * The slow to set.
*/
public void setSlow(boolean slow) {
this.slow = slow;
@@ -1094,14 +1065,16 @@
}
/**
- * @param blocked The blocked to set.
+ * @param blocked
+ * The blocked to set.
*/
public void setBlocked(boolean blocked) {
this.blocked = blocked;
}
/**
- * @param connected The connected to set.
+ * @param connected
+ * The connected to set.
*/
public void setConnected(boolean connected) {
this.connected = connected;
@@ -1115,7 +1088,8 @@
}
/**
- * @param active The active to set.
+ * @param active
+ * The active to set.
*/
public void setActive(boolean active) {
this.active = active;
@@ -1127,7 +1101,7 @@
public synchronized boolean isStarting() {
return starting;
}
-
+
public synchronized boolean isNetworkConnection() {
return networkConnection;
}
@@ -1149,15 +1123,20 @@
public Response processBrokerInfo(BrokerInfo info) {
if (info.isSlaveBroker()) {
- // stream messages from this broker (the master) to
- // the slave
- MutableBrokerFilter parent =
(MutableBrokerFilter)broker.getAdaptor(MutableBrokerFilter.class);
- masterBroker = new MasterBroker(parent, transport);
- masterBroker.startProcessing();
- LOG.info("Slave Broker " + info.getBrokerName() + " is attached");
- BrokerService bService=connector.getBrokerService();
+ BrokerService bService = connector.getBrokerService();
+ // Do we only support passive slaves - or does the slave want to be
+ // passive ?
+ boolean passive = bService.isPassiveSlave() ||
info.isPassiveSlave();
+ if (passive == false) {
+
+ // stream messages from this broker (the master) to
+ // the slave
+ MutableBrokerFilter parent = (MutableBrokerFilter)
broker.getAdaptor(MutableBrokerFilter.class);
+ masterBroker = new MasterBroker(parent, transport);
+ masterBroker.startProcessing();
+ }
+ LOG.info((passive?"Passive":"Active")+" Slave Broker " +
info.getBrokerName() + " is attached");
bService.slaveConnectionEstablished();
-
} else if (info.isNetworkConnection() && info.isDuplexConnection()) {
// so this TransportConnection is the rear end of a network bridge
// We have been requested to create a two way pipe ...
@@ -1174,14 +1153,12 @@
uri = URISupport.createURIWithQuery(uri,
URISupport.createQueryString(map));
Transport localTransport = TransportFactory.connect(uri);
Transport remoteBridgeTransport = new
ResponseCorrelator(transport);
- duplexBridge = NetworkBridgeFactory.createBridge(config,
localTransport,
-
remoteBridgeTransport);
+ duplexBridge = NetworkBridgeFactory.createBridge(config,
localTransport, remoteBridgeTransport);
duplexBridge.setBrokerService(broker.getBrokerService());
// now turn duplex off this side
info.setDuplexConnection(false);
duplexBridge.setCreatedByDuplex(true);
- duplexBridge.duplexStart(this,brokerInfo, info);
-
+ duplexBridge.duplexStart(this, brokerInfo, info);
LOG.info("Created Duplex Bridge back to " +
info.getBrokerName());
return null;
} catch (Exception e) {
@@ -1195,12 +1172,10 @@
this.brokerInfo = info;
broker.addBroker(this, info);
networkConnection = true;
-
List<TransportConnectionState> connectionStates =
listConnectionStates();
for (TransportConnectionState cs : connectionStates) {
cs.getContext().setNetworkConnection(true);
}
-
return null;
}
@@ -1247,8 +1222,7 @@
ProducerState producerState = ss.getProducerState(id);
if (producerState != null && producerState.getInfo() !=
null) {
ProducerInfo info = producerState.getInfo();
- result.setMutable(info.getDestination() == null
- ||
info.getDestination().isComposite());
+ result.setMutable(info.getDestination() == null ||
info.getDestination().isComposite());
}
}
producerExchanges.put(id, result);
@@ -1314,8 +1288,8 @@
}
public Response processConnectionControl(ConnectionControl control) throws
Exception {
- if(control != null) {
- faultTolerantConnection=control.isFaultTolerant();
+ if (control != null) {
+ faultTolerantConnection = control.isFaultTolerant();
}
return null;
}
@@ -1328,16 +1302,17 @@
return null;
}
- protected synchronized TransportConnectionState
registerConnectionState(ConnectionId connectionId,TransportConnectionState
state) {
+ protected synchronized TransportConnectionState
registerConnectionState(ConnectionId connectionId,
+ TransportConnectionState state) {
TransportConnectionState cs = null;
- if (!connectionStateRegister.isEmpty() &&
!connectionStateRegister.doesHandleMultipleConnectionStates()){
- //swap implementations
- TransportConnectionStateRegister newRegister = new
MapTransportConnectionStateRegister();
- newRegister.intialize(connectionStateRegister);
- connectionStateRegister = newRegister;
+ if (!connectionStateRegister.isEmpty() &&
!connectionStateRegister.doesHandleMultipleConnectionStates()) {
+ // swap implementations
+ TransportConnectionStateRegister newRegister = new
MapTransportConnectionStateRegister();
+ newRegister.intialize(connectionStateRegister);
+ connectionStateRegister = newRegister;
}
- cs= connectionStateRegister.registerConnectionState(connectionId,
state);
- return cs;
+ cs = connectionStateRegister.registerConnectionState(connectionId,
state);
+ return cs;
}
protected synchronized TransportConnectionState
unregisterConnectionState(ConnectionId connectionId) {
@@ -1349,15 +1324,15 @@
}
protected synchronized TransportConnectionState
lookupConnectionState(String connectionId) {
- return connectionStateRegister.lookupConnectionState(connectionId);
+ return connectionStateRegister.lookupConnectionState(connectionId);
}
protected synchronized TransportConnectionState
lookupConnectionState(ConsumerId id) {
- return connectionStateRegister.lookupConnectionState(id);
+ return connectionStateRegister.lookupConnectionState(id);
}
protected synchronized TransportConnectionState
lookupConnectionState(ProducerId id) {
- return connectionStateRegister.lookupConnectionState(id);
+ return connectionStateRegister.lookupConnectionState(id);
}
protected synchronized TransportConnectionState
lookupConnectionState(SessionId id) {
@@ -1367,5 +1342,4 @@
protected synchronized TransportConnectionState
lookupConnectionState(ConnectionId connectionId) {
return connectionStateRegister.lookupConnectionState(connectionId);
}
-
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
Fri Sep 11 16:16:08 2009
@@ -139,7 +139,7 @@
}
public void onException(IOException error) {
- if (started.get()) {
+ if (started.get() && remoteBroker.isDisposed()) {
serviceRemoteException(error);
}
}
@@ -206,6 +206,7 @@
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
brokerInfo.setSlaveBroker(true);
+ brokerInfo.setPassiveSlave(broker.isPassiveSlave());
restartBridge();
LOG.info("Slave connection between " + localBroker + " and " +
remoteBroker + " has been established.");
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
Fri Sep 11 16:16:08 2009
@@ -16,7 +16,13 @@
*/
package org.apache.activemq.command;
+import org.apache.activemq.plugin.StatisticsBrokerPlugin;
import org.apache.activemq.state.CommandVisitor;
+import org.apache.activemq.util.MarshallingSupport;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import java.io.IOException;
+import java.util.Properties;
/**
* When a client connects to a broker, the broker send the client a BrokerInfo
@@ -28,6 +34,8 @@
* @version $Revision: 1.7 $
*/
public class BrokerInfo extends BaseCommand {
+ private static Log LOG = LogFactory.getLog(BrokerInfo.class);
+ private static final String PASSIVE_SLAVE_KEY = "passiveSlave";
public static final byte DATA_STRUCTURE_TYPE = CommandTypes.BROKER_INFO;
BrokerId brokerId;
String brokerURL;
@@ -209,4 +217,33 @@
public void setNetworkProperties(String networkProperties) {
this.networkProperties = networkProperties;
}
+
+ public boolean isPassiveSlave() {
+ boolean result = false;
+ Properties props = getProperties();
+ if (props != null) {
+ result = Boolean.parseBoolean(props.getProperty(PASSIVE_SLAVE_KEY,
"false"));
+ }
+ return result;
+ }
+
+ public void setPassiveSlave(boolean value) {
+ Properties props = new Properties();
+ props.put(PASSIVE_SLAVE_KEY, Boolean.toString(value));
+ try {
+
this.networkProperties=MarshallingSupport.propertiesToString(props);
+ } catch (IOException e) {
+ LOG.error("Failed to marshall props to a String",e);
+ }
+ }
+
+ public Properties getProperties() {
+ Properties result = null;
+ try {
+ result =
MarshallingSupport.stringToProperties(getNetworkProperties());
+ } catch (IOException e) {
+ LOG.error("Failed to marshall properties", e);
+ }
+ return result;
+ }
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=813906&r1=813905&r2=813906&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
Fri Sep 11 16:16:08 2009
@@ -25,10 +25,8 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
-import java.util.Random;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.atomic.AtomicReference;
-
import org.apache.activemq.command.BrokerInfo;
import org.apache.activemq.command.Command;
import org.apache.activemq.command.ConnectionControl;