This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 5cdb268 [ISSUE-435][PART-A]Support receiving traceOn signal. (#434)
5cdb268 is described below
commit 5cdb268fafbccafdf0deafae5eb08037017ba7be
Author: BeautyYuYanli <[email protected]>
AuthorDate: Fri Aug 12 15:22:38 2022 +0800
[ISSUE-435][PART-A]Support receiving traceOn signal. (#434)
* add traceOn to sendRedult
* add getExtField to RemoteCommand; use unordered_map to store extField
* complete extFields
* complete traceOn
---
include/SendResult.h | 6 +++++-
src/MQClientAPIImpl.cpp | 4 +++-
src/producer/SendResult.cpp | 14 ++++++++++++--
src/protocol/RemotingCommand.cpp | 10 ++++++++++
src/protocol/RemotingCommand.h | 4 +++-
5 files changed, 33 insertions(+), 5 deletions(-)
diff --git a/include/SendResult.h b/include/SendResult.h
index cfe83ce..94ba543 100644
--- a/include/SendResult.h
+++ b/include/SendResult.h
@@ -37,7 +37,8 @@ class ROCKETMQCLIENT_API SendResult {
const std::string& offsetMsgId,
const MQMessageQueue& messageQueue,
int64 queueOffset,
- const std::string& regionId);
+ const std::string& regionId,
+ const bool traceOn);
virtual ~SendResult();
SendResult(const SendResult& other);
@@ -55,6 +56,8 @@ class ROCKETMQCLIENT_API SendResult {
SendStatus getSendStatus() const;
MQMessageQueue getMessageQueue() const;
int64 getQueueOffset() const;
+ bool getTraceOn() const;
+
std::string toString() const;
private:
@@ -65,6 +68,7 @@ class ROCKETMQCLIENT_API SendResult {
int64 m_queueOffset;
std::string m_transactionId;
std::string m_regionId;
+ bool m_traceOn;
};
} // namespace rocketmq
diff --git a/src/MQClientAPIImpl.cpp b/src/MQClientAPIImpl.cpp
index 19cf3f5..671f328 100644
--- a/src/MQClientAPIImpl.cpp
+++ b/src/MQClientAPIImpl.cpp
@@ -542,10 +542,12 @@ SendResult MQClientAPIImpl::processSendResponse(const
string& brokerName,
}
if (res == 0) {
SendMessageResponseHeader* responseHeader =
(SendMessageResponseHeader*)pResponse->getCommandHeader();
+ auto extFields = pResponse->getExtFields();
+ bool traceOn = (extFields->count("TRACE_ON") && extFields->at("TRACE_ON")
== "true");
MQMessageQueue messageQueue(msg.getTopic(), brokerName,
responseHeader->queueId);
string unique_msgId =
msg.getProperty(MQMessage::PROPERTY_UNIQ_CLIENT_MESSAGE_ID_KEYIDX);
return SendResult(sendStatus, unique_msgId, responseHeader->msgId,
messageQueue, responseHeader->queueOffset,
- responseHeader->regionId);
+ responseHeader->regionId, traceOn);
}
LOG_ERROR("processSendResponse error remark:%s, error code:%d",
(pResponse->getRemark()).c_str(),
pResponse->getCode());
diff --git a/src/producer/SendResult.cpp b/src/producer/SendResult.cpp
index 3b56ff3..f6926d5 100644
--- a/src/producer/SendResult.cpp
+++ b/src/producer/SendResult.cpp
@@ -39,13 +39,15 @@ SendResult::SendResult(const SendStatus& sendStatus,
const std::string& offsetMsgId,
const MQMessageQueue& messageQueue,
int64 queueOffset,
- const string& regionId)
+ const string& regionId,
+ const bool traceOn)
: m_sendStatus(sendStatus),
m_msgId(msgId),
m_offsetMsgId(offsetMsgId),
m_messageQueue(messageQueue),
m_queueOffset(queueOffset),
- m_regionId(regionId) {}
+ m_regionId(regionId),
+ m_traceOn(traceOn) {}
SendResult::SendResult(const SendResult& other) {
m_sendStatus = other.m_sendStatus;
@@ -54,6 +56,7 @@ SendResult::SendResult(const SendResult& other) {
m_messageQueue = other.m_messageQueue;
m_queueOffset = other.m_queueOffset;
m_regionId = other.m_regionId;
+ m_traceOn = other.m_traceOn;
}
SendResult& SendResult::operator=(const SendResult& other) {
@@ -64,6 +67,7 @@ SendResult& SendResult::operator=(const SendResult& other) {
m_messageQueue = other.m_messageQueue;
m_queueOffset = other.m_queueOffset;
m_regionId = other.m_regionId;
+ m_traceOn = other.m_traceOn;
}
return *this;
}
@@ -96,6 +100,10 @@ int64 SendResult::getQueueOffset() const {
return m_queueOffset;
}
+bool SendResult::getTraceOn() const {
+ return m_traceOn;
+}
+
std::string SendResult::toString() const {
stringstream ss;
ss << "SendResult: ";
@@ -105,6 +113,8 @@ std::string SendResult::toString() const {
ss << ",queueOffset:" << m_queueOffset;
ss << ",transactionId:" << m_transactionId;
ss << ",messageQueue:" << m_messageQueue.toString();
+ ss << ",regionId:" << m_regionId;
+ ss << ",traceOn:" << m_traceOn;
return ss.str();
}
diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp
index 91d023e..bdde864 100644
--- a/src/protocol/RemotingCommand.cpp
+++ b/src/protocol/RemotingCommand.cpp
@@ -180,6 +180,12 @@ RemotingCommand* RemotingCommand::Decode(const
MemoryBlock& mem) {
if (bodyLen > 0) {
cmd->SetBody(pData + 4 + headLen, bodyLen);
}
+ if (object.isMember("extFields")) {
+ Json::Value& extFields = object["extFields"];
+ for (auto& it : extFields.getMemberNames()) {
+ cmd->m_extFields[it] = extFields[it].asString();
+ }
+ }
return cmd;
}
@@ -304,6 +310,10 @@ void RemotingCommand::addExtField(const string& key, const
string& value) {
m_extFields[key] = value;
}
+const unordered_map<string, string>* RemotingCommand::getExtFields() const{
+ return &m_extFields;
+}
+
std::string RemotingCommand::ToString() const {
std::stringstream ss;
ss << "code:" << m_code << ",opaque:" << m_opaque << ",flag:" << m_flag <<
",body.size:" << m_body.getSize()
diff --git a/src/protocol/RemotingCommand.h b/src/protocol/RemotingCommand.h
index 1e039c8..b0525b7 100644
--- a/src/protocol/RemotingCommand.h
+++ b/src/protocol/RemotingCommand.h
@@ -21,6 +21,7 @@
#include <boost/thread/thread.hpp>
#include <memory>
#include <sstream>
+#include <unordered_map>
#include "CommandHeader.h"
#include "dataBlock.h"
@@ -62,6 +63,7 @@ class RemotingCommand {
const int getFlag() const;
const int getVersion() const;
void addExtField(const string& key, const string& value);
+ const unordered_map<string, string>* getExtFields() const;
string getMsgBody() const;
void setMsgBody(const string& body);
@@ -81,7 +83,7 @@ class RemotingCommand {
int m_flag;
string m_remark;
string m_msgBody;
- map<string, string> m_extFields;
+ unordered_map<string, string> m_extFields;
MemoryBlock m_head;
MemoryBlock m_body;