[ https://issues.apache.org/jira/browse/QPID-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Seth Zegelstein updated QPID-8347: ---------------------------------- Description: Hello All, I am trying to use QPID C++ messaging API with a RabbitMQ Broker communicating over AMQP 1.0 protocol. When the receive() call times out, the RabbitMQ Broker throws an error and closes the session. The QPID C++ client then hangs indefinitely in the receive call. I believe that this is a bug in the QPID C++ API because it should always respect the client provided timeout, no mater what the Broker does. I also wrote a bug for RabbitMQ AMQP 1.0 plugin: [https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/90] Update: After further testing, it seems like Qpid Proton handles the timeout case correctly with RabbitMQ Broker, isolating this issue to the C++ API. helloworld_blocking.py: from __future__ import print_function from proton import Message from proton.utils import BlockingConnection from proton.handlers import IncomingMessageHandler conn = BlockingConnection("localhost:5672") receiver = conn.create_receiver("examples") sender = conn.create_sender("examples") #sender.send(Message(body="Hello World!")); msg = receiver.receive(timeout=0) print(msg.body) receiver.accept() conn.close() Result: python helloworld_blocking.py Traceback (most recent call last): File "helloworld_blocking.py", line 30, in <module> msg = receiver.receive(timeout=0) File "/home/user/qpid-proton-0.28.0/build/python/dist/proton/_utils.py", line 171, in receive timeout=timeout) File "/home/user/qpid-proton-0.28.0/build/python/dist/proton/_utils.py", line 314, in wait raise Timeout(txt) proton._exceptions.Timeout: Connection amqp://localhost:5672 timed out: Receiving on receiver 5aa78b24-27c6-4b76-a92a-d5410aa0a6ef-examples QPID C++ Example Code (Modified HelloWorld example): #include <qpid/messaging/Connection.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> #include <iostream> using namespace qpid::messaging; int main(int argc, char** argv) { std::string broker = argc > 1 ? argv[1] : "localhost:5672"; std::cout << "broker: " << broker << std::endl; std::string address = argc > 2 ? argv[2] : "topic.hello.world"; std::cout << "address: " << address << std::endl; std::string connectionOptions = argc > 3 ? argv[3] : ""; std::cout << "connectionOptions: " << connectionOptions << std::endl; try { Connection connection(broker, connectionOptions); connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver(address); Message message; std::cout << "Pre Receive" << std::endl; message = receiver.fetch(Duration::SECOND * 10); std::cout << "Post Receive" << std::endl; session.acknowledge(); connection.close(); return 0; } catch(const std::exception& error) { std::cerr << error.what() << std::endl; return 1; } } } Server Error (Occurs after 10 second timeout): =INFO REPORT==== 23-Jul-2019::17:49:54 === accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672) =ERROR REPORT==== 23-Jul-2019::17:49:54 === closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672): {bad_version,\{1,1,0,10}} =INFO REPORT==== 23-Jul-2019::17:49:54 === accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672) =ERROR REPORT==== 23-Jul-2019::17:50:04 === * ** Generic server <0.675.0> terminating * ** Last message in was {send_command, {'basic.credit_drained', <<99,116,97,103,45,0,0,0,0>>, 1}} * ** When Server state == {state,1,<0.669.0>,<0.674.0>,direct, {[],[]}, false,<0.678.0>,none,none,0,true,none, \{0,nil}, \{0,nil}, true,false} ** Reason for termination == ** \{{badmatch,{empty,{[],[]} }}, [ {amqp_channel,rpc_bottom_half,2, [ {file,"src/amqp_channel.erl"},\{line,623}]}, \{amqp_channel,handle_method_from_server1,3, [{file,"src/amqp_channel.erl"} , {line,800} ]}, {gen_server,try_dispatch,4, {line,616} ]}, {gen_server,handle_msg,6,[ {file,"gen_server.erl"},//\{line,686}" class="external-link" rel="nofollow">\\\\\\\{file,"gen_server.erl"} ,\{line,616}]}, {gen_server,handle_msg,6,[ \\{file,"gen_server.erl"} ,\\\\\\\{line,686}}, {proc_lib,init_p_do_apply,3,[\\\\ \{file,"proc_lib.erl"},\\\\\\\{line,247}|file://\{file,/]}]} =WARNING REPORT==== 23-Jul-2019::17:50:04 === Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): {{badmatch, \{empty, {[], []}}}, [\{amqp_channel, rpc_bottom_half, 2, [{file, "src/amqp_channel.erl"}, \{line, 623}]}, \{amqp_channel, handle_method_from_server1, 3, [{file, "src/amqp_channel.erl"}, \{line, 800}]}, \{gen_server, try_dispatch, 4, [{file, "gen_server.erl"}, \{line, 616}]}, \{gen_server, handle_msg, 6, [{file, "gen_server.erl"}, \{line, 686}]}, \{proc_lib, init_p_do_apply, 3, [{file, "proc_lib.erl"}, \{line, 247}]}]} =ERROR REPORT==== 23-Jul-2019::17:50:04 === ** Generic server <0.678.0> terminating ** Last message in was {'EXIT',<0.675.0>, \{{badmatch,{empty,{[],[]}}},}} \{{ [{amqp_channel,rpc_bottom_half,2, [ {file,"src/amqp_channel.erl"},\\\\\\\\{line,623}]},}} \{{ {amqp_channel,handle_method_from_server1,3, [{file,"src/amqp_channel.erl"},\{line,800} ]},}} \{{ {gen_server,try_dispatch,4, [ \{file,"gen_server.erl"} , \{line,616}]},}} \{{ {gen_server,handle_msg,6, [ \{file,"gen_server.erl"} , \{line,686} ]},}} \{{ {proc_lib,init_p_do_apply,3, [ \{file,"proc_lib.erl"} , \{line,247} ]}]}}}} {{ ** When Server state == \{ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>, <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>, \{lstate,<0.677.0>,false} ,}} \{{ none,1,}} {{ \{[],[]},}} \{{ {user,<<"guest">>, [administrator], [\\\{rabbit_auth_backend_internal,none}|file://\{rabbit_auth_backend_internal,none}/]},}} \{{ <<"/">>,<<>>,}} \{{ {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],}} \{{ [[<0.583.0>|}} \{{ {resource,<<"/">>,queue, <<"topic.hello.world">>}]],}} \{{ [],[],[],[],[],[]}}},}} \{{ {state,}} \{{ {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],}} {{ [[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],}} \{{ [],[],[],[],[],[]}}},}} \{{ erlang},}} \{{ {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],[],[],}} \{{ [[<<99,116,97,103,45,0,0,0,0>>|}} {{ {{amqqueue,}} \{{ {resource,<<"/">>,queue, <<"topic.hello.world">>},}} \{{ false,false,none,[],<0.583.0>,[],[],[],}} \{{ undefined,[],[],live,0},}} \{{ {false,65535,false, [{<<"x-credit">>,table, [{<<"credit">>,long,0},}} \{{ {<<"drain">>,boolean,false}]}]}}]],}} \{{ [],[],[],[]}}},}} \{{ {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],}} \{{ [[<0.583.0>|}} \{{ {1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],}} \{{ [],[],[],[],[],[]}}},}} \{{ {set,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],}} {{ [<0.583.0>],}} \{{ [],[],[],[],[],[]}}},}} \{{ <0.672.0>,}} \{{ {state,fine,5000, #Ref<0.4039704202.3895984129.147347>},}} \{{ false,1,}} \{{ }}{{0,nil},{0,nil}}{{,}} \{{ [],}} {{ {{0,nil},{0,nil}},}} \{{ [{<<"publisher_confirms">>,bool,true},}} \{{ {<<"exchange_exchange_bindings">>,bool,true},}} \{{ {<<"basic.nack">>,bool,true},}} \{{ {<<"consumer_cancel_notify">>,bool,true},}} \{{ {<<"connection.blocked">>,bool,true},}} \{{ {<<"authentication_failure_close">>,bool,true}],}} \{{ none,65535,none,flow,[]}}} \{{ ** Reason for termination == }} \{{ ** {{badmatch,{empty,{[],[]} }}, [\{amqp_channel,rpc_bottom_half,2, [ {file,"src/amqp_channel.erl"} ,\{line,623}]}, \{amqp_channel,handle_method_from_server1,3, [ {file,"src/amqp_channel.erl"} ,\{line,800}]}, \{gen_server,try_dispatch,4,[ {file,"gen_server.erl"} ,\{line,616}]}, \{gen_server,handle_msg,6,[ {file,"gen_server.erl"} ,\{line,686}]}, {proc_lib,init_p_do_apply,3,[ \\{file,"proc_lib.erl"} ,\{line,247}]}]} gdb backtrace: (gdb) r Starting program: /mnt/user/dev/rabbit_mq_test/hello_world [Thread debugging using libthread_db enabled] Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". broker: localhost:5672 address: topic.hello.world connectionOptions: [New Thread 0x7ffff3d53700 (LWP 26650)] [New Thread 0x7ffff3340700 (LWP 26651)] [New Thread 0x7ffff2b3f700 (LWP 26652)] Pre Receive ^C Thread 1 "hello_world" received signal SIGINT, Interrupt. 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88 88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory. (gdb) bt #0 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88 #1 __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, cond=0x555555785cd8) at pthread_cond_wait.c:502 #2 __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at pthread_cond_wait.c:655 #3 0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, mutex=...) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59 #4 0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41 #5 0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706 #6 0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0, ssn=..., lnk=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721 #7 0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch (this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271 #8 0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, message=..., timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55 #9 0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61 #10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch (this=0x7fffffffe780, timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52 #11 0x000055555555589c in main () (gdb) was: Hello All, I am trying to use QPID C++ messaging API with a RabbitMQ Broker communicating over AMQP 1.0 protocol. When the receive() call times out, the RabbitMQ Broker throws an error and closes the session. The QPID C++ client then hangs indefinitely in the receive call. I believe that this is a bug in the QPID C++ API because it should always respect the client provided timeout, no mater what the Broker does. I also wrote a bug for RabbitMQ AMQP 1.0 plugin: [https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/90] Example Code (Modified HelloWorld example): #include <qpid/messaging/Connection.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> #include <iostream> using namespace qpid::messaging; int main(int argc, char** argv) { std::string broker = argc > 1 ? argv[1] : "localhost:5672"; std::cout << "broker: " << broker << std::endl; std::string address = argc > 2 ? argv[2] : "topic.hello.world"; std::cout << "address: " << address << std::endl; std::string connectionOptions = argc > 3 ? argv[3] : ""; std::cout << "connectionOptions: " << connectionOptions << std::endl; try { Connection connection(broker, connectionOptions); connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver(address); Message message; std::cout << "Pre Receive" << std::endl; message = receiver.fetch(Duration::SECOND * 10); std::cout << "Post Receive" << std::endl; session.acknowledge(); connection.close(); return 0; } catch(const std::exception& error) { std::cerr << error.what() << std::endl; return 1; } } } Server Error (Occurs after 10 second timeout): =INFO REPORT==== 23-Jul-2019::17:49:54 === accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672) =ERROR REPORT==== 23-Jul-2019::17:49:54 === closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672): {bad_version,\{1,1,0,10}} =INFO REPORT==== 23-Jul-2019::17:49:54 === accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672) =ERROR REPORT==== 23-Jul-2019::17:50:04 === * ** Generic server <0.675.0> terminating * ** Last message in was {send_command, {'basic.credit_drained', <<99,116,97,103,45,0,0,0,0>>, 1}} * ** When Server state == {state,1,<0.669.0>,<0.674.0>,direct, {[],[]}, false,<0.678.0>,none,none,0,true,none, \{0,nil}, \{0,nil}, true,false} ** Reason for termination == ** \{{badmatch,{empty,{[],[]} }}, [ {amqp_channel,rpc_bottom_half,2, [ {file,"src/amqp_channel.erl"},\{line,623}]}, \{amqp_channel,handle_method_from_server1,3, [{file,"src/amqp_channel.erl"} , {line,800} ]}, {gen_server,try_dispatch,4, {line,616} ]}, {gen_server,handle_msg,6,[ {file,"gen_server.erl"},//\{line,686}" class="external-link" rel="nofollow">\\\\\\{file,"gen_server.erl"} ,\{line,616}]}, {gen_server,handle_msg,6,[ \{file,"gen_server.erl"} ,\\\\\\{line,686}}, {proc_lib,init_p_do_apply,3,[\\\\ {file,"proc_lib.erl"},\\\\\\{line,247}|file://\{file,/]}]} =WARNING REPORT==== 23-Jul-2019::17:50:04 === Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): {{badmatch, \{empty, {[], []}}}, [\{amqp_channel, rpc_bottom_half, 2, [{file, "src/amqp_channel.erl"}, \{line, 623}]}, \{amqp_channel, handle_method_from_server1, 3, [{file, "src/amqp_channel.erl"}, \{line, 800}]}, \{gen_server, try_dispatch, 4, [{file, "gen_server.erl"}, \{line, 616}]}, \{gen_server, handle_msg, 6, [{file, "gen_server.erl"}, \{line, 686}]}, \{proc_lib, init_p_do_apply, 3, [{file, "proc_lib.erl"}, \{line, 247}]}]} =ERROR REPORT==== 23-Jul-2019::17:50:04 === ** Generic server <0.678.0> terminating ** Last message in was {'EXIT',<0.675.0>, \{{badmatch,{empty,{[],[]}}},}} \{{ [{amqp_channel,rpc_bottom_half,2, [ {file,"src/amqp_channel.erl"},\\\\\\\{line,623}]},}} \{{ {amqp_channel,handle_method_from_server1,3, [{file,"src/amqp_channel.erl"},\{line,800} ]},}} \{{ {gen_server,try_dispatch,4, [ \{file,"gen_server.erl"} , \{line,616}]},}} {{ {gen_server,handle_msg,6, [ \{file,"gen_server.erl"} , \{line,686} ]},}} \{{ {proc_lib,init_p_do_apply,3, [ \{file,"proc_lib.erl"} , \{line,247} ]}]}}}} {{ ** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>, <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>, \{lstate,<0.677.0>,false} ,}} \{{ none,1,}} {{ \{[],[]},}} \{{ {user,<<"guest">>, [administrator], [\\\{rabbit_auth_backend_internal,none}|file://\{rabbit_auth_backend_internal,none}/]},}} \{{ <<"/">>,<<>>,}} \{{ {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],}} \{{ [[<0.583.0>|}} \{{ {resource,<<"/">>,queue, <<"topic.hello.world">>}]],}} \{{ [],[],[],[],[],[]}}},}} \{{ {state,}} \{{ {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],}} {{ [[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],}} \{{ [],[],[],[],[],[]}}},}} \{{ erlang},}} \{{ {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],[],[],}} \{{ [[<<99,116,97,103,45,0,0,0,0>>|}} {{ {{amqqueue,}} \{{ {resource,<<"/">>,queue, <<"topic.hello.world">>},}} \{{ false,false,none,[],<0.583.0>,[],[],[],}} \{{ undefined,[],[],live,0},}} \{{ {false,65535,false, [{<<"x-credit">>,table, [{<<"credit">>,long,0},}} \{{ {<<"drain">>,boolean,false}]}]}}]],}} \{{ [],[],[],[]}}},}} \{{ {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],}} \{{ [[<0.583.0>|}} \{{ {1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],}} \{{ [],[],[],[],[],[]}}},}} \{{ {set,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} {{ {{[],[],[],[],[],[],[],[],[],}} {{ [<0.583.0>],}} \{{ [],[],[],[],[],[]}}},}} \{{ <0.672.0>,}} \{{ {state,fine,5000, #Ref<0.4039704202.3895984129.147347>},}} \{{ false,1,}} \{{ }}{{0,nil},{0,nil}}{{,}} \{{ [],}} {{ {{0,nil},{0,nil}},}} \{{ [{<<"publisher_confirms">>,bool,true},}} \{{ {<<"exchange_exchange_bindings">>,bool,true},}} \{{ {<<"basic.nack">>,bool,true},}} \{{ {<<"consumer_cancel_notify">>,bool,true},}} \{{ {<<"connection.blocked">>,bool,true},}} \{{ {<<"authentication_failure_close">>,bool,true}],}} \{{ none,65535,none,flow,[]}}} \{{ ** Reason for termination == }} \{{ ** {{badmatch,{empty,{[],[]} }}, [\{amqp_channel,rpc_bottom_half,2, [ \{file,"src/amqp_channel.erl"} ,\{line,623}]}, \{amqp_channel,handle_method_from_server1,3, [ {file,"src/amqp_channel.erl"} ,\{line,800}]}, \{gen_server,try_dispatch,4,[ {file,"gen_server.erl"} ,\{line,616}]}, {gen_server,handle_msg,6,[ \{file,"gen_server.erl"} ,\{line,686}]}, {proc_lib,init_p_do_apply,3,[ \{file,"proc_lib.erl"} ,\{line,247}]}]} gdb backtrace: (gdb) r Starting program: /mnt/user/dev/rabbit_mq_test/hello_world [Thread debugging using libthread_db enabled] Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". broker: localhost:5672 address: topic.hello.world connectionOptions: [New Thread 0x7ffff3d53700 (LWP 26650)] [New Thread 0x7ffff3340700 (LWP 26651)] [New Thread 0x7ffff2b3f700 (LWP 26652)] Pre Receive ^C Thread 1 "hello_world" received signal SIGINT, Interrupt. 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88 88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory. (gdb) bt #0 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88 #1 __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, cond=0x555555785cd8) at pthread_cond_wait.c:502 #2 __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at pthread_cond_wait.c:655 #3 0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, mutex=...) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59 #4 0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41 #5 0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706 #6 0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0, ssn=..., lnk=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721 #7 0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch (this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271 #8 0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, message=..., timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55 #9 0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61 #10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch (this=0x7fffffffe780, timeout=...) at /home/user/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52 #11 0x000055555555589c in main () (gdb) > Receive() blocks forever once timeout has been exceeded with a RabbitMQ Broker > ------------------------------------------------------------------------------ > > Key: QPID-8347 > URL: https://issues.apache.org/jira/browse/QPID-8347 > Project: Qpid > Issue Type: Bug > Components: C++ Client > Affects Versions: qpid-cpp-1.39.0 > Environment: OS: Ubuntu Bionic > QPID C++ version 1.39.0 > QPID Proton version 0.28.0 > RabbitMQ Broker version 3.6.10 > Erlang version 20.2.2 > RabbimtMQ AMQP 1.0 plugin Version 3.6.10 > > Reporter: Seth Zegelstein > Priority: Blocker > > Hello All, > I am trying to use QPID C++ messaging API with a RabbitMQ Broker > communicating over AMQP 1.0 protocol. When the receive() call times out, the > RabbitMQ Broker throws an error and closes the session. The QPID C++ client > then hangs indefinitely in the receive call. I believe that this is a bug in > the QPID C++ API because it should always respect the client provided > timeout, no mater what the Broker does. I also wrote a bug for RabbitMQ AMQP > 1.0 plugin: [https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/90] > > Update: After further testing, it seems like Qpid Proton handles the timeout > case correctly with RabbitMQ Broker, isolating this issue to the C++ API. > helloworld_blocking.py: > from __future__ import print_function > from proton import Message > from proton.utils import BlockingConnection > from proton.handlers import IncomingMessageHandler > > conn = BlockingConnection("localhost:5672") > receiver = conn.create_receiver("examples") > sender = conn.create_sender("examples") > #sender.send(Message(body="Hello World!")); > msg = receiver.receive(timeout=0) > print(msg.body) > receiver.accept() > conn.close() > > Result: > python helloworld_blocking.py > Traceback (most recent call last): > File "helloworld_blocking.py", line 30, in <module> > msg = receiver.receive(timeout=0) > File "/home/user/qpid-proton-0.28.0/build/python/dist/proton/_utils.py", > line 171, in receive > timeout=timeout) > File "/home/user/qpid-proton-0.28.0/build/python/dist/proton/_utils.py", > line 314, in wait > raise Timeout(txt) > proton._exceptions.Timeout: Connection amqp://localhost:5672 timed out: > Receiving on receiver 5aa78b24-27c6-4b76-a92a-d5410aa0a6ef-examples > > QPID C++ Example Code (Modified HelloWorld example): > #include <qpid/messaging/Connection.h> > #include <qpid/messaging/Message.h> > #include <qpid/messaging/Receiver.h> > #include <qpid/messaging/Sender.h> > #include <qpid/messaging/Session.h> > #include <iostream> > > using namespace qpid::messaging; > > int main(int argc, char** argv) { > std::string broker = argc > 1 ? argv[1] : "localhost:5672"; > std::cout << "broker: " << broker << std::endl; > std::string address = argc > 2 ? argv[2] : "topic.hello.world"; > std::cout << "address: " << address << std::endl; > std::string connectionOptions = argc > 3 ? argv[3] : ""; > std::cout << "connectionOptions: " << connectionOptions << std::endl; > > try > { Connection connection(broker, connectionOptions); > connection.open(); Session session = connection.createSession(); > Receiver receiver = session.createReceiver(address); Message > message; std::cout << "Pre Receive" << std::endl; message = > receiver.fetch(Duration::SECOND * 10); std::cout << "Post Receive" << > std::endl; session.acknowledge(); connection.close(); > return 0; } > catch(const std::exception& error) > { std::cerr << error.what() << std::endl; return 1; } > } > } > > Server Error (Occurs after 10 second timeout): > =INFO REPORT==== 23-Jul-2019::17:49:54 === > accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672) > > =ERROR REPORT==== 23-Jul-2019::17:49:54 === > closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672): > {bad_version,\{1,1,0,10}} > > =INFO REPORT==== 23-Jul-2019::17:49:54 === > accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672) > > =ERROR REPORT==== 23-Jul-2019::17:50:04 === > * > ** Generic server <0.675.0> terminating > * > ** Last message in was {send_command, > {'basic.credit_drained', > <<99,116,97,103,45,0,0,0,0>>, > 1}} > * > ** When Server state == {state,1,<0.669.0>,<0.674.0>,direct, > > {[],[]}, > > false,<0.678.0>,none,none,0,true,none, > > \{0,nil}, > > \{0,nil}, > > true,false} > > ** Reason for termination == > > ** \{{badmatch,{empty,{[],[]} > }}, > [ > {amqp_channel,rpc_bottom_half,2, [ > {file,"src/amqp_channel.erl"},\{line,623}]}, > > \{amqp_channel,handle_method_from_server1,3, > [{file,"src/amqp_channel.erl"} > , > {line,800} > ]}, > {gen_server,try_dispatch,4, > {line,616} > ]}, > {gen_server,handle_msg,6,[ > {file,"gen_server.erl"},//\{line,686}" class="external-link" > rel="nofollow">\\\\\\\{file,"gen_server.erl"} > ,\{line,616}]}, > > {gen_server,handle_msg,6,[ \\{file,"gen_server.erl"} > > ,\\\\\\\{line,686}}, > > {proc_lib,init_p_do_apply,3,[\\\\ > \{file,"proc_lib.erl"},\\\\\\\{line,247}|file://\{file,/]}]} > > > > =WARNING REPORT==== 23-Jul-2019::17:50:04 === > > Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): > {{badmatch, > > > \{empty, > {[], > []}}}, > > > [\{amqp_channel, > rpc_bottom_half, > 2, > [{file, > "src/amqp_channel.erl"}, > > > \{line, > 623}]}, > > > \{amqp_channel, > handle_method_from_server1, > 3, > [{file, > "src/amqp_channel.erl"}, > > > \{line, > 800}]}, > > > \{gen_server, > try_dispatch, > 4, > [{file, > "gen_server.erl"}, > > > \{line, > 616}]}, > > > \{gen_server, > handle_msg, > 6, > [{file, > "gen_server.erl"}, > > > \{line, > 686}]}, > > > \{proc_lib, > init_p_do_apply, > 3, > [{file, > "proc_lib.erl"}, > > > \{line, > 247}]}]} > > > > =ERROR REPORT==== 23-Jul-2019::17:50:04 === > > ** Generic server <0.678.0> terminating > > ** Last message in was {'EXIT',<0.675.0>, > > \{{badmatch,{empty,{[],[]}}},}} > > \{{ [{amqp_channel,rpc_bottom_half,2, > [ {file,"src/amqp_channel.erl"},\\\\\\\\{line,623}]},}} > > \{{ {amqp_channel,handle_method_from_server1,3, > [{file,"src/amqp_channel.erl"},\{line,800} > > ]},}} > > \{{ {gen_server,try_dispatch,4, > [ \{file,"gen_server.erl"} > > , \{line,616}]},}} > > \{{ {gen_server,handle_msg,6, > [ \{file,"gen_server.erl"} > > , > \{line,686} > > ]},}} > > \{{ {proc_lib,init_p_do_apply,3, > [ \{file,"proc_lib.erl"} > > , > \{line,247} > > ]}]}}}} > > {{ ** When Server state == > > \{ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>, > <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>, > \{lstate,<0.677.0>,false} > > ,}} > > \{{ none,1,}} > > {{ > \{[],[]},}} > > \{{ {user,<<"guest">>, > [administrator], > [\\\{rabbit_auth_backend_internal,none}|file://\{rabbit_auth_backend_internal,none}/]},}} > > \{{ <<"/">>,<<>>,}} > > \{{ {dict,1,16,16,8,80,48, > {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} > > {{ {{[],[],[],[],[],[],[],[],[],}} > > \{{ [[<0.583.0>|}} > > \{{ {resource,<<"/">>,queue, > <<"topic.hello.world">>}]],}} > > \{{ [],[],[],[],[],[]}}},}} > > \{{ {state,}} > > \{{ {dict,1,16,16,8,80,48, > {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} > > {{ {{[],[],[],[],[],[],[],[],[],}} > > {{ > [[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],}} > > \{{ [],[],[],[],[],[]}}},}} > > \{{ erlang},}} > > \{{ {dict,1,16,16,8,80,48, > {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} > > {{ {{[],[],[],[],[],[],[],[],[],[],[],}} > > \{{ [[<<99,116,97,103,45,0,0,0,0>>|}} > > {{ {{amqqueue,}} > > \{{ {resource,<<"/">>,queue, > <<"topic.hello.world">>},}} > > \{{ false,false,none,[],<0.583.0>,[],[],[],}} > > \{{ undefined,[],[],live,0},}} > > \{{ {false,65535,false, > [{<<"x-credit">>,table, > [{<<"credit">>,long,0},}} > > \{{ {<<"drain">>,boolean,false}]}]}}]],}} > > \{{ [],[],[],[]}}},}} > > \{{ {dict,1,16,16,8,80,48, > {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} > > {{ {{[],[],[],[],[],[],[],[],[],}} > > \{{ [[<0.583.0>|}} > > \{{ > {1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],}} > > \{{ [],[],[],[],[],[]}}},}} > > \{{ {set,1,16,16,8,80,48, > {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}} > > {{ {{[],[],[],[],[],[],[],[],[],}} > > {{ [<0.583.0>],}} > > \{{ [],[],[],[],[],[]}}},}} > > \{{ <0.672.0>,}} > > \{{ {state,fine,5000, > #Ref<0.4039704202.3895984129.147347>},}} > > \{{ false,1,}} > > \{{ }}{{0,nil},{0,nil}}{{,}} > > \{{ [],}} > > {{ {{0,nil},{0,nil}},}} > > \{{ [{<<"publisher_confirms">>,bool,true},}} > > \{{ {<<"exchange_exchange_bindings">>,bool,true},}} > > \{{ {<<"basic.nack">>,bool,true},}} > > \{{ {<<"consumer_cancel_notify">>,bool,true},}} > > \{{ {<<"connection.blocked">>,bool,true},}} > > \{{ > {<<"authentication_failure_close">>,bool,true}],}} > > \{{ none,65535,none,flow,[]}}} > > \{{ ** Reason for termination == }} > > \{{ ** {{badmatch,{empty,{[],[]} > > }}, > > [\{amqp_channel,rpc_bottom_half,2, [ > {file,"src/amqp_channel.erl"} > > ,\{line,623}]}, > > > > \{amqp_channel,handle_method_from_server1,3, [ > {file,"src/amqp_channel.erl"} > > ,\{line,800}]}, > > > > \{gen_server,try_dispatch,4,[ {file,"gen_server.erl"} > > ,\{line,616}]}, > > > > \{gen_server,handle_msg,6,[ {file,"gen_server.erl"} > ,\{line,686}]}, > > {proc_lib,init_p_do_apply,3,[ \\{file,"proc_lib.erl"} > ,\{line,247}]}]} > > > gdb backtrace: > (gdb) r > Starting program: /mnt/user/dev/rabbit_mq_test/hello_world > [Thread debugging using libthread_db enabled] > Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". > broker: localhost:5672 > address: topic.hello.world > connectionOptions: > [New Thread 0x7ffff3d53700 (LWP 26650)] > [New Thread 0x7ffff3340700 (LWP 26651)] > [New Thread 0x7ffff2b3f700 (LWP 26652)] > Pre Receive > ^C > Thread 1 "hello_world" received signal SIGINT, Interrupt. > 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, > expected=0, futex_word=0x555555785d04) at > ../sysdeps/unix/sysv/linux/futex-internal.h:88 > 88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory. > (gdb) bt > #0 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, > expected=0, futex_word=0x555555785d04) at > ../sysdeps/unix/sysv/linux/futex-internal.h:88 > #1 __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, > cond=0x555555785cd8) at pthread_cond_wait.c:502 > #2 __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at > pthread_cond_wait.c:655 > #3 0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, > mutex=...) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59 > #4 0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at > /home/user/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41 > #5 0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait > (this=0x5555557858e0) at > /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706 > #6 0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait > (this=0x5555557858e0, ssn=..., lnk=...) at > /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721 > #7 0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch > (this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...) > at > /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271 > #8 0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch > (this=0x55555579b200, message=..., timeout=...) at > /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55 > #9 0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch > (this=0x55555579b200, timeout=...) at > /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61 > #10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch > (this=0x7fffffffe780, timeout=...) at > /home/user/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52 > #11 0x000055555555589c in main () > (gdb) > -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org