Author: rajdavies
Date: Wed Jan 16 11:03:02 2008
New Revision: 612542

URL: http://svn.apache.org/viewvc?rev=612542&view=rev
Log:
Add producers to DestinationStatistics

Modified:
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/advisory/AdvisoryBroker.java
 Wed Jan 16 11:03:02 2008
@@ -277,7 +277,17 @@
 
     protected void fireProducerAdvisory(ConnectionContext context, 
ActiveMQDestination producerDestination,ActiveMQTopic topic, Command command, 
ConsumerId targetConsumerId) throws Exception {
         ActiveMQMessage advisoryMessage = new ActiveMQMessage();
-        advisoryMessage.setIntProperty("producerCount", producers.size());
+        int count = 0;
+        if (producerDestination != null) {
+            Set<Destination> set = getDestinations(producerDestination);
+            if (set != null) {
+                for (Destination dest : set) {
+                    count += dest.getDestinationStatistics().getConsumers()
+                            .getCount();
+                }
+            }
+        }
+        advisoryMessage.setIntProperty("producerCount", count);
         fireAdvisory(context, topic, command, targetConsumerId, 
advisoryMessage);
     }
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/AbstractRegion.java
 Wed Jan 16 11:03:02 2008
@@ -37,6 +37,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 import org.apache.activemq.filter.DestinationFilter;
@@ -408,5 +409,26 @@
     public void setAutoCreateDestinations(boolean autoCreateDestinations) {
         this.autoCreateDestinations = autoCreateDestinations;
     }
+    
+    public void addProducer(ConnectionContext context, ProducerInfo info) 
throws Exception{
+        for (Iterator iter = 
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
+            Destination dest = (Destination)iter.next();
+            dest.addProducer(context, info);
+        }
+    }
+
+    /**
+     * Removes a Producer.
+     * @param context the environment the operation is being executed under.
+     * @throws Exception TODO
+     */
+    public void removeProducer(ConnectionContext context, ProducerInfo info) 
throws Exception{
+        for (Iterator iter = 
destinationMap.get(info.getDestination()).iterator(); iter.hasNext();) {
+            Destination dest = (Destination)iter.next();
+            dest.removeProducer(context, info);
+        }
+    }
+
+
 
 }

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/BaseDestination.java
 Wed Jan 16 11:03:02 2008
@@ -16,6 +16,9 @@
  */
 package org.apache.activemq.broker.region;
 
+import org.apache.activemq.broker.ConnectionContext;
+import org.apache.activemq.command.ProducerInfo;
+
 
 /**
  * @version $Revision: 1.12 $
@@ -26,6 +29,7 @@
     private int maxProducersToAudit=1024;
     private int maxAuditDepth=1;
     private boolean enableAudit=true;
+    protected final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
     /**
      * @return the producerFlowControl
      */
@@ -73,6 +77,14 @@
      */
     public void setEnableAudit(boolean enableAudit) {
         this.enableAudit = enableAudit;
+    }
+    
+    public void addProducer(ConnectionContext context, ProducerInfo info) 
throws Exception{
+        destinationStatistics.getProducers().increment();
+    }
+
+    public void removeProducer(ConnectionContext context, ProducerInfo info) 
throws Exception{
+        destinationStatistics.getProducers().decrement();
     }
 
     

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Destination.java
 Wed Jan 16 11:03:02 2008
@@ -25,6 +25,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -37,6 +38,10 @@
     void addSubscription(ConnectionContext context, Subscription sub) throws 
Exception;
 
     void removeSubscription(ConnectionContext context, Subscription sub) 
throws Exception;
+    
+    void addProducer(ConnectionContext context, ProducerInfo info) throws 
Exception;
+
+    void removeProducer(ConnectionContext context, ProducerInfo info) throws 
Exception;
 
     void send(ProducerBrokerExchange producerExchange, Message messageSend) 
throws Exception;
 

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationFilter.java
 Wed Jan 16 11:03:02 2008
@@ -27,6 +27,7 @@
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageAck;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.store.MessageStore;
 import org.apache.activemq.usage.MemoryUsage;
 import org.apache.activemq.usage.SystemUsage;
@@ -128,6 +129,17 @@
     
     public void setProducerFlowControl(boolean value){
         next.setProducerFlowControl(value);
+    }
+
+    public void addProducer(ConnectionContext context, ProducerInfo info)
+            throws Exception {
+        next.addProducer(context, info);
+        
+    }
+
+    public void removeProducer(ConnectionContext context, ProducerInfo info)
+            throws Exception {
+       next.removeProducer(context, info);
     }
     
     

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/DestinationStatistics.java
 Wed Jan 16 11:03:02 2008
@@ -32,6 +32,7 @@
     protected CountStatisticImpl enqueues;
     protected CountStatisticImpl dequeues;
     protected CountStatisticImpl consumers;
+    protected CountStatisticImpl producers;
     protected CountStatisticImpl messages;
     protected PollCountStatisticImpl messagesCached;
     protected CountStatisticImpl dispatched;
