Antal Ispánovity created AMQCPP-649:
---------------------------------------
Summary: cms::MessageConsumer with pending messages cannot be
closed
Key: AMQCPP-649
URL: https://issues.apache.org/jira/browse/AMQCPP-649
Project: ActiveMQ C++ Client
Issue Type: Bug
Components: CMS Impl
Affects Versions: 3.9.4
Environment: The above stacktraces are produced with Visual Studio 2013
The issue is reproduced on :
* MS Windows 10 x86_64 + Visual Studio 2013
* Void Linux x86_64 + g++ 9.3
Reporter: Antal Ispánovity
Assignee: Timothy A. Bish
Dear Support,
I found an issue that cms::MessageConsumer cannot be deleted and therefore the
cms::Session and cms::Connection cannot be closed while there are still
incoming messages.
I can reproduce the problem with a slightly modified version of the example
code posted here:
[https://activemq.apache.org/components/cms/example]
Here is the required modification:
Close the 'connection' in the onMessage(const Message* message) function like
this:
*original:*
{code:java}
count++;
{code}
*modified:*
{code:java}
count++;
if (connection)
{
connection->close();
delete connection;
connection = nullptr;
}
else
{
return;
}{code}
This is not the exact code that I used in our production environment, but
unfortunately I cannot share that code with you.
What happens:
the
{code:java}
connection->close();{code}
hangs. Here is the stacktrace from the modified example application:
[External Code] [External Code]
activemq-cppd.dll!decaf::internal::util::concurrent::PlatformThread::interruptibleWaitOnCondition(void
* condition, _RTL_CRITICAL_SECTION * mutex,
decaf::internal::util::concurrent::CompletionCondition & complete) Line 282 C++
activemq-cppd.dll!decaf::internal::util::concurrent::Threading::join(decaf::internal::util::concurrent::ThreadHandle
* thread, __int64 mills, int nanos) Line 1147 C++
activemq-cppd.dll!decaf::internal::util::concurrent::Threading::destroyThread(decaf::internal::util::concurrent::ThreadHandle
* thread) Line 946 C++ activemq-cppd.dll!decaf::lang::Thread::~Thread() Line
131 C++ [External Code]
activemq-cppd.dll!decaf::lang::Pointer<decaf::lang::Thread,decaf::util::concurrent::atomic::AtomicRefCounter>::onDeleteFunc(decaf::lang::Thread
* value) Line 317 C++
activemq-cppd.dll!decaf::lang::Pointer<decaf::lang::Thread,decaf::util::concurrent::atomic::AtomicRefCounter>::~Pointer<decaf::lang::Thread,decaf::util::concurrent::atomic::AtomicRefCounter>()
Line 148 C++
activemq-cppd.dll!activemq::threads::DedicatedTaskRunner::~DedicatedTaskRunner()
Line 46 C++ [External Code]
activemq-cppd.dll!decaf::lang::Pointer<activemq::threads::TaskRunner,decaf::util::concurrent::atomic::AtomicRefCounter>::onDeleteFunc(activemq::threads::TaskRunner
* value) Line 317 C++
activemq-cppd.dll!decaf::lang::Pointer<activemq::threads::TaskRunner,decaf::util::concurrent::atomic::AtomicRefCounter>::~Pointer<activemq::threads::TaskRunner,decaf::util::concurrent::atomic::AtomicRefCounter>()
Line 148 C++
activemq-cppd.dll!activemq::core::ActiveMQSessionExecutor::stop() Line 153 C++
activemq-cppd.dll!activemq::core::kernels::ActiveMQSessionKernel::stop() Line
1121 C++ activemq-cppd.dll!activemq::core::ActiveMQConnection::stop() Line 898
C++> activemq-cppd.dll!activemq::core::ActiveMQConnection::close() Line 695 C++
consumer_producer.exe!HelloWorldConsumer::onMessage(const cms::Message *
message) Line 281 C++
activemq-cppd.dll!activemq::core::kernels::ActiveMQConsumerKernel::dispatch(const
decaf::lang::Pointer<activemq::commands::MessageDispatch,decaf::util::concurrent::atomic::AtomicRefCounter>
& dispatch) Line 1646 C++
activemq-cppd.dll!activemq::core::ActiveMQSessionExecutor::dispatch(const
decaf::lang::Pointer<activemq::commands::MessageDispatch,decaf::util::concurrent::atomic::AtomicRefCounter>
& dispatch) Line 166 C++
activemq-cppd.dll!activemq::core::ActiveMQSessionExecutor::iterate() Line 192
C++ activemq-cppd.dll!activemq::threads::DedicatedTaskRunner::run() Line 141
C++ activemq-cppd.dll!decaf::lang::Thread::run() Line 143 C++
activemq-cppd.dll!`anonymous namespace'::runCallback(void * arg) Line 268 C++
activemq-cppd.dll!`anonymous namespace'::threadEntryMethod(void * arg) Line 258
C++ [External Code]
In our production environment the Connection is closed outside of the onMessage
function and in that case the following is the stacktrace on the blocked thread:
[External Code] [External Code] >
activemq-cppd.dll!decaf::internal::util::concurrent::PlatformThread::waitOnCondition(void
* condition, _RTL_CRITICAL_SECTION * mutex) Line 254 C++
activemq-cppd.dll!`anonymous
namespace'::doMonitorEnter(decaf::internal::util::concurrent::MonitorHandle *
monitor, decaf::internal::util::concurrent::ThreadHandle * thread) Line 664 C++
activemq-cppd.dll!decaf::internal::util::concurrent::Threading::enterMonitor(decaf::internal::util::concurrent::MonitorHandle
* monitor) Line 1541 C++
activemq-cppd.dll!decaf::util::concurrent::Mutex::lock() Line 130 C++
activemq-cppd.dll!decaf::util::AbstractCollection<decaf::lang::Pointer<activemq::commands::MessageDispatch,decaf::util::concurrent::atomic::AtomicRefCounter>
>::lock() Line 346 C++
activemq-cppd.dll!decaf::util::concurrent::Lock::lock() Line 55 C++
activemq-cppd.dll!decaf::util::concurrent::Lock::Lock(decaf::util::concurrent::Synchronizable
* object, const bool intiallyLocked) Line 32 C++
activemq-cppd.dll!activemq::core::FifoMessageDispatchChannel::close() Line 131
C++
activemq-cppd.dll!activemq::core::kernels::ActiveMQConsumerKernel::dispose()
Line 972 C++
activemq-cppd.dll!activemq::core::kernels::ActiveMQConsumerKernel::doClose()
Line 894 C++
activemq-cppd.dll!activemq::core::kernels::ActiveMQConsumerKernel::close() Line
875 C++
activemq-cppd.dll!activemq::core::ActiveMQConsumer::~ActiveMQConsumer() Line 74
C++ [External Code]
...
The execution is waiting at this line:
::WaitForSingleObject(condition, INFINITE);
condition's value is: 0x0000000000000484
mutex'x value is:
- mutex 0x0000029337da8ff0 \{DebugInfo=0xffffffffffffffff {Type=???
CreatorBackTraceIndex=??? CriticalSection=...} ...} _RTL_CRITICAL_SECTION *-
mutex 0x0000029337da8ff0 \{DebugInfo=0xffffffffffffffff {Type=???
CreatorBackTraceIndex=??? CriticalSection=...} ...} _RTL_CRITICAL_SECTION *-
DebugInfo 0xffffffffffffffff \{Type=??? CreatorBackTraceIndex=???
CriticalSection=??? ...} _RTL_CRITICAL_SECTION_DEBUG * Type <Unable to read
memory> CreatorBackTraceIndex <Unable to read memory> CriticalSection <Unable
to read memory> + ProcessLocksList \{Flink=??? Blink=??? } _LIST_ENTRY
EntryCount <Unable to read memory> ContentionCount <Unable to read memory>
Flags <Unable to read memory> CreatorBackTraceIndexHigh <Unable to read memory>
SpareWORD <Unable to read memory> LockCount -1 long RecursionCount 0 long
OwningThread 0x0000000000000000 void * LockSemaphore 0x0000000000000000 void *
SpinCount 33556432 unsigned __int64
--
This message was sent by Atlassian Jira
(v8.3.4#803005)