Author: mgoulish
Date: Fri Dec 11 15:29:00 2009
New Revision: 889657
URL: http://svn.apache.org/viewvc?rev=889657&view=rev
Log:
Add retry capability to several cpg calls.
First retry is immediate, next one after 10 usec,
then 100 usec, etc ... for 5 retries.
Retry pause maxes out at 0.1 second.
Then give up and report error.
The lack of retry on one of these calls must have
been responsible for several hard-to-reproduce
failures seen over the last month or so.
Modified:
qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp?rev=889657&r1=889656&r2=889657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.cpp Fri Dec 11 15:29:00 2009
@@ -39,6 +39,8 @@
using namespace std;
+
+
Cpg* Cpg::cpgFromHandle(cpg_handle_t handle) {
void* cpg=0;
CPG_CHECK(cpg_context_get(handle, &cpg), "Cannot get CPG instance.");
@@ -46,6 +48,28 @@
return reinterpret_cast<Cpg*>(cpg);
}
+// Applies the same retry-logic to all cpg calls that need it.
+void Cpg::callCpg ( CpgOp & c ) {
+ cpg_error_t result;
+ unsigned int snooze = 10;
+ for ( unsigned int nth_try = 0; nth_try < cpgRetries; ++ nth_try ) {
+ if ( CPG_OK == (result = c.op(handle, & group))) {
+ QPID_LOG(info, c.opName << " successful.");
+ break;
+ }
+ else if ( result == CPG_ERR_TRY_AGAIN ) {
+ QPID_LOG(info, "Retrying " << c.opName );
+ sys::usleep ( snooze );
+ snooze *= 10;
+ snooze = (snooze <= maxCpgRetrySleep) ? snooze : maxCpgRetrySleep;
+ }
+ else break; // Don't retry unless CPG tells us to.
+ }
+
+ if ( result != CPG_OK )
+ CPG_CHECK(result, c.msg(group));
+}
+
// Global callback functions.
void Cpg::globalDeliver (
cpg_handle_t handle,
@@ -129,11 +153,11 @@
void Cpg::join(const std::string& name) {
group = name;
- CPG_CHECK(cpg_join(handle, &group), cantJoinMsg(group));
+ callCpg ( cpgJoinOp );
}
void Cpg::leave() {
- CPG_CHECK(cpg_leave(handle, &group), cantLeaveMsg(group));
+ callCpg ( cpgLeaveOp );
}
@@ -158,7 +182,8 @@
if (!isShutdown) {
QPID_LOG(debug,"Shutting down CPG");
isShutdown=true;
- CPG_CHECK(cpg_finalize(handle), "Error in shutdown of CPG");
+
+ callCpg ( cpgFinalizeOp );
}
}
@@ -201,6 +226,10 @@
return "Cannot join CPG group "+group.str();
}
+std::string Cpg::cantFinalizeMsg(const Name& group) {
+ return "Cannot finalize CPG group "+group.str();
+}
+
std::string Cpg::cantLeaveMsg(const Name& group) {
return "Cannot leave CPG group "+group.str();
}
Modified: qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h?rev=889657&r1=889656&r2=889657&view=diff
==============================================================================
--- qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h (original)
+++ qpid/trunk/qpid/cpp/src/qpid/cluster/Cpg.h Fri Dec 11 15:29:00 2009
@@ -39,6 +39,7 @@
* On error all functions throw Cpg::Exception.
*
*/
+
class Cpg : public sys::IOHandle {
public:
struct Exception : public ::qpid::Exception {
@@ -114,10 +115,73 @@
int getFd();
private:
+
+ // Maximum number of retries for cog functions that can tell
+ // us to "try again later".
+ static const unsigned int cpgRetries = 5;
+
+ // Don't let sleep-time between cpg retries to go above 0.1 second.
+ static const unsigned int maxCpgRetrySleep = 100000;
+
+
+ // Base class for the Cpg operations that need retry capability.
+ struct CpgOp {
+ std::string opName;
+
+ CpgOp ( std::string opName )
+ : opName(opName) { }
+
+ virtual cpg_error_t op ( cpg_handle_t handle, struct cpg_name * ) = 0;
+ virtual std::string msg(const Name&) = 0;
+ virtual ~CpgOp ( ) { }
+ };
+
+
+ struct CpgJoinOp : public CpgOp {
+ CpgJoinOp ( )
+ : CpgOp ( std::string("cpg_join") ) { }
+
+ cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) {
+ return cpg_join ( handle, group );
+ }
+
+ std::string msg(const Name& name) { return cantJoinMsg(name); }
+ };
+
+ struct CpgLeaveOp : public CpgOp {
+ CpgLeaveOp ( )
+ : CpgOp ( std::string("cpg_leave") ) { }
+
+ cpg_error_t op(cpg_handle_t handle, struct cpg_name * group) {
+ return cpg_leave ( handle, group );
+ }
+
+ std::string msg(const Name& name) { return cantLeaveMsg(name); }
+ };
+
+ struct CpgFinalizeOp : public CpgOp {
+ CpgFinalizeOp ( )
+ : CpgOp ( std::string("cpg_finalize") ) { }
+
+ cpg_error_t op(cpg_handle_t handle, struct cpg_name *) {
+ return cpg_finalize ( handle );
+ }
+
+ std::string msg(const Name& name) { return cantFinalizeMsg(name); }
+ };
+
+ // This fn standardizes retry policy across all Cpg ops that need it.
+ void callCpg ( CpgOp & );
+
+ CpgJoinOp cpgJoinOp;
+ CpgLeaveOp cpgLeaveOp;
+ CpgFinalizeOp cpgFinalizeOp;
+
static std::string errorStr(cpg_error_t err, const std::string& msg);
static std::string cantJoinMsg(const Name&);
static std::string cantLeaveMsg(const Name&);
static std::string cantMcastMsg(const Name&);
+ static std::string cantFinalizeMsg(const Name&);
static Cpg* cpgFromHandle(cpg_handle_t);
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:[email protected]