yetengfei opened a new issue #367:
URL: https://github.com/apache/rocketmq-client-cpp/issues/367
**code is this:**
// consume message
int doConsumeMessage(struct CPushConsumer *consumer, CMessageExt *msgExt)
{
cout << "[Consume Message] " << "MsgTopic:" << GetMessageTopic(msgExt)
<< ", MsgTags:" << GetMessageTags(msgExt)
<< ", MsgKeys:" << GetMessageKeys(msgExt) << ", MsgBody:" <<
GetMessageBody(msgExt) << endl;
return E_CONSUME_SUCCESS;
}
int main(int argc, char *argv[])
{
cout << "Push consumer Initializing...." << endl;
// create push consumer and set some values for it
CPushConsumer *consumer = CreatePushConsumer("Group_Consumer_Test");
SetPushConsumerNameServerAddress(consumer, "192.168.159.202:5432");
Subscribe(consumer, "canal_manager", "*");
// register message callback
RegisterMessageCallback(consumer, doConsumeMessage);
// start push consumer
StartPushConsumer(consumer);
cout << "Push consumer start, and listening message within 1min..." <<
endl;
for (int i = 0; i < 6; i++)
{
cout << "Already Running: " << (i * 10) << "S" << endl;
usleep(10000000);
}
// shutdown push consumer
ShutdownPushConsumer(consumer);
// destroy push consumer
DestroyPushConsumer(consumer);
cout << "PushConsumer Shutdown!" << endl;
return 0;
}
**After pushconsumer is forcibly closed and restarted again, the previously
consumed messages will be re-consumed**
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]