Antal Ispánovity created AMQCPP-649:
---------------------------------------

             Summary: cms::MessageConsumer with pending messages cannot be 
closed 
                 Key: AMQCPP-649
                 URL: https://issues.apache.org/jira/browse/AMQCPP-649
             Project: ActiveMQ C++ Client
          Issue Type: Bug
          Components: CMS Impl
    Affects Versions: 3.9.4
         Environment: The above stacktraces are produced with Visual Studio 2013

The issue is reproduced on :
 * MS Windows 10 x86_64 + Visual Studio 2013
 * Void Linux x86_64 + g++ 9.3
            Reporter: Antal Ispánovity
            Assignee: Timothy A. Bish


Dear Support,

 

I found an issue that cms::MessageConsumer cannot be deleted and therefore the 
cms::Session and cms::Connection cannot be closed while there are still 
incoming messages.

I can reproduce the problem with a slightly modified version of the example 
code posted here:

[https://activemq.apache.org/components/cms/example]

 

Here is the required modification:

Close the 'connection' in the onMessage(const Message* message)  function like 
this:

*original:*

 
{code:java}
count++;
{code}
 

 

*modified:*

 
{code:java}
count++;
if (connection)
{
    connection->close();
    delete connection;
    connection = nullptr;
}
else
{
    return;
}{code}
This is not the exact code that I used in our production environment, but 
unfortunately I cannot share that code with you.

 

What happens:

the 
{code:java}
connection->close();{code}
hangs. Here is the stacktrace from the modified example application:

  [External Code]   [External Code]   
activemq-cppd.dll!decaf::internal::util::concurrent::PlatformThread::interruptibleWaitOnCondition(void
 * condition, _RTL_CRITICAL_SECTION * mutex, 
decaf::internal::util::concurrent::CompletionCondition & complete) Line 282 C++ 
 
activemq-cppd.dll!decaf::internal::util::concurrent::Threading::join(decaf::internal::util::concurrent::ThreadHandle
 * thread, __int64 mills, int nanos) Line 1147 C++  
activemq-cppd.dll!decaf::internal::util::concurrent::Threading::destroyThread(decaf::internal::util::concurrent::ThreadHandle
 * thread) Line 946 C++  activemq-cppd.dll!decaf::lang::Thread::~Thread() Line 
131 C++  [External Code]   
activemq-cppd.dll!decaf::lang::Pointer<decaf::lang::Thread,decaf::util::concurrent::atomic::AtomicRefCounter>::onDeleteFunc(decaf::lang::Thread
 * value) Line 317 C++  
activemq-cppd.dll!decaf::lang::Pointer<decaf::lang::Thread,decaf::util::concurrent::atomic::AtomicRefCounter>::~Pointer<decaf::lang::Thread,decaf::util::concurrent::atomic::AtomicRefCounter>()
 Line 148 C++  
activemq-cppd.dll!activemq::threads::DedicatedTaskRunner::~DedicatedTaskRunner()
 Line 46 C++  [External Code]   
activemq-cppd.dll!decaf::lang::Pointer<activemq::threads::TaskRunner,decaf::util::concurrent::atomic::AtomicRefCounter>::onDeleteFunc(activemq::threads::TaskRunner
 * value) Line 317 C++  
activemq-cppd.dll!decaf::lang::Pointer<activemq::threads::TaskRunner,decaf::util::concurrent::atomic::AtomicRefCounter>::~Pointer<activemq::threads::TaskRunner,decaf::util::concurrent::atomic::AtomicRefCounter>()
 Line 148 C++  
activemq-cppd.dll!activemq::core::ActiveMQSessionExecutor::stop() Line 153 C++  
activemq-cppd.dll!activemq::core::kernels::ActiveMQSessionKernel::stop() Line 
1121 C++  activemq-cppd.dll!activemq::core::ActiveMQConnection::stop() Line 898 
C++> activemq-cppd.dll!activemq::core::ActiveMQConnection::close() Line 695 C++ 
 consumer_producer.exe!HelloWorldConsumer::onMessage(const cms::Message * 
message) Line 281 C++  
activemq-cppd.dll!activemq::core::kernels::ActiveMQConsumerKernel::dispatch(const
 
decaf::lang::Pointer<activemq::commands::MessageDispatch,decaf::util::concurrent::atomic::AtomicRefCounter>
 & dispatch) Line 1646 C++  
activemq-cppd.dll!activemq::core::ActiveMQSessionExecutor::dispatch(const 
decaf::lang::Pointer<activemq::commands::MessageDispatch,decaf::util::concurrent::atomic::AtomicRefCounter>
 & dispatch) Line 166 C++  
activemq-cppd.dll!activemq::core::ActiveMQSessionExecutor::iterate() Line 192 
C++  activemq-cppd.dll!activemq::threads::DedicatedTaskRunner::run() Line 141 
C++  activemq-cppd.dll!decaf::lang::Thread::run() Line 143 C++  
activemq-cppd.dll!`anonymous namespace'::runCallback(void * arg) Line 268 C++  
activemq-cppd.dll!`anonymous namespace'::threadEntryMethod(void * arg) Line 258 
C++  [External Code]

 

In our production environment the Connection is closed outside of the onMessage 
function and in that case the following is the stacktrace on the blocked thread:

  [External Code]   [External Code] > 
activemq-cppd.dll!decaf::internal::util::concurrent::PlatformThread::waitOnCondition(void
 * condition, _RTL_CRITICAL_SECTION * mutex) Line 254 C++  
activemq-cppd.dll!`anonymous 
namespace'::doMonitorEnter(decaf::internal::util::concurrent::MonitorHandle * 
monitor, decaf::internal::util::concurrent::ThreadHandle * thread) Line 664 C++ 
 
activemq-cppd.dll!decaf::internal::util::concurrent::Threading::enterMonitor(decaf::internal::util::concurrent::MonitorHandle
 * monitor) Line 1541 C++  
activemq-cppd.dll!decaf::util::concurrent::Mutex::lock() Line 130 C++  
activemq-cppd.dll!decaf::util::AbstractCollection<decaf::lang::Pointer<activemq::commands::MessageDispatch,decaf::util::concurrent::atomic::AtomicRefCounter>
 >::lock() Line 346 C++  
activemq-cppd.dll!decaf::util::concurrent::Lock::lock() Line 55 C++  
activemq-cppd.dll!decaf::util::concurrent::Lock::Lock(decaf::util::concurrent::Synchronizable
 * object, const bool intiallyLocked) Line 32 C++  
activemq-cppd.dll!activemq::core::FifoMessageDispatchChannel::close() Line 131 
C++  
activemq-cppd.dll!activemq::core::kernels::ActiveMQConsumerKernel::dispose() 
Line 972 C++  
activemq-cppd.dll!activemq::core::kernels::ActiveMQConsumerKernel::doClose() 
Line 894 C++  
activemq-cppd.dll!activemq::core::kernels::ActiveMQConsumerKernel::close() Line 
875 C++  
activemq-cppd.dll!activemq::core::ActiveMQConsumer::~ActiveMQConsumer() Line 74 
C++  [External Code]

...

 

The execution is waiting at this line: 

::WaitForSingleObject(condition, INFINITE);

condition's value is: 0x0000000000000484

mutex'x value is:

- mutex 0x0000029337da8ff0 \{DebugInfo=0xffffffffffffffff {Type=??? 
CreatorBackTraceIndex=??? CriticalSection=...} ...} _RTL_CRITICAL_SECTION *- 
mutex 0x0000029337da8ff0 \{DebugInfo=0xffffffffffffffff {Type=??? 
CreatorBackTraceIndex=??? CriticalSection=...} ...} _RTL_CRITICAL_SECTION *- 
DebugInfo 0xffffffffffffffff \{Type=??? CreatorBackTraceIndex=??? 
CriticalSection=??? ...} _RTL_CRITICAL_SECTION_DEBUG * Type <Unable to read 
memory> CreatorBackTraceIndex <Unable to read memory> CriticalSection <Unable 
to read memory> + ProcessLocksList \{Flink=??? Blink=??? } _LIST_ENTRY 
EntryCount <Unable to read memory> ContentionCount <Unable to read memory> 
Flags <Unable to read memory> CreatorBackTraceIndexHigh <Unable to read memory> 
SpareWORD <Unable to read memory> LockCount -1 long RecursionCount 0 long 
OwningThread 0x0000000000000000 void * LockSemaphore 0x0000000000000000 void * 
SpinCount 33556432 unsigned __int64

 

 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to