This is an automated email from the ASF dual-hosted git repository.

dinglei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/rocketmq-client-cpp.git


The following commit(s) were added to refs/heads/master by this push:
     new 28c4cf0  [ISSUE #121]Support connect MQ endpoint with ACL (#133)
28c4cf0 is described below

commit 28c4cf0bd1e1b9d1f639fad6e59b7b62b243913a
Author: dinglei <[email protected]>
AuthorDate: Wed Apr 17 10:14:16 2019 +0800

    [ISSUE #121]Support connect MQ endpoint with ACL (#133)
    
    * Add function name and line in the log format
    
    * Support connect aliyun endpoint
    
    * Modify signature context to link aliyun mq
    
    * Format NameSpace file
    
    * Format file style
    
    * Update MQClientFactory.cpp
    
    * Remove unnecessary comments
---
 src/MQClientFactory.cpp             |    4 +-
 src/common/MQClient.cpp             |    3 +-
 src/common/NameSpaceUtil.cpp        |   39 +
 src/common/NameSpaceUtil.h          |   36 +
 src/log/Logging.cpp                 |   42 +-
 src/log/Logging.h                   |   63 +-
 src/protocol/CommandHeader.cpp      |    8 +-
 src/transport/TcpRemotingClient.cpp | 1397 ++++++++++++++++++-----------------
 src/transport/TcpTransport.cpp      |    2 +
 9 files changed, 849 insertions(+), 745 deletions(-)

diff --git a/src/MQClientFactory.cpp b/src/MQClientFactory.cpp
index 66cc243..cfa62dd 100644
--- a/src/MQClientFactory.cpp
+++ b/src/MQClientFactory.cpp
@@ -164,12 +164,12 @@ bool 
MQClientFactory::updateTopicRouteInfoFromNameServer(const string& topic,
       vector<QueueData>& queueDatas = pTopicRouteData->getQueueDatas();
       vector<QueueData>::iterator it = queueDatas.begin();
       for (; it != queueDatas.end(); ++it) {
-        // ¶Áд·ÖÇø¸öÊýÊÇÒ»Ö£¬¹ÊÖ»×öÒ»´ÎÅжÏ;
         int queueNums = std::min(4, it->readQueueNums);
         it->readQueueNums = queueNums;
         it->writeQueueNums = queueNums;
       }
     }
+    LOG_DEBUG("getTopicRouteInfoFromNameServer is null for topic :%s", 
topic.c_str());
   } else {
     
pTopicRouteData.reset(m_pClientAPIImpl->getTopicRouteInfoFromNameServer(topic, 
1000 * 5, session_credentials));
   }
@@ -280,7 +280,6 @@ void MQClientFactory::shutdown() {
 
   switch (m_serviceState) {
     case RUNNING: {
-      //<! stop;
       if (m_consumer_async_service_thread) {
         m_consumer_async_ioService.stop();
         m_consumer_async_service_thread->interrupt();
@@ -305,7 +304,6 @@ void MQClientFactory::shutdown() {
       break;
   }
 
-  //<!ɾ³ý×Ô¼º;
   MQClientManager::getInstance()->removeClientFactory(m_clientId);
 }
 
diff --git a/src/common/MQClient.cpp b/src/common/MQClient.cpp
index 6cc451c..42d77a5 100644
--- a/src/common/MQClient.cpp
+++ b/src/common/MQClient.cpp
@@ -21,6 +21,7 @@
 #include "MQClientManager.h"
 #include "TopicPublishInfo.h"
 #include "UtilAll.h"
+#include "NameSpaceUtil.h"
 
 namespace rocketmq {
 
@@ -68,7 +69,7 @@ const string& MQClient::getNamesrvAddr() const {
 }
 
 void MQClient::setNamesrvAddr(const string& namesrvAddr) {
-  m_namesrvAddr = namesrvAddr;
+  m_namesrvAddr = NameSpaceUtil::formatNameServerURL(namesrvAddr);
 }
 
 const string& MQClient::getNamesrvDomain() const {
diff --git a/src/common/NameSpaceUtil.cpp b/src/common/NameSpaceUtil.cpp
new file mode 100644
index 0000000..74f87a2
--- /dev/null
+++ b/src/common/NameSpaceUtil.cpp
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "NameSpaceUtil.h"
+#include "Logging.h"
+
+namespace rocketmq {
+
+bool NameSpaceUtil::isEndPointURL(string nameServerAddr) {
+  if (nameServerAddr.length() >= ENDPOINT_PREFIX_LENGTH && 
nameServerAddr.find(ENDPOINT_PREFIX) != string::npos) {
+    return true;
+  }
+  return false;
+}
+
+string NameSpaceUtil::formatNameServerURL(string nameServerAddr) {
+  int index = nameServerAddr.find(ENDPOINT_PREFIX);
+  if (index != string::npos) {
+    LOG_DEBUG("Get Name Server from endpoint [%s]",
+              nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH, 
nameServerAddr.length() - ENDPOINT_PREFIX_LENGTH).c_str());
+    return nameServerAddr.substr(ENDPOINT_PREFIX_LENGTH, 
nameServerAddr.length() - ENDPOINT_PREFIX_LENGTH);
+  }
+  return nameServerAddr;
+}
+}  // namespace rocketmq
diff --git a/src/common/NameSpaceUtil.h b/src/common/NameSpaceUtil.h
new file mode 100644
index 0000000..39f5da7
--- /dev/null
+++ b/src/common/NameSpaceUtil.h
@@ -0,0 +1,36 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef __NAMESPACEUTIL_H__
+#define __NAMESPACEUTIL_H__
+
+#include <string>
+
+using namespace std;
+
+static const string ENDPOINT_PREFIX = "http://";;
+static const int ENDPOINT_PREFIX_LENGTH = ENDPOINT_PREFIX.length();
+namespace rocketmq {
+class NameSpaceUtil {
+ public:
+  static bool isEndPointURL(string nameServerAddr);
+
+  static string formatNameServerURL(string nameServerAddr);
+};
+
+}  // namespace rocketmq
+#endif  //__NAMESPACEUTIL_H__
diff --git a/src/log/Logging.cpp b/src/log/Logging.cpp
index 6e6a939..9ecd1d8 100644
--- a/src/log/Logging.cpp
+++ b/src/log/Logging.cpp
@@ -1,19 +1,19 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 #include "Logging.h"
 #include <boost/date_time/gregorian/gregorian.hpp>
 #include "UtilAll.h"
@@ -53,13 +53,13 @@ logAdapter::logAdapter() : m_logLevel(eLOG_LEVEL_INFO) {
                                     keywords::min_free_space = 300 * 1024 * 
1024, keywords::target = homeDir,
                                     keywords::max_size = 200 * 1024 * 1024,  
// max keep 3 log file defaultly
                                     keywords::auto_flush = true);
-  logging::core::get()->set_filter(logging::trivial::severity >= 
logging::trivial::info);
+  // logging::core::get()->set_filter(logging::trivial::severity >= 
logging::trivial::info);
+  setLogLevelInner(m_logLevel);
 
   logging::add_common_attributes();
 }
 
-void logAdapter::setLogLevel(elogLevel logLevel) {
-  m_logLevel = logLevel;
+void logAdapter::setLogLevelInner(elogLevel logLevel) {
   switch (logLevel) {
     case eLOG_LEVEL_FATAL:
       logging::core::get()->set_filter(logging::trivial::severity >= 
logging::trivial::fatal);
@@ -90,6 +90,10 @@ void logAdapter::setLogLevel(elogLevel logLevel) {
       break;
   }
 }
+void logAdapter::setLogLevel(elogLevel logLevel) {
+  m_logLevel = logLevel;
+  setLogLevelInner(logLevel);
+}
 
 elogLevel logAdapter::getLogLevel() {
   return m_logLevel;
@@ -101,4 +105,4 @@ void logAdapter::setLogFileNumAndSize(int logNum, int 
sizeOfPerFile) {
   m_logSink->locked_backend()->set_file_collector(sinks::file::make_collector(
       keywords::target = homeDir, keywords::max_size = logNum * sizeOfPerFile 
* 1024 * 1024));
 }
-}
+}  // namespace rocketmq
diff --git a/src/log/Logging.h b/src/log/Logging.h
index 5729148..dfede34 100644
--- a/src/log/Logging.h
+++ b/src/log/Logging.h
@@ -1,23 +1,24 @@
 /*
-* Licensed to the Apache Software Foundation (ASF) under one or more
-* contributor license agreements.  See the NOTICE file distributed with
-* this work for additional information regarding copyright ownership.
-* The ASF licenses this file to You under the Apache License, Version 2.0
-* (the "License"); you may not use this file except in compliance with
-* the License.  You may obtain a copy of the License at
-*
-*     http://www.apache.org/licenses/LICENSE-2.0
-*
-* Unless required by applicable law or agreed to in writing, software
-* distributed under the License is distributed on an "AS IS" BASIS,
-* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-* See the License for the specific language governing permissions and
-* limitations under the License.
-*/
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
 
 #ifndef _ALOG_ADAPTER_H_
 #define _ALOG_ADAPTER_H_
 
+#include <string.h>
 #include <boost/date_time/posix_time/posix_time_types.hpp>
 #include <boost/log/core.hpp>
 #include <boost/log/expressions.hpp>
@@ -51,6 +52,7 @@ class logAdapter {
 
  private:
   logAdapter();
+  void setLogLevelInner(elogLevel logLevel);
   elogLevel m_logLevel;
   std::string m_logFile;
   src::severity_logger<boost::log::trivial::severity_level> m_severityLogger;
@@ -74,12 +76,31 @@ class LogUtil {
     BOOST_LOG_SEV(AGENT_LOGGER, level) << formattedString.get();
     va_end(arg_ptr);
   }
+  static void LogMessageFull(boost::log::trivial::severity_level level,
+                             const char* file,
+                             const char* func,
+                             int line,
+                             const char* format,
+                             ...) {
+    va_list arg_ptr;
+    va_start(arg_ptr, format);
+    boost::scoped_array<char> formattedString(new char[1024]);
+    vsnprintf(formattedString.get(), 1024, format, arg_ptr);
+    // BOOST_LOG_SEV(AGENT_LOGGER, level) << formattedString.get() << "[" << 
file << ":" << func << ":"<< line << "]";
+    BOOST_LOG_SEV(AGENT_LOGGER, level) << formattedString.get() << "[" << func 
<< ":" << line << "]";
+    va_end(arg_ptr);
+  }
 };
 
-#define LOG_FATAL(...) LogUtil::LogMessage(boost::log::trivial::fatal, 
__LINE__, __VA_ARGS__)
-#define LOG_ERROR(...) LogUtil::LogMessage(boost::log::trivial::error, 
__LINE__, __VA_ARGS__)
-#define LOG_WARN(...) LogUtil::LogMessage(boost::log::trivial::warning, 
__LINE__, __VA_ARGS__)
-#define LOG_INFO(...) LogUtil::LogMessage(boost::log::trivial::info, __LINE__, 
__VA_ARGS__)
-#define LOG_DEBUG(...) LogUtil::LogMessage(boost::log::trivial::debug, 
__LINE__, __VA_ARGS__)
-}
+#define LOG_FATAL(...) \
+  LogUtil::LogMessageFull(boost::log::trivial::fatal, __FILE__, __FUNCTION__, 
__LINE__, __VA_ARGS__)
+#define LOG_ERROR(...) \
+  LogUtil::LogMessageFull(boost::log::trivial::error, __FILE__, __FUNCTION__, 
__LINE__, __VA_ARGS__)
+#define LOG_WARN(...) \
+  LogUtil::LogMessageFull(boost::log::trivial::warning, __FILE__, 
__FUNCTION__, __LINE__, __VA_ARGS__)
+//#define LOG_INFO(...) LogUtil::LogMessage(boost::log::trivial::info, 
__LINE__, __VA_ARGS__)
+#define LOG_INFO(...) LogUtil::LogMessageFull(boost::log::trivial::info, 
__FILE__, __FUNCTION__, __LINE__, __VA_ARGS__)
+#define LOG_DEBUG(...) \
+  LogUtil::LogMessageFull(boost::log::trivial::debug, __FILE__, __FUNCTION__, 
__LINE__, __VA_ARGS__)
+}  // namespace rocketmq
 #endif
