This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new d321677 feat(namespace): add name space support (#207)
d321677 is described below
commit d3216777d010b984f2f3e4b8093b7a022a4db20c
Author: dinglei <[email protected]>
AuthorDate: Tue Dec 31 11:02:23 2019 +0800
feat(namespace): add name space support (#207)
* Add name space util support
* feat(namespace): add name space support
* feat(namespace): add name space support
* fix(travisci): fix format error in supportting name space
---
include/DefaultMQProducer.h | 1 +
include/DefaultMQPullConsumer.h | 1 +
include/DefaultMQPushConsumer.h | 1 +
include/MQClient.h | 7 +--
include/MQConsumer.h | 2 +
src/common/MQClient.cpp | 11 +++-
src/common/MessageAccessor.cpp | 56 ++++++++++++++++++++
src/common/{NameSpaceUtil.h => MessageAccessor.h} | 25 +++++----
src/common/NameSpaceUtil.cpp | 52 +++++++++++++++++++
src/common/NameSpaceUtil.h | 14 +++++
src/consumer/ConsumeMessageConcurrentlyService.cpp | 6 +++
src/consumer/ConsumeMessageOrderlyService.cpp | 5 ++
src/consumer/DefaultMQPullConsumer.cpp | 48 ++++++++++++++---
src/consumer/DefaultMQPushConsumer.cpp | 37 +++++++++++++
src/producer/DefaultMQProducer.cpp | 60 +++++++++++++++++++++-
15 files changed, 301 insertions(+), 25 deletions(-)
diff --git a/include/DefaultMQProducer.h b/include/DefaultMQProducer.h
index fd657ad..630e765 100644
--- a/include/DefaultMQProducer.h
+++ b/include/DefaultMQProducer.h
@@ -102,6 +102,7 @@ class ROCKETMQCLIENT_API DefaultMQProducer : public
MQProducer {
SendCallback* pSendCallback);
bool tryToCompressMessage(MQMessage& msg);
BatchMessage buildBatchMessage(std::vector<MQMessage>& msgs);
+ bool dealWithNameSpace();
private:
int m_sendMsgTimeout;
diff --git a/include/DefaultMQPullConsumer.h b/include/DefaultMQPullConsumer.h
index af01941..939ce0e 100644
--- a/include/DefaultMQPullConsumer.h
+++ b/include/DefaultMQPullConsumer.h
@@ -123,6 +123,7 @@ class ROCKETMQCLIENT_API DefaultMQPullConsumer : public
MQConsumer {
private:
void checkConfig();
void copySubscription();
+ bool dealWithNameSpace();
PullResult pullSyncImpl(const MQMessageQueue& mq,
const std::string& subExpression,
diff --git a/include/DefaultMQPushConsumer.h b/include/DefaultMQPushConsumer.h
index b6de085..894c6b5 100644
--- a/include/DefaultMQPushConsumer.h
+++ b/include/DefaultMQPushConsumer.h
@@ -130,6 +130,7 @@ class ROCKETMQCLIENT_API DefaultMQPushConsumer : public
MQConsumer {
void checkConfig();
void copySubscription();
void updateTopicSubscribeInfoWhenSubscriptionChanged();
+ bool dealWithNameSpace();
private:
uint64_t m_startTime;
diff --git a/include/MQClient.h b/include/MQClient.h
index 5632c95..0e23339 100644
--- a/include/MQClient.h
+++ b/include/MQClient.h
@@ -57,7 +57,9 @@ class ROCKETMQCLIENT_API MQClient {
void setNamesrvDomain(const std::string& namesrvDomain);
const std::string& getInstanceName() const;
void setInstanceName(const std::string& instanceName);
- //<!groupName;
+ // nameSpace
+ const std::string& getNameSpace() const;
+ void setNameSpace(const std::string& nameSpace);
const std::string& getGroupName() const;
void setGroupName(const std::string& groupname);
@@ -184,9 +186,8 @@ class ROCKETMQCLIENT_API MQClient {
std::string m_namesrvAddr;
std::string m_namesrvDomain;
std::string m_instanceName;
- //<! the name is globle only
+ std::string m_nameSpace;
std::string m_GroupName;
- //<!factory;
MQClientFactory* m_clientFactory;
int m_serviceState;
int m_pullThreadNum;
diff --git a/include/MQConsumer.h b/include/MQConsumer.h
index b6cd613..48d78f2 100644
--- a/include/MQConsumer.h
+++ b/include/MQConsumer.h
@@ -56,9 +56,11 @@ class ROCKETMQCLIENT_API MQConsumer : public MQClient {
public:
MessageModel getMessageModel() const { return m_messageModel; }
void setMessageModel(MessageModel messageModel) { m_messageModel =
messageModel; }
+ bool isUseNameSpaceMode() const { return m_useNameSpaceMode; }
protected:
MessageModel m_messageModel;
+ bool m_useNameSpaceMode = false;
};
//<!***************************************************************************
diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp
index 8eb3b13..afdc5fa 100644
--- a/src/common/MQClient.cpp
+++ b/src/common/MQClient.cpp
@@ -19,9 +19,9 @@
#include "Logging.h"
#include "MQClientFactory.h"
#include "MQClientManager.h"
+#include "NameSpaceUtil.h"
#include "TopicPublishInfo.h"
#include "UtilAll.h"
-#include "NameSpaceUtil.h"
namespace rocketmq {
@@ -39,6 +39,7 @@ MQClient::MQClient() {
m_namesrvAddr = "";
m_instanceName = "DEFAULT";
+ m_nameSpace = "";
m_clientFactory = NULL;
m_serviceState = CREATE_JUST;
m_pullThreadNum = std::thread::hardware_concurrency();
@@ -87,7 +88,13 @@ const string& MQClient::getInstanceName() const {
void MQClient::setInstanceName(const string& instanceName) {
m_instanceName = instanceName;
}
+const string& MQClient::getNameSpace() const {
+ return m_nameSpace;
+}
+void MQClient::setNameSpace(const string& nameSpace) {
+ m_nameSpace = nameSpace;
+}
void MQClient::createTopic(const string& key, const string& newTopic, int
queueNum) {
try {
getFactory()->createTopic(key, newTopic, queueNum, m_SessionCredentials);
@@ -212,4 +219,4 @@ const SessionCredentials& MQClient::getSessionCredentials()
const {
}
//<!************************************************************************
-} //<!end namespace;
+} // namespace rocketmq
diff --git a/src/common/MessageAccessor.cpp b/src/common/MessageAccessor.cpp
new file mode 100644
index 0000000..6fcfac1
--- /dev/null
+++ b/src/common/MessageAccessor.cpp
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "MessageAccessor.h"
+#include <functional>
+#include <vector>
+#include "Logging.h"
+#include "NameSpaceUtil.h"
+
+using namespace std;
+namespace rocketmq {
+
+void MessageAccessor::withNameSpace(MQMessage& msg, const string nameSpace) {
+ if (!nameSpace.empty()) {
+ string originTopic = msg.getTopic();
+ string newTopic = nameSpace + NAMESPACE_SPLIT_FLAG + originTopic;
+ msg.setTopic(newTopic);
+ }
+}
+
+void MessageAccessor::withoutNameSpaceSingle(MQMessageExt& msg, const string
nameSpace) {
+ if (!nameSpace.empty()) {
+ string originTopic = msg.getTopic();
+ auto index = originTopic.find(nameSpace);
+ if (index != string::npos) {
+ string newTopic =
+ originTopic.substr(index + nameSpace.length() +
NAMESPACE_SPLIT_FLAG.length(), originTopic.length());
+ msg.setTopic(newTopic);
+ LOG_DEBUG("Find Name Space Prefix in MessageID[%s], OriginTopic[%s],
NewTopic[%s]", msg.getMsgId().c_str(),
+ originTopic.c_str(), newTopic.c_str());
+ }
+ }
+}
+void MessageAccessor::withoutNameSpace(vector<MQMessageExt>& msgs, const
string nameSpace) {
+ if (!nameSpace.empty()) {
+ // for_each(msgs.cbegin(), msgs.cend(),
bind2nd(&MessageAccessor::withoutNameSpaceSingle, nameSpace));
+ for (auto iter = msgs.begin(); iter != msgs.end(); iter++) {
+ withoutNameSpaceSingle(*iter, nameSpace);
+ }
+ }
+}
+//<!***************************************************************************
+} // namespace rocketmq
diff --git a/src/common/NameSpaceUtil.h b/src/common/MessageAccessor.h
similarity index 60%
copy from src/common/NameSpaceUtil.h
copy to src/common/MessageAccessor.h
index 1d4afcf..af42021 100644
--- a/src/common/NameSpaceUtil.h
+++ b/src/common/MessageAccessor.h
@@ -14,23 +14,22 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
-#ifndef __NAMESPACEUTIL_H__
-#define __NAMESPACEUTIL_H__
+#ifndef __MESSAGE_ACCESSOR_H__
+#define __MESSAGE_ACCESSOR_H__
#include <string>
-
-using namespace std;
-
-static const string ENDPOINT_PREFIX = "http://";
-static const unsigned int ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
+#include <vector>
+#include "MQMessage.h"
+#include "MQMessageExt.h"
namespace rocketmq {
-class NameSpaceUtil {
+//<!***************************************************************************
+class MessageAccessor {
public:
- static bool isEndPointURL(string nameServerAddr);
-
- static string formatNameServerURL(string nameServerAddr);
+ static void withNameSpace(MQMessage& msg, const std::string nameSpace);
+ static void withoutNameSpaceSingle(MQMessageExt& msg, const std::string
nameSpace);
+ static void withoutNameSpace(std::vector<MQMessageExt>& msgs, const
std::string nameSpace);
};
+//<!***************************************************************************
} // namespace rocketmq
-#endif //__NAMESPACEUTIL_H__
+#endif
diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp
index ff44a00..118bd4f 100644
--- a/src/common/NameSpaceUtil.cpp
+++ b/src/common/NameSpaceUtil.cpp
@@ -36,4 +36,56 @@ string NameSpaceUtil::formatNameServerURL(string
nameServerAddr) {
}
return nameServerAddr;
}
+
+string NameSpaceUtil::getNameSpaceFromNsURL(string nameServerAddr) {
+ LOG_DEBUG("Try to get Name Space from nameServerAddr [%s]",
nameServerAddr.c_str());
+ string nsAddr = formatNameServerURL(nameServerAddr);
+ string nameSpace;
+ auto index = nameServerAddr.find(NAMESPACE_PREFIX);
+ if (index != string::npos) {
+ auto indexDot = nameServerAddr.find('.');
+ if (indexDot != string::npos) {
+ nameSpace = nameServerAddr.substr(index, indexDot);
+ LOG_INFO("Get Name Space [%s] from nameServerAddr [%s]",
nameSpace.c_str(), nameServerAddr.c_str());
+ return nameSpace;
+ }
+ }
+ return "";
+}
+
+bool NameSpaceUtil::checkNameSpaceExistInNsURL(string nameServerAddr) {
+ if (!isEndPointURL(nameServerAddr)) {
+ LOG_DEBUG("This nameServerAddr [%s] is not a endpoint. should not get Name
Space.", nameServerAddr.c_str());
+ return false;
+ }
+ auto index = nameServerAddr.find(NAMESPACE_PREFIX);
+ if (index != string::npos) {
+ LOG_INFO("Find Name Space Prefix in nameServerAddr [%s]",
nameServerAddr.c_str());
+ return true;
+ }
+ return false;
+}
+
+bool NameSpaceUtil::checkNameSpaceExistInNameServer(string nameServerAddr) {
+ auto index = nameServerAddr.find(NAMESPACE_PREFIX);
+ if (index != string::npos) {
+ LOG_INFO("Find Name Space Prefix in nameServerAddr [%s]",
nameServerAddr.c_str());
+ return true;
+ }
+ return false;
+}
+
+string NameSpaceUtil::withNameSpace(string source, string ns) {
+ if (!ns.empty()) {
+ return ns + NAMESPACE_SPLIT_FLAG + source;
+ }
+ return source;
+}
+
+bool NameSpaceUtil::hasNameSpace(string source, string ns) {
+ if (source.length() >= ns.length() && source.find(ns) != string::npos) {
+ return true;
+ }
+ return false;
+}
} // namespace rocketmq
diff --git a/src/common/NameSpaceUtil.h b/src/common/NameSpaceUtil.h
index 1d4afcf..a63d647 100644
--- a/src/common/NameSpaceUtil.h
+++ b/src/common/NameSpaceUtil.h
@@ -24,12 +24,26 @@ using namespace std;
static const string ENDPOINT_PREFIX = "http://";
static const unsigned int ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
+static const string NAMESPACE_PREFIX = "MQ_INST_";
+static const int NAMESPACE_PREFIX_LENGTH = NAMESPACE_PREFIX.length();
+static const string NAMESPACE_SPLIT_FLAG = "%";
+
namespace rocketmq {
class NameSpaceUtil {
public:
static bool isEndPointURL(string nameServerAddr);
static string formatNameServerURL(string nameServerAddr);
+
+ static string getNameSpaceFromNsURL(string nameServerAddr);
+
+ static bool checkNameSpaceExistInNsURL(string nameServerAddr);
+
+ static bool checkNameSpaceExistInNameServer(string nameServerAddr);
+
+ static string withNameSpace(string source, string ns);
+
+ static bool hasNameSpace(string source, string ns);
};
} // namespace rocketmq
diff --git a/src/consumer/ConsumeMessageConcurrentlyService.cpp
b/src/consumer/ConsumeMessageConcurrentlyService.cpp
index e5df16e..3b0204a 100644
--- a/src/consumer/ConsumeMessageConcurrentlyService.cpp
+++ b/src/consumer/ConsumeMessageConcurrentlyService.cpp
@@ -20,6 +20,7 @@
#include "ConsumeMsgService.h"
#include "DefaultMQPushConsumer.h"
#include "Logging.h"
+#include "MessageAccessor.h"
#include "UtilAll.h"
namespace rocketmq {
@@ -98,6 +99,11 @@ void
ConsumeMessageConcurrentlyService::ConsumeRequest(boost::weak_ptr<PullReque
if (m_pMessageListener != NULL) {
resetRetryTopic(msgs);
request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
+ LOG_DEBUG("=====Receive Messages:[%s][%s][%s]",
msgs[0].getTopic().c_str(), msgs[0].getMsgId().c_str(),
+ msgs[0].getBody().c_str());
+ if (m_pConsumer->isUseNameSpaceMode()) {
+ MessageAccessor::withoutNameSpace(msgs, m_pConsumer->getNameSpace());
+ }
status = m_pMessageListener->consumeMessage(msgs);
}
diff --git a/src/consumer/ConsumeMessageOrderlyService.cpp
b/src/consumer/ConsumeMessageOrderlyService.cpp
index f68fe44..4d06e5f 100644
--- a/src/consumer/ConsumeMessageOrderlyService.cpp
+++ b/src/consumer/ConsumeMessageOrderlyService.cpp
@@ -17,6 +17,8 @@
#if !defined(WIN32) && !defined(__APPLE__)
#include <sys/prctl.h>
#endif
+
+#include <MessageAccessor.h>
#include "ConsumeMsgService.h"
#include "DefaultMQPushConsumer.h"
#include "Logging.h"
@@ -181,6 +183,9 @@ void
ConsumeMessageOrderlyService::ConsumeRequest(boost::weak_ptr<PullRequest> p
request->takeMessages(msgs,
pConsumer->getConsumeMessageBatchMaxSize());
if (!msgs.empty()) {
request->setLastConsumeTimestamp(UtilAll::currentTimeMillis());
+ if (m_pConsumer->isUseNameSpaceMode()) {
+ MessageAccessor::withoutNameSpace(msgs,
m_pConsumer->getNameSpace());
+ }
ConsumeStatus consumeStatus =
m_pMessageListener->consumeMessage(msgs);
if (consumeStatus == RECONSUME_LATER) {
request->makeMessageToCosumeAgain(msgs);
diff --git a/src/consumer/DefaultMQPullConsumer.cpp
b/src/consumer/DefaultMQPullConsumer.cpp
index d3ce978..f549883 100644
--- a/src/consumer/DefaultMQPullConsumer.cpp
+++ b/src/consumer/DefaultMQPullConsumer.cpp
@@ -20,10 +20,9 @@
#include "CommunicationMode.h"
#include "FilterAPI.h"
#include "Logging.h"
-#include "MQClientAPIImpl.h"
#include "MQClientFactory.h"
-#include "MQClientManager.h"
-#include "MQProtos.h"
+#include "MessageAccessor.h"
+#include "NameSpaceUtil.h"
#include "OffsetStore.h"
#include "PullAPIWrapper.h"
#include "PullSysFlag.h"
@@ -64,6 +63,7 @@ void DefaultMQPullConsumer::start() {
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
+ dealWithNameSpace();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
@@ -147,7 +147,8 @@ void DefaultMQPullConsumer::sendMessageBack(MQMessageExt&
msg, int delayLevel) {
void DefaultMQPullConsumer::fetchSubscribeMessageQueues(const string& topic,
vector<MQMessageQueue>& mqs) {
mqs.clear();
try {
- getFactory()->fetchSubscribeMessageQueues(topic, mqs,
getSessionCredentials());
+ const string localTopic = NameSpaceUtil::withNameSpace(topic,
getNameSpace());
+ getFactory()->fetchSubscribeMessageQueues(localTopic, mqs,
getSessionCredentials());
} catch (MQException& e) {
LOG_ERROR(e.what());
}
@@ -226,7 +227,11 @@ PullResult DefaultMQPullConsumer::pullSyncImpl(const
MQMessageQueue& mq,
ComMode_SYNC, // 10
NULL,
//<!callback;
getSessionCredentials(), NULL));
- return m_pPullAPIWrapper->processPullResult(mq, pullResult.get(),
pSData.get());
+ PullResult pr = m_pPullAPIWrapper->processPullResult(mq, pullResult.get(),
pSData.get());
+ if (m_useNameSpaceMode) {
+ MessageAccessor::withoutNameSpace(pr.msgFoundList, m_nameSpace);
+ }
+ return pr;
} catch (MQException& e) {
LOG_ERROR(e.what());
}
@@ -265,6 +270,7 @@ void DefaultMQPullConsumer::pullAsyncImpl(const
MQMessageQueue& mq,
arg.pPullWrapper = m_pPullAPIWrapper;
try {
+ // not support name space
unique_ptr<PullResult> pullResult(m_pPullAPIWrapper->pullKernelImpl(mq,
// 1
pSData->getSubString(), // 2
0L,
// 3
@@ -376,6 +382,36 @@ bool
DefaultMQPullConsumer::producePullMsgTask(boost::weak_ptr<PullRequest> pull
Rebalance* DefaultMQPullConsumer::getRebalance() const {
return NULL;
}
-
+// we should deal with name space before producer start.
+bool DefaultMQPullConsumer::dealWithNameSpace() {
+ string ns = getNameSpace();
+ if (ns.empty()) {
+ string nsAddr = getNamesrvAddr();
+ if (!NameSpaceUtil::checkNameSpaceExistInNameServer(nsAddr)) {
+ return true;
+ }
+ ns = NameSpaceUtil::getNameSpaceFromNsURL(nsAddr);
+ // reset namespace
+ setNameSpace(ns);
+ }
+ // reset group name
+ if (!NameSpaceUtil::hasNameSpace(getGroupName(), ns)) {
+ string fullGID = NameSpaceUtil::withNameSpace(getGroupName(), ns);
+ setGroupName(fullGID);
+ }
+ set<string> tmpTopics;
+ for (auto iter = m_registerTopics.begin(); iter != m_registerTopics.end();
iter++) {
+ string topic = *iter;
+ if (!NameSpaceUtil::hasNameSpace(topic, ns)) {
+ LOG_INFO("Update Subscribe Topic[%s] with NameSpace:%s", topic.c_str(),
ns.c_str());
+ topic = NameSpaceUtil::withNameSpace(topic, ns);
+ // let other mode to known, the name space model opened.
+ m_useNameSpaceMode = true;
+ }
+ tmpTopics.insert(topic);
+ }
+ m_registerTopics.swap(tmpTopics);
+ return true;
+}
//<!************************************************************************
} // namespace rocketmq
diff --git a/src/consumer/DefaultMQPushConsumer.cpp
b/src/consumer/DefaultMQPushConsumer.cpp
index 0cd5427..c6ab2b2 100644
--- a/src/consumer/DefaultMQPushConsumer.cpp
+++ b/src/consumer/DefaultMQPushConsumer.cpp
@@ -23,6 +23,7 @@
#include "Logging.h"
#include "MQClientAPIImpl.h"
#include "MQClientFactory.h"
+#include "NameSpaceUtil.h"
#include "OffsetStore.h"
#include "PullAPIWrapper.h"
#include "PullSysFlag.h"
@@ -206,6 +207,7 @@ DefaultMQPushConsumer::DefaultMQPushConsumer(const string&
groupname)
string gname = groupname.empty() ? DEFAULT_CONSUMER_GROUP : groupname;
setGroupName(gname);
m_asyncPull = true;
+ m_useNameSpaceMode = false;
m_asyncPullTimeout = 30 * 1000;
setMessageModel(CLUSTERING);
@@ -298,6 +300,8 @@ void DefaultMQPushConsumer::start() {
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
+ // deal with name space before start
+ dealWithNameSpace();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
@@ -972,6 +976,39 @@ ConsumerRunningInfo*
DefaultMQPushConsumer::getConsumerRunningInfo() {
return info;
}
+// we should deal with name space before producer start.
+bool DefaultMQPushConsumer::dealWithNameSpace() {
+ string ns = getNameSpace();
+ if (ns.empty()) {
+ string nsAddr = getNamesrvAddr();
+ if (!NameSpaceUtil::checkNameSpaceExistInNameServer(nsAddr)) {
+ return true;
+ }
+ ns = NameSpaceUtil::getNameSpaceFromNsURL(nsAddr);
+ // reset namespace
+ setNameSpace(ns);
+ }
+ // reset group name
+ if (!NameSpaceUtil::hasNameSpace(getGroupName(), ns)) {
+ string fullGID = NameSpaceUtil::withNameSpace(getGroupName(), ns);
+ setGroupName(fullGID);
+ }
+ map<string, string> subTmp;
+ map<string, string>::iterator it = m_subTopics.begin();
+ for (; it != m_subTopics.end(); ++it) {
+ string topic = it->first;
+ string subs = it->second;
+ if (!NameSpaceUtil::hasNameSpace(topic, ns)) {
+ LOG_INFO("Update Subscribe[%s:%s] with NameSpace:%s", it->first.c_str(),
it->second.c_str(), ns.c_str());
+ topic = NameSpaceUtil::withNameSpace(topic, ns);
+ // let other mode to known, the name space model opened.
+ m_useNameSpaceMode = true;
+ }
+ subTmp[topic] = subs;
+ }
+ m_subTopics.swap(subTmp);
+ return true;
+}
//<!************************************************************************
} // namespace rocketmq
diff --git a/src/producer/DefaultMQProducer.cpp
b/src/producer/DefaultMQProducer.cpp
index 980a373..bd9cbc0 100644
--- a/src/producer/DefaultMQProducer.cpp
+++ b/src/producer/DefaultMQProducer.cpp
@@ -30,6 +30,8 @@
#include "MQClientManager.h"
#include "MQDecoder.h"
#include "MQProtos.h"
+#include "MessageAccessor.h"
+#include "NameSpaceUtil.h"
#include "StringIdMaker.h"
#include "TopicPublishInfo.h"
#include "Validators.h"
@@ -61,7 +63,8 @@ void DefaultMQProducer::start() {
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
-
+ // we should deal with namespaced before start.
+ dealWithNameSpace();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
@@ -109,6 +112,9 @@ void DefaultMQProducer::shutdown() {
SendResult DefaultMQProducer::send(MQMessage& msg, bool bSelectActiveBroker) {
Validators::checkMessage(msg, getMaxMessageSize());
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
try {
return sendDefaultImpl(msg, ComMode_SYNC, NULL, bSelectActiveBroker);
} catch (MQException& e) {
@@ -120,6 +126,9 @@ SendResult DefaultMQProducer::send(MQMessage& msg, bool
bSelectActiveBroker) {
void DefaultMQProducer::send(MQMessage& msg, SendCallback* pSendCallback, bool
bSelectActiveBroker) {
Validators::checkMessage(msg, getMaxMessageSize());
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
try {
sendDefaultImpl(msg, ComMode_ASYNC, pSendCallback, bSelectActiveBroker);
} catch (MQException& e) {
@@ -162,6 +171,9 @@ BatchMessage
DefaultMQProducer::buildBatchMessage(std::vector<MQMessage>& msgs)
bool waitStoreMsgOK = false;
for (auto& msg : msgs) {
Validators::checkMessage(msg, getMaxMessageSize());
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
if (firstFlag) {
topic = msg.getTopic();
waitStoreMsgOK = msg.isWaitStoreMsgOK();
@@ -190,6 +202,9 @@ BatchMessage
DefaultMQProducer::buildBatchMessage(std::vector<MQMessage>& msgs)
SendResult DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq) {
Validators::checkMessage(msg, getMaxMessageSize());
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
if (msg.getTopic() != mq.getTopic()) {
LOG_WARN("message's topic not equal mq's topic");
}
@@ -204,6 +219,9 @@ SendResult DefaultMQProducer::send(MQMessage& msg, const
MQMessageQueue& mq) {
void DefaultMQProducer::send(MQMessage& msg, const MQMessageQueue& mq,
SendCallback* pSendCallback) {
Validators::checkMessage(msg, getMaxMessageSize());
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
if (msg.getTopic() != mq.getTopic()) {
LOG_WARN("message's topic not equal mq's topic");
}
@@ -217,6 +235,9 @@ void DefaultMQProducer::send(MQMessage& msg, const
MQMessageQueue& mq, SendCallb
void DefaultMQProducer::sendOneway(MQMessage& msg, bool bSelectActiveBroker) {
Validators::checkMessage(msg, getMaxMessageSize());
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
try {
sendDefaultImpl(msg, ComMode_ONEWAY, NULL, bSelectActiveBroker);
} catch (MQException& e) {
@@ -227,6 +248,9 @@ void DefaultMQProducer::sendOneway(MQMessage& msg, bool
bSelectActiveBroker) {
void DefaultMQProducer::sendOneway(MQMessage& msg, const MQMessageQueue& mq) {
Validators::checkMessage(msg, getMaxMessageSize());
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
if (msg.getTopic() != mq.getTopic()) {
LOG_WARN("message's topic not equal mq's topic");
}
@@ -240,6 +264,9 @@ void DefaultMQProducer::sendOneway(MQMessage& msg, const
MQMessageQueue& mq) {
SendResult DefaultMQProducer::send(MQMessage& msg, MessageQueueSelector*
pSelector, void* arg) {
try {
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
return sendSelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL);
} catch (MQException& e) {
LOG_ERROR(e.what());
@@ -254,6 +281,9 @@ SendResult DefaultMQProducer::send(MQMessage& msg,
int autoRetryTimes,
bool bActiveBroker) {
try {
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
return sendAutoRetrySelectImpl(msg, pSelector, arg, ComMode_SYNC, NULL,
autoRetryTimes, bActiveBroker);
} catch (MQException& e) {
LOG_ERROR(e.what());
@@ -264,6 +294,9 @@ SendResult DefaultMQProducer::send(MQMessage& msg,
void DefaultMQProducer::send(MQMessage& msg, MessageQueueSelector* pSelector,
void* arg, SendCallback* pSendCallback) {
try {
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
sendSelectImpl(msg, pSelector, arg, ComMode_ASYNC, pSendCallback);
} catch (MQException& e) {
LOG_ERROR(e.what());
@@ -273,6 +306,9 @@ void DefaultMQProducer::send(MQMessage& msg,
MessageQueueSelector* pSelector, vo
void DefaultMQProducer::sendOneway(MQMessage& msg, MessageQueueSelector*
pSelector, void* arg) {
try {
+ if (!NameSpaceUtil::hasNameSpace(msg.getTopic(), getNameSpace())) {
+ MessageAccessor::withNameSpace(msg, getNameSpace());
+ }
sendSelectImpl(msg, pSelector, arg, ComMode_ONEWAY, NULL);
} catch (MQException& e) {
LOG_ERROR(e.what());
@@ -535,9 +571,11 @@ bool DefaultMQProducer::tryToCompressMessage(MQMessage&
msg) {
return false;
}
+
int DefaultMQProducer::getRetryTimes() const {
return m_retryTimes;
}
+
void DefaultMQProducer::setRetryTimes(int times) {
if (times <= 0) {
LOG_WARN("set retry times illegal, use default value:5");
@@ -556,6 +594,7 @@ void DefaultMQProducer::setRetryTimes(int times) {
int DefaultMQProducer::getRetryTimes4Async() const {
return m_retryTimes4Async;
}
+
void DefaultMQProducer::setRetryTimes4Async(int times) {
if (times <= 0) {
LOG_WARN("set retry times illegal, use default value:1");
@@ -572,5 +611,24 @@ void DefaultMQProducer::setRetryTimes4Async(int times) {
m_retryTimes4Async = times;
}
+// we should deal with name space before producer start.
+bool DefaultMQProducer::dealWithNameSpace() {
+ string ns = getNameSpace();
+ if (ns.empty()) {
+ string nsAddr = getNamesrvAddr();
+ if (!NameSpaceUtil::checkNameSpaceExistInNameServer(nsAddr)) {
+ return true;
+ }
+ ns = NameSpaceUtil::getNameSpaceFromNsURL(nsAddr);
+ // reset namespace
+ setNameSpace(ns);
+ }
+ // reset group name
+ if (!NameSpaceUtil::hasNameSpace(getGroupName(), ns)) {
+ string fullGID = NameSpaceUtil::withNameSpace(getGroupName(), ns);
+ setGroupName(fullGID);
+ }
+ return true;
+}
//<!***************************************************************************
} // namespace rocketmq