Author: tross
Date: Wed Jun 13 14:13:27 2012
New Revision: 1349865
URL: http://svn.apache.org/viewvc?rev=1349865&view=rev
Log:
QPID-4061 - Added statistic: Number of unacknowledged messages in a session
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
qpid/trunk/qpid/specs/management-schema.xml
qpid/trunk/qpid/tools/src/py/qpid-stat
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SemanticState.cpp Wed Jun 13 14:13:27
2012
@@ -142,6 +142,7 @@ bool SemanticState::cancel(const string&
DeliveryRecords::iterator removed =
remove_if(unacked.begin(), unacked.end(),
bind(&DeliveryRecord::isRedundant, _1));
unacked.erase(removed, unacked.end());
+ getSession().setUnackedCount(unacked.size());
return true;
} else {
return false;
@@ -270,6 +271,7 @@ void SemanticState::checkDtxTimeout()
void SemanticState::record(const DeliveryRecord& delivery)
{
unacked.push_back(delivery);
+ getSession().setUnackedCount(unacked.size());
}
const std::string QPID_SYNC_FREQUENCY("qpid.sync_frequency");
@@ -555,6 +557,7 @@ void SemanticState::recover(bool requeue
//w.r.t id is lost
sort(unacked.begin(), unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::deliver(DeliveryRecord& msg, bool sync)
@@ -712,6 +715,7 @@ void SemanticState::release(DeliveryId f
DeliveryRecords::iterator removed =
remove_if(range.start, range.end, bind(&DeliveryRecord::isRedundant,
_1));
unacked.erase(removed, range.end);
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::reject(DeliveryId first, DeliveryId last)
@@ -723,6 +727,7 @@ void SemanticState::reject(DeliveryId fi
if (i->isRedundant()) i = unacked.erase(i);
else i++;
}
+ getSession().setUnackedCount(unacked.size());
}
bool SemanticState::ConsumerImpl::doOutput()
@@ -810,6 +815,7 @@ void SemanticState::accepted(const Seque
(TransactionContext*) 0)));
unacked.erase(removed, unacked.end());
}
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::completed(const SequenceSet& commands) {
@@ -819,6 +825,7 @@ void SemanticState::completed(const Sequ
bind(&SemanticState::complete, this,
_1)));
unacked.erase(removed, unacked.end());
requestDispatch();
+ getSession().setUnackedCount(unacked.size());
}
void SemanticState::attached()
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionContext.h Wed Jun 13 14:13:27
2012
@@ -47,6 +47,7 @@ class SessionContext : public OwnershipT
virtual uint16_t getChannel() const = 0;
virtual const SessionId& getSessionId() const = 0;
virtual void addPendingExecutionSync() = 0;
+ virtual void setUnackedCount(uint64_t) {}
};
}} // namespace qpid::broker
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Wed Jun 13 14:13:27 2012
@@ -126,6 +126,11 @@ class SessionState : public qpid::Sessio
// the SessionState of a received Execution.Sync command.
void addPendingExecutionSync();
+ void setUnackedCount(uint64_t count) {
+ if (mgmtObject)
+ mgmtObject->set_unackedMessages(count);
+ }
+
// Used to delay creation of management object for sessions
// belonging to inter-broker bridges
void addManagementObject();
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
---
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
(original)
+++
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/qmf/QMFService.java
Wed Jun 13 14:13:27 2012
@@ -1741,6 +1741,12 @@ public class QMFService implements Confi
return 0l;
}
+ public Long getUnackedMessages()
+ {
+ // TODO
+ return 0l;
+ }
+
public Long getTxnStarts()
{
return _obj.getTxnStarts();
Modified: qpid/trunk/qpid/specs/management-schema.xml
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Wed Jun 13 14:13:27 2012
@@ -443,7 +443,7 @@
<property name="expireTime" type="absTime" access="RO" optional="y"/>
<property name="maxClientRate" type="uint32" access="RO"
unit="msgs/sec" optional="y"/>
- <statistic name="framesOutstanding" type="count32"/>
+ <statistic name="unackedMessages" type="uint64" unit="message"
desc="Unacknowledged messages in the session"/>
<statistic name="TxnStarts" type="count64" unit="transaction"
desc="Total transactions started "/>
<statistic name="TxnCommits" type="count64" unit="transaction"
desc="Total transactions committed"/>
Modified: qpid/trunk/qpid/tools/src/py/qpid-stat
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/tools/src/py/qpid-stat?rev=1349865&r1=1349864&r2=1349865&view=diff
==============================================================================
--- qpid/trunk/qpid/tools/src/py/qpid-stat (original)
+++ qpid/trunk/qpid/tools/src/py/qpid-stat Wed Jun 13 14:13:27 2012
@@ -427,7 +427,8 @@ class BrokerManager:
heads.append(Header("acked", Header.Y))
heads.append(Header("excl", Header.Y))
heads.append(Header("creditMode"))
- heads.append(Header("delivered", Header.KMG))
+ heads.append(Header("delivered", Header.COMMAS))
+ heads.append(Header("sessUnacked", Header.COMMAS))
rows = []
subscriptions = self.broker.getAllSubscriptions()
sessions = self.getSessionMap()
@@ -447,6 +448,7 @@ class BrokerManager:
row.append(s.exclusive)
row.append(s.creditMode)
row.append(s.delivered)
+ row.append(session.unackedMessages)
rows.append(row)
except:
pass
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]