diff --git a/src/protocol/CommandHeader.cpp b/src/protocol/CommandHeader.cpp
index 025d5c6..4e4b0bb 100644
--- a/src/protocol/CommandHeader.cpp
+++ b/src/protocol/CommandHeader.cpp
@@ -71,11 +71,9 @@ void SendMessageRequestHeader::Encode(Json::Value& outData) {
   outData["bornTimestamp"] = UtilAll::to_string(bornTimestamp);
   outData["flag"] = flag;
   outData["properties"] = properties;
-#ifdef ONS
   outData["reconsumeTimes"] = UtilAll::to_string(reconsumeTimes);
   outData["unitMode"] = UtilAll::to_string(unitMode);
-#endif
-  outData["batch"] = batch;
+  // outData["batch"] = batch;
 }
 
 int SendMessageRequestHeader::getReconsumeTimes() {
@@ -106,11 +104,9 @@ void 
SendMessageRequestHeader::SetDeclaredFieldOfCommandHeader(map<string, strin
   requestMap.insert(pair<string, string>("bornTimestamp", 
UtilAll::to_string(bornTimestamp)));
   requestMap.insert(pair<string, string>("flag", UtilAll::to_string(flag)));
   requestMap.insert(pair<string, string>("properties", properties));
-#ifdef ONS
   requestMap.insert(pair<string, string>("reconsumeTimes", 
UtilAll::to_string(reconsumeTimes)));
   requestMap.insert(pair<string, string>("unitMode", 
UtilAll::to_string(unitMode)));
-#endif
-  requestMap.insert(pair<string, string>("batch", UtilAll::to_string(batch)));
+  // requestMap.insert(pair<string, string>("batch", 
UtilAll::to_string(batch)));
 }
 
 //<!************************************************************************
