[ 
https://issues.apache.org/jira/browse/QPID-8347?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seth Zegelstein updated QPID-8347:
----------------------------------
    Description: 
Hello All,

I am trying to use QPID C++ messaging API with a RabbitMQ Broker communicating 
over AMQP 1.0 protocol. When the receive() call times out, the RabbitMQ Broker 
throws an error and closes the session. The QPID C++ client then hangs 
indefinitely in the receive call. I believe that this is a bug in the QPID C++ 
API because it should always respect the client provided timeout, no mater what 
the Broker does. I also wrote a bug for RabbitMQ AMQP 1.0 plugin: 
[https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/90]

 

Example Code (Modified HelloWorld example):

#include <qpid/messaging/Connection.h>

#include <qpid/messaging/Message.h>

#include <qpid/messaging/Receiver.h>

#include <qpid/messaging/Sender.h>

#include <qpid/messaging/Session.h>

#include <iostream>

 

using namespace qpid::messaging;

 

int main(int argc, char** argv) {

    std::string broker = argc > 1 ? argv[1] : "localhost:5672";

    std::cout << "broker: " << broker << std::endl;

    std::string address = argc > 2 ? argv[2] : "topic.hello.world";

    std::cout << "address: " << address << std::endl;

    std::string connectionOptions = argc > 3 ? argv[3] : "";

    std::cout << "connectionOptions: " << connectionOptions << std::endl;

 

    try

{                 Connection connection(broker, connectionOptions);         
connection.open();         Session session = connection.createSession();        
  Receiver receiver = session.createReceiver(address);         Message message; 
        std::cout << "Pre Receive" << std::endl;         message = 
receiver.fetch(Duration::SECOND * 10);         std::cout << "Post Receive" << 
std::endl;         session.acknowledge();           connection.close();         
return 0;     }

catch(const std::exception& error)

{         std::cerr << error.what() << std::endl;         return 1;     }

}

}

 

Server Error (Occurs after 10 second timeout):

=INFO REPORT==== 23-Jul-2019::17:49:54 ===

accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672)

 

=ERROR REPORT==== 23-Jul-2019::17:49:54 ===

closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672):

{bad_version,\{1,1,0,10}}

 

=INFO REPORT==== 23-Jul-2019::17:49:54 ===

accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672)

 

