Seth Zegelstein created QPID-8347: ------------------------------------- Summary: Receive() blocks forever once timeout has been exceeded with a RabbitMQ Broker Key: QPID-8347 URL: https://issues.apache.org/jira/browse/QPID-8347 Project: Qpid Issue Type: Bug Components: C++ Client Affects Versions: qpid-cpp-1.39.0 Environment: OS: Ubuntu Bionic
QPID C++ version 1.39.0 QPID Proton version 0.28.0 RabbitMQ Broker version 3.6.10 Erlang version 20.2.2 RabbimtMQ AMQP 1.0 plugin Version 3.6.10 Reporter: Seth Zegelstein Hello All, I am trying to use QPID C++ messaging API with a RabbitMQ Broker communicating over AMQP 1.0 protocol. When the receive() call times out, the RabbitMQ Broker throws an error and closes the session. The QPID C++ client then hangs indefinitely in the receive call. Example Code (Modified HelloWorld example): #include <qpid/messaging/Connection.h> #include <qpid/messaging/Message.h> #include <qpid/messaging/Receiver.h> #include <qpid/messaging/Sender.h> #include <qpid/messaging/Session.h> #include <iostream> using namespace qpid::messaging; int main(int argc, char** argv) { std::string broker = argc > 1 ? argv[1] : "localhost:5672"; std::cout << "broker: " << broker << std::endl; std::string address = argc > 2 ? argv[2] : "topic.hello.world"; std::cout << "address: " << address << std::endl; std::string connectionOptions = argc > 3 ? argv[3] : ""; std::cout << "connectionOptions: " << connectionOptions << std::endl; try { Connection connection(broker, connectionOptions); connection.open(); Session session = connection.createSession(); Receiver receiver = session.createReceiver(address); Message message; std::cout << "Pre Receive" << std::endl; message = receiver.fetch(Duration::SECOND * 10); std::cout << "Post Receive" << std::endl; session.acknowledge(); connection.close(); return 0; } catch(const std::exception& error) { std::cerr << error.what() << std::endl; return 1; } } Server Error (Occurs after 10 second timeout): =INFO REPORT==== 23-Jul-2019::17:49:54 === accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672) =ERROR REPORT==== 23-Jul-2019::17:49:54 === closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672): {bad_version,\{1,1,0,10}} =INFO REPORT==== 23-Jul-2019::17:49:54 === accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672) =ERROR REPORT==== 23-Jul-2019::17:50:04 === ** Generic server <0.675.0> terminating ** Last message in was {send_command, {'basic.credit_drained', <<99,116,97,103,45,0,0,0,0>>, 1}} ** When Server state == {state,1,<0.669.0>,<0.674.0>,direct, {[],[]}, false,<0.678.0>,none,none,0,true,none, {0,nil}, {0,nil}, true,false} ** Reason for termination == ** \{{badmatch,{empty,{[],[]}}}, [{amqp_channel,rpc_bottom_half,2, [\{file,"src/amqp_channel.erl"},\{line,623}]}, {amqp_channel,handle_method_from_server1,3, [\{file,"src/amqp_channel.erl"},\{line,800}]}, {gen_server,try_dispatch,4,[\{file,"gen_server.erl"},\{line,616}]}, {gen_server,handle_msg,6,[\{file,"gen_server.erl"},\{line,686}]}, {proc_lib,init_p_do_apply,3,[\{file,"proc_lib.erl"},\{line,247}]}]} =WARNING REPORT==== 23-Jul-2019::17:50:04 === Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): {{badmatch, {empty, {[], []}}}, [{amqp_channel, rpc_bottom_half, 2, [{file, "src/amqp_channel.erl"}, {line, 623}]}, {amqp_channel, handle_method_from_server1, 3, [{file, "src/amqp_channel.erl"}, {line, 800}]}, {gen_server, try_dispatch, 4, [{file, "gen_server.erl"}, {line, 616}]}, {gen_server, handle_msg, 6, [{file, "gen_server.erl"}, {line, 686}]}, {proc_lib, init_p_do_apply, 3, [{file, "proc_lib.erl"}, {line, 247}]}]} =ERROR REPORT==== 23-Jul-2019::17:50:04 === ** Generic server <0.678.0> terminating ** Last message in was {'EXIT',<0.675.0>, {\{badmatch,{empty,{[],[]}}}, [{amqp_channel,rpc_bottom_half,2, [\{file,"src/amqp_channel.erl"},\{line,623}]}, {amqp_channel,handle_method_from_server1,3, [\{file,"src/amqp_channel.erl"},\{line,800}]}, {gen_server,try_dispatch,4, [\{file,"gen_server.erl"},\{line,616}]}, {gen_server,handle_msg,6, [\{file,"gen_server.erl"},\{line,686}]}, {proc_lib,init_p_do_apply,3, [\{file,"proc_lib.erl"},\{line,247}]}]}} ** When Server state == {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>, <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>, {lstate,<0.677.0>,false}, none,1, {[],[]}, {user,<<"guest">>, [administrator], [\{rabbit_auth_backend_internal,none}]}, <<"/">>,<<>>, {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}, {{[],[],[],[],[],[],[],[],[], [[<0.583.0>| {resource,<<"/">>,queue, <<"topic.hello.world">>}]], [],[],[],[],[],[]}}}, {state, {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}, {{[],[],[],[],[],[],[],[],[], [[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]], [],[],[],[],[],[]}}}, erlang}, {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}, {{[],[],[],[],[],[],[],[],[],[],[], [[<<99,116,97,103,45,0,0,0,0>>| {{amqqueue, {resource,<<"/">>,queue, <<"topic.hello.world">>}, false,false,none,[],<0.583.0>,[],[],[], undefined,[],[],live,0}, {false,65535,false, [{<<"x-credit">>,table, [\{<<"credit">>,long,0}, {<<"drain">>,boolean,false}]}]}}]], [],[],[],[]}}}, {dict,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}, {{[],[],[],[],[],[],[],[],[], [[<0.583.0>| {1,\{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]], [],[],[],[],[],[]}}}, {set,1,16,16,8,80,48, {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]}, {{[],[],[],[],[],[],[],[],[], [<0.583.0>], [],[],[],[],[],[]}}}, <0.672.0>, {state,fine,5000, #Ref<0.4039704202.3895984129.147347>}, false,1, {\{0,nil},\{0,nil}}, [], {\{0,nil},\{0,nil}}, [\{<<"publisher_confirms">>,bool,true}, {<<"exchange_exchange_bindings">>,bool,true}, {<<"basic.nack">>,bool,true}, {<<"consumer_cancel_notify">>,bool,true}, {<<"connection.blocked">>,bool,true}, {<<"authentication_failure_close">>,bool,true}], none,65535,none,flow,[]} ** Reason for termination == ** \{{badmatch,{empty,{[],[]}}}, [{amqp_channel,rpc_bottom_half,2, [\{file,"src/amqp_channel.erl"},\{line,623}]}, {amqp_channel,handle_method_from_server1,3, [\{file,"src/amqp_channel.erl"},\{line,800}]}, {gen_server,try_dispatch,4,[\{file,"gen_server.erl"},\{line,616}]}, {gen_server,handle_msg,6,[\{file,"gen_server.erl"},\{line,686}]}, {proc_lib,init_p_do_apply,3,[\{file,"proc_lib.erl"},\{line,247}]}]} gdb backtrace: (gdb) r Starting program: /mnt/e339307/dev/rabbit_mq_test/hello_world [Thread debugging using libthread_db enabled] Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1". broker: localhost:5672 address: topic.hello.world connectionOptions: [New Thread 0x7ffff3d53700 (LWP 26650)] [New Thread 0x7ffff3340700 (LWP 26651)] [New Thread 0x7ffff2b3f700 (LWP 26652)] Pre Receive ^C Thread 1 "hello_world" received signal SIGINT, Interrupt. 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88 88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory. (gdb) bt #0 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, expected=0, futex_word=0x555555785d04) at ../sysdeps/unix/sysv/linux/futex-internal.h:88 #1 __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, cond=0x555555785cd8) at pthread_cond_wait.c:502 #2 __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at pthread_cond_wait.c:655 #3 0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, mutex=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59 #4 0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at /home/e339307/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41 #5 0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706 #6 0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait (this=0x5555557858e0, ssn=..., lnk=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721 #7 0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch (this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271 #8 0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, message=..., timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55 #9 0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch (this=0x55555579b200, timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61 #10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch (this=0x7fffffffe780, timeout=...) at /home/e339307/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52 #11 0x000055555555589c in main () (gdb) -- This message was sent by Atlassian JIRA (v7.6.14#76016) --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@qpid.apache.org For additional commands, e-mail: dev-h...@qpid.apache.org