@@ -43,6 +44,7 @@
         dispatched = new CountStatisticImpl("dispatched", "The number of 
messages that have been dispatched from the destination");
         dequeues = new CountStatisticImpl("dequeues", "The number of messages 
that have been acknowledged from the destination");
         consumers = new CountStatisticImpl("consumers", "The number of 
consumers that that are subscribing to messages from the destination");
+        producers = new CountStatisticImpl("producers", "The number of 
producers that that are publishing messages to the destination");
         messages = new CountStatisticImpl("messages", "The number of messages 
that that are being held by the destination");
         messagesCached = new PollCountStatisticImpl("messagesCached", "The 
number of messages that are held in the destination's memory cache");
         processTime = new TimeStatisticImpl("processTime", "information around 
length of time messages are held by a destination");
@@ -50,6 +52,7 @@
         addStatistic("dispatched", dispatched);
         addStatistic("dequeues", dequeues);
         addStatistic("consumers", consumers);
+        addStatistic("prodcuers", producers);
         addStatistic("messages", messages);
         addStatistic("messagesCached", messagesCached);
         addStatistic("processTime", processTime);
@@ -66,6 +69,10 @@
     public CountStatisticImpl getConsumers() {
         return consumers;
     }
+    
+    public CountStatisticImpl getProducers() {
+        return producers;
+    }
 
     public PollCountStatisticImpl getMessagesCached() {
         return messagesCached;
@@ -100,6 +107,7 @@
         dispatched.setEnabled(enabled);
         dequeues.setEnabled(enabled);
         consumers.setEnabled(enabled);
+        producers.setEnabled(enabled);
         messages.setEnabled(enabled);
         messagesCached.setEnabled(enabled);
         processTime.setEnabled(enabled);
@@ -112,6 +120,7 @@
             dispatched.setParent(parent.dispatched);
             dequeues.setParent(parent.dequeues);
             consumers.setParent(parent.consumers);
+            producers.setParent(parent.producers);
             messagesCached.setParent(parent.messagesCached);
             messages.setParent(parent.messages);
             processTime.setParent(parent.processTime);
@@ -120,6 +129,7 @@
             dispatched.setParent(null);
             dequeues.setParent(null);
             consumers.setParent(null);
+            producers.setParent(null);
             messagesCached.setParent(null);
             messages.setParent(null);
             processTime.setParent(null);

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Queue.java
 Wed Jan 16 11:03:02 2008
@@ -79,7 +79,6 @@
     private final List<Subscription> consumers = new 
ArrayList<Subscription>(50);
     private final SystemUsage systemUsage;
     private final MemoryUsage memoryUsage;
-    private final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
     private PendingMessageCursor messages;
     private final LinkedList<MessageReference> pagedInMessages = new 
LinkedList<MessageReference>();
     private LockOwner exclusiveOwner;

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Region.java
 Wed Jan 16 11:03:02 2008
@@ -29,6 +29,7 @@
 import org.apache.activemq.command.MessageAck;
 import org.apache.activemq.command.MessageDispatchNotification;
 import org.apache.activemq.command.MessagePull;
+import org.apache.activemq.command.ProducerInfo;
 import org.apache.activemq.command.RemoveSubscriptionInfo;
 import org.apache.activemq.command.Response;
 
@@ -86,6 +87,21 @@
      * @throws Exception TODO
      */
     void removeConsumer(ConnectionContext context, ConsumerInfo info) throws 
Exception;
+    
+    /**
+     * Adds a Producer.
+     * @param context the environment the operation is being executed under.
+     * @throws Exception TODO
+     */
+    void addProducer(ConnectionContext context, ProducerInfo info) throws 
Exception;
+
+    /**
+     * Removes a Producer.
+     * @param context the environment the operation is being executed under.
+     * @throws Exception TODO
+     */
+    void removeProducer(ConnectionContext context, ProducerInfo info) throws 
Exception;
+
 
     /**
      * Deletes a durable subscription.

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
 Wed Jan 16 11:03:02 2008
@@ -322,10 +322,45 @@
     public void removeSession(ConnectionContext context, SessionInfo info) 
throws Exception {
     }
 
-    public void addProducer(ConnectionContext context, ProducerInfo info) 
throws Exception {
+    public void addProducer(ConnectionContext context, ProducerInfo info)
+            throws Exception {
+        ActiveMQDestination destination = info.getDestination();
+        if (destination != null) {
+            switch (destination.getDestinationType()) {
+            case ActiveMQDestination.QUEUE_TYPE:
+                queueRegion.addProducer(context, info);
+                break;
+            case ActiveMQDestination.TOPIC_TYPE:
+                topicRegion.addProducer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                tempQueueRegion.addProducer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                tempTopicRegion.addProducer(context, info);
+                break;
+            }
+        }
     }
 
     public void removeProducer(ConnectionContext context, ProducerInfo info) 
throws Exception {
+        ActiveMQDestination destination = info.getDestination();
+        if (destination != null) {
+            switch (destination.getDestinationType()) {
+            case ActiveMQDestination.QUEUE_TYPE:
+                queueRegion.removeProducer(context, info);
+                break;
+            case ActiveMQDestination.TOPIC_TYPE:
+                topicRegion.removeProducer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_QUEUE_TYPE:
+                tempQueueRegion.removeProducer(context, info);
+                break;
+            case ActiveMQDestination.TEMP_TOPIC_TYPE:
+                tempTopicRegion.removeProducer(context, info);
+                break;
+            }
+        }
     }
 
     public Subscription addConsumer(ConnectionContext context, ConsumerInfo 
info) throws Exception {

Modified: 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
URL: 
http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java?rev=612542&r1=612541&r2=612542&view=diff
==============================================================================
--- 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 (original)
+++ 
activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/Topic.java
 Wed Jan 16 11:03:02 2008
@@ -76,8 +76,7 @@
     protected final TopicMessageStore store;
     private final SystemUsage systemUsage;
     private final MemoryUsage memoryUsage;
-    protected final DestinationStatistics destinationStatistics = new 
DestinationStatistics();
-
+   
     private DispatchPolicy dispatchPolicy = new SimpleDispatchPolicy();
     private SubscriptionRecoveryPolicy subscriptionRecoveryPolicy;
     private boolean sendAdvisoryIfNoConsumers;


Reply via email to