[ 
https://issues.apache.org/jira/browse/QPID-6397?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Pavel Moravec closed QPID-6397.
-------------------------------

> [C++ broker] segfault when processing QMF method during periodic processing
> ---------------------------------------------------------------------------
>
>                 Key: QPID-6397
>                 URL: https://issues.apache.org/jira/browse/QPID-6397
>             Project: Qpid
>          Issue Type: Bug
>          Components: C++ Broker
>    Affects Versions: 0.31
>            Reporter: Pavel Moravec
>            Assignee: Pavel Moravec
>             Fix For: Future
>
>
> There is a race condition causing segfault when:
> - one thread executes periodic processing with traces enabled (at least for 
> qpid::management::ManagementAgent::debugSnapshot)
> - second thread is just processing QMF method from a client
> The root cause is, the first thread iterates through managementObjects map in 
> dumpMap (or through newManagementObjects in dumpVector) while the second 
> thread is executing moveNewObjects that moves stuff from newManagementObjects 
> to managementObjects.
> See backtraces hit (dumpMap shown, hit also dumpVector):
> (gdb) bt # of thread 1
> #0  0x0000003f0c632885 in raise (sig=6) at 
> ../nptl/sysdeps/unix/sysv/linux/raise.c:64
> #1  0x0000003f0c634065 in abort () at abort.c:92
> #2  0x0000003f0c62b9fe in __assert_fail_base (fmt=<value optimized out>, 
> assertion=0x7ff117bd5c94 "px != 0", 
>     file=0x7ff117bd5c68 "/usr/include/boost/smart_ptr/shared_ptr.hpp", 
> line=<value optimized out>, function=<value optimized out>)
>     at assert.c:96
> #3  0x0000003f0c62bac0 in __assert_fail (assertion=0x7ff117bd5c94 "px != 0", 
>     file=0x7ff117bd5c68 "/usr/include/boost/smart_ptr/shared_ptr.hpp", 
> line=418, 
>     function=0x7ff117bd5e80 "T* boost::shared_ptr< <template-parameter-1-1> 
> >::operator->() const [with T = qpid::management::ManagementObject]") at 
> assert.c:105
> #4  0x00007ff117947139 in 
> boost::shared_ptr<qpid::management::ManagementObject>::operator->() const ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #5  0x00007ff117bafcd9 in qpid::management::(anonymous 
> namespace)::dumpMap(std::map<qpid::management::ObjectId, 
> boost::shared_ptr<qpid::management::ManagementObject>, 
> std::less<qpid::management::ObjectId>, 
> std::allocator<std::pair<qpid::management::ObjectId const, 
> boost::shared_ptr<qpid::management::ManagementObject> > > > const&) () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #6  0x00007ff117bb06ba in 
> qpid::management::ManagementAgent::debugSnapshot(char const*) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #7  0x00007ff117b954ed in 
> qpid::management::ManagementAgent::periodicProcessing() ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #8  0x00007ff117bc5c1f in boost::_mfi::mf0<void, 
> qpid::management::ManagementAgent>::operator()(qpid::management::ManagementAgent*)
>  const () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #9  0x00007ff117bc4384 in void 
> boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*> 
> >::operator()<boost::_mfi::mf0<void, qpid::management::ManagementAgent>, 
> boost::_bi::list0>(boost::_bi::type<void>, boost::_mfi::mf0<void, 
> qpid::management::ManagementAgent>&, boost::_bi::list0&, int) () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #10 0x00007ff117bc1269 in boost::_bi::bind_t<void, boost::_mfi::mf0<void, 
> qpid::management::ManagementAgent>, 
> boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*> > 
> >::operator()() () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #11 0x00007ff117bbc1e0 in 
> boost::detail::function::void_function_obj_invoker0<boost::_bi::bind_t<void, 
> boost::_mfi::mf0<void, qpid::management::ManagementAgent>, 
> boost::_bi::list1<boost::_bi::value<qpid::management::ManagementAgent*> > >, 
> void>::invoke(boost::detail::function::function_buffer&) () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #12 0x00007ff117a5e2af in boost::function0<void>::operator()() const () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #13 0x00007ff117b8e400 in qpid::management::(anonymous 
> namespace)::Periodic::fire() ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #14 0x00007ff1173b518f in qpid::sys::TimerTask::fireTask() () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> ---Type <return> to continue, or q <return> to quit---
> #15 0x00007ff1173b63e9 in 
> qpid::sys::Timer::fire(boost::intrusive_ptr<qpid::sys::TimerTask>) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #16 0x00007ff1173b5d1d in qpid::sys::Timer::run() () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #17 0x00007ff1173280ef in qpid::sys::(anonymous 
> namespace)::runRunnable(void*) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #18 0x0000003f0ce077f1 in start_thread (arg=0x7ff116c8f700) at 
> pthread_create.c:301
> #19 0x0000003f0c6e570d in clone () at 
> ../sysdeps/unix/sysv/linux/x86_64/clone.S:115
> (gdb) bt  # of thread 2
> #0  0x00007ff116ca99c7 in qpid::types::VariantImpl::~VariantImpl() () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidtypes.so.1
> #1  0x00007ff116caf2e9 in qpid::types::Variant::~Variant() () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidtypes.so.1
> #2  0x00007ff117967876 in 
> qmf::org::apache::qpid::broker::Connection::mapEncodeValues(std::map<std::basic_string<char,
>  std::char_traits<char>, std::allocator<char> >, qpid::types::Variant, 
> std::less<std::basic_string<char, std::char_traits<char>, 
> std::allocator<char> > >, std::allocator<std::pair<std::basic_string<char, 
> std::char_traits<char>, std::allocator<char> > const, qpid::types::Variant> > 
> >&, bool, bool) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #3  0x00007ff117bb118d in 
> qpid::management::ManagementAgent::DeletedObject::DeletedObject(boost::shared_ptr<qpid::management::ManagementObject>,
>  bool, bool) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #4  0x00007ff117b94ec1 in qpid::management::ManagementAgent::moveNewObjects() 
> ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #5  0x00007ff117b9e79a in 
> qpid::management::ManagementAgent::handleMethodRequest(std::basic_string<char,
>  std::char_traits<char>, std::allocator<char> > const&, 
> std::basic_string<char, std::char_traits<char>, std::allocator<char> > 
> const&, std::basic_string<char, std::char_traits<char>, std::allocator<char> 
> > const&, std::basic_string<char, std::char_traits<char>, 
> std::allocator<char> > const&, std::basic_string<char, 
> std::char_traits<char>, std::allocator<char> > const&, bool) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #6  0x00007ff117bacb70 in 
> qpid::management::ManagementAgent::dispatchAgentCommand(qpid::broker::Message&,
>  bool) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #7  0x00007ff117b9ccc3 in 
> qpid::management::ManagementAgent::dispatchCommand(qpid::broker::Deliverable&,
>  std::basic_string<char, std::char_traits<char>, std::allocator<char> > 
> const&, qpid::framing::FieldTable const*, bool, int) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #8  0x00007ff117bca08a in 
> qpid::broker::ManagementDirectExchange::route(qpid::broker::Deliverable&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #9  0x00007ff117b49d65 in 
> qpid::broker::SemanticState::route(qpid::broker::Message&, 
> qpid::broker::Deliverable&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #10 0x00007ff117b6a8f3 in 
> qpid::broker::SessionState::handleContent(qpid::framing::AMQFrame&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #11 0x00007ff117b6af31 in 
> qpid::broker::SessionState::handleIn(qpid::framing::AMQFrame&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #12 0x00007ff117b72dc1 in 
> qpid::framing::Handler<qpid::framing::AMQFrame&>::MemFunRef<qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface,
>  
> &(qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface::handleIn(qpid::framing::AMQFrame&))>::handle(qpid::framing::AMQFrame&)
>  () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #13 0x00007ff117376ba3 in 
> qpid::amqp_0_10::SessionHandler::handleIn(qpid::framing::AMQFrame&) ()
> ---Type <return> to continue, or q <return> to quit---
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #14 0x00007ff117b72dc1 in 
> qpid::framing::Handler<qpid::framing::AMQFrame&>::MemFunRef<qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface,
>  
> &(qpid::framing::Handler<qpid::framing::AMQFrame&>::InOutHandlerInterface::handleIn(qpid::framing::AMQFrame&))>::handle(qpid::framing::AMQFrame&)
>  () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #15 0x00007ff117ac640a in 
> qpid::framing::Handler<qpid::framing::AMQFrame&>::operator()(qpid::framing::AMQFrame&)
>  ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #16 0x00007ff117ac1a8c in 
> qpid::broker::ConnectionHandler::handle(qpid::framing::AMQFrame&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #17 0x00007ff117ab8444 in 
> qpid::broker::amqp_0_10::Connection::received(qpid::framing::AMQFrame&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #18 0x00007ff117a23408 in qpid::amqp_0_10::Connection::decode(char const*, 
> unsigned long) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #19 0x00007ff117b34581 in qpid::broker::SecureConnection::decode(char const*, 
> unsigned long) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #20 0x00007ff1173abc2f in 
> qpid::sys::AsynchIOHandler::readbuff(qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #21 0x00007ff117bd2ce6 in boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, 
> qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*>::operator()(qpid::sys::AsynchIOHandler*, 
> qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*) const ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #22 0x00007ff117bd1a24 in void 
> boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>, 
> boost::arg<1>, boost::arg<2> >::operator()<boost::_mfi::mf2<void, 
> qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*>, boost::_bi::list2<qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*&> >(boost::_bi::type<void>, 
> boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*>&, boost::_bi::list2<qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*&>&, int) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #23 0x00007ff117bd0eb6 in void boost::_bi::bind_t<void, 
> boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*>, 
> boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>, 
> boost::arg<1>, boost::arg<2> > >::operator()<qpid::sys::AsynchIO, 
> qpid::sys::AsynchIOBufferBase*>(qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #24 0x00007ff117bd018f in 
> boost::detail::function::void_function_obj_invoker2<boost::_bi::bind_t<void, 
> boost::_mfi::mf2<void, qpid::sys::AsynchIOHandler, qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*>, 
> boost::_bi::list3<boost::_bi::value<qpid::sys::AsynchIOHandler*>, 
> boost::arg<1>, boost::arg<2> > >, void, qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*>::invoke(boost::detail::function::function_buffer&,
>  qpid::sys::AsynchIO&, qpid::sys::AsynchIOBufferBase*) () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> ---Type <return> to continue, or q <return> to quit---
> #25 0x00007ff11730f204 in boost::function2<void, qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*>::operator()(qpid::sys::AsynchIO&, 
> qpid::sys::AsynchIOBufferBase*) const () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #26 0x00007ff11730c28b in 
> qpid::sys::posix::AsynchIO::readable(qpid::sys::DispatchHandle&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #27 0x00007ff117314202 in boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, 
> qpid::sys::DispatchHandle&>::operator()(qpid::sys::posix::AsynchIO*, 
> qpid::sys::DispatchHandle&) const () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #28 0x00007ff117313553 in void 
> boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>, 
> boost::arg<1> >::operator()<boost::_mfi::mf1<void, 
> qpid::sys::posix::AsynchIO, qpid::sys::DispatchHandle&>, 
> boost::_bi::list1<qpid::sys::DispatchHandle&> >(boost::_bi::type<void>, 
> boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, 
> qpid::sys::DispatchHandle&>&, boost::_bi::list1<qpid::sys::DispatchHandle&>&, 
> int) () from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #29 0x00007ff117312844 in void boost::_bi::bind_t<void, 
> boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, 
> qpid::sys::DispatchHandle&>, 
> boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>, 
> boost::arg<1> > 
> >::operator()<qpid::sys::DispatchHandle>(qpid::sys::DispatchHandle&) () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #30 0x00007ff117311821 in 
> boost::detail::function::void_function_obj_invoker1<boost::_bi::bind_t<void, 
> boost::_mfi::mf1<void, qpid::sys::posix::AsynchIO, 
> qpid::sys::DispatchHandle&>, 
> boost::_bi::list2<boost::_bi::value<qpid::sys::posix::AsynchIO*>, 
> boost::arg<1> > >, void, 
> qpid::sys::DispatchHandle&>::invoke(boost::detail::function::function_buffer&,
>  qpid::sys::DispatchHandle&) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #31 0x00007ff1173afc7a in boost::function1<void, 
> qpid::sys::DispatchHandle&>::operator()(qpid::sys::DispatchHandle&) const ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #32 0x00007ff1173af240 in 
> qpid::sys::DispatchHandle::processEvent(qpid::sys::Poller::EventType) ()
>    from /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #33 0x00007ff1173355d6 in qpid::sys::Poller::Event::process() () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #34 0x00007ff1173348d4 in qpid::sys::Poller::run() () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #35 0x00007ff1173ade21 in qpid::sys::Dispatcher::run() () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidcommon.so.2
> #36 0x00007ff117a30177 in qpid::broker::Broker::run() () from 
> /root/qpid-trunk/qpid/cpp/BLD/src/libqpidbroker.so.2
> #37 0x000000000040760c in 
> qpid::broker::QpiddBroker::execute(qpid::broker::QpiddOptions*) ()
> #38 0x000000000040b461 in qpid::broker::run_broker(int, char**, bool) ()
> #39 0x00000000004076a6 in main ()
> Reproducer: little bit tricky but basically:
> 1) create as much objects (and continue in creating them to have something in 
> newManagementObjects map)
> 2) invoke some QMF method just when periodic processing is starting
> In particular, I used this repro:
> 1) populate_provisioning.sh:
> for i in $(seq 1 50); do qpid-config add exchange fanout ex_${i} & done
> wait
> while true; do
>       j=$(uuidgen)
>       echo "$(date): starting with queue ${j}_q_1"
>       for i in $(seq 1 200); do qpid-receive -a "${j}_q_${i}; {create:always, 
> node:{durable:true, x-bindings:[{exchange:'amq.direct', queue:'${j}_q_${i}', 
> key:''}, {exchange:'amq.fanout', queue:'${j}_q_${i}'}, {exchange:'ex_1', 
> queue:'${j}_q_${i}'}, {exchange:'ex_2', queue:'${j}_q_${i}'}, 
> {exchange:'ex_3', queue:'${j}_q_${i}'}, {exchange:'ex_4', 
> queue:'${j}_q_${i}'}, {exchange:'ex_5', queue:'${j}_q_${i}'}, 
> {exchange:'ex_6', queue:'${j}_q_${i}'}, {exchange:'ex_7', 
> queue:'${j}_q_${i}'}, {exchange:'ex_8', queue:'${j}_q_${i}'}, 
> {exchange:'ex_9', queue:'${j}_q_${i}'}, {exchange:'ex_10', 
> queue:'${j}_q_${i}'}, {exchange:'ex_11', queue:'${j}_q_${i}'}, 
> {exchange:'ex_12', queue:'${j}_q_${i}'}, {exchange:'ex_13', 
> queue:'${j}_q_${i}'}, {exchange:'ex_14', queue:'${j}_q_${i}'}, 
> {exchange:'ex_15', queue:'${j}_q_${i}'}, {exchange:'ex_16', 
> queue:'${j}_q_${i}'}, {exchange:'ex_17', queue:'${j}_q_${i}'}, 
> {exchange:'ex_18', queue:'${j}_q_${i}'}, {exchange:'ex_19', 
> queue:'${j}_q_${i}'}, {exchange:'ex_20', queue:'${j}_q_${i}'}, 
> {exchange:'ex_21', queue:'${j}_q_${i}'}, {exchange:'ex_22', 
> queue:'${j}_q_${i}'}, {exchange:'ex_23', queue:'${j}_q_${i}'}, 
> {exchange:'ex_24', queue:'${j}_q_${i}'}, {exchange:'ex_25', 
> queue:'${j}_q_${i}'}, {exchange:'ex_26', queue:'${j}_q_${i}'}, 
> {exchange:'ex_27', queue:'${j}_q_${i}'}, {exchange:'ex_28', 
> queue:'${j}_q_${i}'}, {exchange:'ex_29', queue:'${j}_q_${i}'}, 
> {exchange:'ex_30', queue:'${j}_q_${i}'}, {exchange:'ex_31', 
> queue:'${j}_q_${i}'}, {exchange:'ex_32', queue:'${j}_q_${i}'}, 
> {exchange:'ex_33', queue:'${j}_q_${i}'}, {exchange:'ex_34', 
> queue:'${j}_q_${i}'}, {exchange:'ex_35', queue:'${j}_q_${i}'}, 
> {exchange:'ex_36', queue:'${j}_q_${i}'}, {exchange:'ex_37', 
> queue:'${j}_q_${i}'}, {exchange:'ex_38', queue:'${j}_q_${i}'}, 
> {exchange:'ex_39', queue:'${j}_q_${i}'}, {exchange:'ex_40', 
> queue:'${j}_q_${i}'}, {exchange:'ex_41', queue:'${j}_q_${i}'}, 
> {exchange:'ex_42', queue:'${j}_q_${i}'}, {exchange:'ex_43', 
> queue:'${j}_q_${i}'}, {exchange:'ex_44', queue:'${j}_q_${i}'}, 
> {exchange:'ex_45', queue:'${j}_q_${i}'}, {exchange:'ex_46', 
> queue:'${j}_q_${i}'}, {exchange:'ex_47', queue:'${j}_q_${i}'}, 
> {exchange:'ex_48', queue:'${j}_q_${i}'}, {exchange:'ex_49', 
> queue:'${j}_q_${i}'}, {exchange:'ex_50', queue:'${j}_q_${i}'}] }}" & sleep 
> 0.1; done
>       wait
> done
> 2) while true; do date "+%T.%N"; ./purge_queue whateverQueue 0 > /dev/null 
> 2>&1; sleep 9.83; done
> purge_queue.cpp:
> /* To the extent possible under law, Red Hat, Inc. has dedicated all copyright
>  * to this software to the public domain worldwide, pursuant to the CC0 Public
>  * Domain Dedication. This software is distributed without any warranty.  See
>  *  <http://creativecommons.org/publicdomain/zero/1.0/>.
> */
> #include <qpid/messaging/Connection.h>
> #include <qpid/messaging/Session.h>
> #include <qpid/messaging/Sender.h>
> #include <qpid/messaging/Receiver.h>
> #include <qpid/messaging/Message.h>
> #include <qpid/messaging/Address.h>
> #include <iostream>
> #include <cstdlib>
> using namespace std;
> using namespace qpid::messaging;
> using namespace qpid::types;
> int main(int argc, char** argv) {
>   if (argc < 3) {
>     cerr << "Invalid number of parameters, expecting: queue_name, quantity." 
> << endl;
>     return 1;
>   }
>   string queue_name = argv[1];
>   uint32_t qty = atoi(argv[2]);
>   Connection connection(argc>3?argv[3]:"localhost:5672");
>   connection.open();
>   Session session = connection.createSession();
>   Sender sender = session.createSender("qmf.default.direct/broker");
>   Address responseQueue("#reply-queue; {create:always, 
> node:{x-declare:{auto-delete:true}}}");
>   Receiver receiver = session.createReceiver(responseQueue);
>   Message message;
>   Variant::Map content;
>   Variant::Map OID;
>   Variant::Map arguments;
>   string object_name = "org.apache.qpid.broker:queue:" + queue_name;
>   OID["_object_name"] = object_name;
>   arguments["request"] = qty;
>   content["_object_id"] = OID;
>   content["_method_name"] = "purge";
>   content["_arguments"] = arguments;
>   encode(content, message);
>   message.setReplyTo(responseQueue);
>   message.setProperty("x-amqp-0-10.app-id", "qmf2");
>   message.setProperty("qmf.opcode", "_method_request");
>   message.setContentType("amqp/map");
>   Variant::Map map;
>   decode(message, map);
>   std::cout << map << std::endl;
>   sender.send(message, true);
>   Message response;
>   if (receiver.fetch(response,qpid::messaging::Duration(30000)) == true) {
>     qpid::types::Variant::Map recv_props = response.getProperties();
>     if (recv_props["x-amqp-0-10.app-id"] == "qmf2")
>       if (recv_props["qmf.opcode"] == "_method_response")
>         std::cout << "Response: OK" << std::endl;
>       else if (recv_props["qmf.opcode"] == "_exception")
>         std::cerr << "Error: " << response.getContent() << std::endl;
>       else
>         std::cerr << "Invalid response received!" << std::endl;
>     else
>       std::cerr << "Invalid response not of qmf2 type received!" << std::endl;
>   }
>   else
>     std::cout << "Timeout: No response received within 30 seconds!" << 
> std::endl;
>   receiver.close();
>   sender.close();
>   session.close();
>   connection.close();
>   return 0;
> }



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org
For additional commands, e-mail: dev-h...@qpid.apache.org

Reply via email to