Author: rajdavies
Date: Thu Mar 13 14:18:35 2008
New Revision: 636897
URL: http://svn.apache.org/viewvc?rev=636897&view=rev
Log:
tidied up detection of stale subscriptions across a network
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Modified:
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
URL:
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java?rev=636897&r1=636896&r2=636897&view=diff
==============================================================================
---
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
(original)
+++
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/cluster/ConnectionSplitBroker.java
Thu Mar 13 14:18:35 2008
@@ -16,33 +16,15 @@
*/
package org.apache.activemq.broker.cluster;
+import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import javax.jms.Connection;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.Session;
-
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisoryBroker;
-import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.broker.Broker;
import org.apache.activemq.broker.BrokerFilter;
import org.apache.activemq.broker.ConnectionContext;
-import org.apache.activemq.broker.TransportConnection;
-import org.apache.activemq.broker.TransportConnector;
import org.apache.activemq.broker.region.Subscription;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.BrokerId;
-import org.apache.activemq.command.ConnectionId;
-import org.apache.activemq.command.ConnectionInfo;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.DataStructure;
-import org.apache.activemq.command.Message;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@@ -53,151 +35,52 @@
*
* @version $Revision$
*/
-public class ConnectionSplitBroker extends BrokerFilter implements
- MessageListener {
- private static final Log LOG = LogFactory
- .getLog(ConnectionSplitBroker.class);
-
- private Connection connection;
-
- private Map<ConnectionId, ConnectionContext> clientMap = new
ConcurrentHashMap<ConnectionId, ConnectionContext>();
- private Map<ConsumerId,ConsumerInfo>consumerMap = new
ConcurrentHashMap<ConsumerId,ConsumerInfo>();
+public class ConnectionSplitBroker extends BrokerFilter{
+ private static final Log LOG =
LogFactory.getLog(ConnectionSplitBroker.class);
+ private List<ConsumerInfo>networkConsumerList = new
ArrayList<ConsumerInfo>();
public ConnectionSplitBroker(Broker next) {
super(next);
}
- public void addConnection(ConnectionContext context, ConnectionInfo info)
- throws Exception {
- if (info != null) {
- removeStaleConnection(info);
- clientMap.put(info.getConnectionId(), context);
- }
- super.addConnection(context, info);
- }
-
- public void removeConnection(ConnectionContext context,
- ConnectionInfo info, Throwable error) throws Exception {
- if (info != null) {
- clientMap.remove(info.getConnectionId());
- }
- super.removeConnection(context, info, error);
- }
-
+
public Subscription addConsumer(ConnectionContext context, ConsumerInfo
info) throws Exception{
- if (info.isNetworkSubscription()) {
- List<ConsumerId>list = info.getNetworkConsumerIds();
- for (ConsumerId id:list) {
- consumerMap.put(id,info);
- }
- }else {
- ConsumerInfo networkInfo = consumerMap.get(info.getConsumerId());
- if (networkInfo != null) {
- networkInfo.removeNetworkConsumerId(info.getConsumerId());
- if (networkInfo.isNetworkConsumersEmpty()) {
- consumerMap.remove(info.getConsumerId());
- super.removeConsumer(context,networkInfo);
+ synchronized (networkConsumerList) {
+ if (info.isNetworkSubscription()) {
+ networkConsumerList.add(info);
+ } else {
+ List<ConsumerInfo> gcList = new ArrayList<ConsumerInfo>();
+ for (ConsumerInfo nc : networkConsumerList) {
+ if (!nc.isNetworkConsumersEmpty()) {
+ for (ConsumerId id : nc.getNetworkConsumerIds()) {
+ if (id.equals(info.getConsumerId())) {
+ nc.removeNetworkConsumerId(id);
+ if (nc.isNetworkConsumersEmpty()) {
+ gcList.add(nc);
+ }
+ }
+ }
+ } else {
+ gcList.add(nc);
+ }
+ }
+ for (ConsumerInfo nc : gcList) {
+ networkConsumerList.remove(nc);
+ super.removeConsumer(context, nc);
+ LOG.warn("Removed stale network consumer" + nc);
}
-
}
}
return super.addConsumer(context, info);
}
-
- public void removeConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception{
+ public void removeConsumer(ConnectionContext context, ConsumerInfo info)
throws Exception {
if (info.isNetworkSubscription()) {
- List<ConsumerId>list = info.getNetworkConsumerIds();
- for (ConsumerId id:list) {
- consumerMap.remove(id);
- }
- }
- super.removeConsumer(context, info);
- }
-
- public void start() throws Exception {
- super.start();
- ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(
- getBrokerService().getVmConnectorURI());
- fac.setCloseTimeout(1);
- fac.setWarnAboutUnstartedConnectionTimeout(10000);
- fac.setWatchTopicAdvisories(false);
- fac.setAlwaysSessionAsync(true);
- fac.setClientID(getBrokerId().toString() + ":" + getBrokerName()
- + ":ConnectionSplitBroker");
- connection = fac.createConnection();
- connection.start();
- Session session = connection.createSession(false,
- Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(AdvisorySupport
- .getConnectionAdvisoryTopic());
- consumer.setMessageListener(this);
- }
- public synchronized void stop() throws Exception {
- if (connection != null) {
- connection.stop();
- connection = null;
- }
- super.stop();
- }
-
- public void onMessage(javax.jms.Message m) {
- ActiveMQMessage message = (ActiveMQMessage) m;
-
- DataStructure o = message.getDataStructure();
- if (o != null && o.getClass() == ConnectionInfo.class) {
- ConnectionInfo info = (ConnectionInfo) o;
-
- String brokerId = null;
- try {
- brokerId = message
-
.getStringProperty(AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID);
- if (brokerId != null
- && !brokerId.equals(getBrokerId().getValue())) {
- // see if it already exits
- removeStaleConnection(info);
- }
- } catch (JMSException e) {
- LOG.warn("Failed to get message property "
- + AdvisorySupport.MSG_PROPERTY_ORIGIN_BROKER_ID, e);
- }
-
- }
-
- }
-
- protected boolean contains(BrokerId[] brokerPath, BrokerId brokerId) {
- if (brokerPath != null) {
- for (int i = 0; i < brokerPath.length; i++) {
- if (brokerId.equals(brokerPath[i])) {
- return true;
- }
- }
- }
- return false;
- }
-
- protected void removeStaleConnection(ConnectionInfo info) {
- // see if it already exits
- ConnectionContext old = clientMap.remove(info
- .getConnectionId());
- if (old != null && old.getConnection() != null) {
- String str = "connectionId=" + old.getConnectionId()
- + ",clientId=" + old.getClientId();
- LOG.warn("Removing stale connection: " + str);
- try {
- // remove connection states
- TransportConnection connection = (TransportConnection) old
- .getConnection();
- connection.processRemoveConnection(old
- .getConnectionId());
- connection.stopAsync();
- } catch (Exception e) {
- LOG.error("Failed to remove stale connection: "
- + str, e);
+ synchronized (networkConsumerList) {
+ networkConsumerList.remove(info);
}
}
+ super.removeConsumer(context, info);
}
-
}