Author: astitcher
Date: Tue Feb 3 20:41:04 2009
New Revision: 740433
URL: http://svn.apache.org/viewvc?rev=740433&view=rev
Log:
Add in management statistics for client flow control.
Really fixed Client library to count credit the same way the broker does.
Modified:
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
qpid/trunk/qpid/specs/management-schema.xml
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.cpp Tue Feb 3 20:41:04
2009
@@ -63,6 +63,14 @@
mgmtObject(0),
rateFlowcontrol(0)
{
+ uint32_t maxRate = broker.getOptions().maxSessionRate;
+ if (maxRate) {
+ if (handler->getConnection().getClientThrottling()) {
+ rateFlowcontrol = new RateFlowcontrol(maxRate);
+ } else {
+ QPID_LOG(warning, getId() << ": Unable to flow control client -
client doesn't support");
+ }
+ }
Manageable* parent = broker.GetVhostObject ();
if (parent != 0) {
ManagementAgent* agent = ManagementAgent::Singleton::getInstance();
@@ -72,25 +80,18 @@
mgmtObject->set_attached (0);
mgmtObject->set_detachedLifespan (0);
mgmtObject->clr_expireTime();
+ if (rateFlowcontrol) mgmtObject->set_maxClientRate(maxRate);
ManagementBroker* mb = dynamic_cast<ManagementBroker*>(agent);
agent->addObject (mgmtObject, mb ? mb->allocateId(this) : 0);
}
}
- uint32_t maxRate = broker.getOptions().maxSessionRate;
- if (maxRate) {
- if (handler->getConnection().getClientThrottling()) {
- rateFlowcontrol = new RateFlowcontrol(maxRate);
- } else {
- QPID_LOG(warning, getId() << ": Unable to flow control client -
client doesn't support");
- }
- }
attach(h);
}
SessionState::~SessionState() {
if (mgmtObject != 0)
mgmtObject->resourceDestroy ();
-
+
if (flowControlTimer)
flowControlTimer->cancel();
}
@@ -213,14 +214,7 @@
void fire() {
// This is the best we can currently do to avoid a destruction/fire
race
if (!isCancelled()) {
- // Send credit
- AbsTime now = AbsTime::now();
- uint32_t sendCredit = flowControl.receivedMessage(now, 0);
- if ( sendCredit>0 ) {
- QPID_LOG(debug, sessionState.getId() << ": send producer
credit " << sendCredit);
- sessionState.getProxy().getMessage().flow("", 0, sendCredit);
- flowControl.sentCredit(now, sendCredit);
- } else if ( flowControl.flowStopped() ) {
+ if ( !sessionState.processSendCredit(0) &&
flowControl.flowStopped() ) {
QPID_LOG(warning, sessionState.getId() << ": Reschedule
sending credit");
reset();
timer.add(this);
@@ -270,13 +264,7 @@
// TODO: Probably do message.stop("") first time then disconnect
getProxy().getMessage().stop("");
} else {
- AbsTime now = AbsTime::now();
- uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, 1);
- if ( sendCredit>0 ) {
- QPID_LOG(debug, getId() << ": send producer credit " <<
sendCredit);
- getProxy().getMessage().flow("", 0, sendCredit);
- rateFlowcontrol->sentCredit(now, sendCredit);
- } else if ( rateFlowcontrol->flowStopped() ) {
+ if ( !processSendCredit(1) && rateFlowcontrol->flowStopped() ) {
QPID_LOG(debug, getId() << ": Schedule sending credit");
Timer& timer = getBroker().getTimer();
// Use heuristic for scheduled credit of time for 50 messages,
but not longer than 500ms
@@ -288,6 +276,22 @@
}
}
+bool SessionState::processSendCredit(uint32_t msgs)
+{
+ AbsTime now = AbsTime::now();
+ uint32_t sendCredit = rateFlowcontrol->receivedMessage(now, msgs);
+ if (mgmtObject) mgmtObject->dec_clientCredit(msgs);
+ if ( sendCredit>0 ) {
+ QPID_LOG(debug, getId() << ": send producer credit " << sendCredit);
+ getProxy().getMessage().flow("", 0, sendCredit);
+ rateFlowcontrol->sentCredit(now, sendCredit);
+ if (mgmtObject) mgmtObject->inc_clientCredit(sendCredit);
+ return true;
+ } else {
+ return false;
+ }
+}
+
void SessionState::sendAcceptAndCompletion()
{
if (!accepted.empty()) {
@@ -357,10 +361,12 @@
if (rateFlowcontrol) {
// Issue initial credit - use a heuristic here issue min of 100
messages or 1 secs worth
- QPID_LOG(debug, getId() << ": Issuing producer message credit " <<
std::min(rateFlowcontrol->getRate(), 100U));
+ uint32_t credit = std::min(rateFlowcontrol->getRate(), 100U);
+ QPID_LOG(debug, getId() << ": Issuing producer message credit " <<
credit);
getProxy().getMessage().setFlowMode("", 0);
- getProxy().getMessage().flow("", 0,
std::min(rateFlowcontrol->getRate(), 100U));
- rateFlowcontrol->sentCredit(AbsTime::now(),
std::min(rateFlowcontrol->getRate(), 100U));
+ getProxy().getMessage().flow("", 0, credit);
+ rateFlowcontrol->sentCredit(AbsTime::now(), credit);
+ if (mgmtObject) mgmtObject->inc_clientCredit(credit);
}
}
Modified: qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/broker/SessionState.h Tue Feb 3 20:41:04 2009
@@ -62,7 +62,7 @@
* Broker-side session state includes session's handler chains, which
* may themselves have state.
*/
-class SessionState : public qpid::SessionState,
+class SessionState : public qpid::SessionState,
public SessionContext,
public DeliveryAdapter,
public management::Manageable,
@@ -79,7 +79,7 @@
/** @pre isAttached() */
framing::AMQP_ClientProxy& getProxy();
-
+
/** @pre isAttached() */
ConnectionState& getConnection();
bool isLocal(const ConnectionToken* t) const;
@@ -91,7 +91,7 @@
void giveReadCredit(int32_t);
void senderCompleted(const framing::SequenceSet& ranges);
-
+
void sendCompletion();
//delivery adapter methods:
@@ -108,6 +108,8 @@
SemanticState& getSemanticState() { return semanticState; }
boost::intrusive_ptr<Message> getMessageInProgress() { return
msgBuilder.getMessage(); }
+ bool processSendCredit(uint32_t msgs);
+
private:
void handleCommand(framing::AMQMethodBody* method, const
framing::SequenceNumber& id);
@@ -124,7 +126,7 @@
void sendAcceptAndCompletion();
Broker& broker;
- SessionHandler* handler;
+ SessionHandler* handler;
sys::AbsTime expiry; // Used by SessionManager.
SemanticState semanticState;
SessionAdapter adapter;
@@ -133,7 +135,7 @@
IncompleteMessageList::CompletionListener enqueuedOp;
qmf::org::apache::qpid::broker::Session* mgmtObject;
qpid::framing::SequenceSet accepted;
-
+
// State used for producer flow control (rate limited)
RateFlowcontrol* rateFlowcontrol;
boost::intrusive_ptr<TimerTask> flowControlTimer;
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.cpp Tue Feb 3 20:41:04 2009
@@ -329,6 +329,10 @@
Future SessionImpl::sendCommand(const AMQBody& command, const MethodContent*
content)
{
+ // Only message transfers have content
+ if (content && sendMsgCredit) {
+ sendMsgCredit->acquire();
+ }
Acquire a(sendLock);
SequenceNumber id = nextOut++;
{
@@ -366,7 +370,7 @@
uint64_t data_length = content.getData().length();
if(data_length > 0){
header.setLastSegment(false);
- handleContentOut(header);
+ handleOut(header);
/*Note: end of frame marker included in overhead but not in size*/
const uint32_t frag_size = maxFrameSize - AMQFrame::frameOverhead();
@@ -395,7 +399,7 @@
}
}
} else {
- handleContentOut(header);
+ handleOut(header);
}
}
@@ -448,14 +452,6 @@
sendFrame(frame, true);
}
-void SessionImpl::handleContentOut(AMQFrame& frame) // user thread
-{
- if (sendMsgCredit) {
- sendMsgCredit->acquire();
- }
- sendFrame(frame, true);
-}
-
void SessionImpl::proxyOut(AMQFrame& frame) // network thread
{
//Note: this case is treated slightly differently that command
Modified: qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/client/SessionImpl.h Tue Feb 3 20:41:04 2009
@@ -146,7 +146,6 @@
void handleIn(framing::AMQFrame& frame);
void handleOut(framing::AMQFrame& frame);
- void handleContentOut(framing::AMQFrame& frame);
/**
* Sends session controls. This case is treated slightly
* differently than command frames sent by the application via
Modified: qpid/trunk/qpid/specs/management-schema.xml
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/specs/management-schema.xml?rev=740433&r1=740432&r2=740433&view=diff
==============================================================================
--- qpid/trunk/qpid/specs/management-schema.xml (original)
+++ qpid/trunk/qpid/specs/management-schema.xml Tue Feb 3 20:41:04 2009
@@ -285,6 +285,7 @@
<property name="detachedLifespan" type="uint32" access="RO"
unit="second"/>
<property name="attached" type="bool" access="RO"/>
<property name="expireTime" type="absTime" access="RO" optional="y"/>
+ <property name="maxClientRate" type="uint32" access="RO"
unit="msgs/sec" optional="y"/>
<statistic name="framesOutstanding" type="count32"/>
@@ -293,6 +294,8 @@
<statistic name="TxnRejects" type="count64" unit="transaction"
desc="Total transactions rejected"/>
<statistic name="TxnCount" type="count32" unit="transaction"
desc="Current pending transactions"/>
+ <statistic name="clientCredit" type="count32" unit="message" desc="Client
message credit"/>
+
<method name="solicitAck"/>
<method name="detach"/>
<method name="resetLifespan"/>
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]