[
https://issues.apache.org/jira/browse/QPID-6397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Pavel Moravec resolved QPID-6397.
---------------------------------
Resolution: Fixed
Fix Version/s: Future
Committed revision 1664313.
> [C++ broker] segfault when processing QMF method during periodic processing
> ---------------------------------------------------------------------------
>
> Key: QPID-6397
> URL: https://issues.apache.org/jira/browse/QPID-6397
> Project: Qpid
> Issue Type: Bug
> Components: C++ Broker
> Affects Versions: 0.31
> Reporter: Pavel Moravec
> Assignee: Pavel Moravec
> Fix For: Future
>
>
> There is a race condition causing segfault when:
> - one thread executes periodic processing with traces enabled (at least for
> qpid::management::ManagementAgent::debugSnapshot)
> - second thread is just processing QMF method from a client
> The root cause is, the first thread iterates through managementObjects map in
> dumpMap (or through newManagementObjects in dumpVector) while the second
> thread is executing moveNewObjects that moves stuff from newManagementObjects
> to managementObjects.
> See backtraces hit (dumpMap shown, hit also dumpVector):
> (gdb) bt # of thread 1
> #0 0x0000003f0c632885 in raise (sig=6) at
> ../nptl/sysdeps/unix/sysv/linux/raise.c:64
> #1 0x0000003f0c634065 in abort () at abort.c:92
> #2 0x0000003f0c62b9fe in __assert_fail_base (fmt=<value optimized out>,
> assertion=0x7ff117bd5c94 "px != 0",
> file=0x7ff117bd5c68 "/usr/include/boost/smart_ptr/shared_ptr.hpp",
> line=<value optimized out>, function=<value optimized out>)
> at assert.c:96
> #3 0x0000003f0c62bac0 in __assert_fail (assertion=0x7ff117bd5c94 "px != 0",
> file=0x7ff117bd5c68 "/usr/include/boost/smart_ptr/shared_ptr.hpp",
> line=418,
> function=0x7ff117bd5e80 "T* boost::shared_ptr< <template-parameter-1-1>
> >::operator->() const [with T = qpid::management::ManagementObject]") at
> assert.c:105
> #4 0x00007ff117947139 in
> boost::shared_ptr<qpid::management::ManagementObject>::operator->() const ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #5 0x00007ff117bafcd9 in qpid::management::(anonymous
> namespace)::dumpMap(std::map<qpid::management::ObjectId,
> boost::shared_ptr<qpid::management::ManagementObject>,
> std::less<qpid::management::ObjectId>,
> std::allocator<std::pair<qpid::management::ObjectId const,
> boost::shared_ptr<qpid::management::ManagementObject> > > > const&) () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #6 0x00007ff117bb06ba in
> qpid::management::ManagementAgent::debugSnapshot(char const*) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #7 0x00007ff117b954ed in
> qpid::management::ManagementAgent::periodicProcessing() ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #8 0x00007ff117bc5c1f in boost::_mfi::mf0<void,
> qpid::management::ManagementAgent>::operator()(qpid::management::ManagementAgent*)
> const () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #9 0x00007ff117bc4384 in void
> boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*>
> >::operator()<boost::_mfi::mf0<void, qpid::management::ManagementAgent>,
> boost::_bi::list0>(boost::_bi::type<void>, boost::_mfi::mf0<void,
> qpid::management::ManagementAgent>&, boost::_bi::list0&, int) () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #10 0x00007ff117bc1269 in boost::_bi::bind_t<void, boost::_mfi::mf0<void,
> qpid::management::ManagementAgent>,
> boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*> >
> >::operator()() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #11 0x00007ff117bbc1e0 in
> boost::detail::function::void_function_obj_invoker0<boost::_bi::bind_t<void,
> boost::_mfi::mf0<void, qpid::management::ManagementAgent>,
> boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*> > >,
> void>::invoke(boost::detail::function::function_buffer&) () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #12 0x00007ff117a5e2af in boost::function0<void>::operator()() const () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #13 0x00007ff117b8e400 in qpid::management::(anonymous
> namespace)::Periodic::fire() ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #14 0x00007ff1173b518f in qpid::sys::TimerTask::fireTask() () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> ---Type <return> to continue, or q <return> to quit---
> #15 0x00007ff1173b63e9 in
> qpid::sys::Timer::fire(boost::intrusive_ptr<qpid::sys::TimerTask>) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #16 0x00007ff1173b5d1d in qpid::sys::Timer::run() () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #17 0x00007ff1173280ef in qpid::sys::(anonymous
> namespace)::runRunnable(void*) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #18 0x0000003f0ce077f1 in start_thread (arg=0x7ff116c8f700) at
> pthread_create.c:301
> #19 0x0000003f0c6e570d in clone () at
> ../sysdeps/unix/sysv/linux/x86_64/clone.S:115
> (gdb) bt # of thread 2
> #0 0x00007ff116ca99c7 in qpid::types::VariantImpl::~VariantImpl() () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidtypes.so.1
> #1 0x00007ff116caf2e9 in qpid::types::Variant::~Variant() () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidtypes.so.1
> #2 0x00007ff117967876 in
> qmf::org::apache::qpid::broker::Connection::mapEncodeValues(std::map<std::basic_string<char,
> std::char_traits<char>, std::allocator<char> >, qpid::types::Variant,
> std::less<std::basic_string<char, std::char_traits<char>,
> std::allocator<char> > >, std::allocator<std::pair<std::basic_string<char,
> std::char_traits<char>, std::allocator<char> > const, qpid::types::Variant> >
> >&, bool, bool) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #3 0x00007ff117bb118d in
> qpid::management::ManagementAgent::DeletedObject::DeletedObject(boost::shared_ptr<qpid::management::ManagementObject>,
> bool, bool) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #4 0x00007ff117b94ec1 in qpid::management::ManagementAgent::moveNewObjects()
> ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #5 0x00007ff117b9e79a in
> qpid::management::ManagementAgent::handleMethodRequest(std::basic_string<char,
> std::char_traits<char>, std::allocator<char> > const&,
> std::basic_string<char, std::char_traits<char>, std::allocator<char> >
> const&, std::basic_string<char, std::char_traits<char>, std::allocator<char>
> > const&, std::basic_string<char, std::char_traits<char>,
> std::allocator<char> > const&, std::basic_string<char,
> std::char_traits<char>, std::allocator<char> > const&, bool) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #6 0x00007ff117bacb70 in
> qpid::management::ManagementAgent::dispatchAgentCommand(qpid::broker::Message&,
> bool) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #7 0x00007ff117b9ccc3 in
> qpid::management::ManagementAgent::dispatchCommand(qpid::broker::Deliverable&,
> std::basic_string<char, std::char_traits<char>, std::allocator<char> >
> const&, qpid::framing::FieldTable const*, bool, int) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #8 0x00007ff117bca08a in
> qpid::broker::ManagementDirectExchange::route(qpid::broker::Deliverable&) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #9 0x00007ff117b49d65 in
> qpid::broker::SemanticState::route(qpid::broker::Message&,
> qpid::broker::Deliverable&) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #10 0x00007ff117b6a8f3 in
> qpid::broker::SessionState::handleContent(qpid::framing::AMQFrame&) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #11 0x00007ff117b6af31 in
> qpid::broker::SessionState::handleIn(qpid::framing::AMQFrame&) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #12 0x00007ff117b72dc1 in
> qpid::framing::Handler<qpid::framing::AMQFrame&>::MemFunRef<qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface,
>
> &(qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface::handleIn(qpid::framing::AMQFrame&))>::handle(qpid::framing::AMQFrame&)
> () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #13 0x00007ff117376ba3 in
> qpid::amqp_0_10::SessionHandler::handleIn(qpid::framing::AMQFrame&) ()
> ---Type <return> to continue, or q <return> to quit---
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #14 0x00007ff117b72dc1 in
> qpid::framing::Handler<qpid::framing::AMQFrame&>::MemFunRef<qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface,
>
> &(qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface::handleIn(qpid::framing::AMQFrame&))>::handle(qpid::framing::AMQFrame&)
> () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #15 0x00007ff117ac640a in
> qpid::framing::Handler<qpid::framing::AMQFrame&>::operator()(qpid::framing::AMQFrame&)
> ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #16 0x00007ff117ac1a8c in
> qpid::broker::ConnectionHandler::handle(qpid::framing::AMQFrame&) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #17 0x00007ff117ab8444 in
> qpid::broker::amqp_0_10::Connection::received(qpid::framing::AMQFrame&) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #18 0x00007ff117a23408 in qpid::amqp_0_10::Connection::decode(char const*,
> unsigned long) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #19 0x00007ff117b34581 in qpid::broker::SecureConnection::decode(char const*,
> unsigned long) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #20 0x00007ff1173abc2f in
> qpid::sys::AsynchIOHandler::readbuff(qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #21 0x00007ff117bd2ce6 in boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler,
> qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*>::operator()(qpid::sys::AsynchIOHandler*,
> qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*) const ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #22 0x00007ff117bd1a24 in void
> boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>,
> boost::arg<1>, boost::arg<2> >::operator()<boost::_mfi::mf2<void,
> qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*>, boost::_bi::list2<qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*&> >(boost::_bi::type<void>,
> boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*>&, boost::_bi::list2<qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*&>&, int) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #23 0x00007ff117bd0eb6 in void boost::_bi::bind_t<void,
> boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*>,
> boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>,
> boost::arg<1>, boost::arg<2> > >::operator()<qpid::sys::AsynchIO,
> qpid::sys::AsynchIOBufferBase*>(qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*&) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #24 0x00007ff117bd018f in
> boost::detail::function::void_function_obj_invoker2<boost::_bi::bind_t<void,
> boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*>,
> boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>,
> boost::arg<1>, boost::arg<2> > >, void, qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*>::invoke(boost::detail::function::function_buffer&,
> qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*) () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> ---Type <return> to continue, or q <return> to quit---
> #25 0x00007ff11730f204 in boost::function2<void, qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*>::operator()(qpid::sys::AsynchIO&,
> qpid::sys::AsynchIOBufferBase*) const () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #26 0x00007ff11730c28b in
> qpid::sys::posix::AsynchIO::readable(qpid::sys::DispatchHandle&) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #27 0x00007ff117314202 in boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO,
> qpid::sys::DispatchHandle&>::operator()(qpid::sys::posix::AsynchIO*,
> qpid::sys::DispatchHandle&) const () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #28 0x00007ff117313553 in void
> boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>,
> boost::arg<1> >::operator()<boost::_mfi::mf1<void,
> qpid::sys::posix::AsynchIO, qpid::sys::DispatchHandle&>,
> boost::_bi::list1<qpid::sys::DispatchHandle&> >(boost::_bi::type<void>,
> boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO,
> qpid::sys::DispatchHandle&>&, boost::_bi::list1<qpid::sys::DispatchHandle&>&,
> int) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #29 0x00007ff117312844 in void boost::_bi::bind_t<void,
> boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO,
> qpid::sys::DispatchHandle&>,
> boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>,
> boost::arg<1> >
> >::operator()<qpid::sys::DispatchHandle>(qpid::sys::DispatchHandle&) () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #30 0x00007ff117311821 in
> boost::detail::function::void_function_obj_invoker1<boost::_bi::bind_t<void,
> boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO,
> qpid::sys::DispatchHandle&>,
> boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>,
> boost::arg<1> > >, void,
> qpid::sys::DispatchHandle&>::invoke(boost::detail::function::function_buffer&,
> qpid::sys::DispatchHandle&) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #31 0x00007ff1173afc7a in boost::function1<void,
> qpid::sys::DispatchHandle&>::operator()(qpid::sys::DispatchHandle&) const ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #32 0x00007ff1173af240 in
> qpid::sys::DispatchHandle::processEvent(qpid::sys::Poller::EventType) ()
> from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #33 0x00007ff1173355d6 in qpid::sys::Poller::Event::process() () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #34 0x00007ff1173348d4 in qpid::sys::Poller::run() () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #35 0x00007ff1173ade21 in qpid::sys::Dispatcher::run() () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #36 0x00007ff117a30177 in qpid::broker::Broker::run() () from
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #37 0x000000000040760c in
> qpid::broker::QpiddBroker::execute(qpid::broker::QpiddOptions*) ()
> #38 0x000000000040b461 in qpid::broker::run_broker(int, char**, bool) ()
> #39 0x00000000004076a6 in main ()
> Reproducer: little bit tricky but basically:
> 1) create as much objects (and continue in creating them to have something in
> newManagementObjects map)
> 2) invoke some QMF method just when periodic processing is starting
> In particular, I used this repro:
> 1) populate_provisioning.sh:
> for i in $(seq 1 50); do qpid-config add exchange fanout ex_${i} & done
> wait
> while true; do
> j=$(uuidgen)
> echo "$(date): starting with queue ${j}_q_1"
> for i in $(seq 1 200); do qpid-receive -a "${j}_q_${i}; {create:always,
> node:{durable:true, x-bindings:[{exchange:'amq.direct', queue:'${j}_q_${i}',
> key:''}, {exchange:'amq.fanout', queue:'${j}_q_${i}'}, {exchange:'ex_1',
> queue:'${j}_q_${i}'}, {exchange:'ex_2', queue:'${j}_q_${i}'},
> {exchange:'ex_3', queue:'${j}_q_${i}'}, {exchange:'ex_4',
> queue:'${j}_q_${i}'}, {exchange:'ex_5', queue:'${j}_q_${i}'},
> {exchange:'ex_6', queue:'${j}_q_${i}'}, {exchange:'ex_7',
> queue:'${j}_q_${i}'}, {exchange:'ex_8', queue:'${j}_q_${i}'},
> {exchange:'ex_9', queue:'${j}_q_${i}'}, {exchange:'ex_10',
> queue:'${j}_q_${i}'}, {exchange:'ex_11', queue:'${j}_q_${i}'},
> {exchange:'ex_12', queue:'${j}_q_${i}'}, {exchange:'ex_13',
> queue:'${j}_q_${i}'}, {exchange:'ex_14', queue:'${j}_q_${i}'},
> {exchange:'ex_15', queue:'${j}_q_${i}'}, {exchange:'ex_16',
> queue:'${j}_q_${i}'}, {exchange:'ex_17', queue:'${j}_q_${i}'},
> {exchange:'ex_18', queue:'${j}_q_${i}'}, {exchange:'ex_19',
> queue:'${j}_q_${i}'}, {exchange:'ex_20', queue:'${j}_q_${i}'},
> {exchange:'ex_21', queue:'${j}_q_${i}'}, {exchange:'ex_22',
> queue:'${j}_q_${i}'}, {exchange:'ex_23', queue:'${j}_q_${i}'},
> {exchange:'ex_24', queue:'${j}_q_${i}'}, {exchange:'ex_25',
> queue:'${j}_q_${i}'}, {exchange:'ex_26', queue:'${j}_q_${i}'},
> {exchange:'ex_27', queue:'${j}_q_${i}'}, {exchange:'ex_28',
> queue:'${j}_q_${i}'}, {exchange:'ex_29', queue:'${j}_q_${i}'},
> {exchange:'ex_30', queue:'${j}_q_${i}'}, {exchange:'ex_31',
> queue:'${j}_q_${i}'}, {exchange:'ex_32', queue:'${j}_q_${i}'},
> {exchange:'ex_33', queue:'${j}_q_${i}'}, {exchange:'ex_34',
> queue:'${j}_q_${i}'}, {exchange:'ex_35', queue:'${j}_q_${i}'},
> {exchange:'ex_36', queue:'${j}_q_${i}'}, {exchange:'ex_37',
> queue:'${j}_q_${i}'}, {exchange:'ex_38', queue:'${j}_q_${i}'},
> {exchange:'ex_39', queue:'${j}_q_${i}'}, {exchange:'ex_40',
> queue:'${j}_q_${i}'}, {exchange:'ex_41', queue:'${j}_q_${i}'},
> {exchange:'ex_42', queue:'${j}_q_${i}'}, {exchange:'ex_43',
> queue:'${j}_q_${i}'}, {exchange:'ex_44', queue:'${j}_q_${i}'},
> {exchange:'ex_45', queue:'${j}_q_${i}'}, {exchange:'ex_46',
> queue:'${j}_q_${i}'}, {exchange:'ex_47', queue:'${j}_q_${i}'},
> {exchange:'ex_48', queue:'${j}_q_${i}'}, {exchange:'ex_49',
> queue:'${j}_q_${i}'}, {exchange:'ex_50', queue:'${j}_q_${i}'}] }}" & sleep
> 0.1; done
> wait
> done
> 2) while true; do date "+%T.%N"; ./purge_queue whateverQueue 0 > /dev/null
> 2>&1; sleep 9.83; done
> purge_queue.cpp:
> /* To the extent possible under law, Red Hat, Inc. has dedicated all copyright
> * to this software to the public domain worldwide, pursuant to the CC0 Public
> * Domain Dedication. This software is distributed without any warranty. See
> * <http://creativecommons.org/publicdomain/zero/1.0/>.
> */
> #include <qpid/messaging/Connection.h>
> #include <qpid/messaging/Session.h>
> #include <qpid/messaging/Sender.h>
> #include <qpid/messaging/Receiver.h>
> #include <qpid/messaging/Message.h>
> #include <qpid/messaging/Address.h>
> #include <iostream>
> #include <cstdlib>
> using namespace std;
> using namespace qpid::messaging;
> using namespace qpid::types;
> int main(int argc, char** argv) {
> if (argc < 3) {
> cerr << "Invalid number of parameters, expecting: queue_name, quantity."
> << endl;
> return 1;
> }
> string queue_name = argv[1];
> uint32_t qty = atoi(argv[2]);
> Connection connection(argc>3?argv[3]:"localhost:5672");
> connection.open();
> Session session = connection.createSession();
> Sender sender = session.createSender("qmf.default.direct/broker");
> Address responseQueue("#reply-queue; {create:always,
> node:{x-declare:{auto-delete:true}}}");
> Receiver receiver = session.createReceiver(responseQueue);
> Message message;
> Variant::Map content;
> Variant::Map OID;
> Variant::Map arguments;
> string object_name = "org.apache.qpid.broker:queue:" + queue_name;
> OID["_object_name"] = object_name;
> arguments["request"] = qty;
> content["_object_id"] = OID;
> content["_method_name"] = "purge";
> content["_arguments"] = arguments;
> encode(content, message);
> message.setReplyTo(responseQueue);
> message.setProperty("x-amqp-0-10.app-id", "qmf2");
> message.setProperty("qmf.opcode", "_method_request");
> message.setContentType("amqp/map");
> Variant::Map map;
> decode(message, map);
> std::cout << map << std::endl;
> sender.send(message, true);
> Message response;
> if (receiver.fetch(response,qpid::messaging::Duration(30000)) == true) {
> qpid::types::Variant::Map recv_props = response.getProperties();
> if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
> if (recv_props["qmf.opcode"] == "_method_response")
> std::cout << "Response: OK" << std::endl;
> else if (recv_props["qmf.opcode"] == "_exception")
> std::cerr << "Error: " << response.getContent() << std::endl;
> else
> std::cerr << "Invalid response received!" << std::endl;
> else
> std::cerr << "Invalid response not of qmf2 type received!" << std::endl;
> }
> else
> std::cout << "Timeout: No response received within 30 seconds!" <<
> std::endl;
> receiver.close();
> sender.close();
> session.close();
> connection.close();
> return 0;
> }
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]