This is an automated email from the ASF dual-hosted git repository.
ifplusor pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 52feba9 pass by reference
52feba9 is described below
commit 52feba9c661546ca6159d7ea33ba488c84e39cf7
Author: SRC-xiaojin <[email protected]>
AuthorDate: Sat Jul 2 18:14:51 2022 +0800
pass by reference
(cherry picked from commit cb6de26394de73ed23d18ff8103fd359bfc35e24)
---
include/MQMessageExt.h | 2 +-
src/common/DefaultMQClient.cpp | 2 +-
src/common/MessageAccessor.cpp | 6 +-
src/common/MessageAccessor.h | 6 +-
src/common/NameSpaceUtil.cpp | 16 +--
src/common/NameSpaceUtil.h | 16 +--
src/common/TopAddressing.cpp | 2 +-
src/common/TopAddressing.h | 2 +-
src/consumer/PullAPIWrapper.cpp | 266 +++++++++++++++++++--------------------
src/consumer/PullAPIWrapper.h | 132 +++++++++----------
src/include/DefaultMQClient.h | 2 +-
src/message/MQMessageExt.cpp | 2 +-
src/protocol/LockBatchBody.cpp | 16 +--
src/protocol/LockBatchBody.h | 12 +-
src/protocol/RemotingCommand.cpp | 2 +-
src/protocol/RemotingCommand.h | 2 +-
16 files changed, 243 insertions(+), 243 deletions(-)
diff --git a/include/MQMessageExt.h b/include/MQMessageExt.h
index c269985..969f04b 100644
--- a/include/MQMessageExt.h
+++ b/include/MQMessageExt.h
@@ -40,7 +40,7 @@ class ROCKETMQCLIENT_API MQMessageExt : public MQMessage {
sockaddr bornHost,
int64 storeTimestamp,
sockaddr storeHost,
- std::string msgId);
+ const std::string& msgId);
virtual ~MQMessageExt();
diff --git a/src/common/DefaultMQClient.cpp b/src/common/DefaultMQClient.cpp
index 579cec6..56b2461 100644
--- a/src/common/DefaultMQClient.cpp
+++ b/src/common/DefaultMQClient.cpp
@@ -211,7 +211,7 @@ uint64_t DefaultMQClient::getTcpTransportTryLockTimeout()
const {
return m_tcpTransportTryLockTimeout;
}
-void DefaultMQClient::setUnitName(string unitName) {
+void DefaultMQClient::setUnitName(const std::string& unitName) {
m_unitName = unitName;
}
const string& DefaultMQClient::getUnitName() const {
diff --git a/src/common/MessageAccessor.cpp b/src/common/MessageAccessor.cpp
index 6fcfac1..7b8714a 100644
--- a/src/common/MessageAccessor.cpp
+++ b/src/common/MessageAccessor.cpp
@@ -23,7 +23,7 @@
using namespace std;
namespace rocketmq {
-void MessageAccessor::withNameSpace(MQMessage& msg, const string nameSpace) {
+void MessageAccessor::withNameSpace(MQMessage& msg, const string& nameSpace) {
if (!nameSpace.empty()) {
string originTopic = msg.getTopic();
string newTopic = nameSpace + NAMESPACE_SPLIT_FLAG + originTopic;
@@ -31,7 +31,7 @@ void MessageAccessor::withNameSpace(MQMessage& msg, const
string nameSpace) {
}
}
-void MessageAccessor::withoutNameSpaceSingle(MQMessageExt& msg, const string
nameSpace) {
+void MessageAccessor::withoutNameSpaceSingle(MQMessageExt& msg, const string&
nameSpace) {
if (!nameSpace.empty()) {
string originTopic = msg.getTopic();
auto index = originTopic.find(nameSpace);
@@ -44,7 +44,7 @@ void MessageAccessor::withoutNameSpaceSingle(MQMessageExt&
msg, const string nam
}
}
}
-void MessageAccessor::withoutNameSpace(vector<MQMessageExt>& msgs, const
string nameSpace) {
+void MessageAccessor::withoutNameSpace(vector<MQMessageExt>& msgs, const
string& nameSpace) {
if (!nameSpace.empty()) {
// for_each(msgs.cbegin(), msgs.cend(),
bind2nd(&MessageAccessor::withoutNameSpaceSingle, nameSpace));
for (auto iter = msgs.begin(); iter != msgs.end(); iter++) {
diff --git a/src/common/MessageAccessor.h b/src/common/MessageAccessor.h
index af42021..ce9bb68 100644
--- a/src/common/MessageAccessor.h
+++ b/src/common/MessageAccessor.h
@@ -25,9 +25,9 @@ namespace rocketmq {
//<!***************************************************************************
class MessageAccessor {
public:
- static void withNameSpace(MQMessage& msg, const std::string nameSpace);
- static void withoutNameSpaceSingle(MQMessageExt& msg, const std::string
nameSpace);
- static void withoutNameSpace(std::vector<MQMessageExt>& msgs, const
std::string nameSpace);
+ static void withNameSpace(MQMessage& msg, const std::string& nameSpace);
+ static void withoutNameSpaceSingle(MQMessageExt& msg, const std::string&
nameSpace);
+ static void withoutNameSpace(std::vector<MQMessageExt>& msgs, const
std::string& nameSpace);
};
//<!***************************************************************************
diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp
index 61866e9..a24e5e9 100644
--- a/src/common/NameSpaceUtil.cpp
+++ b/src/common/NameSpaceUtil.cpp
@@ -21,14 +21,14 @@
namespace rocketmq {
-bool NameSpaceUtil::isEndPointURL(string nameServerAddr) {
+bool NameSpaceUtil::isEndPointURL(const string& nameServerAddr) {
if (nameServerAddr.length() >= ENDPOINT_PREFIX_LENGTH &&
nameServerAddr.find(ENDPOINT_PREFIX) != string::npos) {
return true;
}
return false;
}
-string NameSpaceUtil::formatNameServerURL(string nameServerAddr) {
+string NameSpaceUtil::formatNameServerURL(const string& nameServerAddr) {
auto index = nameServerAddr.find(ENDPOINT_PREFIX);
if (index != string::npos) {
LOG_DEBUG("Get Name Server from endpoint [%s]",
@@ -38,7 +38,7 @@ string NameSpaceUtil::formatNameServerURL(string
nameServerAddr) {
return nameServerAddr;
}
-string NameSpaceUtil::getNameSpaceFromNsURL(string nameServerAddr) {
+string NameSpaceUtil::getNameSpaceFromNsURL(const string& nameServerAddr) {
LOG_DEBUG("Try to get Name Space from nameServerAddr [%s]",
nameServerAddr.c_str());
string nsAddr = formatNameServerURL(nameServerAddr);
string nameSpace;
@@ -54,7 +54,7 @@ string NameSpaceUtil::getNameSpaceFromNsURL(string
nameServerAddr) {
return "";
}
-bool NameSpaceUtil::checkNameSpaceExistInNsURL(string nameServerAddr) {
+bool NameSpaceUtil::checkNameSpaceExistInNsURL(const string& nameServerAddr) {
if (!isEndPointURL(nameServerAddr)) {
LOG_DEBUG("This nameServerAddr [%s] is not a endpoint. should not get Name
Space.", nameServerAddr.c_str());
return false;
@@ -67,7 +67,7 @@ bool NameSpaceUtil::checkNameSpaceExistInNsURL(string
nameServerAddr) {
return false;
}
-bool NameSpaceUtil::checkNameSpaceExistInNameServer(string nameServerAddr) {
+bool NameSpaceUtil::checkNameSpaceExistInNameServer(const string&
nameServerAddr) {
auto index = nameServerAddr.find(NAMESPACE_PREFIX);
if (index != string::npos) {
LOG_INFO("Find Name Space Prefix in nameServerAddr [%s]",
nameServerAddr.c_str());
@@ -76,7 +76,7 @@ bool NameSpaceUtil::checkNameSpaceExistInNameServer(string
nameServerAddr) {
return false;
}
-string NameSpaceUtil::withoutNameSpace(string source, string nameSpace) {
+string NameSpaceUtil::withoutNameSpace(const string& source, const string&
nameSpace) {
if (!nameSpace.empty()) {
auto index = source.find(nameSpace);
if (index != string::npos) {
@@ -85,14 +85,14 @@ string NameSpaceUtil::withoutNameSpace(string source,
string nameSpace) {
}
return source;
}
-string NameSpaceUtil::withNameSpace(string source, string ns) {
+string NameSpaceUtil::withNameSpace(const string& source, const string& ns) {
if (!ns.empty()) {
return ns + NAMESPACE_SPLIT_FLAG + source;
}
return source;
}
-bool NameSpaceUtil::hasNameSpace(string source, string ns) {
+bool NameSpaceUtil::hasNameSpace(const string& source, const string& ns) {
if (source.find(TraceContant::TRACE_TOPIC) != string::npos) {
LOG_DEBUG("Find Trace Topic [%s]", source.c_str());
return true;
diff --git a/src/common/NameSpaceUtil.h b/src/common/NameSpaceUtil.h
index cdaaf6b..094fb49 100644
--- a/src/common/NameSpaceUtil.h
+++ b/src/common/NameSpaceUtil.h
@@ -31,21 +31,21 @@ static const string NAMESPACE_SPLIT_FLAG = "%";
namespace rocketmq {
class NameSpaceUtil {
public:
- static bool isEndPointURL(string nameServerAddr);
+ static bool isEndPointURL(const string& nameServerAddr);
- static string formatNameServerURL(string nameServerAddr);
+ static string formatNameServerURL(const string& nameServerAddr);
- static string getNameSpaceFromNsURL(string nameServerAddr);
+ static string getNameSpaceFromNsURL(const string& nameServerAddr);
- static bool checkNameSpaceExistInNsURL(string nameServerAddr);
+ static bool checkNameSpaceExistInNsURL(const string& nameServerAddr);
- static bool checkNameSpaceExistInNameServer(string nameServerAddr);
+ static bool checkNameSpaceExistInNameServer(const string& nameServerAddr);
- static string withNameSpace(string source, string ns);
+ static string withNameSpace(const string& source, const string& ns);
- static string withoutNameSpace(string source, string ns);
+ static string withoutNameSpace(const string& source, const string& ns);
- static bool hasNameSpace(string source, string ns);
+ static bool hasNameSpace(const string& source, const string& ns);
};
} // namespace rocketmq
diff --git a/src/common/TopAddressing.cpp b/src/common/TopAddressing.cpp
index 9d94c43..384c3c0 100644
--- a/src/common/TopAddressing.cpp
+++ b/src/common/TopAddressing.cpp
@@ -21,7 +21,7 @@
#include "url.h"
namespace rocketmq {
-TopAddressing::TopAddressing(string unitName) : m_unitName(unitName) {}
+TopAddressing::TopAddressing(const std::string& unitName) :
m_unitName(unitName) {}
TopAddressing::~TopAddressing() {}
diff --git a/src/common/TopAddressing.h b/src/common/TopAddressing.h
index a698862..07297db 100644
--- a/src/common/TopAddressing.h
+++ b/src/common/TopAddressing.h
@@ -27,7 +27,7 @@
namespace rocketmq {
class TopAddressing {
public:
- TopAddressing(string unitName);
+ TopAddressing(const std::string& unitName);
virtual ~TopAddressing();
public:
diff --git a/src/consumer/PullAPIWrapper.cpp b/src/consumer/PullAPIWrapper.cpp
index a7f1156..3b5528b 100644
--- a/src/consumer/PullAPIWrapper.cpp
+++ b/src/consumer/PullAPIWrapper.cpp
@@ -1,133 +1,133 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "PullAPIWrapper.h"
-#include "CommunicationMode.h"
-#include "MQClientFactory.h"
-#include "PullResultExt.h"
-#include "PullSysFlag.h"
-namespace rocketmq {
-//<!************************************************************************
-PullAPIWrapper::PullAPIWrapper(MQClientFactory* mQClientFactory, const string&
consumerGroup) {
- m_MQClientFactory = mQClientFactory;
- m_consumerGroup = consumerGroup;
-}
-
-PullAPIWrapper::~PullAPIWrapper() {
- m_MQClientFactory = NULL;
- m_pullFromWhichNodeTable.clear();
-}
-
-void PullAPIWrapper::updatePullFromWhichNode(const MQMessageQueue& mq, int
brokerId) {
- boost::lock_guard<boost::mutex> lock(m_lock);
- m_pullFromWhichNodeTable[mq] = brokerId;
-}
-
-int PullAPIWrapper::recalculatePullFromWhichNode(const MQMessageQueue& mq) {
- boost::lock_guard<boost::mutex> lock(m_lock);
- if (m_pullFromWhichNodeTable.find(mq) != m_pullFromWhichNodeTable.end()) {
- return m_pullFromWhichNodeTable[mq];
- }
- return MASTER_ID;
-}
-
-PullResult PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
- PullResult* pullResult,
- SubscriptionData*
subscriptionData) {
- PullResultExt* pResultExt = static_cast<PullResultExt*>(pullResult);
- if (pResultExt == NULL) {
- string errMsg("The pullResult NULL of");
- errMsg.append(mq.toString());
- THROW_MQEXCEPTION(MQClientException, errMsg, -1);
- }
-
- //<!update;
- updatePullFromWhichNode(mq, pResultExt->suggestWhichBrokerId);
-
- vector<MQMessageExt> msgFilterList;
- if (pResultExt->pullStatus == FOUND) {
- //<!decode all msg list;
- vector<MQMessageExt> msgAllList;
- MQDecoder::decodes(&pResultExt->msgMemBlock, msgAllList);
-
- //<!filter msg list again;
- if (subscriptionData != NULL && !subscriptionData->getTagsSet().empty()) {
- msgFilterList.reserve(msgAllList.size());
- vector<MQMessageExt>::iterator it = msgAllList.begin();
- for (; it != msgAllList.end(); ++it) {
- string msgTag = (*it).getTags();
- if (subscriptionData->containTag(msgTag)) {
- msgFilterList.push_back(*it);
- }
- }
- } else {
- msgFilterList.swap(msgAllList);
- }
- }
-
- return PullResult(pResultExt->pullStatus, pResultExt->nextBeginOffset,
pResultExt->minOffset, pResultExt->maxOffset,
- msgFilterList);
-}
-
-PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq, //
1
- string subExpression, //
2
- int64 subVersion, //
3
- int64 offset, //
4
- int maxNums, //
5
- int sysFlag, //
6
- int64 commitOffset, //
7
- int brokerSuspendMaxTimeMillis, //
8
- int timeoutMillis, //
9
- int communicationMode, //
10
- PullCallback* pullCallback,
- const SessionCredentials&
session_credentials,
- void* pArg /*= NULL*/) {
- unique_ptr<FindBrokerResult> pFindBrokerResult(
- m_MQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
recalculatePullFromWhichNode(mq), false));
- //<!goto nameserver;
- if (pFindBrokerResult == NULL) {
- m_MQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(),
session_credentials);
- pFindBrokerResult.reset(
- m_MQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
recalculatePullFromWhichNode(mq), false));
- }
-
- if (pFindBrokerResult != NULL) {
- int sysFlagInner = sysFlag;
-
- if (pFindBrokerResult->slave) {
- sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner);
- }
-
- PullMessageRequestHeader* pRequestHeader = new PullMessageRequestHeader();
- pRequestHeader->consumerGroup = m_consumerGroup;
- pRequestHeader->topic = mq.getTopic();
- pRequestHeader->queueId = mq.getQueueId();
- pRequestHeader->queueOffset = offset;
- pRequestHeader->maxMsgNums = maxNums;
- pRequestHeader->sysFlag = sysFlagInner;
- pRequestHeader->commitOffset = commitOffset;
- pRequestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis;
- pRequestHeader->subscription = subExpression;
- pRequestHeader->subVersion = subVersion;
-
- return
m_MQClientFactory->getMQClientAPIImpl()->pullMessage(pFindBrokerResult->brokerAddr,
pRequestHeader,
- timeoutMillis,
communicationMode, pullCallback, pArg,
-
session_credentials);
- }
- THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1);
-}
-
-} // namespace rocketmq
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "PullAPIWrapper.h"
+#include "CommunicationMode.h"
+#include "MQClientFactory.h"
+#include "PullResultExt.h"
+#include "PullSysFlag.h"
+namespace rocketmq {
+//<!************************************************************************
+PullAPIWrapper::PullAPIWrapper(MQClientFactory* mQClientFactory, const string&
consumerGroup) {
+ m_MQClientFactory = mQClientFactory;
+ m_consumerGroup = consumerGroup;
+}
+
+PullAPIWrapper::~PullAPIWrapper() {
+ m_MQClientFactory = NULL;
+ m_pullFromWhichNodeTable.clear();
+}
+
+void PullAPIWrapper::updatePullFromWhichNode(const MQMessageQueue& mq, int
brokerId) {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+ m_pullFromWhichNodeTable[mq] = brokerId;
+}
+
+int PullAPIWrapper::recalculatePullFromWhichNode(const MQMessageQueue& mq) {
+ boost::lock_guard<boost::mutex> lock(m_lock);
+ if (m_pullFromWhichNodeTable.find(mq) != m_pullFromWhichNodeTable.end()) {
+ return m_pullFromWhichNodeTable[mq];
+ }
+ return MASTER_ID;
+}
+
+PullResult PullAPIWrapper::processPullResult(const MQMessageQueue& mq,
+ PullResult* pullResult,
+ SubscriptionData*
subscriptionData) {
+ PullResultExt* pResultExt = static_cast<PullResultExt*>(pullResult);
+ if (pResultExt == NULL) {
+ string errMsg("The pullResult NULL of");
+ errMsg.append(mq.toString());
+ THROW_MQEXCEPTION(MQClientException, errMsg, -1);
+ }
+
+ //<!update;
+ updatePullFromWhichNode(mq, pResultExt->suggestWhichBrokerId);
+
+ vector<MQMessageExt> msgFilterList;
+ if (pResultExt->pullStatus == FOUND) {
+ //<!decode all msg list;
+ vector<MQMessageExt> msgAllList;
+ MQDecoder::decodes(&pResultExt->msgMemBlock, msgAllList);
+
+ //<!filter msg list again;
+ if (subscriptionData != NULL && !subscriptionData->getTagsSet().empty()) {
+ msgFilterList.reserve(msgAllList.size());
+ vector<MQMessageExt>::iterator it = msgAllList.begin();
+ for (; it != msgAllList.end(); ++it) {
+ string msgTag = (*it).getTags();
+ if (subscriptionData->containTag(msgTag)) {
+ msgFilterList.push_back(*it);
+ }
+ }
+ } else {
+ msgFilterList.swap(msgAllList);
+ }
+ }
+
+ return PullResult(pResultExt->pullStatus, pResultExt->nextBeginOffset,
pResultExt->minOffset, pResultExt->maxOffset,
+ msgFilterList);
+}
+
+PullResult* PullAPIWrapper::pullKernelImpl(const MQMessageQueue& mq, //
1
+ const string& subExpression,
// 2
+ int64 subVersion, //
3
+ int64 offset, //
4
+ int maxNums, //
5
+ int sysFlag, //
6
+ int64 commitOffset, //
7
+ int brokerSuspendMaxTimeMillis, //
8
+ int timeoutMillis, //
9
+ int communicationMode, //
10
+ PullCallback* pullCallback,
+ const SessionCredentials&
session_credentials,
+ void* pArg /*= NULL*/) {
+ unique_ptr<FindBrokerResult> pFindBrokerResult(
+ m_MQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
recalculatePullFromWhichNode(mq), false));
+ //<!goto nameserver;
+ if (pFindBrokerResult == NULL) {
+ m_MQClientFactory->updateTopicRouteInfoFromNameServer(mq.getTopic(),
session_credentials);
+ pFindBrokerResult.reset(
+ m_MQClientFactory->findBrokerAddressInSubscribe(mq.getBrokerName(),
recalculatePullFromWhichNode(mq), false));
+ }
+
+ if (pFindBrokerResult != NULL) {
+ int sysFlagInner = sysFlag;
+
+ if (pFindBrokerResult->slave) {
+ sysFlagInner = PullSysFlag::clearCommitOffsetFlag(sysFlagInner);
+ }
+
+ PullMessageRequestHeader* pRequestHeader = new PullMessageRequestHeader();
+ pRequestHeader->consumerGroup = m_consumerGroup;
+ pRequestHeader->topic = mq.getTopic();
+ pRequestHeader->queueId = mq.getQueueId();
+ pRequestHeader->queueOffset = offset;
+ pRequestHeader->maxMsgNums = maxNums;
+ pRequestHeader->sysFlag = sysFlagInner;
+ pRequestHeader->commitOffset = commitOffset;
+ pRequestHeader->suspendTimeoutMillis = brokerSuspendMaxTimeMillis;
+ pRequestHeader->subscription = subExpression;
+ pRequestHeader->subVersion = subVersion;
+
+ return
m_MQClientFactory->getMQClientAPIImpl()->pullMessage(pFindBrokerResult->brokerAddr,
pRequestHeader,
+ timeoutMillis,
communicationMode, pullCallback, pArg,
+
session_credentials);
+ }
+ THROW_MQEXCEPTION(MQClientException, "The broker not exist", -1);
+}
+
+} // namespace rocketmq
diff --git a/src/consumer/PullAPIWrapper.h b/src/consumer/PullAPIWrapper.h
index 29a64fb..dca370a 100644
--- a/src/consumer/PullAPIWrapper.h
+++ b/src/consumer/PullAPIWrapper.h
@@ -1,66 +1,66 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#ifndef _PULLAPIWRAPPER_H_
-#define _PULLAPIWRAPPER_H_
-
-#include <boost/thread/mutex.hpp>
-#include <boost/thread/thread.hpp>
-#include "AsyncCallback.h"
-#include "MQMessageQueue.h"
-#include "SessionCredentials.h"
-#include "SubscriptionData.h"
-
-namespace rocketmq {
-class MQClientFactory;
-//<!***************************************************************************
-class PullAPIWrapper {
- public:
- PullAPIWrapper(MQClientFactory* mQClientFactory, const string&
consumerGroup);
- ~PullAPIWrapper();
-
- PullResult processPullResult(const MQMessageQueue& mq, PullResult*
pullResult, SubscriptionData* subscriptionData);
-
- PullResult* pullKernelImpl(const MQMessageQueue& mq, // 1
- string subExpression, // 2
- int64 subVersion, // 3
- int64 offset, // 4
- int maxNums, // 5
- int sysFlag, // 6
- int64 commitOffset, // 7
- int brokerSuspendMaxTimeMillis, // 8
- int timeoutMillis, // 9
- int communicationMode, // 10
- PullCallback* pullCallback,
- const SessionCredentials& session_credentials,
- void* pArg = NULL);
-
- private:
- void updatePullFromWhichNode(const MQMessageQueue& mq, int brokerId);
-
- int recalculatePullFromWhichNode(const MQMessageQueue& mq);
-
- private:
- MQClientFactory* m_MQClientFactory;
- string m_consumerGroup;
- boost::mutex m_lock;
- map<MQMessageQueue, int /* brokerId */> m_pullFromWhichNodeTable;
-};
-
-//<!***************************************************************************
-} // namespace rocketmq
-
-#endif //<! _PULLAPIWRAPPER_H_
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#ifndef _PULLAPIWRAPPER_H_
+#define _PULLAPIWRAPPER_H_
+
+#include <boost/thread/mutex.hpp>
+#include <boost/thread/thread.hpp>
+#include "AsyncCallback.h"
+#include "MQMessageQueue.h"
+#include "SessionCredentials.h"
+#include "SubscriptionData.h"
+
+namespace rocketmq {
+class MQClientFactory;
+//<!***************************************************************************
+class PullAPIWrapper {
+ public:
+ PullAPIWrapper(MQClientFactory* mQClientFactory, const string&
consumerGroup);
+ ~PullAPIWrapper();
+
+ PullResult processPullResult(const MQMessageQueue& mq, PullResult*
pullResult, SubscriptionData* subscriptionData);
+
+ PullResult* pullKernelImpl(const MQMessageQueue& mq, // 1
+ const string& subExpression, // 2
+ int64 subVersion, // 3
+ int64 offset, // 4
+ int maxNums, // 5
+ int sysFlag, // 6
+ int64 commitOffset, // 7
+ int brokerSuspendMaxTimeMillis, // 8
+ int timeoutMillis, // 9
+ int communicationMode, // 10
+ PullCallback* pullCallback,
+ const SessionCredentials& session_credentials,
+ void* pArg = NULL);
+
+ private:
+ void updatePullFromWhichNode(const MQMessageQueue& mq, int brokerId);
+
+ int recalculatePullFromWhichNode(const MQMessageQueue& mq);
+
+ private:
+ MQClientFactory* m_MQClientFactory;
+ string m_consumerGroup;
+ boost::mutex m_lock;
+ map<MQMessageQueue, int /* brokerId */> m_pullFromWhichNodeTable;
+};
+
+//<!***************************************************************************
+} // namespace rocketmq
+
+#endif //<! _PULLAPIWRAPPER_H_
diff --git a/src/include/DefaultMQClient.h b/src/include/DefaultMQClient.h
index 92ce318..10399c5 100644
--- a/src/include/DefaultMQClient.h
+++ b/src/include/DefaultMQClient.h
@@ -161,7 +161,7 @@ class DefaultMQClient {
void setTcpTransportTryLockTimeout(uint64_t timeout); // ms
uint64_t getTcpTransportTryLockTimeout() const;
- void setUnitName(std::string unitName);
+ void setUnitName(const std::string& unitName);
const std::string& getUnitName() const;
void setSessionCredentials(const std::string& input_accessKey,
diff --git a/src/message/MQMessageExt.cpp b/src/message/MQMessageExt.cpp
index fc022a2..17b2304 100644
--- a/src/message/MQMessageExt.cpp
+++ b/src/message/MQMessageExt.cpp
@@ -38,7 +38,7 @@ MQMessageExt::MQMessageExt(int queueId,
sockaddr bornHost,
int64 storeTimestamp,
sockaddr storeHost,
- string msgId)
+ const std::string& msgId)
: m_queueOffset(0),
m_commitLogOffset(0),
m_bornTimestamp(bornTimestamp),
diff --git a/src/protocol/LockBatchBody.cpp b/src/protocol/LockBatchBody.cpp
index faecbbc..5ed2bcf 100644
--- a/src/protocol/LockBatchBody.cpp
+++ b/src/protocol/LockBatchBody.cpp
@@ -22,20 +22,20 @@ namespace rocketmq { //<!end namespace;
string LockBatchRequestBody::getConsumerGroup() {
return consumerGroup;
}
-void LockBatchRequestBody::setConsumerGroup(string in_consumerGroup) {
+void LockBatchRequestBody::setConsumerGroup(const string& in_consumerGroup) {
consumerGroup = in_consumerGroup;
}
string LockBatchRequestBody::getClientId() {
return clientId;
}
-void LockBatchRequestBody::setClientId(string in_clientId) {
+void LockBatchRequestBody::setClientId(const string& in_clientId) {
clientId = in_clientId;
}
vector<MQMessageQueue> LockBatchRequestBody::getMqSet() {
return mqSet;
}
-void LockBatchRequestBody::setMqSet(vector<MQMessageQueue> in_mqSet) {
- mqSet.swap(in_mqSet);
+void LockBatchRequestBody::setMqSet(const vector<MQMessageQueue>& in_mqSet) {
+ mqSet = in_mqSet;
}
void LockBatchRequestBody::Encode(string& outData) {
Json::Value root;
@@ -94,20 +94,20 @@ void LockBatchResponseBody::Decode(const MemoryBlock* mem,
vector<MQMessageQueue
string UnlockBatchRequestBody::getConsumerGroup() {
return consumerGroup;
}
-void UnlockBatchRequestBody::setConsumerGroup(string in_consumerGroup) {
+void UnlockBatchRequestBody::setConsumerGroup(const string& in_consumerGroup) {
consumerGroup = in_consumerGroup;
}
string UnlockBatchRequestBody::getClientId() {
return clientId;
}
-void UnlockBatchRequestBody::setClientId(string in_clientId) {
+void UnlockBatchRequestBody::setClientId(const string& in_clientId) {
clientId = in_clientId;
}
vector<MQMessageQueue> UnlockBatchRequestBody::getMqSet() {
return mqSet;
}
-void UnlockBatchRequestBody::setMqSet(vector<MQMessageQueue> in_mqSet) {
- mqSet.swap(in_mqSet);
+void UnlockBatchRequestBody::setMqSet(const vector<MQMessageQueue>& in_mqSet) {
+ mqSet = in_mqSet;
}
void UnlockBatchRequestBody::Encode(string& outData) {
Json::Value root;
diff --git a/src/protocol/LockBatchBody.h b/src/protocol/LockBatchBody.h
index 7f92e89..fd60c28 100644
--- a/src/protocol/LockBatchBody.h
+++ b/src/protocol/LockBatchBody.h
@@ -32,11 +32,11 @@ class LockBatchRequestBody {
public:
virtual ~LockBatchRequestBody() { mqSet.clear(); }
string getConsumerGroup();
- void setConsumerGroup(string consumerGroup);
+ void setConsumerGroup(const string& consumerGroup);
string getClientId();
- void setClientId(string clientId);
+ void setClientId(const string& clientId);
vector<MQMessageQueue> getMqSet();
- void setMqSet(vector<MQMessageQueue> mqSet);
+ void setMqSet(const vector<MQMessageQueue>& mqSet);
void Encode(string& outData);
Json::Value toJson(const MQMessageQueue& mq) const;
@@ -61,11 +61,11 @@ class UnlockBatchRequestBody {
public:
virtual ~UnlockBatchRequestBody() { mqSet.clear(); }
string getConsumerGroup();
- void setConsumerGroup(string consumerGroup);
+ void setConsumerGroup(const string& consumerGroup);
string getClientId();
- void setClientId(string clientId);
+ void setClientId(const string& clientId);
vector<MQMessageQueue> getMqSet();
- void setMqSet(vector<MQMessageQueue> mqSet);
+ void setMqSet(const vector<MQMessageQueue>& mqSet);
void Encode(string& outData);
Json::Value toJson(const MQMessageQueue& mq) const;
diff --git a/src/protocol/RemotingCommand.cpp b/src/protocol/RemotingCommand.cpp
index cb7bd1d..91d023e 100644
--- a/src/protocol/RemotingCommand.cpp
+++ b/src/protocol/RemotingCommand.cpp
@@ -280,7 +280,7 @@ CommandHeader* RemotingCommand::getCommandHeader() const {
return m_pExtHeader.get();
}
-void RemotingCommand::setParsedJson(Json::Value json) {
+void RemotingCommand::setParsedJson(const Json::Value& json) {
m_parsedJson = json;
}
diff --git a/src/protocol/RemotingCommand.h b/src/protocol/RemotingCommand.h
index bc137f7..1e039c8 100644
--- a/src/protocol/RemotingCommand.h
+++ b/src/protocol/RemotingCommand.h
@@ -57,7 +57,7 @@ class RemotingCommand {
bool isResponseType();
void markOnewayRPC();
bool isOnewayRPC();
- void setParsedJson(Json::Value json);
+ void setParsedJson(const Json::Value& json);
CommandHeader* getCommandHeader() const;
const int getFlag() const;
const int getVersion() const;