diff --git a/src/transport/TcpRemotingClient.cpp 
b/src/transport/TcpRemotingClient.cpp
index 26060c1..e72af1d 100644
--- a/src/transport/TcpRemotingClient.cpp
+++ b/src/transport/TcpRemotingClient.cpp
@@ -1,695 +1,702 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-#include "TcpRemotingClient.h"
-#include <stddef.h>
-#if !defined(WIN32) && !defined(__APPLE__)
-#include <sys/prctl.h>
-#endif
-#include "Logging.h"
-#include "MemoryOutputStream.h"
-#include "TopAddressing.h"
-#include "UtilAll.h"
-
-namespace rocketmq {
-
-//<!************************************************************************
-TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t 
tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
-    : m_pullThreadNum(pullThreadNum),
-      m_tcpConnectTimeout(tcpConnectTimeout),
-      m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout),
-      m_namesrvIndex(0),
-      m_ioServiceWork(m_ioService) {
-#if !defined(WIN32) && !defined(__APPLE__)
-  string taskName = UtilAll::getProcessName();
-  prctl(PR_SET_NAME, "networkTP", 0, 0, 0);
-#endif
-  for (int i = 0; i != pullThreadNum; ++i) {
-    m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, 
&m_ioService));
-  }
-#if !defined(WIN32) && !defined(__APPLE__)
-  prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
-#endif
-  LOG_INFO(
-      "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, "
-      "m_pullThreadNum:%d",
-      m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum);
-  m_async_service_thread.reset(new 
boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
-}
-
-void TcpRemotingClient::boost_asio_work() {
-  LOG_INFO("TcpRemotingClient::boost asio async service runing");
-  boost::asio::io_service::work work(m_async_ioService);  // avoid async io
-                                                          // service stops 
after
-                                                          // first timer 
timeout
-                                                          // callback
-  m_async_ioService.run();
-}
-
-TcpRemotingClient::~TcpRemotingClient() {
-  m_tcpTable.clear();
-  m_futureTable.clear();
-  m_asyncFutureTable.clear();
-  m_namesrvAddrList.clear();
-  removeAllTimerCallback();
-}
-
-void TcpRemotingClient::stopAllTcpTransportThread() {
-  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
-  m_async_ioService.stop();
-  m_async_service_thread->interrupt();
-  m_async_service_thread->join();
-  removeAllTimerCallback();
-
-  {
-    TcpMap::iterator it = m_tcpTable.begin();
-    for (; it != m_tcpTable.end(); ++it) {
-      it->second->disconnect(it->first);
-    }
-    m_tcpTable.clear();
-  }
-
-  m_ioService.stop();
-  m_threadpool.join_all();
-
-  {
-    boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
-    for (ResMap::iterator it = m_futureTable.begin(); it != 
m_futureTable.end(); ++it) {
-      if (it->second)
-        it->second->releaseThreadCondition();
-    }
-  }
-  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
-}
-
-void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
-  if (!addrs.empty()) {
-    boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, 
boost::try_to_lock);
-    if (!lock.owns_lock()) {
-      if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(10))) {
-        LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
-        return;
-      }
-    }
-    // clear first;
-    m_namesrvAddrList.clear();
-
-    vector<string> out;
-    UtilAll::Split(out, addrs, ";");
-    for (size_t i = 0; i < out.size(); i++) {
-      string addr = out[i];
-      UtilAll::Trim(addr);
-
-      string hostName;
-      short portNumber;
-      if (UtilAll::SplitURL(addr, hostName, portNumber)) {
-        LOG_INFO("update Namesrv:%s", addr.c_str());
-        m_namesrvAddrList.push_back(addr);
-      }
-    }
-    out.clear();
-  }
-}
-
-bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& 
request) {
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
-    int code = request.getCode();
-    int opaque = request.getOpaque();
-    boost::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, 
opaque, this, 3000, false, NULL));
-    addResponseFuture(opaque, responseFuture);
-    // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d,
-    // timeoutms:%d", addr.c_str(), code, opaque, 3000);
-
-    if (SendCommand(pTcp, request)) {
-      responseFuture->setSendRequestOK(true);
-      unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000));
-      if (pRsp == NULL) {
-        LOG_ERROR("wait response timeout of heartbeat, so closeTransport of 
addr:%s", addr.c_str());
-        CloseTransport(addr, pTcp);
-        return false;
-      } else if (pRsp->getCode() == SUCCESS_VALUE) {
-        return true;
-      } else {
-        LOG_WARN("get error response:%d of heartbeat to addr:%s", 
pRsp->getCode(), addr.c_str());
-        return false;
-      }
-    } else {
-      CloseTransport(addr, pTcp);
-    }
-  }
-  return false;
-}
-
-RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
-                                               RemotingCommand& request,
-                                               int timeoutMillis /* = 3000 */) 
{
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
-    int code = request.getCode();
-    int opaque = request.getOpaque();
-    boost::shared_ptr<ResponseFuture> responseFuture(
-        new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL));
-    addResponseFuture(opaque, responseFuture);
-
-    if (SendCommand(pTcp, request)) {
-      // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d,
-      // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis);
-      responseFuture->setSendRequestOK(true);
-      RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis);
-      if (pRsp == NULL) {
-        if (code != GET_CONSUMER_LIST_BY_GROUP) {
-          LOG_WARN(
-              "wait response timeout or get NULL response of code:%d, so "
-              "closeTransport of addr:%s",
-              code, addr.c_str());
-          CloseTransport(addr, pTcp);
-        }
-        // avoid responseFuture leak;
-        findAndDeleteResponseFuture(opaque);
-        return NULL;
-      } else {
-        return pRsp;
-      }
-    } else {
-      // avoid responseFuture leak;
-      findAndDeleteResponseFuture(opaque);
-      CloseTransport(addr, pTcp);
-    }
-  }
-  return NULL;
-}
-
-bool TcpRemotingClient::invokeAsync(const string& addr,
-                                    RemotingCommand& request,
-                                    AsyncCallbackWrap* cbw,
-                                    int64 timeoutMilliseconds,
-                                    int maxRetrySendTimes,
-                                    int retrySendTimes) {
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
-    //<!not delete, for callback to delete;
-    int code = request.getCode();
-    int opaque = request.getOpaque();
-    boost::shared_ptr<ResponseFuture> responseFuture(
-        new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, 
cbw));
-    responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
-    responseFuture->setRetrySendTimes(retrySendTimes);
-    responseFuture->setBrokerAddr(addr);
-    responseFuture->setRequestCommand(request);
-    addAsyncResponseFuture(opaque, responseFuture);
-    if (cbw) {
-      boost::asio::deadline_timer* t =
-          new boost::asio::deadline_timer(m_async_ioService, 
boost::posix_time::milliseconds(timeoutMilliseconds));
-      addTimerCallback(t, opaque);
-      boost::system::error_code e;
-      
t->async_wait(boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout,
 this, e, opaque));
