First thing to do is switch to the new 3.6.0 release.
http://activemq.apache.org/cms/activemq-cpp-360-release.html

I noticed you are using Stomp which I don't recommened, switching to openwire will allow you to use the failover transport.

You can call close on the connection from the Exception callback and it should kick any blocked threads waiting on sends and receives. You can't delete you connection from that callback though that'd be a bit of badness. Once the onException method is called your application will have to tear down its CMS resources and recreate them, you can keep your ConnectionFactory but the other stuff Session, Connection have to go. Handling all that is tricky that using the failover transport and openwire can save you a lot of grief.

On 03/07/2013 09:22 PM, Matevz Tadel wrote:
Hi,

I'm using a single connection to an AMQ topic in a multi-threaded program that does many other things beyond sending data over AMQ. So, when an AMQ onException() happens I really do not want to exit the application (all demos seem to have "// exit(1);" in this handler :) ) and would like to reinitialize my single connection even if this means leaking some resources.

I'm getting exceptions like this since switching to 3.5.0 (but the load on application is getting higher so it doesn't have to be a change introduced in 3.5.0): 2013-03-06 01:51:51 ERR XrdFileCloseReporterAmq::onException Exception callback invoked:
    DataInputStream::readLong - Reached EOF
and the application now crashes when I close the connection (before this used to trigger throwing of a true exception in my application thread that does AMQ calls).

Please see my connect and onException() implementations below. Full class is available here: http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.h?revision=2860&view=markup http://www.gled.org/viewvc/gled/trunk/libsets/XrdMon/Glasses/XrdFileCloseReporterAmq.cxx?revision=2862&view=markup

I guess the question is ... is there anything sane one can do after onException() is invoked or this actually means internal state of amq-cpp is corrupted beyond repair? If this is indeed the case, the only option for me is to separate sending of AMQ messages from the main application.

Best,
Matevz


void XrdFileCloseReporterAmq::amq_connect()
{
  static const Exc_t _eh("XrdFileCloseReporterAmq::amq_connect ");

  TString uri;

// 2012-07 Failover and Stomp don't splice ... or, they splice too well, making as // many threads as ulimit lets them on first error ... thrashing the machine. // uri.Form("failover://(tcp://%s:%hu?wireFormat=stomp)", mAmqHost.Data(), mAmqPort);

  uri.Form("tcp://%s:%hu?wireFormat=stomp", mAmqHost.Data(), mAmqPort);

  try
  {
    auto_ptr<cms::ConnectionFactory> conn_fac
(new activemq::core::ActiveMQConnectionFactory(uri.Data(), mAmqUser.Data(), mAmqPswd.Data()));

    mConn = conn_fac->createConnection();
    mConn->setExceptionListener(this);
    mConn->start();
  }
  catch (cms::CMSException& e)
  {
throw _eh + "Exception during connection creation: " + e.getStackTraceString();
  }

  try
  {
    mSess = mConn->createSession(); // Default is AUTO_ACKNOWLEDGE
    mDest = mSess->createTopic(mAmqTopic.Data());
    mProd = mSess->createProducer(mDest);
mProd->setDeliveryMode(cms::DeliveryMode::NON_PERSISTENT); // Copied from examples, NFI.
  }
  catch (cms::InvalidDestinationException& e)
  {
throw _eh + "Invalid destination exception during producer creation: " + e.getStackTraceString();
  }
  catch (cms::CMSException& e)
  {
throw _eh + "Exception during session, topic or message producer initialization: " + e.getStackTraceString();
  }
}


void XrdFileCloseReporterAmq::onException(const cms::CMSException &e)
{
  static const Exc_t _eh("XrdFileCloseReporterAmq::onException ");

  if (*mLog)
mLog->Form(ZLog::L_Error, _eh, "Exception callback invoked:\n %s",
         e.getStackTraceString().c_str());

  cms::Connection *conn = mConn;
  mConn = 0;

// This somehow results in proper exception getting thrown in the amq thread.
  // Go figure ... zen engineering or just a bug?
  //
// 2013-03-07: Argh ... and this apparently crashes with activemq-cpp-3.5.0!
  // What to do?
  // Use mConn == 0 in main thread as a signal things should be reopened.
  // But do no call close :)
  if (conn)
  {
    conn->close();
    delete conn;
  }
}





--
Tim Bish
Sr Software Engineer | RedHat Inc.
[email protected] | www.fusesource.com | www.redhat.com
skype: tabish121 | twitter: @tabish121
blog: http://timbish.blogspot.com/

Reply via email to