Author: tross
Date: Wed Jun 13 14:13:27 2012
New Revision: 1349865

URL: http://svn.apache.org/viewvc?rev=1349865&view=rev
Log:
QPID-4061 - Added statistic: Number of unacknowledged messages in a session

Modified:
    qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
    qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
    
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
    qpid/trunk/qpid/specs/management-schema.xml
    qpid/trunk/qpid/tools/src/py/qpid-stat

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jun 13 14:13:27 
2012
@@ -142,6 +142,7 @@ bool SemanticState::cancel(const string&
         DeliveryRecords::iterator removed =
             remove_if(unacked.begin(), unacked.end(), 
bind(&DeliveryRecord::isRedundant, _1));
         unacked.erase(removed, unacked.end());
+        getSession().setUnackedCount(unacked.size());
         return true;
     } else {
         return false;
@@ -270,6 +271,7 @@ void SemanticState::checkDtxTimeout()
 void SemanticState::record(const DeliveryRecord& delivery)
 {
     unacked.push_back(delivery);
+    getSession().setUnackedCount(unacked.size());
 }
 
 const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -555,6 +557,7 @@ void SemanticState::recover(bool requeue
         //w.r.t id is lost
         sort(unacked.begin(), unacked.end());
     }
+    getSession().setUnackedCount(unacked.size());
 }
 
 void SemanticState::deliver(DeliveryRecord& msg, bool sync)
@@ -712,6 +715,7 @@ void SemanticState::release(DeliveryId f
     DeliveryRecords::iterator removed =
         remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant, 
_1));
     unacked.erase(removed, range.end);
+    getSession().setUnackedCount(unacked.size());
 }
 
 void SemanticState::reject(DeliveryId first, DeliveryId last)
@@ -723,6 +727,7 @@ void SemanticState::reject(DeliveryId fi
         if (i->isRedundant()) i = unacked.erase(i);
         else i++;
     }
+    getSession().setUnackedCount(unacked.size());
 }
 
 bool SemanticState::ConsumerImpl::doOutput()
@@ -810,6 +815,7 @@ void SemanticState::accepted(const Seque
                                               (TransactionContext*) 0)));
         unacked.erase(removed, unacked.end());
     }
+    getSession().setUnackedCount(unacked.size());
 }
 
 void SemanticState::completed(const SequenceSet& commands) {
@@ -819,6 +825,7 @@ void SemanticState::completed(const Sequ
                                      bind(&SemanticState::complete, this, 
_1)));
     unacked.erase(removed, unacked.end());
     requestDispatch();
+    getSession().setUnackedCount(unacked.size());
 }
 
 void SemanticState::attached()

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Wed Jun 13 14:13:27 
2012
@@ -47,6 +47,7 @@ class SessionContext : public OwnershipT
     virtual uint16_t getChannel() const = 0;
     virtual const SessionId& getSessionId() const = 0;
     virtual void addPendingExecutionSync() = 0;
+    virtual void setUnackedCount(uint64_t) {}
 };
 
 }} // namespace qpid::broker

Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Jun 13 14:13:27 2012
@@ -126,6 +126,11 @@ class SessionState : public qpid::Sessio
     // the SessionState of a received Execution.Sync command.
     void addPendingExecutionSync();
 
+    void setUnackedCount(uint64_t count) {
+        if (mgmtObject)
+            mgmtObject->set_unackedMessages(count);
+    }
+
     // Used to delay creation of management object for sessions
     // belonging to inter-broker bridges
     void addManagementObject();

Modified: 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java 
(original)
+++ 
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java 
Wed Jun 13 14:13:27 2012
@@ -1741,6 +1741,12 @@ public class QMFService implements Confi
             return 0l;
         }
 
+        public Long getUnackedMessages()
+        {
+            // TODO
+            return 0l;
+        }
+
         public Long getTxnStarts()
         {
             return _obj.getTxnStarts();

Modified: qpid/trunk/qpid/specs/management-schema.xml
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Wed Jun 13 14:13:27 2012
@@ -443,7 +443,7 @@
     <property name="expireTime"       type="absTime" access="RO" optional="y"/>
     <property name="maxClientRate"    type="uint32"  access="RO" 
unit="msgs/sec" optional="y"/>
 
-    <statistic name="framesOutstanding" type="count32"/>
+    <statistic name="unackedMessages" type="uint64" unit="message" 
desc="Unacknowledged messages in the session"/>
 
     <statistic name="TxnStarts"    type="count64"  unit="transaction" 
desc="Total transactions started "/>
     <statistic name="TxnCommits"   type="count64"  unit="transaction" 
desc="Total transactions committed"/>

Modified: qpid/trunk/qpid/tools/src/py/qpid-stat
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-stat?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-stat (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-stat Wed Jun 13 14:13:27 2012
@@ -427,7 +427,8 @@ class BrokerManager:
         heads.append(Header("acked", Header.Y))
         heads.append(Header("excl", Header.Y))
         heads.append(Header("creditMode"))
-        heads.append(Header("delivered", Header.KMG))
+        heads.append(Header("delivered", Header.COMMAS))
+        heads.append(Header("sessUnacked", Header.COMMAS))
         rows = []
         subscriptions = self.broker.getAllSubscriptions()
         sessions = self.getSessionMap()
@@ -447,6 +448,7 @@ class BrokerManager:
                 row.append(s.exclusive)
                 row.append(s.creditMode)
                 row.append(s.delivered)
+                row.append(session.unackedMessages)
                 rows.append(row)
             except:
                 pass



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to