mikehb opened a new issue, #421:
URL: https://github.com/apache/rocketmq-clients/issues/421
Modify ExamplePushConsumer.cpp as below,
```
#include <chrono>
#include <iostream>
#include <mutex>
#include <thread>
#include "gflags/gflags.h"
#include "rocketmq/Logger.h"
#include "rocketmq/PushConsumer.h"
using namespace ROCKETMQ_NAMESPACE;
DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are
published");
DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL,
provided by your service provider");
DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through
your instance management console");
DEFINE_string(access_key, "", "Your access key ID");
DEFINE_string(access_secret, "", "Your access secret");
void test() {
std::string tag = "*";
auto listener = [](const Message& message) {
std::cout << "Received a message[topic=" << message.topic() << ",
MsgId=" << message.id() << "]" << std::endl;
return ConsumeResult::SUCCESS;
};
CredentialsProviderPtr credentials_provider;
if (!FLAGS_access_key.empty() && !FLAGS_access_secret.empty()) {
credentials_provider =
std::make_shared<StaticCredentialsProvider>(FLAGS_access_key,
FLAGS_access_secret);
}
auto push_consumer = PushConsumer::newBuilder()
.withGroup(FLAGS_group)
.withConfiguration(Configuration::newBuilder()
.withEndpoints(FLAGS_access_point)
.withRequestTimeout(std::chrono::seconds(3))
.withCredentialsProvider(credentials_provider)
.build())
.withConsumeThreads(4)
.withListener(listener)
.subscribe(FLAGS_topic, tag)
.build();
std::this_thread::sleep_for(std::chrono::seconds(3));
}
int main(int argc, char* argv[]) {
gflags::ParseCommandLineFlags(&argc, &argv, true);
auto& logger = getLogger();
logger.setConsoleLevel(Level::Info);
logger.setLevel(Level::Info);
logger.init();
test();
return EXIT_SUCCESS;
}
```
sometimes, the following core dump was printed,
```
(gdb) bt
#0 0x00000000005984f7 in
rocketmq::ReceiveMessageStreamReader::OnReadDone(bool) ()
#1 0x000000000064ecd4 in
grpc::internal::ClientCallbackReaderImpl<apache::rocketmq::v2::ReceiveMessageResponse>::StartCall()::{lambda(bool)#2}::operator()(bool)
const ()
#2 0x000000000065b514 in void std::__invoke_impl<void,
grpc::internal::ClientCallbackReaderImpl<apache::rocketmq::v2::ReceiveMessageResponse>::StartCall()::{lambda(bool)#2}&,
bool>(std::__invoke_other,
grpc::internal::ClientCallbackReaderImpl<apache::rocketmq::v2::ReceiveMessageResponse>::StartCall()::{lambda(bool)#2}&,
bool&&) ()
#3 0x000000000065aa12 in std::enable_if<std::__and_<std::is_void<void>,
std::__is_invocable<grpc::internal::ClientCallbackReaderImpl<apache::rocketmq::v2::ReceiveMessageResponse>::StartCall()::{lambda(bool)#2}&,
bool> >::value, void>::type std::__invoke_r<void,
grpc::internal::ClientCallbackReaderImpl<apache::rocketmq::v2::ReceiveMessageResponse>::StartCall()::{lambda(bool)#2}&,
bool>(grpc::internal::ClientCallbackReaderImpl<apache::rocketmq::v2::ReceiveMessageResponse>::StartCall()::{lambda(bool)#2}&,
bool&&) ()
#4 0x0000000000659b5d in std::_Function_handler<void (bool),
grpc::internal::ClientCallbackReaderImpl<apache::rocketmq::v2::ReceiveMessageResponse>::StartCall()::{lambda(bool)#2}>::_M_invoke(std::_Any_data
const&, bool&&) ()
#5 0x0000000000523272 in std::function<void (bool)>::operator()(bool) const
()
#6 0x0000000000522351 in void
grpc::internal::CatchingCallback<std::function<void (bool)>&,
bool&>(std::function<void (bool)>&, bool&) ()
#7 0x00000000005209ec in grpc::internal::CallbackWithSuccessTag::Run(bool)
()
#8 0x000000000052092b in
grpc::internal::CallbackWithSuccessTag::StaticRun(grpc_completion_queue_functor*,
int) ()
#9 0x000000000067a9a1 in grpc::(anonymous
namespace)::CallbackAlternativeCQ::Ref()::{lambda(void*)#1}::operator()(void*)
const ()
#10 0x000000000067a9cf in grpc::(anonymous
namespace)::CallbackAlternativeCQ::Ref()::{lambda(void*)#1}::_FUN(void*) ()
#11 0x0000000000ce348c in grpc_core::(anonymous
namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void
(*)(void*), void*, bool*, grpc_core::Thread::Options
const&)::{lambda(void*)#1}::operator()(void*) const ()
#12 0x0000000000ce34d5 in grpc_core::(anonymous
namespace)::ThreadInternalsPosix::ThreadInternalsPosix(char const*, void
(*)(void*), void*, bool*, grpc_core::Thread::Options
const&)::{lambda(void*)#1}::_FUN(void*) ()
#13 0x00007feef50ae12d in start_thread () from /lib64/libc.so.6
#14 0x00007feef512fbc0 in clone3 () from /lib64/libc.so.6
(gdb)
```
while it may crashed with another core dump,
```
(gdb) bt
#0 0x00007fc706eafe5c in __pthread_kill_implementation () from
/lib64/libc.so.6
#1 0x00007fc706e5fa76 in raise () from /lib64/libc.so.6
#2 0x00007fc706e497fc in abort () from /lib64/libc.so.6
#3 0x00007fc7070a2b97 in __gnu_cxx::__verbose_terminate_handler() [clone
.cold] () from /lib64/libstdc++.so.6
#4 0x00007fc7070ae48c in __cxxabiv1::__terminate(void (*)()) () from
/lib64/libstdc++.so.6
#5 0x00007fc7070ae4f7 in std::terminate() () from /lib64/libstdc++.so.6
#6 0x000000000047aafd in std::thread::~thread() ()
#7 0x00000000005b45ff in void std::_Destroy<std::thread>(std::thread*) ()
#8 0x00000000005b3b85 in void
std::_Destroy_aux<false>::__destroy<std::thread*>(std::thread*, std::thread*) ()
#9 0x00000000005b2686 in void std::_Destroy<std::thread*>(std::thread*,
std::thread*) ()
#10 0x00000000005b048f in void std::_Destroy<std::thread*,
std::thread>(std::thread*, std::thread*, std::allocator<std::thread>&) ()
#11 0x00000000005aeffd in std::vector<std::thread,
std::allocator<std::thread> >::~vector() ()
#12 0x00000000005c504c in rocketmq::ThreadPoolImpl::~ThreadPoolImpl() ()
#13 0x00000000005c5094 in rocketmq::ThreadPoolImpl::~ThreadPoolImpl() ()
#14 0x00000000004c885a in
std::default_delete<rocketmq::ThreadPool>::operator()(rocketmq::ThreadPool*)
const ()
#15 0x00000000004c8230 in std::unique_ptr<rocketmq::ThreadPool,
std::default_delete<rocketmq::ThreadPool> >::~unique_ptr() ()
#16 0x00000000004c9d62 in
rocketmq::ConsumeMessageServiceImpl::~ConsumeMessageServiceImpl() ()
#17 0x000000000049e1e9 in void
std::_Destroy<rocketmq::ConsumeMessageServiceImpl>(rocketmq::ConsumeMessageServiceImpl*)
()
#18 0x000000000049dff0 in void std::allocator_traits<std::allocator<void>
>::destroy<rocketmq::ConsumeMessageServiceImpl>(std::allocator<void>&,
rocketmq::ConsumeMessageServiceImpl*) ()
#19 0x000000000049d6db in
std::_Sp_counted_ptr_inplace<rocketmq::ConsumeMessageServiceImpl,
std::allocator<void>, (__gnu_cxx::_Lock_policy)2>::_M_dispose() ()
#20 0x000000000041f54f in
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release_last_use() ()
#21 0x000000000041f528 in
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release_last_use_cold() ()
#22 0x000000000041ee68 in
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() ()
#23 0x000000000041f6f7 in
std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() ()
#24 0x000000000047c8fc in std::__shared_ptr<rocketmq::ConsumeMessageService,
(__gnu_cxx::_Lock_policy)2>::~__shared_ptr() ()
#25 0x000000000047c918 in
std::shared_ptr<rocketmq::ConsumeMessageService>::~shared_ptr() ()
#26 0x000000000046fb82 in rocketmq::PushConsumerImpl::~PushConsumerImpl() ()
#27 0x000000000046f0bf in void
std::_Destroy<rocketmq::PushConsumerImpl>(rocketmq::PushConsumerImpl*) ()
#28 0x000000000046eb5e in void std::allocator_traits<std::allocator<void>
>::destroy<rocketmq::PushConsumerImpl>(std::allocator<void>&,
rocketmq::PushConsumerImpl*) ()
#29 0x000000000046aa93 in
std::_Sp_counted_ptr_inplace<rocketmq::PushConsumerImpl, std::allocator<void>,
(__gnu_cxx::_Lock_policy)2>::_M_dispose() ()
#30 0x000000000041f54f in
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release_last_use() ()
#31 0x000000000041f528 in
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release_last_use_cold() ()
#32 0x000000000041ee68 in
std::_Sp_counted_base<(__gnu_cxx::_Lock_policy)2>::_M_release() ()
#33 0x000000000041f6f7 in
std::__shared_count<(__gnu_cxx::_Lock_policy)2>::~__shared_count() ()
#34 0x000000000041f0d8 in std::__shared_ptr<rocketmq::PushConsumerImpl,
(__gnu_cxx::_Lock_policy)2>::~__shared_ptr() ()
#35 0x000000000041f0f4 in
std::shared_ptr<rocketmq::PushConsumerImpl>::~shared_ptr() ()
#36 0x000000000041f4ae in rocketmq::PushConsumer::~PushConsumer() ()
#37 0x000000000041ded3 in test() ()
#38 0x000000000041e0be in main ()
(gdb)
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]