-    }
-
-    if (SendCommand(pTcp, request))  // Even if send failed, asyncTimerThread
-                                     // will trigger next pull request or 
report
-                                     // send msg failed
-    {
-      LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", 
addr.c_str(), code, opaque);
-      responseFuture->setSendRequestOK(true);
-    }
-    return true;
-  }
-  LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str());
-  return false;
-}
-
-void TcpRemotingClient::invokeOneway(const string& addr, RemotingCommand& 
request) {
-  //<!not need callback;
-  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
-  if (pTcp != NULL) {
-    request.markOnewayRPC();
-    LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(), 
request.getCode());
-    SendCommand(pTcp, request);
-  }
-}
-
-boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string& 
addr, bool needRespons) {
-  if (addr.empty())
-    return CreateNameserverTransport(needRespons);
-
-  return CreateTransport(addr, needRespons);
-}
-
-boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const 
string& addr, bool needRespons) {
-  boost::shared_ptr<TcpTransport> tts;
-  {
-    // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
-    // long
-    // time, if could not get m_tcpLock, return NULL
-    bool bGetMutex = false;
-    boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
-    if (!lock.owns_lock()) {
-      if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
-        LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
-        boost::shared_ptr<TcpTransport> pTcp;
-        return pTcp;
-      } else {
-        bGetMutex = true;
-      }
-    } else {
-      bGetMutex = true;
-    }
-    if (bGetMutex) {
-      if (m_tcpTable.find(addr) != m_tcpTable.end()) {
-        boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]);
-        boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock();
-        if (tcp) {
-          tcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
-          if (connectStatus == e_connectWaitResponse) {
-            boost::shared_ptr<TcpTransport> pTcp;
-            return pTcp;
-          } else if (connectStatus == e_connectFail) {
-            LOG_ERROR("tcpTransport with server disconnected, erase 
server:%s", addr.c_str());
-            tcp->disconnect(addr);  // avoid coredump when connection with 
broker was broken
-            m_tcpTable.erase(addr);
-          } else if (connectStatus == e_connectSuccess) {
-            return tcp;
-          } else {
-            LOG_ERROR(
-                "go to fault state, erase:%s from tcpMap, and reconnect "
-                "it",
-                addr.c_str());
-            m_tcpTable.erase(addr);
-          }
-        }
-      }
-
-      //<!callback;
-      READ_CALLBACK callback = needRespons ? 
&TcpRemotingClient::static_messageReceived : NULL;
-
-      tts.reset(new TcpTransport(this, callback));
-      tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout);
-      if (connectStatus != e_connectWaitResponse) {
-        LOG_WARN("can not connect to :%s", addr.c_str());
-        tts->disconnect(addr);
-        boost::shared_ptr<TcpTransport> pTcp;
-        return pTcp;
-      } else {
-        m_tcpTable[addr] = tts;  // even if connecting failed finally, this
-                                 // server transport will be erased by next
-                                 // CreateTransport
-      }
-    } else {
-      LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str());
-      boost::shared_ptr<TcpTransport> pTcp;
-      return pTcp;
-    }
-  }
-
-  tcpConnectStatus connectStatus = 
tts->waitTcpConnectEvent(m_tcpConnectTimeout);
-  if (connectStatus != e_connectSuccess) {
-    LOG_WARN("can not connect to server:%s", addr.c_str());
-    tts->disconnect(addr);
-    boost::shared_ptr<TcpTransport> pTcp;
-    return pTcp;
-  } else {
-    LOG_INFO("connect server with addr:%s success", addr.c_str());
-    return tts;
-  }
-}
-
-boost::shared_ptr<TcpTransport> 
TcpRemotingClient::CreateNameserverTransport(bool needRespons) {
-  // m_namesrvLock was added to avoid operation of nameServer was blocked by
-  // m_tcpLock, it was used by single Thread mostly, so no performance impact
-  // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long
-  // time, if could not get m_namesrvlock, return NULL
-  bool bGetMutex = false;
-  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, 
boost::try_to_lock);
-  if (!lock.owns_lock()) {
-    if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
-      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
-      boost::shared_ptr<TcpTransport> pTcp;
-      return pTcp;
-    } else {
-      bGetMutex = true;
-    }
-  } else {
-    bGetMutex = true;
-  }
-
-  if (bGetMutex) {
-    if (!m_namesrvAddrChoosed.empty()) {
-      boost::shared_ptr<TcpTransport> pTcp = 
GetTransport(m_namesrvAddrChoosed, true);
-      if (pTcp)
-        return pTcp;
-      else
-        m_namesrvAddrChoosed.clear();
-    }
-
-    vector<string>::iterator itp = m_namesrvAddrList.begin();
-    for (; itp != m_namesrvAddrList.end(); ++itp) {
-      unsigned int index = m_namesrvIndex % m_namesrvAddrList.size();
-      if (m_namesrvIndex == numeric_limits<unsigned int>::max())
-        m_namesrvIndex = 0;
-      m_namesrvIndex++;
-      LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT 
"", m_namesrvIndex, index,
-               m_namesrvAddrList.size());
-      boost::shared_ptr<TcpTransport> pTcp = 
GetTransport(m_namesrvAddrList[index], true);
-      if (pTcp) {
-        m_namesrvAddrChoosed = m_namesrvAddrList[index];
-        return pTcp;
-      }
-    }
-    boost::shared_ptr<TcpTransport> pTcp;
-    return pTcp;
-  } else {
-    LOG_WARN("get nameServer tcpTransport mutex failed");
-    boost::shared_ptr<TcpTransport> pTcp;
-    return pTcp;
-  }
-}
-
-void TcpRemotingClient::CloseTransport(const string& addr, 
boost::shared_ptr<TcpTransport> pTcp) {
-  if (addr.empty()) {
-    return CloseNameServerTransport(pTcp);
-  }
-
-  bool bGetMutex = false;
-  boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
-  if (!lock.owns_lock()) {
-    if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
-      LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
-      return;
-    } else {
-      bGetMutex = true;
-    }
-  } else {
-    bGetMutex = true;
-  }
-  LOG_ERROR("CloseTransport of:%s", addr.c_str());
-  if (bGetMutex) {
-    bool removeItemFromTable = true;
-    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
-      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
-        LOG_INFO(
-            "tcpTransport with addr:%s has been closed before, and has been "
-            "created again, nothing to do",
-            addr.c_str());
-        removeItemFromTable = false;
-      }
-    } else {
-      LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable 
before", addr.c_str());
-      removeItemFromTable = false;
-    }
-
-    if (removeItemFromTable == true) {
-      LOG_WARN("closeTransport: disconnect broker:%s with state:%d", 
addr.c_str(),
-               m_tcpTable[addr]->getTcpConnectStatus());
-      if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess)
-        m_tcpTable[addr]->disconnect(addr);  // avoid coredump when connection 
with server was broken
-      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
-      m_tcpTable.erase(addr);
-    }
-  } else {
-    LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str());
-    return;
-  }
-  LOG_ERROR("CloseTransport of:%s end", addr.c_str());
-}
-
-void 
TcpRemotingClient::CloseNameServerTransport(boost::shared_ptr<TcpTransport> 
pTcp) {
-  bool bGetMutex = false;
-  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, 
boost::try_to_lock);
-  if (!lock.owns_lock()) {
-    if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
-      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
-      return;
-    } else {
-      bGetMutex = true;
-    }
-  } else {
-    bGetMutex = true;
-  }
-  if (bGetMutex) {
-    string addr = m_namesrvAddrChoosed;
-    bool removeItemFromTable = true;
-    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
-      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
-        LOG_INFO(
-            "tcpTransport with addr:%s has been closed before, and has been "
-            "created again, nothing to do",
-            addr.c_str());
-        removeItemFromTable = false;
-      }
-    } else {
-      LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable 
before", addr.c_str());
-      removeItemFromTable = false;
-    }
-
-    if (removeItemFromTable == true) {
-      m_tcpTable[addr]->disconnect(addr);  // avoid coredump when connection 
with server was broken
-      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
-      m_tcpTable.erase(addr);
-      m_namesrvAddrChoosed.clear();
-    }
-  } else {
-    LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s", 
m_namesrvAddrChoosed.c_str());
-    return;
-  }
-}
-
-bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts, 
RemotingCommand& msg) {
-  const MemoryBlock* phead = msg.GetHead();
-  const MemoryBlock* pbody = msg.GetBody();
-
-  unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024));
-  if (phead->getData()) {
-    result->write(phead->getData(), phead->getSize());
-  }
-  if (pbody->getData()) {
-    result->write(pbody->getData(), pbody->getSize());
-  }
-  const char* pData = static_cast<const char*>(result->getData());
-  int len = result->getDataSize();
-  return pTts->sendMessage(pData, len);
-}
-
-void TcpRemotingClient::static_messageReceived(void* context, const 
MemoryBlock& mem, const string& addr) {
-  TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context;
-  if (pTcpRemotingClient)
-    pTcpRemotingClient->messageReceived(mem, addr);
-}
-
-void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& 
addr) {
-  m_ioService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, 
addr));
-}
-
-void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& 
addr) {
-  RemotingCommand* pRespondCmd = NULL;
-  try {
-    pRespondCmd = RemotingCommand::Decode(mem);
-  } catch (...) {
-    LOG_ERROR("processData_error");
-    return;
-  }
-
-  int opaque = pRespondCmd->getOpaque();
-
-  //<!process self;
-  if (pRespondCmd->isResponseType()) {
-    boost::shared_ptr<ResponseFuture> 
pFuture(findAndDeleteAsyncResponseFuture(opaque));
-    if (!pFuture) {
-      pFuture = findAndDeleteResponseFuture(opaque);
-      if (pFuture) {
-        if (pFuture->getSyncResponseFlag()) {
-          LOG_WARN("waitResponse already timeout of opaque:%d", opaque);
-          deleteAndZero(pRespondCmd);
-          return;
-        }
-        LOG_DEBUG("find_response opaque:%d", opaque);
-      } else {
-        LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", 
opaque);
-        deleteAndZero(pRespondCmd);
-        return;
-      }
-    }
-    processResponseCommand(pRespondCmd, pFuture);
-  } else {
-    processRequestCommand(pRespondCmd, addr);
-  }
-}
-
-void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, 
boost::shared_ptr<ResponseFuture> pfuture) {
-  int code = pfuture->getRequestCode();
-  int opaque = pCmd->getOpaque();
-  LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d, 
retrySendTimes:%d", code, opaque,
-            pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes());
-  pCmd->SetExtHeader(code);  // set head , for response use
-
-  pfuture->setResponse(pCmd);
-
-  if (pfuture->getASyncFlag()) {
-    if (!pfuture->getAsyncResponseFlag()) {
-      pfuture->setAsyncResponseFlag();
-      pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
-      cancelTimerCallback(opaque);
-      pfuture->executeInvokeCallback();
-    }
-  }
-}
-
-void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const 
string& addr) {
-  unique_ptr<RemotingCommand> pRequestCommand(pCmd);
-  int requestCode = pRequestCommand->getCode();
-  if (m_requestTable.find(requestCode) == m_requestTable.end()) {
-    LOG_ERROR("can_not_find request:%d processor", requestCode);
-  } else {
-    unique_ptr<RemotingCommand> 
pResponse(m_requestTable[requestCode]->processRequest(addr, 
pRequestCommand.get()));
-    if (!pRequestCommand->isOnewayRPC()) {
-      if (pResponse) {
-        pResponse->setOpaque(pRequestCommand->getOpaque());
-        pResponse->markResponseType();
-        pResponse->Encode();
-
-        invokeOneway(addr, *pResponse);
-      }
-    }
-  }
-}
-
-void TcpRemotingClient::addResponseFuture(int opaque, 
boost::shared_ptr<ResponseFuture> pfuture) {
-  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
-  m_futureTable[opaque] = pfuture;
-}
-
-// Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will
-// be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture> 
TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
-  boost::shared_ptr<ResponseFuture> pResponseFuture;
-  if (m_futureTable.find(opaque) != m_futureTable.end()) {
-    pResponseFuture = m_futureTable[opaque];
-    m_futureTable.erase(opaque);
-  }
-  return pResponseFuture;
-}
-
-void TcpRemotingClient::handleAsyncPullForResponseTimeout(const 
boost::system::error_code& e, int opaque) {
-  if (e == boost::asio::error::operation_aborted) {
-    LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d, 
msg:%s", opaque, e.value(),
-             e.message().data());
-    return;
-  }
-
-  LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s", 
opaque, e.value(), e.message().data());
-  boost::shared_ptr<ResponseFuture> 
pFuture(findAndDeleteAsyncResponseFuture(opaque));
-  if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) 
{
-    if ((pFuture->getAsyncResponseFlag() != true))  // if no response 
received, then check timeout or not
-    {
-      LOG_ERROR("no response got for opaque:%d", opaque);
-      pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout);
-      pFuture->executeInvokeCallbackException();
-    }
-  }
-
-  eraseTimerCallback(opaque);
-}
-
-void TcpRemotingClient::addAsyncResponseFuture(int opaque, 
boost::shared_ptr<ResponseFuture> pfuture) {
-  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
-  m_asyncFutureTable[opaque] = pfuture;
-}
-
-// Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] 
will
-// be erased, so caller must ensure the life cycle of returned shared_ptr;
-boost::shared_ptr<ResponseFuture> 
TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
-  boost::shared_ptr<ResponseFuture> pResponseFuture;
-  if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) {
-    pResponseFuture = m_asyncFutureTable[opaque];
-    m_asyncFutureTable.erase(opaque);
-  }
-
-  return pResponseFuture;
-}
-
-void TcpRemotingClient::registerProcessor(MQRequestCode requestCode, 
ClientRemotingProcessor* clientRemotingProcessor) {
-  if (m_requestTable.find(requestCode) != m_requestTable.end())
-    m_requestTable.erase(requestCode);
-  m_requestTable[requestCode] = clientRemotingProcessor;
-}
-
-void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int 
opaque) {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-    LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque);
-    boost::asio::deadline_timer* old_t = m_async_timer_map[opaque];
-    old_t->cancel();
-    delete old_t;
-    old_t = NULL;
-    m_async_timer_map.erase(opaque);
-  }
-  m_async_timer_map[opaque] = t;
-}
-
-void TcpRemotingClient::eraseTimerCallback(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-    LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
-    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
-    delete t;
-    t = NULL;
-    m_async_timer_map.erase(opaque);
-  }
-}
-
-void TcpRemotingClient::cancelTimerCallback(int opaque) {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
-    LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque);
-    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
-    t->cancel();
-    delete t;
-    t = NULL;
-    m_async_timer_map.erase(opaque);
-  }
-}
-
-void TcpRemotingClient::removeAllTimerCallback() {
-  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
-  for (asyncTimerMap::iterator it = m_async_timer_map.begin(); it != 
m_async_timer_map.end(); ++it) {
-    boost::asio::deadline_timer* t = it->second;
-    t->cancel();
-    delete t;
-    t = NULL;
-  }
-  m_async_timer_map.clear();
-}
-
-void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& 
mq, int opaque) {
-  // delete the map record of opaque<->ResponseFuture, so the answer for the 
pull request will discard when receive it
-  // later
-  boost::shared_ptr<ResponseFuture> 
pFuture(findAndDeleteAsyncResponseFuture(opaque));
-  if (!pFuture) {
-    pFuture = findAndDeleteResponseFuture(opaque);
-    if (pFuture) {
-      LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", 
opaque, mq.toString().data());
-    }
-  } else {
-    LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", 
opaque, mq.toString().data());
-  }
-  // delete the timeout timer for opaque for pullrequest
-  cancelTimerCallback(opaque);
-}
-
-//<!************************************************************************
-}  //<!end namespace;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+#include "TcpRemotingClient.h"
+#include <stddef.h>
+#if !defined(WIN32) && !defined(__APPLE__)
+#include <sys/prctl.h>
+#endif
+#include "Logging.h"
+#include "MemoryOutputStream.h"
+#include "TopAddressing.h"
+#include "UtilAll.h"
+
+namespace rocketmq {
+
+//<!************************************************************************
+TcpRemotingClient::TcpRemotingClient(int pullThreadNum, uint64_t 
tcpConnectTimeout, uint64_t tcpTransportTryLockTimeout)
+    : m_pullThreadNum(pullThreadNum),
+      m_tcpConnectTimeout(tcpConnectTimeout),
+      m_tcpTransportTryLockTimeout(tcpTransportTryLockTimeout),
+      m_namesrvIndex(0),
+      m_ioServiceWork(m_ioService) {
+#if !defined(WIN32) && !defined(__APPLE__)
+  string taskName = UtilAll::getProcessName();
+  prctl(PR_SET_NAME, "networkTP", 0, 0, 0);
+#endif
+  for (int i = 0; i != pullThreadNum; ++i) {
+    m_threadpool.create_thread(boost::bind(&boost::asio::io_service::run, 
&m_ioService));
+  }
+#if !defined(WIN32) && !defined(__APPLE__)
+  prctl(PR_SET_NAME, taskName.c_str(), 0, 0, 0);
+#endif
+  LOG_INFO(
+      "m_tcpConnectTimeout:%ju, m_tcpTransportTryLockTimeout:%ju, "
+      "m_pullThreadNum:%d",
+      m_tcpConnectTimeout, m_tcpTransportTryLockTimeout, m_pullThreadNum);
+  m_async_service_thread.reset(new 
boost::thread(boost::bind(&TcpRemotingClient::boost_asio_work, this)));
+}
+
+void TcpRemotingClient::boost_asio_work() {
+  LOG_INFO("TcpRemotingClient::boost asio async service runing");
+  boost::asio::io_service::work work(m_async_ioService);  // avoid async io
+                                                          // service stops 
after
+                                                          // first timer 
timeout
+                                                          // callback
+  m_async_ioService.run();
+}
+
+TcpRemotingClient::~TcpRemotingClient() {
+  m_tcpTable.clear();
+  m_futureTable.clear();
+  m_asyncFutureTable.clear();
+  m_namesrvAddrList.clear();
+  removeAllTimerCallback();
+}
+
+void TcpRemotingClient::stopAllTcpTransportThread() {
+  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread Begin");
+  m_async_ioService.stop();
+  m_async_service_thread->interrupt();
+  m_async_service_thread->join();
+  removeAllTimerCallback();
+
+  {
+    TcpMap::iterator it = m_tcpTable.begin();
+    for (; it != m_tcpTable.end(); ++it) {
+      it->second->disconnect(it->first);
+    }
+    m_tcpTable.clear();
+  }
+
+  m_ioService.stop();
+  m_threadpool.join_all();
+
+  {
+    boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+    for (ResMap::iterator it = m_futureTable.begin(); it != 
m_futureTable.end(); ++it) {
+      if (it->second)
+        it->second->releaseThreadCondition();
+    }
+  }
+  LOG_DEBUG("TcpRemotingClient::stopAllTcpTransportThread End");
+}
+
+void TcpRemotingClient::updateNameServerAddressList(const string& addrs) {
+  LOG_INFO("updateNameServerAddressList: [%s]", addrs.c_str());
+  if (!addrs.empty()) {
+    boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, 
boost::try_to_lock);
+    if (!lock.owns_lock()) {
+      if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(10))) {
+        LOG_ERROR("updateNameServerAddressList get timed_mutex timeout");
+        return;
+      }
+    }
+    // clear first;
+    m_namesrvAddrList.clear();
+
+    vector<string> out;
+    UtilAll::Split(out, addrs, ";");
+    for (size_t i = 0; i < out.size(); i++) {
+      string addr = out[i];
+      UtilAll::Trim(addr);
+
+      string hostName;
+      short portNumber;
+      if (UtilAll::SplitURL(addr, hostName, portNumber)) {
+        LOG_INFO("update Namesrv:%s", addr.c_str());
+        m_namesrvAddrList.push_back(addr);
+      } else {
+        LOG_INFO("This may be invalid namer server: [%s]", addr.c_str());
+      }
+    }
+    out.clear();
+  }
+}
+
+bool TcpRemotingClient::invokeHeartBeat(const string& addr, RemotingCommand& 
request) {
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != NULL) {
+    int code = request.getCode();
+    int opaque = request.getOpaque();
+    boost::shared_ptr<ResponseFuture> responseFuture(new ResponseFuture(code, 
opaque, this, 3000, false, NULL));
+    addResponseFuture(opaque, responseFuture);
+    // LOG_INFO("invokeHeartbeat success, addr:%s, code:%d, opaque:%d,
+    // timeoutms:%d", addr.c_str(), code, opaque, 3000);
+
+    if (SendCommand(pTcp, request)) {
+      responseFuture->setSendRequestOK(true);
+      unique_ptr<RemotingCommand> pRsp(responseFuture->waitResponse(3000));
+      if (pRsp == NULL) {
+        LOG_ERROR("wait response timeout of heartbeat, so closeTransport of 
addr:%s", addr.c_str());
+        CloseTransport(addr, pTcp);
+        return false;
+      } else if (pRsp->getCode() == SUCCESS_VALUE) {
+        return true;
+      } else {
+        LOG_WARN("get error response:%d of heartbeat to addr:%s", 
pRsp->getCode(), addr.c_str());
+        return false;
+      }
+    } else {
+      CloseTransport(addr, pTcp);
+    }
+  }
+  return false;
+}
+
+RemotingCommand* TcpRemotingClient::invokeSync(const string& addr,
+                                               RemotingCommand& request,
+                                               int timeoutMillis /* = 3000 */) 
{
+  LOG_DEBUG("InvokeSync:", addr.c_str());
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != NULL) {
+    int code = request.getCode();
+    int opaque = request.getOpaque();
+    boost::shared_ptr<ResponseFuture> responseFuture(
+        new ResponseFuture(code, opaque, this, timeoutMillis, false, NULL));
+    addResponseFuture(opaque, responseFuture);
+
+    if (SendCommand(pTcp, request)) {
+      // LOG_INFO("invokeSync success, addr:%s, code:%d, opaque:%d,
+      // timeoutms:%d", addr.c_str(), code, opaque, timeoutMillis);
+      responseFuture->setSendRequestOK(true);
+      RemotingCommand* pRsp = responseFuture->waitResponse(timeoutMillis);
+      if (pRsp == NULL) {
+        if (code != GET_CONSUMER_LIST_BY_GROUP) {
+          LOG_WARN(
+              "wait response timeout or get NULL response of code:%d, so "
+              "closeTransport of addr:%s",
+              code, addr.c_str());
+          CloseTransport(addr, pTcp);
+        }
+        // avoid responseFuture leak;
+        findAndDeleteResponseFuture(opaque);
+        return NULL;
+      } else {
+        return pRsp;
+      }
+    } else {
+      // avoid responseFuture leak;
+      findAndDeleteResponseFuture(opaque);
+      CloseTransport(addr, pTcp);
+    }
+  }
+  LOG_DEBUG("InvokeSync [%s] Failed: Cannot Get Transport.", addr.c_str());
+  return NULL;
+}
+
+bool TcpRemotingClient::invokeAsync(const string& addr,
+                                    RemotingCommand& request,
+                                    AsyncCallbackWrap* cbw,
+                                    int64 timeoutMilliseconds,
+                                    int maxRetrySendTimes,
+                                    int retrySendTimes) {
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != NULL) {
+    //<!not delete, for callback to delete;
+    int code = request.getCode();
+    int opaque = request.getOpaque();
+    boost::shared_ptr<ResponseFuture> responseFuture(
+        new ResponseFuture(code, opaque, this, timeoutMilliseconds, true, 
cbw));
+    responseFuture->setMaxRetrySendTimes(maxRetrySendTimes);
+    responseFuture->setRetrySendTimes(retrySendTimes);
+    responseFuture->setBrokerAddr(addr);
+    responseFuture->setRequestCommand(request);
+    addAsyncResponseFuture(opaque, responseFuture);
+    if (cbw) {
+      boost::asio::deadline_timer* t =
+          new boost::asio::deadline_timer(m_async_ioService, 
boost::posix_time::milliseconds(timeoutMilliseconds));
+      addTimerCallback(t, opaque);
+      boost::system::error_code e;
+      
t->async_wait(boost::bind(&TcpRemotingClient::handleAsyncPullForResponseTimeout,
 this, e, opaque));
+    }
+
+    if (SendCommand(pTcp, request))  // Even if send failed, asyncTimerThread
+                                     // will trigger next pull request or 
report
+                                     // send msg failed
+    {
+      LOG_DEBUG("invokeAsync success, addr:%s, code:%d, opaque:%d", 
addr.c_str(), code, opaque);
+      responseFuture->setSendRequestOK(true);
+    }
+    return true;
+  }
+  LOG_ERROR("invokeAsync failed of addr:%s", addr.c_str());
+  return false;
+}
+
+void TcpRemotingClient::invokeOneway(const string& addr, RemotingCommand& 
request) {
+  //<!not need callback;
+  boost::shared_ptr<TcpTransport> pTcp = GetTransport(addr, true);
+  if (pTcp != NULL) {
+    request.markOnewayRPC();
+    LOG_DEBUG("invokeOneway success, addr:%s, code:%d", addr.c_str(), 
request.getCode());
+    SendCommand(pTcp, request);
+  }
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::GetTransport(const string& 
addr, bool needRespons) {
+  if (addr.empty()) {
+    LOG_DEBUG("GetTransport of NameServer");
+    return CreateNameserverTransport(needRespons);
+  }
+  return CreateTransport(addr, needRespons);
+}
+
+boost::shared_ptr<TcpTransport> TcpRemotingClient::CreateTransport(const 
string& addr, bool needRespons) {
+  boost::shared_ptr<TcpTransport> tts;
+  {
+    // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking
+    // long
+    // time, if could not get m_tcpLock, return NULL
+    bool bGetMutex = false;
+    boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+    if (!lock.owns_lock()) {
+      if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+        LOG_ERROR("GetTransport of:%s get timed_mutex timeout", addr.c_str());
+        boost::shared_ptr<TcpTransport> pTcp;
+        return pTcp;
+      } else {
+        bGetMutex = true;
+      }
+    } else {
+      bGetMutex = true;
+    }
+    if (bGetMutex) {
+      if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+        boost::weak_ptr<TcpTransport> weakPtcp(m_tcpTable[addr]);
+        boost::shared_ptr<TcpTransport> tcp = weakPtcp.lock();
+        if (tcp) {
+          tcpConnectStatus connectStatus = tcp->getTcpConnectStatus();
+          if (connectStatus == e_connectWaitResponse) {
+            boost::shared_ptr<TcpTransport> pTcp;
+            return pTcp;
+          } else if (connectStatus == e_connectFail) {
+            LOG_ERROR("tcpTransport with server disconnected, erase 
server:%s", addr.c_str());
+            tcp->disconnect(addr);  // avoid coredump when connection with 
broker was broken
+            m_tcpTable.erase(addr);
+          } else if (connectStatus == e_connectSuccess) {
+            return tcp;
+          } else {
+            LOG_ERROR(
+                "go to fault state, erase:%s from tcpMap, and reconnect "
+                "it",
+                addr.c_str());
+            m_tcpTable.erase(addr);
+          }
+        }
+      }
+
+      //<!callback;
+      READ_CALLBACK callback = needRespons ? 
&TcpRemotingClient::static_messageReceived : NULL;
+
+      tts.reset(new TcpTransport(this, callback));
+      tcpConnectStatus connectStatus = tts->connect(addr, m_tcpConnectTimeout);
+      if (connectStatus != e_connectWaitResponse) {
+        LOG_WARN("can not connect to :%s", addr.c_str());
+        tts->disconnect(addr);
+        boost::shared_ptr<TcpTransport> pTcp;
+        return pTcp;
+      } else {
+        m_tcpTable[addr] = tts;  // even if connecting failed finally, this
+                                 // server transport will be erased by next
+                                 // CreateTransport
+      }
+    } else {
+      LOG_WARN("get tcpTransport mutex failed :%s", addr.c_str());
+      boost::shared_ptr<TcpTransport> pTcp;
+      return pTcp;
+    }
+  }
+
+  tcpConnectStatus connectStatus = 
tts->waitTcpConnectEvent(m_tcpConnectTimeout);
+  if (connectStatus != e_connectSuccess) {
+    LOG_WARN("can not connect to server:%s", addr.c_str());
+    tts->disconnect(addr);
+    boost::shared_ptr<TcpTransport> pTcp;
+    return pTcp;
+  } else {
+    LOG_INFO("connect server with addr:%s success", addr.c_str());
+    return tts;
+  }
+}
+
+boost::shared_ptr<TcpTransport> 
TcpRemotingClient::CreateNameserverTransport(bool needRespons) {
+  // m_namesrvLock was added to avoid operation of nameServer was blocked by
+  // m_tcpLock, it was used by single Thread mostly, so no performance impact
+  // try get m_tcpLock util m_tcpTransportTryLockTimeout to avoid blocking long
+  // time, if could not get m_namesrvlock, return NULL
+  LOG_DEBUG("--CreateNameserverTransport--");
+  bool bGetMutex = false;
+  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, 
boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
+      boost::shared_ptr<TcpTransport> pTcp;
+      return pTcp;
+    } else {
+      bGetMutex = true;
+    }
+  } else {
+    bGetMutex = true;
+  }
+
+  if (bGetMutex) {
+    if (!m_namesrvAddrChoosed.empty()) {
+      boost::shared_ptr<TcpTransport> pTcp = 
GetTransport(m_namesrvAddrChoosed, true);
+      if (pTcp)
+        return pTcp;
+      else
+        m_namesrvAddrChoosed.clear();
+    }
+
+    vector<string>::iterator itp = m_namesrvAddrList.begin();
+    for (; itp != m_namesrvAddrList.end(); ++itp) {
+      unsigned int index = m_namesrvIndex % m_namesrvAddrList.size();
+      if (m_namesrvIndex == numeric_limits<unsigned int>::max())
+        m_namesrvIndex = 0;
+      m_namesrvIndex++;
+      LOG_INFO("namesrvIndex is:%d, index:%d, namesrvaddrlist size:" SIZET_FMT 
"", m_namesrvIndex, index,
+               m_namesrvAddrList.size());
+      boost::shared_ptr<TcpTransport> pTcp = 
GetTransport(m_namesrvAddrList[index], true);
+      if (pTcp) {
+        m_namesrvAddrChoosed = m_namesrvAddrList[index];
+        return pTcp;
+      }
+    }
+    boost::shared_ptr<TcpTransport> pTcp;
+    return pTcp;
+  } else {
+    LOG_WARN("get nameServer tcpTransport mutex failed");
+    boost::shared_ptr<TcpTransport> pTcp;
+    return pTcp;
+  }
+}
+
+void TcpRemotingClient::CloseTransport(const string& addr, 
boost::shared_ptr<TcpTransport> pTcp) {
+  if (addr.empty()) {
+    return CloseNameServerTransport(pTcp);
+  }
+
+  bool bGetMutex = false;
+  boost::unique_lock<boost::timed_mutex> lock(m_tcpLock, boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CloseTransport of:%s get timed_mutex timeout", addr.c_str());
+      return;
+    } else {
+      bGetMutex = true;
+    }
+  } else {
+    bGetMutex = true;
+  }
+  LOG_ERROR("CloseTransport of:%s", addr.c_str());
+  if (bGetMutex) {
+    bool removeItemFromTable = true;
+    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+        LOG_INFO(
+            "tcpTransport with addr:%s has been closed before, and has been "
+            "created again, nothing to do",
+            addr.c_str());
+        removeItemFromTable = false;
+      }
+    } else {
+      LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable 
before", addr.c_str());
+      removeItemFromTable = false;
+    }
+
+    if (removeItemFromTable == true) {
+      LOG_WARN("closeTransport: disconnect broker:%s with state:%d", 
addr.c_str(),
+               m_tcpTable[addr]->getTcpConnectStatus());
+      if (m_tcpTable[addr]->getTcpConnectStatus() == e_connectSuccess)
+        m_tcpTable[addr]->disconnect(addr);  // avoid coredump when connection 
with server was broken
+      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+      m_tcpTable.erase(addr);
+    }
+  } else {
+    LOG_WARN("CloseTransport::get tcpTransport mutex failed:%s", addr.c_str());
+    return;
+  }
+  LOG_ERROR("CloseTransport of:%s end", addr.c_str());
+}
+
+void 
TcpRemotingClient::CloseNameServerTransport(boost::shared_ptr<TcpTransport> 
pTcp) {
+  bool bGetMutex = false;
+  boost::unique_lock<boost::timed_mutex> lock(m_namesrvlock, 
boost::try_to_lock);
+  if (!lock.owns_lock()) {
+    if (!lock.timed_lock(boost::get_system_time() + 
boost::posix_time::seconds(m_tcpTransportTryLockTimeout))) {
+      LOG_ERROR("CreateNameserverTransport get timed_mutex timeout");
+      return;
+    } else {
+      bGetMutex = true;
+    }
+  } else {
+    bGetMutex = true;
+  }
+  if (bGetMutex) {
+    string addr = m_namesrvAddrChoosed;
+    bool removeItemFromTable = true;
+    if (m_tcpTable.find(addr) != m_tcpTable.end()) {
+      if (m_tcpTable[addr]->getStartTime() != pTcp->getStartTime()) {
+        LOG_INFO(
+            "tcpTransport with addr:%s has been closed before, and has been "
+            "created again, nothing to do",
+            addr.c_str());
+        removeItemFromTable = false;
+      }
+    } else {
+      LOG_INFO("tcpTransport with addr:%s had been removed from tcpTable 
before", addr.c_str());
+      removeItemFromTable = false;
+    }
+
+    if (removeItemFromTable == true) {
+      m_tcpTable[addr]->disconnect(addr);  // avoid coredump when connection 
with server was broken
+      LOG_WARN("closeTransport: erase broker: %s", addr.c_str());
+      m_tcpTable.erase(addr);
+      m_namesrvAddrChoosed.clear();
+    }
+  } else {
+    LOG_WARN("CloseNameServerTransport::get tcpTransport mutex failed:%s", 
m_namesrvAddrChoosed.c_str());
+    return;
+  }
+}
+
+bool TcpRemotingClient::SendCommand(boost::shared_ptr<TcpTransport> pTts, 
RemotingCommand& msg) {
+  const MemoryBlock* phead = msg.GetHead();
+  const MemoryBlock* pbody = msg.GetBody();
+
+  unique_ptr<MemoryOutputStream> result(new MemoryOutputStream(1024));
+  if (phead->getData()) {
+    result->write(phead->getData(), phead->getSize());
+  }
+  if (pbody->getData()) {
+    result->write(pbody->getData(), pbody->getSize());
+  }
+  const char* pData = static_cast<const char*>(result->getData());
+  int len = result->getDataSize();
+  return pTts->sendMessage(pData, len);
+}
+
+void TcpRemotingClient::static_messageReceived(void* context, const 
MemoryBlock& mem, const string& addr) {
+  TcpRemotingClient* pTcpRemotingClient = (TcpRemotingClient*)context;
+  if (pTcpRemotingClient)
+    pTcpRemotingClient->messageReceived(mem, addr);
+}
+
+void TcpRemotingClient::messageReceived(const MemoryBlock& mem, const string& 
addr) {
+  m_ioService.post(boost::bind(&TcpRemotingClient::ProcessData, this, mem, 
addr));
+}
+
+void TcpRemotingClient::ProcessData(const MemoryBlock& mem, const string& 
addr) {
+  RemotingCommand* pRespondCmd = NULL;
+  try {
+    pRespondCmd = RemotingCommand::Decode(mem);
+  } catch (...) {
+    LOG_ERROR("processData_error");
+    return;
+  }
+
+  int opaque = pRespondCmd->getOpaque();
+
+  //<!process self;
+  if (pRespondCmd->isResponseType()) {
+    boost::shared_ptr<ResponseFuture> 
pFuture(findAndDeleteAsyncResponseFuture(opaque));
+    if (!pFuture) {
+      pFuture = findAndDeleteResponseFuture(opaque);
+      if (pFuture) {
+        if (pFuture->getSyncResponseFlag()) {
+          LOG_WARN("waitResponse already timeout of opaque:%d", opaque);
+          deleteAndZero(pRespondCmd);
+          return;
+        }
+        LOG_DEBUG("find_response opaque:%d", opaque);
+      } else {
+        LOG_DEBUG("responseFuture was deleted by timeout of opaque:%d", 
opaque);
+        deleteAndZero(pRespondCmd);
+        return;
+      }
+    }
+    processResponseCommand(pRespondCmd, pFuture);
+  } else {
+    processRequestCommand(pRespondCmd, addr);
+  }
+}
+
+void TcpRemotingClient::processResponseCommand(RemotingCommand* pCmd, 
boost::shared_ptr<ResponseFuture> pfuture) {
+  int code = pfuture->getRequestCode();
+  int opaque = pCmd->getOpaque();
+  LOG_DEBUG("processResponseCommand, code:%d,opaque:%d, maxRetryTimes:%d, 
retrySendTimes:%d", code, opaque,
+            pfuture->getMaxRetrySendTimes(), pfuture->getRetrySendTimes());
+  pCmd->SetExtHeader(code);  // set head , for response use
+
+  pfuture->setResponse(pCmd);
+
+  if (pfuture->getASyncFlag()) {
+    if (!pfuture->getAsyncResponseFlag()) {
+      pfuture->setAsyncResponseFlag();
+      pfuture->setAsyncCallBackStatus(asyncCallBackStatus_response);
+      cancelTimerCallback(opaque);
+      pfuture->executeInvokeCallback();
+    }
+  }
+}
+
+void TcpRemotingClient::processRequestCommand(RemotingCommand* pCmd, const 
string& addr) {
+  unique_ptr<RemotingCommand> pRequestCommand(pCmd);
+  int requestCode = pRequestCommand->getCode();
+  if (m_requestTable.find(requestCode) == m_requestTable.end()) {
+    LOG_ERROR("can_not_find request:%d processor", requestCode);
+  } else {
+    unique_ptr<RemotingCommand> 
pResponse(m_requestTable[requestCode]->processRequest(addr, 
pRequestCommand.get()));
+    if (!pRequestCommand->isOnewayRPC()) {
+      if (pResponse) {
+        pResponse->setOpaque(pRequestCommand->getOpaque());
+        pResponse->markResponseType();
+        pResponse->Encode();
+
+        invokeOneway(addr, *pResponse);
+      }
+    }
+  }
+}
+
+void TcpRemotingClient::addResponseFuture(int opaque, 
boost::shared_ptr<ResponseFuture> pfuture) {
+  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+  m_futureTable[opaque] = pfuture;
+}
+
+// Note: after call this function, shared_ptr of m_syncFutureTable[opaque] will
+// be erased, so caller must ensure the life cycle of returned shared_ptr;
+boost::shared_ptr<ResponseFuture> 
TcpRemotingClient::findAndDeleteResponseFuture(int opaque) {
+  boost::lock_guard<boost::mutex> lock(m_futureTableMutex);
+  boost::shared_ptr<ResponseFuture> pResponseFuture;
+  if (m_futureTable.find(opaque) != m_futureTable.end()) {
+    pResponseFuture = m_futureTable[opaque];
+    m_futureTable.erase(opaque);
+  }
+  return pResponseFuture;
+}
+
+void TcpRemotingClient::handleAsyncPullForResponseTimeout(const 
boost::system::error_code& e, int opaque) {
+  if (e == boost::asio::error::operation_aborted) {
+    LOG_INFO("handleAsyncPullForResponseTimeout aborted opaque:%d, e_code:%d, 
msg:%s", opaque, e.value(),
+             e.message().data());
+    return;
+  }
+
+  LOG_DEBUG("handleAsyncPullForResponseTimeout opaque:%d, e_code:%d, msg:%s", 
opaque, e.value(), e.message().data());
+  boost::shared_ptr<ResponseFuture> 
pFuture(findAndDeleteAsyncResponseFuture(opaque));
+  if (pFuture && pFuture->getASyncFlag() && (pFuture->getAsyncCallbackWrap())) 
{
+    if ((pFuture->getAsyncResponseFlag() != true))  // if no response 
received, then check timeout or not
+    {
+      LOG_ERROR("no response got for opaque:%d", opaque);
+      pFuture->setAsyncCallBackStatus(asyncCallBackStatus_timeout);
+      pFuture->executeInvokeCallbackException();
+    }
+  }
+
+  eraseTimerCallback(opaque);
+}
+
+void TcpRemotingClient::addAsyncResponseFuture(int opaque, 
boost::shared_ptr<ResponseFuture> pfuture) {
+  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
+  m_asyncFutureTable[opaque] = pfuture;
+}
+
+// Note: after call this function, shared_ptr of m_asyncFutureTable[opaque] 
will
+// be erased, so caller must ensure the life cycle of returned shared_ptr;
+boost::shared_ptr<ResponseFuture> 
TcpRemotingClient::findAndDeleteAsyncResponseFuture(int opaque) {
+  boost::lock_guard<boost::mutex> lock(m_asyncFutureLock);
+  boost::shared_ptr<ResponseFuture> pResponseFuture;
+  if (m_asyncFutureTable.find(opaque) != m_asyncFutureTable.end()) {
+    pResponseFuture = m_asyncFutureTable[opaque];
+    m_asyncFutureTable.erase(opaque);
+  }
+
+  return pResponseFuture;
+}
+
+void TcpRemotingClient::registerProcessor(MQRequestCode requestCode, 
ClientRemotingProcessor* clientRemotingProcessor) {
+  if (m_requestTable.find(requestCode) != m_requestTable.end())
+    m_requestTable.erase(requestCode);
+  m_requestTable[requestCode] = clientRemotingProcessor;
+}
+
+void TcpRemotingClient::addTimerCallback(boost::asio::deadline_timer* t, int 
opaque) {
+  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+    LOG_DEBUG("addTimerCallback:erase timerCallback opaque:%lld", opaque);
+    boost::asio::deadline_timer* old_t = m_async_timer_map[opaque];
+    old_t->cancel();
+    delete old_t;
+    old_t = NULL;
+    m_async_timer_map.erase(opaque);
+  }
+  m_async_timer_map[opaque] = t;
+}
+
+void TcpRemotingClient::eraseTimerCallback(int opaque) {
+  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+    LOG_DEBUG("eraseTimerCallback: opaque:%lld", opaque);
+    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
+    delete t;
+    t = NULL;
+    m_async_timer_map.erase(opaque);
+  }
+}
+
+void TcpRemotingClient::cancelTimerCallback(int opaque) {
+  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+  if (m_async_timer_map.find(opaque) != m_async_timer_map.end()) {
+    LOG_DEBUG("cancelTimerCallback: opaque:%lld", opaque);
+    boost::asio::deadline_timer* t = m_async_timer_map[opaque];
+    t->cancel();
+    delete t;
+    t = NULL;
+    m_async_timer_map.erase(opaque);
+  }
+}
+
+void TcpRemotingClient::removeAllTimerCallback() {
+  boost::lock_guard<boost::mutex> lock(m_timerMapMutex);
+  for (asyncTimerMap::iterator it = m_async_timer_map.begin(); it != 
m_async_timer_map.end(); ++it) {
+    boost::asio::deadline_timer* t = it->second;
+    t->cancel();
+    delete t;
+    t = NULL;
+  }
+  m_async_timer_map.clear();
+}
+
+void TcpRemotingClient::deleteOpaqueForDropPullRequest(const MQMessageQueue& 
mq, int opaque) {
+  // delete the map record of opaque<->ResponseFuture, so the answer for the 
pull request will discard when receive it
+  // later
+  boost::shared_ptr<ResponseFuture> 
pFuture(findAndDeleteAsyncResponseFuture(opaque));
+  if (!pFuture) {
+    pFuture = findAndDeleteResponseFuture(opaque);
+    if (pFuture) {
+      LOG_DEBUG("succ deleted the sync pullrequest for opaque:%d, mq:%s", 
opaque, mq.toString().data());
+    }
+  } else {
+    LOG_DEBUG("succ deleted the async pullrequest for opaque:%d, mq:%s", 
opaque, mq.toString().data());
+  }
+  // delete the timeout timer for opaque for pullrequest
+  cancelTimerCallback(opaque);
+}
+
+//<!************************************************************************
+}  // namespace rocketmq
diff --git a/src/transport/TcpTransport.cpp b/src/transport/TcpTransport.cpp
index 7ecf329..701cd8a 100644
--- a/src/transport/TcpTransport.cpp
+++ b/src/transport/TcpTransport.cpp
@@ -53,7 +53,9 @@ TcpTransport::~TcpTransport() {
 tcpConnectStatus TcpTransport::connect(const string& strServerURL, int 
timeOutMillisecs /* = 3000 */) {
   string hostName;
   short portNumber;
+  LOG_DEBUG("connect to [%s].", strServerURL.c_str());
   if (!UtilAll::SplitURL(strServerURL, hostName, portNumber)) {
+    LOG_INFO("connect to [%s] failed, Invalid url.", strServerURL.c_str());
     return e_connectFail;
   }
 

Reply via email to