This is an automated email from the ASF dual-hosted git repository.
dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git
The following commit(s) were added to refs/heads/master by this push:
new 28c4cf0 [ISSUE #121]Support connect MQ endpoint with ACL (#133)
28c4cf0 is described below
commit 28c4cf0bd1e1b9d1f639fad6e59b7b62b243913a
Author: dinglei <[email protected]>
AuthorDate: Wed Apr 17 10:14:16 2019 +0800
[ISSUE #121]Support connect MQ endpoint with ACL (#133)
* Add function name and line in the log format
* Support connect aliyun endpoint
* Modify signature context to link aliyun mq
* Format NameSpace file
* Format file style
* Update MQClientFactory.cpp
* Remove unnecessary comments
---
src/MQClientFactory.cpp | 4 +-
src/common/MQClient.cpp | 3 +-
src/common/NameSpaceUtil.cpp | 39 +
src/common/NameSpaceUtil.h | 36 +
src/log/Logging.cpp | 42 +-
src/log/Logging.h | 63 +-
src/protocol/CommandHeader.cpp | 8 +-
src/transport/TcpRemotingClient.cpp | 1397 ++++++++++++++++++-----------------
src/transport/TcpTransport.cpp | 2 +
9 files changed, 849 insertions(+), 745 deletions(-)
diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 66cc243..cfa62dd 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -164,12 +164,12 @@ bool
MQClientFactory::updateTopicRouteInfoFromNameServer(const string& topic,
vector<QueueData>& queueDatas = pTopicRouteData->getQueueDatas();
vector<QueueData>::iterator it = queueDatas.begin();
for (; it != queueDatas.end(); ++it) {
- // ¶Áд·ÖÇø¸öÊýÊÇÒ»Ö£¬¹ÊÖ»×öÒ»´ÎÅжÏ;
int queueNums = std::min(4, it->readQueueNums);
it->readQueueNums = queueNums;
it->writeQueueNums = queueNums;
}
}
+ LOG_DEBUG("getTopicRouteInfoFromNameServer is null for topic :%s",
topic.c_str());
} else {
pTopicRouteData.reset(m_pClientAPIImpl->getTopicRouteInfoFromNameServer(topic,
1000 * 5, session_credentials));
}
@@ -280,7 +280,6 @@ void MQClientFactory::shutdown() {
switch (m_serviceState) {
case RUNNING: {
- //<! stop;
if (m_consumer_async_service_thread) {
m_consumer_async_ioService.stop();
m_consumer_async_service_thread->interrupt();
@@ -305,7 +304,6 @@ void MQClientFactory::shutdown() {
break;
}
- //<!ɾ³ý×Ô¼º;
MQClientManager::getInstance()->removeClientFactory(m_clientId);
}
diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp
index 6cc451c..42d77a5 100644
--- a/src/common/MQClient.cpp
+++ b/src/common/MQClient.cpp
@@ -21,6 +21,7 @@
#include "MQClientManager.h"
#include "TopicPublishInfo.h"
#include "UtilAll.h"
+#include "NameSpaceUtil.h"
namespace rocketmq {
@@ -68,7 +69,7 @@ const string& MQClient::getNamesrvAddr() const {
}
void MQClient::setNamesrvAddr(const string& namesrvAddr) {
- m_namesrvAddr = namesrvAddr;
+ m_namesrvAddr = NameSpaceUtil::formatNameServerURL(namesrvAddr);
}
const string& MQClient::getNamesrvDomain() const {
diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp
new file mode 100644
index 0000000..74f87a2
--- /dev/null
+++ b/src/common/NameSpaceUtil.cpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "NameSpaceUtil.h"
+#include "Logging.h"
+
+namespace rocketmq {
+
+bool NameSpaceUtil::isEndPointURL(string nameServerAddr) {
+ if (nameServerAddr.length() >= ENDPOINT_PREFIX_LENGTH &&
nameServerAddr.find(ENDPOINT_PREFIX) != string::npos) {
+ return true;
+ }
+ return false;
+}
+
+string NameSpaceUtil::formatNameServerURL(string nameServerAddr) {
+ int index = nameServerAddr.find(ENDPOINT_PREFIX);
+ if (index != string::npos) {
+ LOG_DEBUG("Get Name Server from endpoint [%s]",
+ nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH,
nameServerAddr.length() - ENDPOINT_PREFIX_LENGTH).c_str());
+ return nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH,
nameServerAddr.length() - ENDPOINT_PREFIX_LENGTH);
+ }
+ return nameServerAddr;
+}
+} // namespace rocketmq
diff --git a/src/common/NameSpaceUtil.h b/src/common/NameSpaceUtil.h
new file mode 100644
index 0000000..39f5da7
--- /dev/null
+++ b/src/common/NameSpaceUtil.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __NAMESPACEUTIL_H__
+#define __NAMESPACEUTIL_H__
+
+#include <string>
+
+using namespace std;
+
+static const string ENDPOINT_PREFIX = "http://";
+static const int ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
+namespace rocketmq {
+class NameSpaceUtil {
+ public:
+ static bool isEndPointURL(string nameServerAddr);
+
+ static string formatNameServerURL(string nameServerAddr);
+};
+
+} // namespace rocketmq
+#endif //__NAMESPACEUTIL_H__
diff --git a/src/log/Logging.cpp b/src/log/Logging.cpp
index 6e6a939..9ecd1d8 100644
--- a/src/log/Logging.cpp
+++ b/src/log/Logging.cpp
@@ -1,19 +1,19 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
#include "Logging.h"
#include <boost/date_time/gregorian/gregorian.hpp>
#include "UtilAll.h"
@@ -53,13 +53,13 @@ logAdapter::logAdapter() : m_logLevel(eLOG_LEVEL_INFO) {
keywords::min_free_space = 300 * 1024 *
1024, keywords::target = homeDir,
keywords::max_size = 200 * 1024 * 1024,
// max keep 3 log file defaultly
keywords::auto_flush = true);
- logging::core::get()->set_filter(logging::trivial::severity >=
logging::trivial::info);
+ // logging::core::get()->set_filter(logging::trivial::severity >=
logging::trivial::info);
+ setLogLevelInner(m_logLevel);
logging::add_common_attributes();
}
-void logAdapter::setLogLevel(elogLevel logLevel) {
- m_logLevel = logLevel;
+void logAdapter::setLogLevelInner(elogLevel logLevel) {
switch (logLevel) {
case eLOG_LEVEL_FATAL:
logging::core::get()->set_filter(logging::trivial::severity >=
logging::trivial::fatal);
@@ -90,6 +90,10 @@ void logAdapter::setLogLevel(elogLevel logLevel) {
break;
}
}
+void logAdapter::setLogLevel(elogLevel logLevel) {
+ m_logLevel = logLevel;
+ setLogLevelInner(logLevel);
+}
elogLevel logAdapter::getLogLevel() {
return m_logLevel;
@@ -101,4 +105,4 @@ void logAdapter::setLogFileNumAndSize(int logNum, int
sizeOfPerFile) {
m_logSink->locked_backend()->set_file_collector(sinks::file::make_collector(
keywords::target = homeDir, keywords::max_size = logNum * sizeOfPerFile
* 1024 * 1024));
}
-}
+} // namespace rocketmq
diff --git a/src/log/Logging.h b/src/log/Logging.h
index 5729148..dfede34 100644
--- a/src/log/Logging.h
+++ b/src/log/Logging.h
@@ -1,23 +1,24 @@
/*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements. See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License. You may obtain a copy of the License at
-*
-* http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
#ifndef _ALOG_ADAPTER_H_
#define _ALOG_ADAPTER_H_
+#include <string.h>
#include <boost/date_time/posix_time/posix_time_types.hpp>
#include <boost/log/core.hpp>
#include <boost/log/expressions.hpp>
@@ -51,6 +52,7 @@ class logAdapter {
private:
logAdapter();
+ void setLogLevelInner(elogLevel logLevel);
elogLevel m_logLevel;
std::string m_logFile;
src::severity_logger<boost::log::trivial::severity_level> m_severityLogger;
@@ -74,12 +76,31 @@ class LogUtil {
BOOST_LOG_SEV(AGENT_LOGGER, level) << formattedString.get();
va_end(arg_ptr);
}
+ static void LogMessageFull(boost::log::trivial::severity_level level,
+ const char* file,
+ const char* func,
+ int line,
+ const char* format,
+ ...) {
+ va_list arg_ptr;
+ va_start(arg_ptr, format);
+ boost::scoped_array<char> formattedString(new char[1024]);
+ vsnprintf(formattedString.get(), 1024, format, arg_ptr);
+ // BOOST_LOG_SEV(AGENT_LOGGER, level) << formattedString.get() << "[" <<
file << ":" << func << ":"<< line << "]";
+ BOOST_LOG_SEV(AGENT_LOGGER, level) << formattedString.get() << "[" << func
<< ":" << line << "]";
+ va_end(arg_ptr);
+ }
};
-#define LOG_FATAL(...) LogUtil::LogMessage(boost::log::trivial::fatal,
__LINE__, __VA_ARGS__)
-#define LOG_ERROR(...) LogUtil::LogMessage(boost::log::trivial::error,
__LINE__, __VA_ARGS__)
-#define LOG_WARN(...) LogUtil::LogMessage(boost::log::trivial::warning,
__LINE__, __VA_ARGS__)
-#define LOG_INFO(...) LogUtil::LogMessage(boost::log::trivial::info, __LINE__,
__VA_ARGS__)
-#define LOG_DEBUG(...) LogUtil::LogMessage(boost::log::trivial::debug,
__LINE__, __VA_ARGS__)
-}
+#define LOG_FATAL(...) \
+ LogUtil::LogMessageFull(boost::log::trivial::fatal, __FILE__, __FUNCTION__,
__LINE__, __VA_ARGS__)
+#define LOG_ERROR(...) \
+ LogUtil::LogMessageFull(boost::log::trivial::error, __FILE__, __FUNCTION__,
__LINE__, __VA_ARGS__)
+#define LOG_WARN(...) \
+ LogUtil::LogMessageFull(boost::log::trivial::warning, __FILE__,
__FUNCTION__, __LINE__, __VA_ARGS__)
+//#define LOG_INFO(...) LogUtil::LogMessage(boost::log::trivial::info,
__LINE__, __VA_ARGS__)
+#define LOG_INFO(...) LogUtil::LogMessageFull(boost::log::trivial::info,
__FILE__, __FUNCTION__, __LINE__, __VA_ARGS__)
+#define LOG_DEBUG(...) \
+ LogUtil::LogMessageFull(boost::log::trivial::debug, __FILE__, __FUNCTION__,
__LINE__, __VA_ARGS__)
+} // namespace rocketmq
#endif
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 025d5c6..4e4b0bb 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -71,11 +71,9 @@ void SendMessageRequestHeader::Encode(Json::Value& outData) {
outData["bornTimestamp"] = UtilAll::to_string(bornTimestamp);
outData["flag"] = flag;
outData["properties"] = properties;
-#ifdef ONS
outData["reconsumeTimes"] = UtilAll::to_string(reconsumeTimes);
outData["unitMode"] = UtilAll::to_string(unitMode);
-#endif
- outData["batch"] = batch;
+ // outData["batch"] = batch;
}
int SendMessageRequestHeader::getReconsumeTimes() {
@@ -106,11 +104,9 @@ void
SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, strin
requestMap.insert(pair<string, string>("bornTimestamp",
UtilAll::to_string(bornTimestamp)));
requestMap.insert(pair<string, string>("flag", UtilAll::to_string(flag)));
requestMap.insert(pair<string, string>("properties", properties));
-#ifdef ONS
requestMap.insert(pair<string, string>("reconsumeTimes",
UtilAll::to_string(reconsumeTimes)));
requestMap.insert(pair<string, string>("unitMode",
UtilAll::to_string(unitMode)));
-#endif
- requestMap.insert(pair<string, string>("batch", UtilAll::to_string(batch)));
+ // requestMap.insert(pair<string, string>("batch",
UtilAll::to_string(batch)));
}
//<!************************************************************************
diff --git a/src/transport/TcpRemotingClient.cpp
b/src/transport/TcpRemotingClient.cpp
index 26060c1..e72af1d 100644
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -1,695 +1,702 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "TcpRemotingClient.h"
-#include <stddef.h>
-#if !defined(WIN32) && !defined(__APPLE__)
-#include <sys/prctl.h>
-#endif
-#include "Logging.h"
-#include "MemoryOutputStream.h"
-#include "TopAddressing.h"
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-//<!************************************************************************
-TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t
tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
- : m_pullThreadNum(pullThreadNum),
- m_tcpConnectTimeout(tcpConnectTimeout),
- m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout),
- m_namesrvIndex(0),
- m_ioServiceWork(m_ioService) {
-#if !defined(WIN32) && !defined(__APPLE__)
- string taskName = UtilAll::getProcessName();
- prctl(PR_SET_NAME, "networkTP", 0, 0, 0);
-#endif
- for (int i = 0; i != pullThreadNum; ++i) {
- m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run,
&m_ioService));
- }
-#if !defined(WIN32) && !defined(__APPLE__)
- prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
-#endif
- LOG_INFO(
- "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, "
- "m_pullThreadNum:%d",
- m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum);
- m_async_service_thread.reset(new
boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
-}
-
-void TcpRemotingClient::boost_asio_work() {
- LOG_INFO("TcpRemotingClient::boost asio async service runing");
- boost::asio::io_service::work work(m_async_ioService); // avoid async io
- // service stops
after
- // first timer
timeout
- // callback
- m_async_ioService.run();
-}
-
-TcpRemotingClient::~TcpRemotingClient() {
- m_tcpTable.clear();
- m_futureTable.clear();
- m_asyncFutureTable.clear();
- m_namesrvAddrList.clear();
- removeAllTimerCallback();
-}
-
-void TcpRemotingClient::stopAllTcpTransportThread() {
- LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
- m_async_ioService.stop();
- m_async_service_thread->interrupt();
- m_async_service_thread->join();
- removeAllTimerCallback();
-
- {
- TcpMap::iterator it = m_tcpTable.begin();
- for (; it != m_tcpTable.end(); ++it) {
- it->second->disconnect(it->first);
- }
- m_tcpTable.clear();
- }
-
- m_ioService.stop();
- m_threadpool.join_all();
-
- {
- boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
- for (ResMap::iterator it = m_futureTable.begin(); it !=
m_futureTable.end(); ++it) {
- if (it->second)
- it->second->releaseThreadCondition();
- }
- }
- LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
-}
-
-void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
- if (!addrs.empty()) {
- boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
boost::try_to_lock);
- if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(10))) {
- LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
- return;
- }
- }
- // clear first;
- m_namesrvAddrList.clear();
-
- vector<string> out;
- UtilAll::Split(out, addrs, ";");
- for (size_t i = 0; i < out.size(); i++) {
- string addr = out[i];
- UtilAll::Trim(addr);
-
- string hostName;
- short portNumber;
- if (UtilAll::SplitURL(addr, hostName, portNumber)) {
- LOG_INFO("update Namesrv:%s", addr.c_str());
- m_namesrvAddrList.push_back(addr);
- }
- }
- out.clear();
- }
-}
-
-bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand&
request) {
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
- if (pTcp != NULL) {
- int code = request.getCode();
- int opaque = request.getOpaque();
- boost::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code,
opaque, this, 3000, false, NULL));
- addResponseFuture(opaque, responseFuture);
- // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d,
- // timeoutms:%d", addr.c_str(), code, opaque, 3000);
-
- if (SendCommand(pTcp, request)) {
- responseFuture->setSendRequestOK(true);
- unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000));
- if (pRsp == NULL) {
- LOG_ERROR("wait response timeout of heartbeat, so closeTransport of
addr:%s", addr.c_str());
- CloseTransport(addr, pTcp);
- return false;
- } else if (pRsp->getCode() == SUCCESS_VALUE) {
- return true;
- } else {
- LOG_WARN("get error response:%d of heartbeat to addr:%s",
pRsp->getCode(), addr.c_str());
- return false;
- }
- } else {
- CloseTransport(addr, pTcp);
- }
- }
- return false;
-}
-
-RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
- RemotingCommand& request,
- int timeoutMillis /* = 3000 */)
{
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
- if (pTcp != NULL) {
- int code = request.getCode();
- int opaque = request.getOpaque();
- boost::shared_ptr<ResponseFuture> responseFuture(
- new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL));
- addResponseFuture(opaque, responseFuture);
-
- if (SendCommand(pTcp, request)) {
- // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d,
- // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis);
- responseFuture->setSendRequestOK(true);
- RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis);
- if (pRsp == NULL) {
- if (code != GET_CONSUMER_LIST_BY_GROUP) {
- LOG_WARN(
- "wait response timeout or get NULL response of code:%d, so "
- "closeTransport of addr:%s",
- code, addr.c_str());
- CloseTransport(addr, pTcp);
- }
- // avoid responseFuture leak;
- findAndDeleteResponseFuture(opaque);
- return NULL;
- } else {
- return pRsp;
- }
- } else {
- // avoid responseFuture leak;
- findAndDeleteResponseFuture(opaque);
- CloseTransport(addr, pTcp);
- }
- }
- return NULL;
-}
-
-bool TcpRemotingClient::invokeAsync(const string& addr,
- RemotingCommand& request,
- AsyncCallbackWrap* cbw,
- int64 timeoutMilliseconds,
- int maxRetrySendTimes,
- int retrySendTimes) {
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
- if (pTcp != NULL) {
- //<!not delete, for callback to delete;
- int code = request.getCode();
- int opaque = request.getOpaque();
- boost::shared_ptr<ResponseFuture> responseFuture(
- new ResponseFuture(code, opaque, this, timeoutMilliseconds, true,
cbw));
- responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
- responseFuture->setRetrySendTimes(retrySendTimes);
- responseFuture->setBrokerAddr(addr);
- responseFuture->setRequestCommand(request);
- addAsyncResponseFuture(opaque, responseFuture);
- if (cbw) {
- boost::asio::deadline_timer* t =
- new boost::asio::deadline_timer(m_async_ioService,
boost::posix_time::milliseconds(timeoutMilliseconds));
- addTimerCallback(t, opaque);
- boost::system::error_code e;
-
t->async_wait(boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout,
this, e, opaque));
- }
-
- if (SendCommand(pTcp, request)) // Even if send failed, asyncTimerThread
- // will trigger next pull request or
report
- // send msg failed
- {
- LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d",
addr.c_str(), code, opaque);
- responseFuture->setSendRequestOK(true);
- }
- return true;
- }
- LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str());
- return false;
-}
-
-void TcpRemotingClient::invokeOneway(const string& addr, RemotingCommand&
request) {
- //<!not need callback;
- boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
- if (pTcp != NULL) {
- request.markOnewayRPC();
- LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(),
request.getCode());
- SendCommand(pTcp, request);
- }
-}
-
-boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string&
addr, bool needRespons) {
- if (addr.empty())
- return CreateNameserverTransport(needRespons);
-
- return CreateTransport(addr, needRespons);
-}
-
-boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const
string& addr, bool needRespons) {
- boost::shared_ptr<TcpTransport> tts;
- {
- // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
- // long
- // time, if could not get m_tcpLock, return NULL
- bool bGetMutex = false;
- boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
- if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
- LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- } else {
- bGetMutex = true;
- }
- } else {
- bGetMutex = true;
- }
- if (bGetMutex) {
- if (m_tcpTable.find(addr) != m_tcpTable.end()) {
- boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]);
- boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock();
- if (tcp) {
- tcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
- if (connectStatus == e_connectWaitResponse) {
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- } else if (connectStatus == e_connectFail) {
- LOG_ERROR("tcpTransport with server disconnected, erase
server:%s", addr.c_str());
- tcp->disconnect(addr); // avoid coredump when connection with
broker was broken
- m_tcpTable.erase(addr);
- } else if (connectStatus == e_connectSuccess) {
- return tcp;
- } else {
- LOG_ERROR(
- "go to fault state, erase:%s from tcpMap, and reconnect "
- "it",
- addr.c_str());
- m_tcpTable.erase(addr);
- }
- }
- }
-
- //<!callback;
- READ_CALLBACK callback = needRespons ?
&TcpRemotingClient::static_messageReceived : NULL;
-
- tts.reset(new TcpTransport(this, callback));
- tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout);
- if (connectStatus != e_connectWaitResponse) {
- LOG_WARN("can not connect to :%s", addr.c_str());
- tts->disconnect(addr);
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- } else {
- m_tcpTable[addr] = tts; // even if connecting failed finally, this
- // server transport will be erased by next
- // CreateTransport
- }
- } else {
- LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str());
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- }
- }
-
- tcpConnectStatus connectStatus =
tts->waitTcpConnectEvent(m_tcpConnectTimeout);
- if (connectStatus != e_connectSuccess) {
- LOG_WARN("can not connect to server:%s", addr.c_str());
- tts->disconnect(addr);
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- } else {
- LOG_INFO("connect server with addr:%s success", addr.c_str());
- return tts;
- }
-}
-
-boost::shared_ptr<TcpTransport>
TcpRemotingClient::CreateNameserverTransport(bool needRespons) {
- // m_namesrvLock was added to avoid operation of nameServer was blocked by
- // m_tcpLock, it was used by single Thread mostly, so no performance impact
- // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long
- // time, if could not get m_namesrvlock, return NULL
- bool bGetMutex = false;
- boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
boost::try_to_lock);
- if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
- LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- } else {
- bGetMutex = true;
- }
- } else {
- bGetMutex = true;
- }
-
- if (bGetMutex) {
- if (!m_namesrvAddrChoosed.empty()) {
- boost::shared_ptr<TcpTransport> pTcp =
GetTransport(m_namesrvAddrChoosed, true);
- if (pTcp)
- return pTcp;
- else
- m_namesrvAddrChoosed.clear();
- }
-
- vector<string>::iterator itp = m_namesrvAddrList.begin();
- for (; itp != m_namesrvAddrList.end(); ++itp) {
- unsigned int index = m_namesrvIndex % m_namesrvAddrList.size();
- if (m_namesrvIndex == numeric_limits<unsigned int>::max())
- m_namesrvIndex = 0;
- m_namesrvIndex++;
- LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT
"", m_namesrvIndex, index,
- m_namesrvAddrList.size());
- boost::shared_ptr<TcpTransport> pTcp =
GetTransport(m_namesrvAddrList[index], true);
- if (pTcp) {
- m_namesrvAddrChoosed = m_namesrvAddrList[index];
- return pTcp;
- }
- }
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- } else {
- LOG_WARN("get nameServer tcpTransport mutex failed");
- boost::shared_ptr<TcpTransport> pTcp;
- return pTcp;
- }
-}
-
-void TcpRemotingClient::CloseTransport(const string& addr,
boost::shared_ptr<TcpTransport> pTcp) {
- if (addr.empty()) {
- return CloseNameServerTransport(pTcp);
- }
-
- bool bGetMutex = false;
- boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
- if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
- LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
- return;
- } else {
- bGetMutex = true;
- }
- } else {
- bGetMutex = true;
- }
- LOG_ERROR("CloseTransport of:%s", addr.c_str());
- if (bGetMutex) {
- bool removeItemFromTable = true;
- if (m_tcpTable.find(addr) != m_tcpTable.end()) {
- if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
- LOG_INFO(
- "tcpTransport with addr:%s has been closed before, and has been "
- "created again, nothing to do",
- addr.c_str());
- removeItemFromTable = false;
- }
- } else {
- LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable
before", addr.c_str());
- removeItemFromTable = false;
- }
-
- if (removeItemFromTable == true) {
- LOG_WARN("closeTransport: disconnect broker:%s with state:%d",
addr.c_str(),
- m_tcpTable[addr]->getTcpConnectStatus());
- if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess)
- m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection
with server was broken
- LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
- m_tcpTable.erase(addr);
- }
- } else {
- LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str());
- return;
- }
- LOG_ERROR("CloseTransport of:%s end", addr.c_str());
-}
-
-void
TcpRemotingClient::CloseNameServerTransport(boost::shared_ptr<TcpTransport>
pTcp) {
- bool bGetMutex = false;
- boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
boost::try_to_lock);
- if (!lock.owns_lock()) {
- if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
- LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
- return;
- } else {
- bGetMutex = true;
- }
- } else {
- bGetMutex = true;
- }
- if (bGetMutex) {
- string addr = m_namesrvAddrChoosed;
- bool removeItemFromTable = true;
- if (m_tcpTable.find(addr) != m_tcpTable.end()) {
- if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
- LOG_INFO(
- "tcpTransport with addr:%s has been closed before, and has been "
- "created again, nothing to do",
- addr.c_str());
- removeItemFromTable = false;
- }
- } else {
- LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable
before", addr.c_str());
- removeItemFromTable = false;
- }
-
- if (removeItemFromTable == true) {
- m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection
with server was broken
- LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
- m_tcpTable.erase(addr);
- m_namesrvAddrChoosed.clear();
- }
- } else {
- LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s",
m_namesrvAddrChoosed.c_str());
- return;
- }
-}
-
-bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts,
RemotingCommand& msg) {
- const MemoryBlock* phead = msg.GetHead();
- const MemoryBlock* pbody = msg.GetBody();
-
- unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024));
- if (phead->getData()) {
- result->write(phead->getData(), phead->getSize());
- }
- if (pbody->getData()) {
- result->write(pbody->getData(), pbody->getSize());
- }
- const char* pData = static_cast<const char*>(result->getData());
- int len = result->getDataSize();
- return pTts->sendMessage(pData, len);
-}
-
-void TcpRemotingClient::static_messageReceived(void* context, const
MemoryBlock& mem, const string& addr) {
- TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context;
- if (pTcpRemotingClient)
- pTcpRemotingClient->messageReceived(mem, addr);
-}
-
-void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string&
addr) {
- m_ioService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem,
addr));
-}
-
-void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string&
addr) {
- RemotingCommand* pRespondCmd = NULL;
- try {
- pRespondCmd = RemotingCommand::Decode(mem);
- } catch (...) {
- LOG_ERROR("processData_error");
- return;
- }
-
- int opaque = pRespondCmd->getOpaque();
-
- //<!process self;
- if (pRespondCmd->isResponseType()) {
- boost::shared_ptr<ResponseFuture>
pFuture(findAndDeleteAsyncResponseFuture(opaque));
- if (!pFuture) {
- pFuture = findAndDeleteResponseFuture(opaque);
- if (pFuture) {
- if (pFuture->getSyncResponseFlag()) {
- LOG_WARN("waitResponse already timeout of opaque:%d", opaque);
- deleteAndZero(pRespondCmd);
- return;
- }
- LOG_DEBUG("find_response opaque:%d", opaque);
- } else {
- LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d",
opaque);
- deleteAndZero(pRespondCmd);
- return;
- }
- }
- processResponseCommand(pRespondCmd, pFuture);
- } else {
- processRequestCommand(pRespondCmd, addr);
- }
-}
-
-void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd,
boost::shared_ptr<ResponseFuture> pfuture) {
- int code = pfuture->getRequestCode();
- int opaque = pCmd->getOpaque();
- LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d,
retrySendTimes:%d", code, opaque,
- pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes());
- pCmd->SetExtHeader(code); // set head , for response use
-
- pfuture->setResponse(pCmd);
-
- if (pfuture->getASyncFlag()) {
- if (!pfuture->getAsyncResponseFlag()) {
- pfuture->setAsyncResponseFlag();
- pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
- cancelTimerCallback(opaque);
- pfuture->executeInvokeCallback();
- }
- }
-}
-
-void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const
string& addr) {
- unique_ptr<RemotingCommand> pRequestCommand(pCmd);
- int requestCode = pRequestCommand->getCode();
- if (m_requestTable.find(requestCode) == m_requestTable.end()) {
- LOG_ERROR("can_not_find request:%d processor", requestCode);
- } else {
- unique_ptr<RemotingCommand>
pResponse(m_requestTable[requestCode]->processRequest(addr,
pRequestCommand.get()));
- if (!pRequestCommand->isOnewayRPC()) {
- if (pResponse) {
- pResponse->setOpaque(pRequestCommand->getOpaque());
- pResponse->markResponseType();
- pResponse->Encode();
-
- invokeOneway(addr, *pResponse);
- }
- }
- }
-}
-
-void TcpRemotingClient::addResponseFuture(int opaque,
boost::shared_ptr<ResponseFuture> pfuture) {
- boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
- m_futureTable[opaque] = pfuture;
-}
-
-// Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will
-// be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture>
TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
- boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
- boost::shared_ptr<ResponseFuture> pResponseFuture;
- if (m_futureTable.find(opaque) != m_futureTable.end()) {
- pResponseFuture = m_futureTable[opaque];
- m_futureTable.erase(opaque);
- }
- return pResponseFuture;
-}
-
-void TcpRemotingClient::handleAsyncPullForResponseTimeout(const
boost::system::error_code& e, int opaque) {
- if (e == boost::asio::error::operation_aborted) {
- LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d,
msg:%s", opaque, e.value(),
- e.message().data());
- return;
- }
-
- LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s",
opaque, e.value(), e.message().data());
- boost::shared_ptr<ResponseFuture>
pFuture(findAndDeleteAsyncResponseFuture(opaque));
- if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap()))
{
- if ((pFuture->getAsyncResponseFlag() != true)) // if no response
received, then check timeout or not
- {
- LOG_ERROR("no response got for opaque:%d", opaque);
- pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout);
- pFuture->executeInvokeCallbackException();
- }
- }
-
- eraseTimerCallback(opaque);
-}
-
-void TcpRemotingClient::addAsyncResponseFuture(int opaque,
boost::shared_ptr<ResponseFuture> pfuture) {
- boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
- m_asyncFutureTable[opaque] = pfuture;
-}
-
-// Note: after call this function, shared_ptr of m_asyncFutureTable[opaque]
will
-// be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture>
TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
- boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
- boost::shared_ptr<ResponseFuture> pResponseFuture;
- if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) {
- pResponseFuture = m_asyncFutureTable[opaque];
- m_asyncFutureTable.erase(opaque);
- }
-
- return pResponseFuture;
-}
-
-void TcpRemotingClient::registerProcessor(MQRequestCode requestCode,
ClientRemotingProcessor* clientRemotingProcessor) {
- if (m_requestTable.find(requestCode) != m_requestTable.end())
- m_requestTable.erase(requestCode);
- m_requestTable[requestCode] = clientRemotingProcessor;
-}
-
-void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int
opaque) {
- boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
- if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
- LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque);
- boost::asio::deadline_timer* old_t = m_async_timer_map[opaque];
- old_t->cancel();
- delete old_t;
- old_t = NULL;
- m_async_timer_map.erase(opaque);
- }
- m_async_timer_map[opaque] = t;
-}
-
-void TcpRemotingClient::eraseTimerCallback(int opaque) {
- boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
- if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
- LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
- boost::asio::deadline_timer* t = m_async_timer_map[opaque];
- delete t;
- t = NULL;
- m_async_timer_map.erase(opaque);
- }
-}
-
-void TcpRemotingClient::cancelTimerCallback(int opaque) {
- boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
- if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
- LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque);
- boost::asio::deadline_timer* t = m_async_timer_map[opaque];
- t->cancel();
- delete t;
- t = NULL;
- m_async_timer_map.erase(opaque);
- }
-}
-
-void TcpRemotingClient::removeAllTimerCallback() {
- boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
- for (asyncTimerMap::iterator it = m_async_timer_map.begin(); it !=
m_async_timer_map.end(); ++it) {
- boost::asio::deadline_timer* t = it->second;
- t->cancel();
- delete t;
- t = NULL;
- }
- m_async_timer_map.clear();
-}
-
-void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue&
mq, int opaque) {
- // delete the map record of opaque<->ResponseFuture, so the answer for the
pull request will discard when receive it
- // later
- boost::shared_ptr<ResponseFuture>
pFuture(findAndDeleteAsyncResponseFuture(opaque));
- if (!pFuture) {
- pFuture = findAndDeleteResponseFuture(opaque);
- if (pFuture) {
- LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s",
opaque, mq.toString().data());
- }
- } else {
- LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s",
opaque, mq.toString().data());
- }
- // delete the timeout timer for opaque for pullrequest
- cancelTimerCallback(opaque);
-}
-
-//<!************************************************************************
-} //<!end namespace;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TcpRemotingClient.h"
+#include <stddef.h>
+#if !defined(WIN32) && !defined(__APPLE__)
+#include <sys/prctl.h>
+#endif
+#include "Logging.h"
+#include "MemoryOutputStream.h"
+#include "TopAddressing.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+//<!************************************************************************
+TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t
tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
+ : m_pullThreadNum(pullThreadNum),
+ m_tcpConnectTimeout(tcpConnectTimeout),
+ m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout),
+ m_namesrvIndex(0),
+ m_ioServiceWork(m_ioService) {
+#if !defined(WIN32) && !defined(__APPLE__)
+ string taskName = UtilAll::getProcessName();
+ prctl(PR_SET_NAME, "networkTP", 0, 0, 0);
+#endif
+ for (int i = 0; i != pullThreadNum; ++i) {
+ m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run,
&m_ioService));
+ }
+#if !defined(WIN32) && !defined(__APPLE__)
+ prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
+#endif
+ LOG_INFO(
+ "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, "
+ "m_pullThreadNum:%d",
+ m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum);
+ m_async_service_thread.reset(new
boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
+}
+
+void TcpRemotingClient::boost_asio_work() {
+ LOG_INFO("TcpRemotingClient::boost asio async service runing");
+ boost::asio::io_service::work work(m_async_ioService); // avoid async io
+ // service stops
after
+ // first timer
timeout
+ // callback
+ m_async_ioService.run();
+}
+
+TcpRemotingClient::~TcpRemotingClient() {
+ m_tcpTable.clear();
+ m_futureTable.clear();
+ m_asyncFutureTable.clear();
+ m_namesrvAddrList.clear();
+ removeAllTimerCallback();
+}
+
+void TcpRemotingClient::stopAllTcpTransportThread() {
+ LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
+ m_async_ioService.stop();
+ m_async_service_thread->interrupt();
+ m_async_service_thread->join();
+ removeAllTimerCallback();
+
+ {
+ TcpMap::iterator it = m_tcpTable.begin();
+ for (; it != m_tcpTable.end(); ++it) {
+ it->second->disconnect(it->first);
+ }
+ m_tcpTable.clear();
+ }
+
+ m_ioService.stop();
+ m_threadpool.join_all();
+
+ {
+ boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+ for (ResMap::iterator it = m_futureTable.begin(); it !=
m_futureTable.end(); ++it) {
+ if (it->second)
+ it->second->releaseThreadCondition();
+ }
+ }
+ LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
+}
+
+void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
+ LOG_INFO("updateNameServerAddressList: [%s]", addrs.c_str());
+ if (!addrs.empty()) {
+ boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
boost::try_to_lock);
+ if (!lock.owns_lock()) {
+ if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(10))) {
+ LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
+ return;
+ }
+ }
+ // clear first;
+ m_namesrvAddrList.clear();
+
+ vector<string> out;
+ UtilAll::Split(out, addrs, ";");
+ for (size_t i = 0; i < out.size(); i++) {
+ string addr = out[i];
+ UtilAll::Trim(addr);
+
+ string hostName;
+ short portNumber;
+ if (UtilAll::SplitURL(addr, hostName, portNumber)) {
+ LOG_INFO("update Namesrv:%s", addr.c_str());
+ m_namesrvAddrList.push_back(addr);
+ } else {
+ LOG_INFO("This may be invalid namer server: [%s]", addr.c_str());
+ }
+ }
+ out.clear();
+ }
+}
+
+bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand&
request) {
+ boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+ if (pTcp != NULL) {
+ int code = request.getCode();
+ int opaque = request.getOpaque();
+ boost::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code,
opaque, this, 3000, false, NULL));
+ addResponseFuture(opaque, responseFuture);
+ // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d,
+ // timeoutms:%d", addr.c_str(), code, opaque, 3000);
+
+ if (SendCommand(pTcp, request)) {
+ responseFuture->setSendRequestOK(true);
+ unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000));
+ if (pRsp == NULL) {
+ LOG_ERROR("wait response timeout of heartbeat, so closeTransport of
addr:%s", addr.c_str());
+ CloseTransport(addr, pTcp);
+ return false;
+ } else if (pRsp->getCode() == SUCCESS_VALUE) {
+ return true;
+ } else {
+ LOG_WARN("get error response:%d of heartbeat to addr:%s",
pRsp->getCode(), addr.c_str());
+ return false;
+ }
+ } else {
+ CloseTransport(addr, pTcp);
+ }
+ }
+ return false;
+}
+
+RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
+ RemotingCommand& request,
+ int timeoutMillis /* = 3000 */)
{
+ LOG_DEBUG("InvokeSync:", addr.c_str());
+ boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+ if (pTcp != NULL) {
+ int code = request.getCode();
+ int opaque = request.getOpaque();
+ boost::shared_ptr<ResponseFuture> responseFuture(
+ new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL));
+ addResponseFuture(opaque, responseFuture);
+
+ if (SendCommand(pTcp, request)) {
+ // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d,
+ // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis);
+ responseFuture->setSendRequestOK(true);
+ RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis);
+ if (pRsp == NULL) {
+ if (code != GET_CONSUMER_LIST_BY_GROUP) {
+ LOG_WARN(
+ "wait response timeout or get NULL response of code:%d, so "
+ "closeTransport of addr:%s",
+ code, addr.c_str());
+ CloseTransport(addr, pTcp);
+ }
+ // avoid responseFuture leak;
+ findAndDeleteResponseFuture(opaque);
+ return NULL;
+ } else {
+ return pRsp;
+ }
+ } else {
+ // avoid responseFuture leak;
+ findAndDeleteResponseFuture(opaque);
+ CloseTransport(addr, pTcp);
+ }
+ }
+ LOG_DEBUG("InvokeSync [%s] Failed: Cannot Get Transport.", addr.c_str());
+ return NULL;
+}
+
+bool TcpRemotingClient::invokeAsync(const string& addr,
+ RemotingCommand& request,
+ AsyncCallbackWrap* cbw,
+ int64 timeoutMilliseconds,
+ int maxRetrySendTimes,
+ int retrySendTimes) {
+ boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+ if (pTcp != NULL) {
+ //<!not delete, for callback to delete;
+ int code = request.getCode();
+ int opaque = request.getOpaque();
+ boost::shared_ptr<ResponseFuture> responseFuture(
+ new ResponseFuture(code, opaque, this, timeoutMilliseconds, true,
cbw));
+ responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
+ responseFuture->setRetrySendTimes(retrySendTimes);
+ responseFuture->setBrokerAddr(addr);
+ responseFuture->setRequestCommand(request);
+ addAsyncResponseFuture(opaque, responseFuture);
+ if (cbw) {
+ boost::asio::deadline_timer* t =
+ new boost::asio::deadline_timer(m_async_ioService,
boost::posix_time::milliseconds(timeoutMilliseconds));
+ addTimerCallback(t, opaque);
+ boost::system::error_code e;
+
t->async_wait(boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout,
this, e, opaque));
+ }
+
+ if (SendCommand(pTcp, request)) // Even if send failed, asyncTimerThread
+ // will trigger next pull request or
report
+ // send msg failed
+ {
+ LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d",
addr.c_str(), code, opaque);
+ responseFuture->setSendRequestOK(true);
+ }
+ return true;
+ }
+ LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str());
+ return false;
+}
+
+void TcpRemotingClient::invokeOneway(const string& addr, RemotingCommand&
request) {
+ //<!not need callback;
+ boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+ if (pTcp != NULL) {
+ request.markOnewayRPC();
+ LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(),
request.getCode());
+ SendCommand(pTcp, request);
+ }
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string&
addr, bool needRespons) {
+ if (addr.empty()) {
+ LOG_DEBUG("GetTransport of NameServer");
+ return CreateNameserverTransport(needRespons);
+ }
+ return CreateTransport(addr, needRespons);
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const
string& addr, bool needRespons) {
+ boost::shared_ptr<TcpTransport> tts;
+ {
+ // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
+ // long
+ // time, if could not get m_tcpLock, return NULL
+ bool bGetMutex = false;
+ boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+ if (!lock.owns_lock()) {
+ if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+ LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
+ boost::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
+ } else {
+ bGetMutex = true;
+ }
+ } else {
+ bGetMutex = true;
+ }
+ if (bGetMutex) {
+ if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+ boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]);
+ boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock();
+ if (tcp) {
+ tcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
+ if (connectStatus == e_connectWaitResponse) {
+ boost::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
+ } else if (connectStatus == e_connectFail) {
+ LOG_ERROR("tcpTransport with server disconnected, erase
server:%s", addr.c_str());
+ tcp->disconnect(addr); // avoid coredump when connection with
broker was broken
+ m_tcpTable.erase(addr);
+ } else if (connectStatus == e_connectSuccess) {
+ return tcp;
+ } else {
+ LOG_ERROR(
+ "go to fault state, erase:%s from tcpMap, and reconnect "
+ "it",
+ addr.c_str());
+ m_tcpTable.erase(addr);
+ }
+ }
+ }
+
+ //<!callback;
+ READ_CALLBACK callback = needRespons ?
&TcpRemotingClient::static_messageReceived : NULL;
+
+ tts.reset(new TcpTransport(this, callback));
+ tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout);
+ if (connectStatus != e_connectWaitResponse) {
+ LOG_WARN("can not connect to :%s", addr.c_str());
+ tts->disconnect(addr);
+ boost::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
+ } else {
+ m_tcpTable[addr] = tts; // even if connecting failed finally, this
+ // server transport will be erased by next
+ // CreateTransport
+ }
+ } else {
+ LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str());
+ boost::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
+ }
+ }
+
+ tcpConnectStatus connectStatus =
tts->waitTcpConnectEvent(m_tcpConnectTimeout);
+ if (connectStatus != e_connectSuccess) {
+ LOG_WARN("can not connect to server:%s", addr.c_str());
+ tts->disconnect(addr);
+ boost::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
+ } else {
+ LOG_INFO("connect server with addr:%s success", addr.c_str());
+ return tts;
+ }
+}
+
+boost::shared_ptr<TcpTransport>
TcpRemotingClient::CreateNameserverTransport(bool needRespons) {
+ // m_namesrvLock was added to avoid operation of nameServer was blocked by
+ // m_tcpLock, it was used by single Thread mostly, so no performance impact
+ // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long
+ // time, if could not get m_namesrvlock, return NULL
+ LOG_DEBUG("--CreateNameserverTransport--");
+ bool bGetMutex = false;
+ boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
boost::try_to_lock);
+ if (!lock.owns_lock()) {
+ if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+ LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
+ boost::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
+ } else {
+ bGetMutex = true;
+ }
+ } else {
+ bGetMutex = true;
+ }
+
+ if (bGetMutex) {
+ if (!m_namesrvAddrChoosed.empty()) {
+ boost::shared_ptr<TcpTransport> pTcp =
GetTransport(m_namesrvAddrChoosed, true);
+ if (pTcp)
+ return pTcp;
+ else
+ m_namesrvAddrChoosed.clear();
+ }
+
+ vector<string>::iterator itp = m_namesrvAddrList.begin();
+ for (; itp != m_namesrvAddrList.end(); ++itp) {
+ unsigned int index = m_namesrvIndex % m_namesrvAddrList.size();
+ if (m_namesrvIndex == numeric_limits<unsigned int>::max())
+ m_namesrvIndex = 0;
+ m_namesrvIndex++;
+ LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT
"", m_namesrvIndex, index,
+ m_namesrvAddrList.size());
+ boost::shared_ptr<TcpTransport> pTcp =
GetTransport(m_namesrvAddrList[index], true);
+ if (pTcp) {
+ m_namesrvAddrChoosed = m_namesrvAddrList[index];
+ return pTcp;
+ }
+ }
+ boost::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
+ } else {
+ LOG_WARN("get nameServer tcpTransport mutex failed");
+ boost::shared_ptr<TcpTransport> pTcp;
+ return pTcp;
+ }
+}
+
+void TcpRemotingClient::CloseTransport(const string& addr,
boost::shared_ptr<TcpTransport> pTcp) {
+ if (addr.empty()) {
+ return CloseNameServerTransport(pTcp);
+ }
+
+ bool bGetMutex = false;
+ boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+ if (!lock.owns_lock()) {
+ if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+ LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
+ return;
+ } else {
+ bGetMutex = true;
+ }
+ } else {
+ bGetMutex = true;
+ }
+ LOG_ERROR("CloseTransport of:%s", addr.c_str());
+ if (bGetMutex) {
+ bool removeItemFromTable = true;
+ if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+ if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+ LOG_INFO(
+ "tcpTransport with addr:%s has been closed before, and has been "
+ "created again, nothing to do",
+ addr.c_str());
+ removeItemFromTable = false;
+ }
+ } else {
+ LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable
before", addr.c_str());
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable == true) {
+ LOG_WARN("closeTransport: disconnect broker:%s with state:%d",
addr.c_str(),
+ m_tcpTable[addr]->getTcpConnectStatus());
+ if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess)
+ m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection
with server was broken
+ LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+ m_tcpTable.erase(addr);
+ }
+ } else {
+ LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str());
+ return;
+ }
+ LOG_ERROR("CloseTransport of:%s end", addr.c_str());
+}
+
+void
TcpRemotingClient::CloseNameServerTransport(boost::shared_ptr<TcpTransport>
pTcp) {
+ bool bGetMutex = false;
+ boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock,
boost::try_to_lock);
+ if (!lock.owns_lock()) {
+ if (!lock.timed_lock(boost::get_system_time() +
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+ LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
+ return;
+ } else {
+ bGetMutex = true;
+ }
+ } else {
+ bGetMutex = true;
+ }
+ if (bGetMutex) {
+ string addr = m_namesrvAddrChoosed;
+ bool removeItemFromTable = true;
+ if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+ if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+ LOG_INFO(
+ "tcpTransport with addr:%s has been closed before, and has been "
+ "created again, nothing to do",
+ addr.c_str());
+ removeItemFromTable = false;
+ }
+ } else {
+ LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable
before", addr.c_str());
+ removeItemFromTable = false;
+ }
+
+ if (removeItemFromTable == true) {
+ m_tcpTable[addr]->disconnect(addr); // avoid coredump when connection
with server was broken
+ LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+ m_tcpTable.erase(addr);
+ m_namesrvAddrChoosed.clear();
+ }
+ } else {
+ LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s",
m_namesrvAddrChoosed.c_str());
+ return;
+ }
+}
+
+bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts,
RemotingCommand& msg) {
+ const MemoryBlock* phead = msg.GetHead();
+ const MemoryBlock* pbody = msg.GetBody();
+
+ unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024));
+ if (phead->getData()) {
+ result->write(phead->getData(), phead->getSize());
+ }
+ if (pbody->getData()) {
+ result->write(pbody->getData(), pbody->getSize());
+ }
+ const char* pData = static_cast<const char*>(result->getData());
+ int len = result->getDataSize();
+ return pTts->sendMessage(pData, len);
+}
+
+void TcpRemotingClient::static_messageReceived(void* context, const
MemoryBlock& mem, const string& addr) {
+ TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context;
+ if (pTcpRemotingClient)
+ pTcpRemotingClient->messageReceived(mem, addr);
+}
+
+void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string&
addr) {
+ m_ioService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem,
addr));
+}
+
+void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string&
addr) {
+ RemotingCommand* pRespondCmd = NULL;
+ try {
+ pRespondCmd = RemotingCommand::Decode(mem);
+ } catch (...) {
+ LOG_ERROR("processData_error");
+ return;
+ }
+
+ int opaque = pRespondCmd->getOpaque();
+
+ //<!process self;
+ if (pRespondCmd->isResponseType()) {
+ boost::shared_ptr<ResponseFuture>
pFuture(findAndDeleteAsyncResponseFuture(opaque));
+ if (!pFuture) {
+ pFuture = findAndDeleteResponseFuture(opaque);
+ if (pFuture) {
+ if (pFuture->getSyncResponseFlag()) {
+ LOG_WARN("waitResponse already timeout of opaque:%d", opaque);
+ deleteAndZero(pRespondCmd);
+ return;
+ }
+ LOG_DEBUG("find_response opaque:%d", opaque);
+ } else {
+ LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d",
opaque);
+ deleteAndZero(pRespondCmd);
+ return;
+ }
+ }
+ processResponseCommand(pRespondCmd, pFuture);
+ } else {
+ processRequestCommand(pRespondCmd, addr);
+ }
+}
+
+void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd,
boost::shared_ptr<ResponseFuture> pfuture) {
+ int code = pfuture->getRequestCode();
+ int opaque = pCmd->getOpaque();
+ LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d,
retrySendTimes:%d", code, opaque,
+ pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes());
+ pCmd->SetExtHeader(code); // set head , for response use
+
+ pfuture->setResponse(pCmd);
+
+ if (pfuture->getASyncFlag()) {
+ if (!pfuture->getAsyncResponseFlag()) {
+ pfuture->setAsyncResponseFlag();
+ pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
+ cancelTimerCallback(opaque);
+ pfuture->executeInvokeCallback();
+ }
+ }
+}
+
+void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const
string& addr) {
+ unique_ptr<RemotingCommand> pRequestCommand(pCmd);
+ int requestCode = pRequestCommand->getCode();
+ if (m_requestTable.find(requestCode) == m_requestTable.end()) {
+ LOG_ERROR("can_not_find request:%d processor", requestCode);
+ } else {
+ unique_ptr<RemotingCommand>
pResponse(m_requestTable[requestCode]->processRequest(addr,
pRequestCommand.get()));
+ if (!pRequestCommand->isOnewayRPC()) {
+ if (pResponse) {
+ pResponse->setOpaque(pRequestCommand->getOpaque());
+ pResponse->markResponseType();
+ pResponse->Encode();
+
+ invokeOneway(addr, *pResponse);
+ }
+ }
+ }
+}
+
+void TcpRemotingClient::addResponseFuture(int opaque,
boost::shared_ptr<ResponseFuture> pfuture) {
+ boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+ m_futureTable[opaque] = pfuture;
+}
+
+// Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will
+// be erased, so caller must ensure the life cycle of returned shared_ptr;
+boost::shared_ptr<ResponseFuture>
TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
+ boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+ boost::shared_ptr<ResponseFuture> pResponseFuture;
+ if (m_futureTable.find(opaque) != m_futureTable.end()) {
+ pResponseFuture = m_futureTable[opaque];
+ m_futureTable.erase(opaque);
+ }
+ return pResponseFuture;
+}
+
+void TcpRemotingClient::handleAsyncPullForResponseTimeout(const
boost::system::error_code& e, int opaque) {
+ if (e == boost::asio::error::operation_aborted) {
+ LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d,
msg:%s", opaque, e.value(),
+ e.message().data());
+ return;
+ }
+
+ LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s",
opaque, e.value(), e.message().data());
+ boost::shared_ptr<ResponseFuture>
pFuture(findAndDeleteAsyncResponseFuture(opaque));
+ if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap()))
{
+ if ((pFuture->getAsyncResponseFlag() != true)) // if no response
received, then check timeout or not
+ {
+ LOG_ERROR("no response got for opaque:%d", opaque);
+ pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout);
+ pFuture->executeInvokeCallbackException();
+ }
+ }
+
+ eraseTimerCallback(opaque);
+}
+
+void TcpRemotingClient::addAsyncResponseFuture(int opaque,
boost::shared_ptr<ResponseFuture> pfuture) {
+ boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
+ m_asyncFutureTable[opaque] = pfuture;
+}
+
+// Note: after call this function, shared_ptr of m_asyncFutureTable[opaque]
will
+// be erased, so caller must ensure the life cycle of returned shared_ptr;
+boost::shared_ptr<ResponseFuture>
TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
+ boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
+ boost::shared_ptr<ResponseFuture> pResponseFuture;
+ if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) {
+ pResponseFuture = m_asyncFutureTable[opaque];
+ m_asyncFutureTable.erase(opaque);
+ }
+
+ return pResponseFuture;
+}
+
+void TcpRemotingClient::registerProcessor(MQRequestCode requestCode,
ClientRemotingProcessor* clientRemotingProcessor) {
+ if (m_requestTable.find(requestCode) != m_requestTable.end())
+ m_requestTable.erase(requestCode);
+ m_requestTable[requestCode] = clientRemotingProcessor;
+}
+
+void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int
opaque) {
+ boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+ if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+ LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque);
+ boost::asio::deadline_timer* old_t = m_async_timer_map[opaque];
+ old_t->cancel();
+ delete old_t;
+ old_t = NULL;
+ m_async_timer_map.erase(opaque);
+ }
+ m_async_timer_map[opaque] = t;
+}
+
+void TcpRemotingClient::eraseTimerCallback(int opaque) {
+ boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+ if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+ LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
+ boost::asio::deadline_timer* t = m_async_timer_map[opaque];
+ delete t;
+ t = NULL;
+ m_async_timer_map.erase(opaque);
+ }
+}
+
+void TcpRemotingClient::cancelTimerCallback(int opaque) {
+ boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+ if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+ LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque);
+ boost::asio::deadline_timer* t = m_async_timer_map[opaque];
+ t->cancel();
+ delete t;
+ t = NULL;
+ m_async_timer_map.erase(opaque);
+ }
+}
+
+void TcpRemotingClient::removeAllTimerCallback() {
+ boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+ for (asyncTimerMap::iterator it = m_async_timer_map.begin(); it !=
m_async_timer_map.end(); ++it) {
+ boost::asio::deadline_timer* t = it->second;
+ t->cancel();
+ delete t;
+ t = NULL;
+ }
+ m_async_timer_map.clear();
+}
+
+void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue&
mq, int opaque) {
+ // delete the map record of opaque<->ResponseFuture, so the answer for the
pull request will discard when receive it
+ // later
+ boost::shared_ptr<ResponseFuture>
pFuture(findAndDeleteAsyncResponseFuture(opaque));
+ if (!pFuture) {
+ pFuture = findAndDeleteResponseFuture(opaque);
+ if (pFuture) {
+ LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s",
opaque, mq.toString().data());
+ }
+ } else {
+ LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s",
opaque, mq.toString().data());
+ }
+ // delete the timeout timer for opaque for pullrequest
+ cancelTimerCallback(opaque);
+}
+
+//<!************************************************************************
+} // namespace rocketmq
diff --git a/src/transport/TcpTransport.cpp b/src/transport/TcpTransport.cpp
index 7ecf329..701cd8a 100644
--- a/src/transport/TcpTransport.cpp
+++ b/src/transport/TcpTransport.cpp
@@ -53,7 +53,9 @@ TcpTransport::~TcpTransport() {
tcpConnectStatus TcpTransport::connect(const string& strServerURL, int
timeOutMillisecs /* = 3000 */) {
string hostName;
short portNumber;
+ LOG_DEBUG("connect to [%s].", strServerURL.c_str());
if (!UtilAll::SplitURL(strServerURL, hostName, portNumber)) {
+ LOG_INFO("connect to [%s] failed, Invalid url.", strServerURL.c_str());
return e_connectFail;
}