Author: chirino Date: Fri Feb 29 13:59:33 2008 New Revision: 632455 URL: http://svn.apache.org/viewvc?rev=632455&view=rev Log: The Producer MemoryLimit can lead to network deadlock when spooling is disabled. So we now disable using it when sooling is used on a queue.
see: https://issues.apache.org/activemq/browse/AMQ-1606 Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java?rev=632455&r1=632454&r2=632455&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/BrokerService.java Fri Feb 29 13:59:33 2008 @@ -160,6 +160,7 @@ private CountDownLatch stoppedLatch = new CountDownLatch(1); private boolean supportFailOver; private boolean clustered; + private Broker regionBroker; static { @@ -1430,7 +1431,7 @@ * @throws */ protected Broker createBroker() throws Exception { - Broker regionBroker = createRegionBroker(); + regionBroker = createRegionBroker(); Broker broker = addInterceptors(regionBroker); // Add a filter that will stop access to the broker once stopped @@ -1488,7 +1489,7 @@ DestinationInterceptor destinationInterceptor = new CompositeDestinationInterceptor(destinationInterceptors); RegionBroker regionBroker = null; if (destinationFactory == null) { - destinationFactory = new DestinationFactoryImpl(getProducerSystemUsage(), getTaskRunnerFactory(), getPersistenceAdapter()); + destinationFactory = new DestinationFactoryImpl(this, getTaskRunnerFactory(), getPersistenceAdapter()); } if (isUseJmx()) { MBeanServer mbeanServer = getManagementContext().getMBeanServer(); @@ -1796,5 +1797,13 @@ broker.addDestination(adminConnectionContext, destination); } } + } + + public Broker getRegionBroker() { + return regionBroker; + } + + public void setRegionBroker(Broker regionBroker) { + this.regionBroker = regionBroker; } } Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java?rev=632455&r1=632454&r2=632455&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedRegionBroker.java Fri Feb 29 13:59:33 2008 @@ -126,7 +126,7 @@ } protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { - return new ManagedTempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); + return new ManagedTempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } protected Region createTempTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java?rev=632455&r1=632454&r2=632455&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/jmx/ManagedTempQueueRegion.java Fri Feb 29 13:59:33 2008 @@ -19,6 +19,7 @@ import javax.jms.InvalidSelectorException; import javax.management.ObjectName; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.Destination; import org.apache.activemq.broker.region.DestinationFactory; @@ -34,10 +35,10 @@ private final ManagedRegionBroker regionBroker; - public ManagedTempQueueRegion(ManagedRegionBroker regionBroker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, + public ManagedTempQueueRegion(ManagedRegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { - super(regionBroker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); - this.regionBroker = regionBroker; + super(broker, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); + this.regionBroker = broker; } protected Subscription createSubscription(ConnectionContext context, ConsumerInfo info) throws InvalidSelectorException { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=632455&r1=632454&r2=632455&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java Fri Feb 29 13:59:33 2008 @@ -16,7 +16,10 @@ */ package org.apache.activemq.broker.region; +import java.io.IOException; + import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ProducerInfo; @@ -33,8 +36,8 @@ protected final ActiveMQDestination destination; protected final Broker broker; protected final MessageStore store; - protected final SystemUsage systemUsage; - protected final MemoryUsage memoryUsage; + protected SystemUsage systemUsage; + protected MemoryUsage memoryUsage; private boolean producerFlowControl = true; private int maxProducersToAudit=1024; private int maxAuditDepth=2048; @@ -43,36 +46,41 @@ private boolean useCache=true; private int minimumMessageSize=1024; protected final DestinationStatistics destinationStatistics = new DestinationStatistics(); + protected final BrokerService brokerService; /** * @param broker * @param store * @param destination - * @param systemUsage * @param parentStats + * @throws Exception */ - public BaseDestination(Broker broker,MessageStore store,ActiveMQDestination destination, SystemUsage systemUsage,DestinationStatistics parentStats) { - this.broker=broker; + public BaseDestination(BrokerService brokerService,MessageStore store, ActiveMQDestination destination, DestinationStatistics parentStats) throws Exception { + this.brokerService = brokerService; + this.broker=brokerService.getBroker(); this.store=store; this.destination=destination; - this.systemUsage=systemUsage; - this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString()); - this.memoryUsage.setUsagePortion(1.0f); - // Let the store know what usage manager we are using so that he can - // flush messages to disk when usage gets high. - if (store != null) { - store.setMemoryUsage(this.memoryUsage); - } // let's copy the enabled property from the parent DestinationStatistics this.destinationStatistics.setEnabled(parentStats.isEnabled()); this.destinationStatistics.setParent(parentStats); + + this.systemUsage = brokerService.getProducerSystemUsage(); + this.memoryUsage = new MemoryUsage(systemUsage.getMemoryUsage(), destination.toString()); + this.memoryUsage.setUsagePortion(1.0f); } /** * initialize the destination * @throws Exception */ - public abstract void initialize() throws Exception; + public void initialize() throws Exception { + // Let the store know what usage manager we are using so that he can + // flush messages to disk when usage gets high. + if (store != null) { + store.setMemoryUsage(this.memoryUsage); + } + } + /** * @return the producerFlowControl */ Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java?rev=632455&r1=632454&r2=632455&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFactoryImpl.java Fri Feb 29 13:59:33 2008 @@ -22,6 +22,7 @@ import javax.jms.JMSException; import org.apache.activemq.advisory.AdvisorySupport; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.region.policy.PolicyEntry; import org.apache.activemq.command.ActiveMQDestination; @@ -44,13 +45,13 @@ */ public class DestinationFactoryImpl extends DestinationFactory { - protected final SystemUsage memoryManager; protected final TaskRunnerFactory taskRunnerFactory; protected final PersistenceAdapter persistenceAdapter; protected RegionBroker broker; + private final BrokerService brokerService; - public DestinationFactoryImpl(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) { - this.memoryManager = memoryManager; + public DestinationFactoryImpl(BrokerService brokerService, TaskRunnerFactory taskRunnerFactory, PersistenceAdapter persistenceAdapter) { + this.brokerService = brokerService; this.taskRunnerFactory = taskRunnerFactory; if (persistenceAdapter == null) { throw new IllegalArgumentException("null persistenceAdapter"); @@ -76,7 +77,7 @@ if (destination.isQueue()) { if (destination.isTemporary()) { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; - return new Queue(broker.getRoot(), destination, memoryManager, null, destinationStatistics, taskRunnerFactory) { + return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { // Only consumers on the same connection can consume @@ -90,14 +91,14 @@ }; } else { MessageStore store = persistenceAdapter.createQueueMessageStore((ActiveMQQueue)destination); - Queue queue = new Queue(broker.getRoot(), destination, memoryManager, store, destinationStatistics, taskRunnerFactory); + Queue queue = new Queue(brokerService, destination, store, destinationStatistics, taskRunnerFactory); configureQueue(queue, destination); queue.initialize(); return queue; } } else if (destination.isTemporary()) { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; - return new Topic(broker.getRoot(), destination, null, memoryManager, destinationStatistics, taskRunnerFactory) { + return new Topic(brokerService, destination, null, destinationStatistics, taskRunnerFactory) { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { // Only consumers on the same connection can consume from @@ -113,7 +114,7 @@ if (!AdvisorySupport.isAdvisoryTopic(destination)) { store = persistenceAdapter.createTopicMessageStore((ActiveMQTopic)destination); } - Topic topic = new Topic(broker.getRoot(), destination, store, memoryManager, destinationStatistics, taskRunnerFactory); + Topic topic = new Topic(brokerService, destination, store, destinationStatistics, taskRunnerFactory); configureTopic(topic, destination); topic.initialize(); return topic; Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=632455&r1=632454&r2=632455&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java Fri Feb 29 13:59:33 2008 @@ -32,7 +32,7 @@ import javax.jms.InvalidSelectorException; import javax.jms.JMSException; -import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.cursors.PendingMessageCursor; @@ -65,7 +65,6 @@ import org.apache.activemq.thread.TaskRunner; import org.apache.activemq.thread.TaskRunnerFactory; import org.apache.activemq.transaction.Synchronization; -import org.apache.activemq.usage.SystemUsage; import org.apache.activemq.util.BrokerSupport; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -106,9 +105,9 @@ } }; - public Queue(Broker broker, final ActiveMQDestination destination, final SystemUsage systemUsage,MessageStore store,DestinationStatistics parentStats, + public Queue(BrokerService brokerService, final ActiveMQDestination destination, MessageStore store,DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { - super(broker, store, destination,systemUsage, parentStats); + super(brokerService, store, destination, parentStats); if (destination.isTemporary() || broker == null || store==null ) { this.messages = new VMPendingMessageCursor(); @@ -130,8 +129,17 @@ this.dispatchSelector=new QueueDispatchSelector(destination); } - + public void initialize() throws Exception { + // If a VMPendingMessageCursor don't use the default Producer System Usage + // since it turns into a shared blocking queue which can lead to a network deadlock. + // If we are ccursoring to disk..it's not and issue because it does not block due + // to large disk sizes. + if( messages instanceof VMPendingMessageCursor ) { + this.systemUsage = brokerService.getSystemUsage(); + memoryUsage.setParent(systemUsage.getMemoryUsage()); + } + super.initialize(); if (store != null) { // Restore the persistent messages. messages.setSystemUsage(systemUsage); Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=632455&r1=632454&r2=632455&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Fri Feb 29 13:59:33 2008 @@ -82,7 +82,7 @@ private final Region topicRegion; private final Region tempQueueRegion; private final Region tempTopicRegion; - private BrokerService brokerService; + protected BrokerService brokerService; private boolean started; private boolean keepDurableSubsActive; @@ -161,7 +161,7 @@ } protected Region createTempQueueRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { - return new TempQueueRegion(this, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); + return new TempQueueRegion(this, brokerService, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); } protected Region createTopicRegion(SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java?rev=632455&r1=632454&r2=632455&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/TempQueueRegion.java Fri Feb 29 13:59:33 2008 @@ -17,9 +17,8 @@ package org.apache.activemq.broker.region; import javax.jms.InvalidSelectorException; -import javax.jms.JMSException; -import org.apache.activemq.broker.Connection; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.command.ActiveMQDestination; import org.apache.activemq.command.ActiveMQTempDestination; @@ -34,18 +33,20 @@ */ public class TempQueueRegion extends AbstractTempRegion { private static final Log LOG = LogFactory.getLog(TempQueueRegion.class); + private final BrokerService brokerService; - public TempQueueRegion(RegionBroker broker, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, + public TempQueueRegion(RegionBroker broker, BrokerService brokerService, DestinationStatistics destinationStatistics, SystemUsage memoryManager, TaskRunnerFactory taskRunnerFactory, DestinationFactory destinationFactory) { super(broker, destinationStatistics, memoryManager, taskRunnerFactory, destinationFactory); // We should allow the following to be configurable via a Destination // Policy // setAutoCreateDestinations(false); + this.brokerService = brokerService; } protected Destination doCreateDestination(ConnectionContext context, ActiveMQDestination destination) throws Exception { final ActiveMQTempDestination tempDest = (ActiveMQTempDestination)destination; - return new Queue(broker.getRoot(), destination, usageManager, null, destinationStatistics, taskRunnerFactory) { + return new Queue(brokerService, destination, null, destinationStatistics, taskRunnerFactory) { public void addSubscription(ConnectionContext context, Subscription sub) throws Exception { // Only consumers on the same connection can consume from Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=632455&r1=632454&r2=632455&view=diff ============================================================================== --- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java (original) +++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java Fri Feb 29 13:59:33 2008 @@ -25,6 +25,7 @@ import org.apache.activemq.advisory.AdvisorySupport; import org.apache.activemq.broker.Broker; +import org.apache.activemq.broker.BrokerService; import org.apache.activemq.broker.ConnectionContext; import org.apache.activemq.broker.ProducerBrokerExchange; import org.apache.activemq.broker.region.policy.DeadLetterStrategy; @@ -87,9 +88,9 @@ }; - public Topic(Broker broker, ActiveMQDestination destination, TopicMessageStore store, SystemUsage systemUsage, DestinationStatistics parentStats, + public Topic(BrokerService brokerService, ActiveMQDestination destination, TopicMessageStore store, DestinationStatistics parentStats, TaskRunnerFactory taskFactory) throws Exception { - super(broker, store, destination,systemUsage, parentStats); + super(brokerService, store, destination, parentStats); this.topicStore=store; //set default subscription recovery policy if (destination.isTemporary() || AdvisorySupport.isAdvisoryTopic(destination) ){ @@ -102,6 +103,7 @@ } public void initialize() throws Exception{ + super.initialize(); if (store != null) { int messageCount = store.getMessageCount(); destinationStatistics.getMessages().setCount(messageCount);