=ERROR REPORT==== 23-Jul-2019::17:50:04 ===
 * 
 ** Generic server <0.675.0> terminating 

 * 
 ** Last message in was {send_command,

                           {'basic.credit_drained',

                               <<99,116,97,103,45,0,0,0,0>>,

                               1}}
 * 
 ** When Server state == {state,1,<0.669.0>,<0.674.0>,direct,

                              

{[],[]},
 
                                false,<0.678.0>,none,none,0,true,none,
 
                                \{0,nil},
 
                                \{0,nil},
 
                                true,false}
 
 ** Reason for termination == 
 
 ** \{{badmatch,{empty,{[],[]}

}},

    [

{amqp_channel,rpc_bottom_half,2,                    [

{file,"src/amqp_channel.erl"},\{line,623}]},
 
      \{amqp_channel,handle_method_from_server1,3,                    
[{file,"src/amqp_channel.erl"}

,

{line,800}

]},

     {gen_server,try_dispatch,4,

{line,616}

]},

     {gen_server,handle_msg,6,[

{file,"gen_server.erl"},//\{line,686}" class="external-link" 
rel="nofollow">\\\\\\{file,"gen_server.erl"}
 
 ,\{line,616}]},
 
      {gen_server,handle_msg,6,[
 \{file,"gen_server.erl"}

,\\\\\\{line,686}},

     {proc_lib,init_p_do_apply,3,[\\\\

{file,"proc_lib.erl"},\\\\\\{line,247}|file://\{file,/]}]}
 
  
 
 =WARNING REPORT==== 23-Jul-2019::17:50:04 ===
 
 Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): 
{{badmatch,
 
                                                                          
\{empty,                                                                        
   {[],                                                                         
   []}}},
 
                                                                         
[\{amqp_channel,                                                                
           rpc_bottom_half,                                                     
                      2,                                                        
                   [{file,                                                      
                       "src/amqp_channel.erl"},
 
                                                                            
\{line,                                                                         
    623}]},
 
                                                                          
\{amqp_channel,                                                                 
          handle_method_from_server1,                                           
                                3,                                              
                             [{file,                                            
                                 "src/amqp_channel.erl"},
 
                                                                            
\{line,                                                                         
    800}]},
 
                                                                          
\{gen_server,                                                                   
        try_dispatch,                                                           
                4,                                                              
             [{file,                                                            
                 "gen_server.erl"},
 
                                                                            
\{line,                                                                         
    616}]},
 
                                                                          
\{gen_server,                                                                   
        handle_msg,                                                             
              6,                                                                
           [{file,                                                              
               "gen_server.erl"},
 
                                                                            
\{line,                                                                         
    686}]},
 
                                                                          
\{proc_lib,                                                                     
      init_p_do_apply,                                                          
                 3,                                                             
              [{file,                                                           
                  "proc_lib.erl"},
 
                                                                            
\{line,                                                                         
    247}]}]}
 
  
 
 =ERROR REPORT==== 23-Jul-2019::17:50:04 ===
 
 ** Generic server <0.678.0> terminating
 
 ** Last message in was {'EXIT',<0.675.0>,
 
                            \{{badmatch,{empty,{[],[]}}},}}
 
 \{{                             [{amqp_channel,rpc_bottom_half,2,              
                    [ {file,"src/amqp_channel.erl"},\\\\\\\{line,623}]},}}
 
 \{{                              {amqp_channel,handle_method_from_server1,3,   
                               [{file,"src/amqp_channel.erl"},\{line,800}
 
 ]},}}
 
 \{{                              {gen_server,try_dispatch,4,                   
               [ \{file,"gen_server.erl"}
 
 , \{line,616}]},}}
 
 {{                              {gen_server,handle_msg,6,                      
            [
 \{file,"gen_server.erl"}
 
 ,
 \{line,686}
 
 ]},}}
 
 \{{                              {proc_lib,init_p_do_apply,3,                  
                [ \{file,"proc_lib.erl"}
 
 ,
 \{line,247}
 
 ]}]}}}}
 
 {{ ** When Server state ==
 
 {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>,                          
<0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>,                          
\{lstate,<0.677.0>,false}
 
 ,}}
 
 \{{                          none,1,}}
 
 {{                         
 \{[],[]},}}
 
 \{{                          {user,<<"guest">>,                           
[administrator],                           
[\\\{rabbit_auth_backend_internal,none}|file://\{rabbit_auth_backend_internal,none}/]},}}
 
 \{{                          <<"/">>,<<>>,}}
 
 \{{                          {dict,1,16,16,8,80,48,                           
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                           {{[],[],[],[],[],[],[],[],[],}}
 
 \{{                             [[<0.583.0>|}}
 
 \{{                               {resource,<<"/">>,queue,                     
           <<"topic.hello.world">>}]],}}
 
 \{{                             [],[],[],[],[],[]}}},}}
 
 \{{                          {state,}}
 
 \{{                           {dict,1,16,16,8,80,48,                           
 {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                            {{[],[],[],[],[],[],[],[],[],}}
 
 {{                              
[[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],}}
 
 \{{                              [],[],[],[],[],[]}}},}}
 
 \{{                           erlang},}}
 
 \{{                          {dict,1,16,16,8,80,48,                           
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                           {{[],[],[],[],[],[],[],[],[],[],[],}}
 
 \{{                             [[<<99,116,97,103,45,0,0,0,0>>|}}
 
 {{                               {{amqqueue,}}
 
 \{{                                 {resource,<<"/">>,queue,                   
               <<"topic.hello.world">>},}}
 
 \{{                                 false,false,none,[],<0.583.0>,[],[],[],}}
 
 \{{                                 undefined,[],[],live,0},}}
 
 \{{                                {false,65535,false,                         
        [{<<"x-credit">>,table,                                   
[{<<"credit">>,long,0},}}
 
 \{{                                    {<<"drain">>,boolean,false}]}]}}]],}}
 
 \{{                             [],[],[],[]}}},}}
 
 \{{                          {dict,1,16,16,8,80,48,                           
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                           {{[],[],[],[],[],[],[],[],[],}}
 
 \{{                             [[<0.583.0>|}}
 
 \{{                               
{1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],}}
 
 \{{                             [],[],[],[],[],[]}}},}}
 
 \{{                          {set,1,16,16,8,80,48,                           
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                           {{[],[],[],[],[],[],[],[],[],}}
 
 {{                             [<0.583.0>],}}
 
 \{{                             [],[],[],[],[],[]}}},}}
 
 \{{                          <0.672.0>,}}
 
 \{{                          {state,fine,5000,                           
#Ref<0.4039704202.3895984129.147347>},}}
 
 \{{                          false,1,}}
 
 \{{                          }}{{0,nil},{0,nil}}{{,}}
 
 \{{                          [],}}
 
 {{                          {{0,nil},{0,nil}},}}
 
 \{{                          [{<<"publisher_confirms">>,bool,true},}}
 
 \{{                           {<<"exchange_exchange_bindings">>,bool,true},}}
 
 \{{                           {<<"basic.nack">>,bool,true},}}
 
 \{{                           {<<"consumer_cancel_notify">>,bool,true},}}
 
 \{{                           {<<"connection.blocked">>,bool,true},}}
 
 \{{                           
{<<"authentication_failure_close">>,bool,true}],}}
 
 \{{                          none,65535,none,flow,[]}}}
 
 \{{ ** Reason for termination == }}
 
 \{{ ** {{badmatch,{empty,{[],[]}
 
 }},
 
     [\{amqp_channel,rpc_bottom_half,2,                    [ 
\{file,"src/amqp_channel.erl"}
 
 ,\{line,623}]},
 
     
 
 \{amqp_channel,handle_method_from_server1,3,                    [ 
{file,"src/amqp_channel.erl"}
 
 ,\{line,800}]},
 
     
 
 \{gen_server,try_dispatch,4,[ {file,"gen_server.erl"}
 
 ,\{line,616}]},
 
     
 
 {gen_server,handle_msg,6,[
 \{file,"gen_server.erl"}
 
 ,\{line,686}]},
 
     
 
 {proc_lib,init_p_do_apply,3,[ 
 \{file,"proc_lib.erl"}

,\{line,247}]}]}

 

 

gdb backtrace:

(gdb) r

Starting program: /mnt/user/dev/rabbit_mq_test/hello_world 

[Thread debugging using libthread_db enabled]

Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".

broker: localhost:5672

address: topic.hello.world

connectionOptions: 

[New Thread 0x7ffff3d53700 (LWP 26650)]

[New Thread 0x7ffff3340700 (LWP 26651)]

[New Thread 0x7ffff2b3f700 (LWP 26652)]

Pre Receive

^C

Thread 1 "hello_world" received signal SIGINT, Interrupt.

0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, 
expected=0, futex_word=0x555555785d04) at 
../sysdeps/unix/sysv/linux/futex-internal.h:88

88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory.

(gdb) bt

#0  0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, 
expected=0, futex_word=0x555555785d04) at 
../sysdeps/unix/sysv/linux/futex-internal.h:88

#1  __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, 
cond=0x555555785cd8) at pthread_cond_wait.c:502

#2  __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at 
pthread_cond_wait.c:655

#3  0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, 
mutex=...) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59

#4  0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at 
/home/user/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41

#5  0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait 
(this=0x5555557858e0) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706

#6  0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait 
(this=0x5555557858e0, ssn=..., lnk=...) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721

#7  0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch 
(this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...)

    at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271

#8  0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch 
(this=0x55555579b200, message=..., timeout=...) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55

#9  0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch 
(this=0x55555579b200, timeout=...) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61

#10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch 
(this=0x7fffffffe780, timeout=...) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52

#11 0x000055555555589c in main ()

(gdb) 

 

  was:
Hello All,

I am trying to use QPID C++ messaging API with a RabbitMQ Broker communicating 
over AMQP 1.0 protocol. When the receive() call times out, the RabbitMQ Broker 
throws an error and closes the session. The QPID C++ client then hangs 
indefinitely in the receive call. I believe that this is a bug in the QPID C++ 
API because it should always respect the client provided timeout, no mater what 
the Broker does. (Sorry about formatting, Copy and paste didn't work out very 
well)

 

Example Code (Modified HelloWorld example):

#include <qpid/messaging/Connection.h>

#include <qpid/messaging/Message.h>

#include <qpid/messaging/Receiver.h>

#include <qpid/messaging/Sender.h>

#include <qpid/messaging/Session.h>

#include <iostream>

 

using namespace qpid::messaging;

 

int main(int argc, char** argv) {

    std::string broker = argc > 1 ? argv[1] : "localhost:5672";

    std::cout << "broker: " << broker << std::endl;

    std::string address = argc > 2 ? argv[2] : "topic.hello.world";

    std::cout << "address: " << address << std::endl;

    std::string connectionOptions = argc > 3 ? argv[3] : "";

    std::cout << "connectionOptions: " << connectionOptions << std::endl;

 

    try

{                 Connection connection(broker, connectionOptions);         
connection.open();         Session session = connection.createSession();        
  Receiver receiver = session.createReceiver(address);         Message message; 
        std::cout << "Pre Receive" << std::endl;         message = 
receiver.fetch(Duration::SECOND * 10);         std::cout << "Post Receive" << 
std::endl;         session.acknowledge();           connection.close();         
return 0;     }

catch(const std::exception& error)

{         std::cerr << error.what() << std::endl;         return 1;     }

}

}

 

Server Error (Occurs after 10 second timeout):

=INFO REPORT==== 23-Jul-2019::17:49:54 ===

accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672)

 

=ERROR REPORT==== 23-Jul-2019::17:49:54 ===

closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672):

{bad_version,\{1,1,0,10}}

 

=INFO REPORT==== 23-Jul-2019::17:49:54 ===

accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672)

 

=ERROR REPORT==== 23-Jul-2019::17:50:04 ===
 * 
 ** Generic server <0.675.0> terminating 

 * 
 ** Last message in was {send_command,

                           {'basic.credit_drained',

                               <<99,116,97,103,45,0,0,0,0>>,

                               1}}
 * 
 ** When Server state == {state,1,<0.669.0>,<0.674.0>,direct,

                              

{[],[]},
 
                                false,<0.678.0>,none,none,0,true,none,
 
                                \{0,nil},
 
                                \{0,nil},
 
                                true,false}
 
 ** Reason for termination == 
 
 ** \{{badmatch,{empty,{[],[]}

}},

    [

{amqp_channel,rpc_bottom_half,2,                    [

{file,"src/amqp_channel.erl"},\{line,623}]},
 
      \{amqp_channel,handle_method_from_server1,3,                    
[{file,"src/amqp_channel.erl"}

,

{line,800}

]},

     {gen_server,try_dispatch,4,

{line,616}

]},

     {gen_server,handle_msg,6,[

{file,"gen_server.erl"},//\{line,686}" class="external-link" 
rel="nofollow">\\\\\{file,"gen_server.erl"}

,\{line,616}]},

     {gen_server,handle_msg,6,[

{file,"gen_server.erl"},\\\\\{line,686}},
 
      
{proc_lib,init_p_do_apply,3,[\\\\{file,"proc_lib.erl"},\\\\\{line,247}|file://{file,/]}]}
 
  
 
 =WARNING REPORT==== 23-Jul-2019::17:50:04 ===
 
 Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): 
{{badmatch,
 
                                                                          
\{empty,                                                                        
   {[],                                                                         
   []}}},
 
                                                                         
[\{amqp_channel,                                                                
           rpc_bottom_half,                                                     
                      2,                                                        
                   [{file,                                                      
                       "src/amqp_channel.erl"},
 
                                                                            
\{line,                                                                         
    623}]},
 
                                                                          
\{amqp_channel,                                                                 
          handle_method_from_server1,                                           
                                3,                                              
                             [{file,                                            
                                 "src/amqp_channel.erl"},
 
                                                                            
\{line,                                                                         
    800}]},
 
                                                                          
\{gen_server,                                                                   
        try_dispatch,                                                           
                4,                                                              
             [{file,                                                            
                 "gen_server.erl"},
 
                                                                            
\{line,                                                                         
    616}]},
 
                                                                          
\{gen_server,                                                                   
        handle_msg,                                                             
              6,                                                                
           [{file,                                                              
               "gen_server.erl"},
 
                                                                            
\{line,                                                                         
    686}]},
 
                                                                          
\{proc_lib,                                                                     
      init_p_do_apply,                                                          
                 3,                                                             
              [{file,                                                           
                  "proc_lib.erl"},
 
                                                                            
\{line,                                                                         
    247}]}]}
 
  
 
 =ERROR REPORT==== 23-Jul-2019::17:50:04 ===
 
 ** Generic server <0.678.0> terminating
 
 ** Last message in was {'EXIT',<0.675.0>,
 
                            {{badmatch,\{empty,{[],[]}}},}}
 
 \{{                             [{amqp_channel,rpc_bottom_half,2,              
                    [ {file,"src/amqp_channel.erl"},\\\\\\{line,623}]},}}
 
 \{{                              {amqp_channel,handle_method_from_server1,3,   
                               [{file,"src/amqp_channel.erl"},\{line,800}
 
 ]},}}
 
 {{                              {gen_server,try_dispatch,4,                    
              [
 \{file,"gen_server.erl"}

, \{line,616}]},}}

{{                              {gen_server,handle_msg,6,                       
           [

{file,"gen_server.erl"}
 
 ,
 \{line,686}
 
 ]},}}
 
 {{                              {proc_lib,init_p_do_apply,3,                   
               [
 \{file,"proc_lib.erl"}
 
 ,
 \{line,247}
 
 ]}]}}}}
 
 {{ ** When Server state ==
 
 {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>,                          
<0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>,                          
{lstate,<0.677.0>,false}
 
 ,}}
 
 \{{                          none,1,}}
 
 {{                         
 \{[],[]},}}
 
 \{{                          {user,<<"guest">>,                           
[administrator],                           
[\\\{rabbit_auth_backend_internal,none}|file://\{rabbit_auth_backend_internal,none}/]},}}
 
 \{{                          <<"/">>,<<>>,}}
 
 \{{                          {dict,1,16,16,8,80,48,                           
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                           {{[],[],[],[],[],[],[],[],[],}}
 
 \{{                             [[<0.583.0>|}}
 
 \{{                               {resource,<<"/">>,queue,                     
           <<"topic.hello.world">>}]],}}
 
 \{{                             [],[],[],[],[],[]}}},}}
 
 \{{                          {state,}}
 
 \{{                           {dict,1,16,16,8,80,48,                           
 {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                            {{[],[],[],[],[],[],[],[],[],}}
 
 {{                              
[[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],}}
 
 \{{                              [],[],[],[],[],[]}}},}}
 
 \{{                           erlang},}}
 
 \{{                          {dict,1,16,16,8,80,48,                           
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                           {{[],[],[],[],[],[],[],[],[],[],[],}}
 
 \{{                             [[<<99,116,97,103,45,0,0,0,0>>|}}
 
 {{                               {{amqqueue,}}
 
 \{{                                 {resource,<<"/">>,queue,                   
               <<"topic.hello.world">>},}}
 
 \{{                                 false,false,none,[],<0.583.0>,[],[],[],}}
 
 \{{                                 undefined,[],[],live,0},}}
 
 \{{                                {false,65535,false,                         
        [{<<"x-credit">>,table,                                   
[{<<"credit">>,long,0},}}
 
 \{{                                    {<<"drain">>,boolean,false}]}]}}]],}}
 
 \{{                             [],[],[],[]}}},}}
 
 \{{                          {dict,1,16,16,8,80,48,                           
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                           {{[],[],[],[],[],[],[],[],[],}}
 
 \{{                             [[<0.583.0>|}}
 
 \{{                               
{1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],}}
 
 \{{                             [],[],[],[],[],[]}}},}}
 
 \{{                          {set,1,16,16,8,80,48,                           
{[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
 
 {{                           {{[],[],[],[],[],[],[],[],[],}}
 
 {{                             [<0.583.0>],}}
 
 \{{                             [],[],[],[],[],[]}}},}}
 
 \{{                          <0.672.0>,}}
 
 \{{                          {state,fine,5000,                           
#Ref<0.4039704202.3895984129.147347>},}}
 
 \{{                          false,1,}}
 
 \{{                          }}{{0,nil},{0,nil}}{{,}}
 
 \{{                          [],}}
 
 {{                          {{0,nil},{0,nil}},}}
 
 \{{                          [{<<"publisher_confirms">>,bool,true},}}
 
 \{{                           {<<"exchange_exchange_bindings">>,bool,true},}}
 
 \{{                           {<<"basic.nack">>,bool,true},}}
 
 \{{                           {<<"consumer_cancel_notify">>,bool,true},}}
 
 \{{                           {<<"connection.blocked">>,bool,true},}}
 
 \{{                           
{<<"authentication_failure_close">>,bool,true}],}}
 
 \{{                          none,65535,none,flow,[]}}}
 
 \{{ ** Reason for termination == }}
 
 \{{ ** {{badmatch,{empty,{[],[]}
 
 }},
 
     [{amqp_channel,rpc_bottom_half,2,                    [
 \{file,"src/amqp_channel.erl"}
 
 ,\{line,623}]},
 
     
 
 {amqp_channel,handle_method_from_server1,3,                    [ 
{file,"src/amqp_channel.erl"}
 
 ,\{line,800}]},
 
     
 
 {gen_server,try_dispatch,4,[ 
{file,"gen_server.erl"}

,\{line,616}]},

    

{gen_server,handle_msg,6,[

{file,"gen_server.erl"}

,\{line,686}]},

    

{proc_lib,init_p_do_apply,3,[ 

{file,"proc_lib.erl"}

,\{line,247}]}]}

 

 

gdb backtrace:

(gdb) r

Starting program: /mnt/user/dev/rabbit_mq_test/hello_world 

[Thread debugging using libthread_db enabled]

Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".

broker: localhost:5672

address: topic.hello.world

connectionOptions: 

[New Thread 0x7ffff3d53700 (LWP 26650)]

[New Thread 0x7ffff3340700 (LWP 26651)]

[New Thread 0x7ffff2b3f700 (LWP 26652)]

Pre Receive

^C

Thread 1 "hello_world" received signal SIGINT, Interrupt.

0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, 
expected=0, futex_word=0x555555785d04) at 
../sysdeps/unix/sysv/linux/futex-internal.h:88

88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory.

(gdb) bt

#0  0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, 
expected=0, futex_word=0x555555785d04) at 
../sysdeps/unix/sysv/linux/futex-internal.h:88

#1  __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, 
cond=0x555555785cd8) at pthread_cond_wait.c:502

#2  __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at 
pthread_cond_wait.c:655

#3  0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, 
mutex=...) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59

#4  0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at 
/home/user/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41

#5  0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait 
(this=0x5555557858e0) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706

#6  0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait 
(this=0x5555557858e0, ssn=..., lnk=...) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721

#7  0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch 
(this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...)

    at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271

#8  0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch 
(this=0x55555579b200, message=..., timeout=...) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55

#9  0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch 
(this=0x55555579b200, timeout=...) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61

#10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch 
(this=0x7fffffffe780, timeout=...) at 
/home/user/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52

#11 0x000055555555589c in main ()

(gdb) 

 


> Receive() blocks forever once timeout has been exceeded with a RabbitMQ Broker
> ------------------------------------------------------------------------------
>
>                 Key: QPID-8347
>                 URL: https://issues.apache.org/jira/browse/QPID-8347
>             Project: Qpid
>          Issue Type: Bug
>          Components: C++ Client
>    Affects Versions: qpid-cpp-1.39.0
>         Environment: OS: Ubuntu Bionic
> QPID C++ version 1.39.0
> QPID Proton version 0.28.0
> RabbitMQ Broker version 3.6.10
> Erlang version 20.2.2
> RabbimtMQ AMQP 1.0 plugin Version 3.6.10
>  
>            Reporter: Seth Zegelstein
>            Priority: Blocker
>
> Hello All,
> I am trying to use QPID C++ messaging API with a RabbitMQ Broker 
> communicating over AMQP 1.0 protocol. When the receive() call times out, the 
> RabbitMQ Broker throws an error and closes the session. The QPID C++ client 
> then hangs indefinitely in the receive call. I believe that this is a bug in 
> the QPID C++ API because it should always respect the client provided 
> timeout, no mater what the Broker does. I also wrote a bug for RabbitMQ AMQP 
> 1.0 plugin: [https://github.com/rabbitmq/rabbitmq-amqp1.0/issues/90]
>  
> Example Code (Modified HelloWorld example):
> #include <qpid/messaging/Connection.h>
> #include <qpid/messaging/Message.h>
> #include <qpid/messaging/Receiver.h>
> #include <qpid/messaging/Sender.h>
> #include <qpid/messaging/Session.h>
> #include <iostream>
>  
> using namespace qpid::messaging;
>  
> int main(int argc, char** argv) {
>     std::string broker = argc > 1 ? argv[1] : "localhost:5672";
>     std::cout << "broker: " << broker << std::endl;
>     std::string address = argc > 2 ? argv[2] : "topic.hello.world";
>     std::cout << "address: " << address << std::endl;
>     std::string connectionOptions = argc > 3 ? argv[3] : "";
>     std::cout << "connectionOptions: " << connectionOptions << std::endl;
>  
>     try
> {                 Connection connection(broker, connectionOptions);         
> connection.open();         Session session = connection.createSession();      
>     Receiver receiver = session.createReceiver(address);         Message 
> message;         std::cout << "Pre Receive" << std::endl;         message = 
> receiver.fetch(Duration::SECOND * 10);         std::cout << "Post Receive" << 
> std::endl;         session.acknowledge();           connection.close();       
>   return 0;     }
> catch(const std::exception& error)
> {         std::cerr << error.what() << std::endl;         return 1;     }
> }
> }
>  
> Server Error (Occurs after 10 second timeout):
> =INFO REPORT==== 23-Jul-2019::17:49:54 ===
> accepting AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672)
>  
> =ERROR REPORT==== 23-Jul-2019::17:49:54 ===
> closing AMQP connection <0.657.0> ([::1]:40358 -> [::1]:5672):
> {bad_version,\{1,1,0,10}}
>  
> =INFO REPORT==== 23-Jul-2019::17:49:54 ===
> accepting AMQP connection <0.660.0> ([::1]:40360 -> [::1]:5672)
>  
> =ERROR REPORT==== 23-Jul-2019::17:50:04 ===
>  * 
>  ** Generic server <0.675.0> terminating 
>  * 
>  ** Last message in was {send_command,
>                            {'basic.credit_drained',
>                                <<99,116,97,103,45,0,0,0,0>>,
>                                1}}
>  * 
>  ** When Server state == {state,1,<0.669.0>,<0.674.0>,direct,
>                               
> {[],[]},
>  
>                                 false,<0.678.0>,none,none,0,true,none,
>  
>                                 \{0,nil},
>  
>                                 \{0,nil},
>  
>                                 true,false}
>  
>  ** Reason for termination == 
>  
>  ** \{{badmatch,{empty,{[],[]}
> }},
>     [
> {amqp_channel,rpc_bottom_half,2,                    [
> {file,"src/amqp_channel.erl"},\{line,623}]},
>  
>       \{amqp_channel,handle_method_from_server1,3,                    
> [{file,"src/amqp_channel.erl"}
> ,
> {line,800}
> ]},
>      {gen_server,try_dispatch,4,
> {line,616}
> ]},
>      {gen_server,handle_msg,6,[
> {file,"gen_server.erl"},//\{line,686}" class="external-link" 
> rel="nofollow">\\\\\\{file,"gen_server.erl"}
>  
>  ,\{line,616}]},
>  
>       {gen_server,handle_msg,6,[
>  \{file,"gen_server.erl"}
> ,\\\\\\{line,686}},
>      {proc_lib,init_p_do_apply,3,[\\\\
> {file,"proc_lib.erl"},\\\\\\{line,247}|file://\{file,/]}]}
>  
>   
>  
>  =WARNING REPORT==== 23-Jul-2019::17:50:04 ===
>  
>  Connection (<0.669.0>) closing: internal error in channel (<0.675.0>): 
> {{badmatch,
>  
>                                                                           
> \{empty,                                                                      
>      {[],                                                                     
>        []}}},
>  
>                                                                          
> [\{amqp_channel,                                                              
>              rpc_bottom_half,                                                 
>                           2,                                                  
>                          [{file,                                              
>                                "src/amqp_channel.erl"},
>  
>                                                                             
> \{line,                                                                       
>       623}]},
>  
>                                                                           
> \{amqp_channel,                                                               
>             handle_method_from_server1,                                       
>                                     3,                                        
>                                    [{file,                                    
>                                          "src/amqp_channel.erl"},
>  
>                                                                             
> \{line,                                                                       
>       800}]},
>  
>                                                                           
> \{gen_server,                                                                 
>           try_dispatch,                                                       
>                     4,                                                        
>                    [{file,                                                    
>                          "gen_server.erl"},
>  
>                                                                             
> \{line,                                                                       
>       616}]},
>  
>                                                                           
> \{gen_server,                                                                 
>           handle_msg,                                                         
>                   6,                                                          
>                  [{file,                                                      
>                        "gen_server.erl"},
>  
>                                                                             
> \{line,                                                                       
>       686}]},
>  
>                                                                           
> \{proc_lib,                                                                   
>         init_p_do_apply,                                                      
>                      3,                                                       
>                     [{file,                                                   
>                           "proc_lib.erl"},
>  
>                                                                             
> \{line,                                                                       
>       247}]}]}
>  
>   
>  
>  =ERROR REPORT==== 23-Jul-2019::17:50:04 ===
>  
>  ** Generic server <0.678.0> terminating
>  
>  ** Last message in was {'EXIT',<0.675.0>,
>  
>                             \{{badmatch,{empty,{[],[]}}},}}
>  
>  \{{                             [{amqp_channel,rpc_bottom_half,2,            
>                       [ {file,"src/amqp_channel.erl"},\\\\\\\{line,623}]},}}
>  
>  \{{                              {amqp_channel,handle_method_from_server1,3, 
>                                  [{file,"src/amqp_channel.erl"},\{line,800}
>  
>  ]},}}
>  
>  \{{                              {gen_server,try_dispatch,4,                 
>                  [ \{file,"gen_server.erl"}
>  
>  , \{line,616}]},}}
>  
>  {{                              {gen_server,handle_msg,6,                    
>               [
>  \{file,"gen_server.erl"}
>  
>  ,
>  \{line,686}
>  
>  ]},}}
>  
>  \{{                              {proc_lib,init_p_do_apply,3,                
>                   [ \{file,"proc_lib.erl"}
>  
>  ,
>  \{line,247}
>  
>  ]}]}}}}
>  
>  {{ ** When Server state ==
>  
>  {ch,running,rabbit_framing_amqp_0_9_1,1,<0.675.0>,                          
> <0.675.0>,<0.669.0>,<<"[::1]:40360 -> [::1]:5672">>,                          
> \{lstate,<0.677.0>,false}
>  
>  ,}}
>  
>  \{{                          none,1,}}
>  
>  {{                         
>  \{[],[]},}}
>  
>  \{{                          {user,<<"guest">>,                           
> [administrator],                           
> [\\\{rabbit_auth_backend_internal,none}|file://\{rabbit_auth_backend_internal,none}/]},}}
>  
>  \{{                          <<"/">>,<<>>,}}
>  
>  \{{                          {dict,1,16,16,8,80,48,                          
>  {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                           {{[],[],[],[],[],[],[],[],[],}}
>  
>  \{{                             [[<0.583.0>|}}
>  
>  \{{                               {resource,<<"/">>,queue,                   
>              <<"topic.hello.world">>}]],}}
>  
>  \{{                             [],[],[],[],[],[]}}},}}
>  
>  \{{                          {state,}}
>  
>  \{{                           {dict,1,16,16,8,80,48,                         
>    {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                            {{[],[],[],[],[],[],[],[],[],}}
>  
>  {{                              
> [[<0.583.0>|#Ref<0.4039704202.3895984130.68325>]],}}
>  
>  \{{                              [],[],[],[],[],[]}}},}}
>  
>  \{{                           erlang},}}
>  
>  \{{                          {dict,1,16,16,8,80,48,                          
>  {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                           {{[],[],[],[],[],[],[],[],[],[],[],}}
>  
>  \{{                             [[<<99,116,97,103,45,0,0,0,0>>|}}
>  
>  {{                               {{amqqueue,}}
>  
>  \{{                                 {resource,<<"/">>,queue,                 
>                  <<"topic.hello.world">>},}}
>  
>  \{{                                 false,false,none,[],<0.583.0>,[],[],[],}}
>  
>  \{{                                 undefined,[],[],live,0},}}
>  
>  \{{                                {false,65535,false,                       
>           [{<<"x-credit">>,table,                                   
> [{<<"credit">>,long,0},}}
>  
>  \{{                                    {<<"drain">>,boolean,false}]}]}}]],}}
>  
>  \{{                             [],[],[],[]}}},}}
>  
>  \{{                          {dict,1,16,16,8,80,48,                          
>  {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                           {{[],[],[],[],[],[],[],[],[],}}
>  
>  \{{                             [[<0.583.0>|}}
>  
>  \{{                               
> {1,{<<99,116,97,103,45,0,0,0,0>>,nil,nil}}]],}}
>  
>  \{{                             [],[],[],[],[],[]}}},}}
>  
>  \{{                          {set,1,16,16,8,80,48,                           
> {[],[],[],[],[],[],[],[],[],[],[],[],[],[],[],[]},}}
>  
>  {{                           {{[],[],[],[],[],[],[],[],[],}}
>  
>  {{                             [<0.583.0>],}}
>  
>  \{{                             [],[],[],[],[],[]}}},}}
>  
>  \{{                          <0.672.0>,}}
>  
>  \{{                          {state,fine,5000,                           
> #Ref<0.4039704202.3895984129.147347>},}}
>  
>  \{{                          false,1,}}
>  
>  \{{                          }}{{0,nil},{0,nil}}{{,}}
>  
>  \{{                          [],}}
>  
>  {{                          {{0,nil},{0,nil}},}}
>  
>  \{{                          [{<<"publisher_confirms">>,bool,true},}}
>  
>  \{{                           {<<"exchange_exchange_bindings">>,bool,true},}}
>  
>  \{{                           {<<"basic.nack">>,bool,true},}}
>  
>  \{{                           {<<"consumer_cancel_notify">>,bool,true},}}
>  
>  \{{                           {<<"connection.blocked">>,bool,true},}}
>  
>  \{{                           
> {<<"authentication_failure_close">>,bool,true}],}}
>  
>  \{{                          none,65535,none,flow,[]}}}
>  
>  \{{ ** Reason for termination == }}
>  
>  \{{ ** {{badmatch,{empty,{[],[]}
>  
>  }},
>  
>      [\{amqp_channel,rpc_bottom_half,2,                    [ 
> \{file,"src/amqp_channel.erl"}
>  
>  ,\{line,623}]},
>  
>      
>  
>  \{amqp_channel,handle_method_from_server1,3,                    [ 
> {file,"src/amqp_channel.erl"}
>  
>  ,\{line,800}]},
>  
>      
>  
>  \{gen_server,try_dispatch,4,[ {file,"gen_server.erl"}
>  
>  ,\{line,616}]},
>  
>      
>  
>  {gen_server,handle_msg,6,[
>  \{file,"gen_server.erl"}
>  
>  ,\{line,686}]},
>  
>      
>  
>  {proc_lib,init_p_do_apply,3,[ 
>  \{file,"proc_lib.erl"}
> ,\{line,247}]}]}
>  
>  
> gdb backtrace:
> (gdb) r
> Starting program: /mnt/user/dev/rabbit_mq_test/hello_world 
> [Thread debugging using libthread_db enabled]
> Using host libthread_db library "/lib/x86_64-linux-gnu/libthread_db.so.1".
> broker: localhost:5672
> address: topic.hello.world
> connectionOptions: 
> [New Thread 0x7ffff3d53700 (LWP 26650)]
> [New Thread 0x7ffff3340700 (LWP 26651)]
> [New Thread 0x7ffff2b3f700 (LWP 26652)]
> Pre Receive
> ^C
> Thread 1 "hello_world" received signal SIGINT, Interrupt.
> 0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, 
> expected=0, futex_word=0x555555785d04) at 
> ../sysdeps/unix/sysv/linux/futex-internal.h:88
> 88 ../sysdeps/unix/sysv/linux/futex-internal.h: No such file or directory.
> (gdb) bt
> #0  0x00007ffff610f9f3 in futex_wait_cancelable (private=<optimized out>, 
> expected=0, futex_word=0x555555785d04) at 
> ../sysdeps/unix/sysv/linux/futex-internal.h:88
> #1  __pthread_cond_wait_common (abstime=0x0, mutex=0x555555785cb0, 
> cond=0x555555785cd8) at pthread_cond_wait.c:502
> #2  __pthread_cond_wait (cond=0x555555785cd8, mutex=0x555555785cb0) at 
> pthread_cond_wait.c:655
> #3  0x00007ffff7ad70b1 in qpid::sys::Condition::wait (this=0x555555785cd8, 
> mutex=...) at /home/user/qpid-cpp-1.39.0/src/qpid/sys/posix/Condition.h:59
> #4  0x00007ffff7ad7333 in qpid::sys::Monitor::wait (this=0x555555785cb0) at 
> /home/user/qpid-cpp-1.39.0/src/qpid/sys/Monitor.h:41
> #5  0x00007ffff7acd008 in qpid::messaging::amqp::ConnectionContext::wait 
> (this=0x5555557858e0) at 
> /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:706
> #6  0x00007ffff7acd11a in qpid::messaging::amqp::ConnectionContext::wait 
> (this=0x5555557858e0, ssn=..., lnk=...) at 
> /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:721
> #7  0x00007ffff7ac8a5a in qpid::messaging::amqp::ConnectionContext::fetch 
> (this=0x5555557858e0, ssn=..., lnk=..., message=..., timeout=...)
>     at 
> /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ConnectionContext.cpp:271
> #8  0x00007ffff7ae7b61 in qpid::messaging::amqp::ReceiverHandle::fetch 
> (this=0x55555579b200, message=..., timeout=...) at 
> /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:55
> #9  0x00007ffff7ae7c30 in qpid::messaging::amqp::ReceiverHandle::fetch 
> (this=0x55555579b200, timeout=...) at 
> /home/user/qpid-cpp-1.39.0/src/qpid/messaging/amqp/ReceiverHandle.cpp:61
> #10 0x00007ffff7b575e4 in qpid::messaging::Receiver::fetch 
> (this=0x7fffffffe780, timeout=...) at 
> /home/user/qpid-cpp-1.39.0/src/qpid/messaging/Receiver.cpp:52
> #11 0x000055555555589c in main ()
> (gdb) 
>  



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to