This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 87a6137  fix(consumer): send back error when consuming failed. (#210)
87a6137 is described below

commit 87a6137150b32b2c69b4e46d38f01f5c9fd8429c
Author: dinglei <[email protected]>
AuthorDate: Thu Jan 2 19:38:19 2020 +0800

    fix(consumer): send back error when consuming failed. (#210)
---
 include/DefaultMQPullConsumer.h                    |  2 +-
 include/DefaultMQPushConsumer.h                    |  2 +-
 include/MQConsumer.h                               |  2 +-
 src/MQClientAPIImpl.cpp                            |  5 +++--
 src/MQClientAPIImpl.h                              |  3 ++-
 src/common/MQClient.cpp                            |  3 ++-
 src/consumer/ConsumeMessageConcurrentlyService.cpp | 20 ++++++++++++--------
 src/consumer/DefaultMQPullConsumer.cpp             |  2 +-
 src/consumer/DefaultMQPushConsumer.cpp             |  9 +++++++--
 src/consumer/Rebalance.cpp                         |  2 +-
 10 files changed, 31 insertions(+), 19 deletions(-)

diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index 33765cd..899862b 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -43,7 +43,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public 
MQConsumer {
   //<!end mqadmin;
 
   //<!begin MQConsumer
-  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& 
brokerName);
   virtual void fetchSubscribeMessageQueues(const std::string& topic, 
std::vector<MQMessageQueue>& mqs);
   virtual void doRebalance();
   virtual void persistConsumerOffset();
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index b29ca72..43f0fbb 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -55,7 +55,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public 
MQConsumer {
   //<!end mqadmin;
 
   //<!begin MQConsumer
-  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel);
+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& 
brokerName);
   virtual void fetchSubscribeMessageQueues(const std::string& topic, 
std::vector<MQMessageQueue>& mqs);
   virtual void doRebalance();
   virtual void persistConsumerOffset();
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index 54b535a..87e2c1b 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -32,7 +32,7 @@ class ConsumerRunningInfo;
 class ROCKETMQCLIENT_API MQConsumer : public MQClient {
  public:
   virtual ~MQConsumer() {}
-  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel) = 0;
+  virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& 
brokerName) = 0;
   virtual void fetchSubscribeMessageQueues(const std::string& topic, 
std::vector<MQMessageQueue>& mqs) = 0;
   virtual void doRebalance() = 0;
   virtual void persistConsumerOffset() = 0;
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index f890968..0877a03 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -823,7 +823,8 @@ void MQClientAPIImpl::updateConsumerOffsetOneway(const 
string& addr,
   m_pRemotingClient->invokeOneway(addr, request);
 }
 
