First thing to do is switch to the new 3.6.0 release.
http://activemq.apache.org/cms/activemq-cpp-360-release.html
I noticed you are using Stomp which I don't recommened, switching to
openwire will allow you to use the failover transport.
You can call close on the connection from the Exception callback and it
should kick any blocked threads waiting on sends and receives. You
can't delete you connection from that callback though that'd be a bit of
badness. Once the onException method is called your application will
have to tear down its CMS resources and recreate them, you can keep your
ConnectionFactory but the other stuff Session, Connection have to go.
Handling all that is tricky that using the failover transport and
openwire can save you a lot of grief.
On 03/07/2013 09:22 PM, Matevz Tadel wrote:
Hi,
I'm using a single connection to an AMQ topic in a multi-threaded
program that does many other things beyond sending data over AMQ. So,
when an AMQ onException() happens I really do not want to exit the
application (all demos seem to have "// exit(1);" in this handler :) )
and would like to reinitialize my single connection even if this means
leaking some resources.
I'm getting exceptions like this since switching to 3.5.0 (but the
load on application is getting higher so it doesn't have to be a
change introduced in 3.5.0):
2013-03-06 01:51:51 ERR XrdFileCloseReporterAmq::onException Exception
callback invoked:
DataInputStream::readLong - Reached EOF
and the application now crashes when I close the connection (before
this used to trigger throwing of a true exception in my application
thread that does AMQ calls).
Please see my connect and onException() implementations below. Full
class is available here:
http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.h?revision=2860&view=markup
http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.cxx?revision=2862&view=markup
I guess the question is ... is there anything sane one can do after
onException() is invoked or this actually means internal state of
amq-cpp is corrupted beyond repair? If this is indeed the case, the
only option for me is to separate sending of AMQ messages from the
main application.
Best,
Matevz
void XrdFileCloseReporterAmq::amq_connect()
{
static const Exc_t _eh("XrdFileCloseReporterAmq::amq_connect ");
TString uri;
// 2012-07 Failover and Stomp don't splice ... or, they splice too
well, making as
// many threads as ulimit lets them on first error ... thrashing the
machine.
// uri.Form("failover://(tcp://%s:%hu?wireFormat=stomp)",
mAmqHost.Data(), mAmqPort);
uri.Form("tcp://%s:%hu?wireFormat=stomp", mAmqHost.Data(), mAmqPort);
try
{
auto_ptr<cms::ConnectionFactory> conn_fac
(new activemq::core::ActiveMQConnectionFactory(uri.Data(),
mAmqUser.Data(), mAmqPswd.Data()));
mConn = conn_fac->createConnection();
mConn->setExceptionListener(this);
mConn->start();
}
catch (cms::CMSException& e)
{
throw _eh + "Exception during connection creation: " +
e.getStackTraceString();
}
try
{
mSess = mConn->createSession(); // Default is AUTO_ACKNOWLEDGE
mDest = mSess->createTopic(mAmqTopic.Data());
mProd = mSess->createProducer(mDest);
mProd->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT); //
Copied from examples, NFI.
}
catch (cms::InvalidDestinationException& e)
{
throw _eh + "Invalid destination exception during producer
creation: " + e.getStackTraceString();
}
catch (cms::CMSException& e)
{
throw _eh + "Exception during session, topic or message producer
initialization: " + e.getStackTraceString();
}
}
void XrdFileCloseReporterAmq::onException(const cms::CMSException &e)
{
static const Exc_t _eh("XrdFileCloseReporterAmq::onException ");
if (*mLog)
mLog->Form(ZLog::L_Error, _eh, "Exception callback invoked:\n
%s",
e.getStackTraceString().c_str());
cms::Connection *conn = mConn;
mConn = 0;
// This somehow results in proper exception getting thrown in the
amq thread.
// Go figure ... zen engineering or just a bug?
//
// 2013-03-07: Argh ... and this apparently crashes with
activemq-cpp-3.5.0!
// What to do?
// Use mConn == 0 in main thread as a signal things should be reopened.
// But do no call close :)
if (conn)
{
conn->close();
delete conn;
}
}
--
Tim Bish
Sr Software Engineer | RedHat Inc.
[email protected] | www.fusesource.com | www.redhat.com
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/