Author: rajdavies
Date: Thu Sep 10 13:19:52 2009
New Revision: 813422
URL: http://svn.apache.org/viewvc?rev=813422&view=rev
Log:
Fix for https://issues.apache.org/activemq/browse/AMQ-2071 and
https://issues.apache.org/activemq/browse/AMQ-2070
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java
Thu Sep 10 13:19:52 2009
@@ -31,7 +31,6 @@
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import javax.management.MBeanServer;
import javax.management.MalformedObjectNameException;
import javax.management.ObjectName;
import org.apache.activemq.ActiveMQConnectionMetaData;
@@ -386,7 +385,8 @@
* @return true if this Broker is a slave to a Master
*/
public boolean isSlave() {
- return masterConnector != null && masterConnector.isSlave();
+ return (masterConnector != null && masterConnector.isSlave()) ||
+ (masterConnector != null &&
masterConnector.isStoppedBeforeStart());
}
public void masterFailed() {
@@ -420,7 +420,7 @@
// Service interface
//
-------------------------------------------------------------------------
public void start() throws Exception {
- if (!started.compareAndSet(false, true)) {
+ if (stopped.get() || !started.compareAndSet(false, true)) {
// lets just ignore redundant start() calls
// as its way too easy to not be completely sure if start() has
been
// called or not with the gazillion of different configuration
@@ -467,8 +467,10 @@
if (!isSlave()) {
startAllConnectors();
}
- if (isUseJmx() && masterConnector != null) {
- registerFTConnectorMBean(masterConnector);
+ if (!stopped.get()) {
+ if (isUseJmx() && masterConnector != null) {
+ registerFTConnectorMBean(masterConnector);
+ }
}
brokerId = broker.getBrokerId();
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", "
+ brokerId + ") started");
@@ -477,7 +479,9 @@
} catch (Exception e) {
LOG.error("Failed to start ActiveMQ JMS Message Broker. Reason: "
+ e, e);
try {
- stop();
+ if (!stopped.get()) {
+ stop();
+ }
} catch (Exception ex) {
LOG.warn("Failed to stop broker after failure in start ", ex);
}
@@ -517,6 +521,24 @@
SelectorParser.clearCache();
stopped.set(true);
stoppedLatch.countDown();
+ if (masterConnectorURI == null) {
+ // master start has not finished yet
+ if (slaveStartSignal.getCount() == 1) {
+ started.set(false);
+ slaveStartSignal.countDown();
+ }
+ } else {
+ for (Service service : services) {
+ if (service instanceof MasterConnector) {
+ MasterConnector mConnector = (MasterConnector) service;
+ if (!mConnector.isSlave()) {
+ // means should be slave but not connected to master
yet
+ started.set(false);
+ mConnector.stopBeforeConnected();
+ }
+ }
+ }
+ }
LOG.info("ActiveMQ JMS Message Broker (" + getBrokerName() + ", " +
brokerId + ") stopped");
synchronized (shutdownHooks) {
for (Runnable hook : shutdownHooks) {
@@ -529,6 +551,77 @@
}
stopper.throwFirstException();
}
+
+ public boolean checkQueueSize(String queueName) {
+ long count = 0;
+ long queueSize = 0;
+ Map<ActiveMQDestination, Destination> destinationMap =
regionBroker.getDestinationMap();
+ for (Map.Entry<ActiveMQDestination, Destination> entry :
destinationMap.entrySet()) {
+ if (entry.getKey().isQueue()) {
+ if (entry.getValue().getName().matches(queueName)) {
+ queueSize =
entry.getValue().getDestinationStatistics().getMessages().getCount();
+ count = queueSize;
+ if (queueSize > 0) {
+ LOG.info("Queue has pending message:" +
entry.getValue().getName() + " queueSize is:"
+ + queueSize);
+ }
+ }
+ }
+ }
+ return count == 0;
+ }
+
+ /**
+ * This method (both connectorName and queueName are using regex to match)
+ * 1. stop the connector (supposed the user input the connector which the
+ * clients connect to) 2. to check whether there is any pending message on
+ * the queues defined by queueName 3. supposedly, after stop the connector,
+ * client should failover to other broker and pending messages should be
+ * forwarded. if no pending messages, the method finally call stop to stop
+ * the broker.
+ *
+ * @param connectorName
+ * @param queueName
+ * @param timeout
+ * @param pollInterval
+ * @throws Exception
+ */
+ public void stopGracefully(String connectorName, String queueName, long
timeout, long pollInterval)
+ throws Exception {
+ if (isUseJmx()) {
+ if (connectorName == null || queueName == null || timeout <= 0) {
+ throw new Exception(
+ "connectorName and queueName cannot be null and
timeout should be >0 for stopGracefully.");
+ }
+ if (pollInterval <= 0) {
+ pollInterval = 30;
+ }
+ LOG.info("Stop gracefully with connectorName:" + connectorName + "
queueName:" + queueName + " timeout:"
+ + timeout + " pollInterval:" + pollInterval);
+ TransportConnector connector;
+ for (int i = 0; i < transportConnectors.size(); i++) {
+ connector = transportConnectors.get(i);
+ if (connector != null && connector.getName() != null &&
connector.getName().matches(connectorName)) {
+ connector.stop();
+ }
+ }
+ long start = System.currentTimeMillis();
+ while (System.currentTimeMillis() - start < timeout * 1000) {
+ // check quesize until it gets zero
+ if (checkQueueSize(queueName)) {
+ stop();
+ break;
+ } else {
+ Thread.sleep(pollInterval * 1000);
+ }
+ }
+ if (stopped.get()) {
+ LOG.info("Successfully stop the broker.");
+ } else {
+ LOG.info("There is still pending message on the queue. Please
check and stop the broker manually.");
+ }
+ }
+ }
/**
* A helper method to block the caller thread until the broker has been
@@ -1828,24 +1921,26 @@
if (isWaitForSlave()) {
waitForSlave();
}
- for (Iterator<NetworkConnector> iter =
getNetworkConnectors().iterator(); iter.hasNext();) {
- NetworkConnector connector = iter.next();
- connector.setLocalUri(uri);
- connector.setBrokerName(getBrokerName());
- connector.setDurableDestinations(durableDestinations);
- connector.start();
- }
- for (Iterator<ProxyConnector> iter =
getProxyConnectors().iterator(); iter.hasNext();) {
- ProxyConnector connector = iter.next();
- connector.start();
- }
- for (Iterator<JmsConnector> iter = jmsConnectors.iterator();
iter.hasNext();) {
- JmsConnector connector = iter.next();
- connector.start();
- }
- for (Service service : services) {
- configureService(service);
- service.start();
+ if (!stopped.get()) {
+ for (Iterator<NetworkConnector> iter =
getNetworkConnectors().iterator(); iter.hasNext();) {
+ NetworkConnector connector = iter.next();
+ connector.setLocalUri(uri);
+ connector.setBrokerName(getBrokerName());
+ connector.setDurableDestinations(durableDestinations);
+ connector.start();
+ }
+ for (Iterator<ProxyConnector> iter =
getProxyConnectors().iterator(); iter.hasNext();) {
+ ProxyConnector connector = iter.next();
+ connector.start();
+ }
+ for (Iterator<JmsConnector> iter = jmsConnectors.iterator();
iter.hasNext();) {
+ JmsConnector connector = iter.next();
+ connector.start();
+ }
+ for (Service service : services) {
+ configureService(service);
+ service.start();
+ }
}
}
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
Thu Sep 10 13:19:52 2009
@@ -218,7 +218,7 @@
if(brokerInfo.isSlaveBroker()){
LOG.error("Slave has exception: " +
e.getMessage()+" shutting down master now.", e);
try {
- broker.stop();
+ doStop();
bService.stop();
}catch(Exception ex){
LOG.warn("Failed to stop the master",ex);
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/ft/MasterConnector.java
Thu Sep 10 13:19:52 2009
@@ -38,6 +38,7 @@
import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.transport.DefaultTransportListener;
import org.apache.activemq.transport.Transport;
+import org.apache.activemq.transport.TransportDisposedIOException;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.IdGenerator;
import org.apache.activemq.util.ServiceStopper;
@@ -63,6 +64,7 @@
private Transport remoteBroker;
private TransportConnector connector;
private AtomicBoolean started = new AtomicBoolean(false);
+ private AtomicBoolean stoppedBeforeStart = new AtomicBoolean(false);
private final IdGenerator idGenerator = new IdGenerator();
private String userName;
private String password;
@@ -70,6 +72,8 @@
private SessionInfo sessionInfo;
private ProducerInfo producerInfo;
private final AtomicBoolean masterActive = new AtomicBoolean();
+ private BrokerInfo brokerInfo;
+ private boolean firstConnection=true;
public MasterConnector() {
}
@@ -95,6 +99,15 @@
return masterActive.get();
}
+ protected void restartBridge() throws Exception {
+ localBroker.oneway(connectionInfo);
+ remoteBroker.oneway(connectionInfo);
+ localBroker.oneway(sessionInfo);
+ remoteBroker.oneway(sessionInfo);
+ remoteBroker.oneway(producerInfo);
+ remoteBroker.oneway(brokerInfo);
+ }
+
public void start() throws Exception {
if (!started.compareAndSet(false, true)) {
return;
@@ -130,6 +143,35 @@
serviceRemoteException(error);
}
}
+
+ public void transportResumed() {
+ try{
+ if(!firstConnection){
+ localBroker =
TransportFactory.connect(localURI);
+ localBroker.setTransportListener(new
DefaultTransportListener() {
+
+ public void onCommand(Object command) {
+ }
+
+ public void onException(IOException error) {
+ if (started.get()) {
+ serviceLocalException(error);
+ }
+ }
+ });
+ localBroker.start();
+ restartBridge();
+ LOG.info("Slave connection between " +
localBroker + " and " + remoteBroker + " has been reestablished.");
+ }else{
+ firstConnection=false;
+ }
+ }catch(IOException e){
+ LOG.error("MasterConnector failed to send BrokerInfo in
transportResumed:", e);
+ }catch(Exception e){
+ LOG.error("MasterConnector failed to restart
localBroker in transportResumed:", e);
+ }
+
+ }
});
try {
localBroker.start();
@@ -138,8 +180,12 @@
masterActive.set(true);
} catch (Exception e) {
masterActive.set(false);
- LOG.error("Failed to start network bridge: " + e, e);
- }
+ if(!stoppedBeforeStart.get()){
+ LOG.error("Failed to start network bridge: " + e, e);
+ }else{
+ LOG.info("Slave stopped before connected to the master.");
+ }
+ }
}
protected void startBridge() throws Exception {
@@ -149,15 +195,9 @@
connectionInfo.setUserName(userName);
connectionInfo.setPassword(password);
connectionInfo.setBrokerMasterConnector(true);
- localBroker.oneway(connectionInfo);
- remoteBroker.oneway(connectionInfo);
sessionInfo = new SessionInfo(connectionInfo, 1);
- localBroker.oneway(sessionInfo);
- remoteBroker.oneway(sessionInfo);
producerInfo = new ProducerInfo(sessionInfo, 1);
producerInfo.setResponseRequired(false);
- remoteBroker.oneway(producerInfo);
- BrokerInfo brokerInfo = null;
if (connector != null) {
brokerInfo = connector.getBrokerInfo();
} else {
@@ -166,12 +206,12 @@
brokerInfo.setBrokerName(broker.getBrokerName());
brokerInfo.setPeerBrokerInfos(broker.getBroker().getPeerBrokerInfos());
brokerInfo.setSlaveBroker(true);
- remoteBroker.oneway(brokerInfo);
+ restartBridge();
LOG.info("Slave connection between " + localBroker + " and " +
remoteBroker + " has been established.");
}
public void stop() throws Exception {
- if (!started.compareAndSet(true, false)) {
+ if (!started.compareAndSet(true, false)||!masterActive.get()) {
return;
}
masterActive.set(false);
@@ -192,6 +232,15 @@
ss.throwFirstException();
}
}
+
+ public void stopBeforeConnected()throws Exception{
+ masterActive.set(false);
+ started.set(false);
+ stoppedBeforeStart.set(true);
+ ServiceStopper ss = new ServiceStopper();
+ ss.stop(localBroker);
+ ss.stop(remoteBroker);
+ }
protected void serviceRemoteException(IOException error) {
LOG.error("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown: " + error.getMessage(), error);
@@ -224,8 +273,12 @@
}
protected void serviceLocalException(Throwable error) {
- LOG.info("Network connection between " + localBroker + " and " +
remoteBroker + " shutdown: " + error.getMessage(), error);
- ServiceSupport.dispose(this);
+ if (!(error instanceof TransportDisposedIOException) ||
localBroker.isDisposed()){
+ LOG.info("Network connection between " + localBroker + " and "
+ remoteBroker + " shutdown: " + error.getMessage(), error);
+ ServiceSupport.dispose(this);
+ }else{
+ LOG.info(error.getMessage());
+ }
}
/**
@@ -289,4 +342,9 @@
broker.masterFailed();
ServiceSupport.dispose(this);
}
+
+ public boolean isStoppedBeforeStart() {
+ return stoppedBeforeStart.get();
+ }
+
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerView.java
Thu Sep 10 13:19:52 2009
@@ -75,7 +75,13 @@
public void stop() throws Exception {
brokerService.stop();
}
+
+ public void stopGracefully(String connectorName, String queueName, long
timeout, long pollInterval)
+ throws Exception {
+ brokerService.stopGracefully(connectorName, queueName, timeout,
pollInterval);
+ }
+
public long getTotalEnqueueCount() {
return broker.getDestinationStatistics().getEnqueues().getCount();
}
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java?rev=813422&r1=813421&r2=813422&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/BrokerViewMBean.java
Thu Sep 10 13:19:52 2009
@@ -114,7 +114,10 @@
/**
* Stop the broker and all it's components.
*/
+ @MBeanInfo("Stop the broker and all its components.")
void stop() throws Exception;
+ @MBeanInfo("Poll for queues matching queueName are empty before stopping")
+ void stopGracefully(String connectorName, String queueName, long timeout,
long pollInterval) throws Exception;
@MBeanInfo("Topics (broadcasted 'queues'); generally system information.")
ObjectName[] getTopics();