-void MQClientAPIImpl::consumerSendMessageBack(MQMessageExt& msg,
+void MQClientAPIImpl::consumerSendMessageBack(const string addr,
+                                              MQMessageExt& msg,
                                               const string& consumerGroup,
                                               int delayLevel,
                                               int timeoutMillis,
@@ -833,7 +834,7 @@ void MQClientAPIImpl::consumerSendMessageBack(MQMessageExt& 
msg,
   pRequestHeader->offset = msg.getCommitLogOffset();
   pRequestHeader->delayLevel = delayLevel;
 
-  string addr = socketAddress2IPPort(msg.getStoreHost());
+  // string addr = socketAddress2IPPort(msg.getStoreHost());
   RemotingCommand request(CONSUMER_SEND_MSG_BACK, pRequestHeader);
   callSignatureBeforeRequest(addr, request, sessionCredentials);
   request.Encode();
diff --git a/src/MQClientAPIImpl.h b/src/MQClientAPIImpl.h
index 763e45d..9555d72 100644
--- a/src/MQClientAPIImpl.h
+++ b/src/MQClientAPIImpl.h
@@ -167,7 +167,8 @@ class MQClientAPIImpl {
                                   int timeoutMillis,
                                   const SessionCredentials& 
sessionCredentials);
 
-  void consumerSendMessageBack(MQMessageExt& msg,
+  void consumerSendMessageBack(const string addr,
+                               MQMessageExt& msg,
                                const string& consumerGroup,
                                int delayLevel,
                                int timeoutMillis,
diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp
index afdc5fa..068b8c4 100644
--- a/src/common/MQClient.cpp
+++ b/src/common/MQClient.cpp
@@ -53,7 +53,8 @@ MQClient::~MQClient() {}
 string MQClient::getMQClientId() const {
   string clientIP = UtilAll::getLocalAddress();
   string processId = UtilAll::to_string(getpid());
-  return processId + "-" + clientIP + "@" + m_instanceName;
+  // return processId + "-" + clientIP + "@" + m_instanceName;
+  return clientIP + "@" + processId + "#" + m_instanceName;
 }
 
 //<!groupName;
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp 
b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index 93cdcc3..371faa2 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -189,14 +189,18 @@ void 
ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
     case CLUSTERING: {
       // send back msg to broker;
       for (size_t i = ackIndex + 1; i < msgs.size(); i++) {
-        LOG_WARN("consume fail, MQ is:%s, its msgId is:%s, index is:" 
SIZET_FMT ", reconsume times is:%d",
-                 (request->m_messageQueue).toString().c_str(), 
msgs[i].getMsgId().c_str(), i,
-                 msgs[i].getReconsumeTimes());
-        if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY && 
!m_pConsumer->sendMessageBack(msgs[i], 0)) {
-          LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index 
is:%d, re-consume times is:%d",
-                   (request->m_messageQueue).toString().c_str(), 
msgs[i].getMsgId().c_str(), i,
-                   msgs[i].getReconsumeTimes());
-          localRetryMsgs.push_back(msgs[i]);
+        LOG_DEBUG("consume fail, MQ is:%s, its msgId is:%s, index is:" 
SIZET_FMT ", reconsume times is:%d",
+                  (request->m_messageQueue).toString().c_str(), 
msgs[i].getMsgId().c_str(), i,
+                  msgs[i].getReconsumeTimes());
+        if (m_pConsumer->getConsumeType() == CONSUME_PASSIVELY) {
+          string brokerName = request->m_messageQueue.getBrokerName();
+          if (!m_pConsumer->sendMessageBack(msgs[i], 0, brokerName)) {
+            LOG_WARN("Send message back fail, MQ is:%s, its msgId is:%s, index 
is:%d, re-consume times is:%d",
+                     (request->m_messageQueue).toString().c_str(), 
msgs[i].getMsgId().c_str(), i,
+                     msgs[i].getReconsumeTimes());
+            msgs[i].setReconsumeTimes(msgs[i].getReconsumeTimes() + 1);
+            localRetryMsgs.push_back(msgs[i]);
+          }
         }
       }
       break;
diff --git a/src/consumer/DefaultMQPullConsumer.cpp 
b/src/consumer/DefaultMQPullConsumer.cpp
index c8aac49..1f58e86 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -142,7 +142,7 @@ void DefaultMQPullConsumer::shutdown() {
   }
 }
 
-bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) 
{
+bool DefaultMQPullConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, 
string& brokerName) {
   return true;
 }
 
diff --git a/src/consumer/DefaultMQPushConsumer.cpp 
b/src/consumer/DefaultMQPushConsumer.cpp
index 376c3e9..df77cac 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -251,9 +251,14 @@ DefaultMQPushConsumer::~DefaultMQPushConsumer() {
   m_subTopics.clear();
 }
 
-bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel) 
{
+bool DefaultMQPushConsumer::sendMessageBack(MQMessageExt& msg, int delayLevel, 
string& brokerName) {
+  string brokerAddr;
+  if (!brokerName.empty())
+    brokerAddr = getFactory()->findBrokerAddressInPublish(brokerName);
+  else
+    brokerAddr = socketAddress2IPPort(msg.getStoreHost());
   try {
-    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(msg, 
getGroupName(), delayLevel, 3000,
+    getFactory()->getMQClientAPIImpl()->consumerSendMessageBack(brokerAddr, 
msg, getGroupName(), delayLevel, 3000,
                                                                 
getSessionCredentials());
   } catch (MQException& e) {
     LOG_ERROR(e.what());
diff --git a/src/consumer/Rebalance.cpp b/src/consumer/Rebalance.cpp
index 0a4c06f..5546b61 100644
--- a/src/consumer/Rebalance.cpp
+++ b/src/consumer/Rebalance.cpp
@@ -128,7 +128,7 @@ void Rebalance::doRebalance() {
             std::stringstream ss;
             ss << "Allocation result for [Consumer Group: " << 
m_pConsumer->getGroupName() << ", Topic: " << topic
                << ", Current Consumer ID: " << m_pConsumer->getMQClientId() << 
"] is changed.\n "
-               << "Total Queue #: " << mqAll.size() << ", Total Consumer #: " 
<< cidAll.size()
+               << "Total Queue :#" << mqAll.size() << ", Total Consumer :#" << 
cidAll.size()
                << " Allocated Queues are: \n";
 
             for (vector<MQMessageQueue>::size_type i = 0; i < 
allocateResult.size(); ++i) {

Reply via email to