This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 87a6137 fix(consumer): send back error when consuming failed. (#210)
87a6137 is described below
commit 87a6137150b32b2c69b4e46d38f01f5c9fd8429c
Author: dinglei <[email protected]>
AuthorDate: Thu Jan 2 19:38:19 2020 +0800
fix(consumer): send back error when consuming failed. (#210)
---
include/DefaultMQPullConsumer.h | 2 +-
include/DefaultMQPushConsumer.h | 2 +-
include/MQConsumer.h | 2 +-
src/MQClientAPIImpl.cpp | 5 +++--
src/MQClientAPIImpl.h | 3 ++-
src/common/MQClient.cpp | 3 ++-
src/consumer/ConsumeMessageConcurrentlyService.cpp | 20 ++++++++++++--------
src/consumer/DefaultMQPullConsumer.cpp | 2 +-
src/consumer/DefaultMQPushConsumer.cpp | 9 +++++++--
src/consumer/Rebalance.cpp | 2 +-
10 files changed, 31 insertions(+), 19 deletions(-)
diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 33765cd..899862b 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public
MQConsumer {
//<!end mqadmin;
//<!begin MQConsumer
- virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
+ virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string&
brokerName);
virtual void fetchSubscribeMessageQueues(const std::string& topic,
std::vector<MQMessageQueue>& mqs);
virtual void doRebalance();
virtual void persistConsumerOffset();
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index b29ca72..43f0fbb 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -55,7 +55,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public
MQConsumer {
//<!end mqadmin;
//<!begin MQConsumer
- virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
+ virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string&
brokerName);
virtual void fetchSubscribeMessageQueues(const std::string& topic,
std::vector<MQMessageQueue>& mqs);
virtual void doRebalance();
virtual void persistConsumerOffset();
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index 54b535a..87e2c1b 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -32,7 +32,7 @@ class ConsumerRunningInfo;
class ROCKETMQCLIENT_API MQConsumer : public MQClient {
public:
virtual ~MQConsumer() {}
- virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel) = 0;
+ virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string&
brokerName) = 0;
virtual void fetchSubscribeMessageQueues(const std::string& topic,
std::vector<MQMessageQueue>& mqs) = 0;
virtual void doRebalance() = 0;
virtual void persistConsumerOffset() = 0;
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index f890968..0877a03 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -823,7 +823,8 @@ void MQClientAPIImpl::updateConsumerOffsetOneway(const
string& addr,
m_pRemotingClient->invokeOneway(addr, request);
}
-void MQClientAPIImpl::consumerSendMessageBack(MQMessageExt& msg,
+void MQClientAPIImpl::consumerSendMessageBack(const string addr,
+ MQMessageExt& msg,
const string& consumerGroup,
int delayLevel,
int timeoutMillis,
@@ -833,7 +834,7 @@ void MQClientAPIImpl::consumerSendMessageBack(MQMessageExt&
msg,
pRequestHeader->offset = msg.getCommitLogOffset();
pRequestHeader->delayLevel = delayLevel;
- string addr = socketAddress2IPPort(msg.getStoreHost());
+ // string addr = socketAddress2IPPort(msg.getStoreHost());
RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);
callSignatureBeforeRequest(addr, request, sessionCredentials);
request.Encode();
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 763e45d..9555d72 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -167,7 +167,8 @@ class MQClientAPIImpl {
int timeoutMillis,
const SessionCredentials&
sessionCredentials);
- void consumerSendMessageBack(MQMessageExt& msg,
+ void consumerSendMessageBack(const string addr,
+ MQMessageExt& msg,
const string& consumerGroup,
int delayLevel,
int timeoutMillis,
diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp
index afdc5fa..068b8c4 100644
--- a/src/common/MQClient.cpp
+++ b/src/common/MQClient.cpp
@@ -53,7 +53,8 @@ MQClient::~MQClient() {}
string MQClient::getMQClientId() const {
string clientIP = UtilAll::getLocalAddress();
string processId = UtilAll::to_string(getpid());
- return processId + "-" + clientIP + "@" + m_instanceName;
+ // return processId + "-" + clientIP + "@" + m_instanceName;
+ return clientIP + "@" + processId + "#" + m_instanceName;
}
//<!groupName;
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp
b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 93cdcc3..371faa2 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -189,14 +189,18 @@ void
ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
case CLUSTERING: {
// send back msg to broker;
for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
- LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:"
SIZET_FMT ", reconsume times is:%d",
- (request->m_messageQueue).toString().c_str(),
msgs[i].getMsgId().c_str(), i,
- msgs[i].getReconsumeTimes());
- if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY &&
!m_pConsumer->sendMessageBack(msgs[i], 0)) {
- LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index
is:%d, re-consume times is:%d",
- (request->m_messageQueue).toString().c_str(),
msgs[i].getMsgId().c_str(), i,
- msgs[i].getReconsumeTimes());
- localRetryMsgs.push_back(msgs[i]);
+ LOG_DEBUG("consume fail, MQ is:%s, its msgId is:%s, index is:"
SIZET_FMT ", reconsume times is:%d",
+ (request->m_messageQueue).toString().c_str(),
msgs[i].getMsgId().c_str(), i,
+ msgs[i].getReconsumeTimes());
+ if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {
+ string brokerName = request->m_messageQueue.getBrokerName();
+ if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {
+ LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index
is:%d, re-consume times is:%d",
+ (request->m_messageQueue).toString().c_str(),
msgs[i].getMsgId().c_str(), i,
+ msgs[i].getReconsumeTimes());
+ msgs[i].setReconsumeTimes(msgs[i].getReconsumeTimes() + 1);
+ localRetryMsgs.push_back(msgs[i]);
+ }
}
}
break;
diff --git a/src/consumer/DefaultMQPullConsumer.cpp
b/src/consumer/DefaultMQPullConsumer.cpp
index c8aac49..1f58e86 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -142,7 +142,7 @@ void DefaultMQPullConsumer::shutdown() {
}
}
-bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel)
{
+bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel,
string& brokerName) {
return true;
}
diff --git a/src/consumer/DefaultMQPushConsumer.cpp
b/src/consumer/DefaultMQPushConsumer.cpp
index 376c3e9..df77cac 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -251,9 +251,14 @@ DefaultMQPushConsumer::~DefaultMQPushConsumer() {
m_subTopics.clear();
}
-bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel)
{
+bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel,
string& brokerName) {
+ string brokerAddr;
+ if (!brokerName.empty())
+ brokerAddr = getFactory()->findBrokerAddressInPublish(brokerName);
+ else
+ brokerAddr = socketAddress2IPPort(msg.getStoreHost());
try {
- getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(msg,
getGroupName(), delayLevel, 3000,
+ getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr,
msg, getGroupName(), delayLevel, 3000,
getSessionCredentials());
} catch (MQException& e) {
LOG_ERROR(e.what());
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 0a4c06f..5546b61 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -128,7 +128,7 @@ void Rebalance::doRebalance() {
std::stringstream ss;
ss << "Allocation result for [Consumer Group: " <<
m_pConsumer->getGroupName() << ", Topic: " << topic
<< ", Current Consumer ID: " << m_pConsumer->getMQClientId() <<
"] is changed.\n "
- << "Total Queue #: " << mqAll.size() << ", Total Consumer #: "
<< cidAll.size()
+ << "Total Queue :#" << mqAll.size() << ", Total Consumer :#" <<
cidAll.size()
<< " Allocated Queues are: \n";
for (vector<MQMessageQueue>::size_type i = 0; i <
allocateResult.size(); ++i) {