This is an automated email from the ASF dual-hosted git repository.

gosonzhang pushed a commit to branch tubemq-client-cpp
in repository https://gitbox.apache.org/repos/asf/incubator-tubemq.git


The following commit(s) were added to refs/heads/tubemq-client-cpp by this push:
     new 2e2b007  [TUBEMQ-275]Thread Pool & Timer (#205)
2e2b007 is described below

commit 2e2b0073a3d7323494f6c2ce4f40b61135f2ce68
Author: charlely <41224243+charl...@users.noreply.github.com>
AuthorDate: Tue Jul 14 15:33:09 2020 +0800

    [TUBEMQ-275]Thread Pool & Timer (#205)
    
    Co-authored-by: charleli <charl...@tencent.com>
---
 .../tubemq-client-cpp/CMakeLists.txt               | 33 ++++----
 tubemq-client-twins/tubemq-client-cpp/README.md    | 33 ++++++++
 .../tubemq-client-cpp/example/CMakeLists.txt       |  3 +-
 .../{src => example/executor_pool}/CMakeLists.txt  | 13 +---
 .../singleton.h => example/executor_pool/main.cc}  | 63 ++++++++--------
 .../include/tubemq/executor_pool.h                 | 88 ++++++++++++++++++++++
 .../tubemq-client-cpp/include/tubemq/singleton.h   |  5 +-
 .../tubemq-client-cpp/src/CMakeLists.txt           |  3 -
 .../tubemq-client-cpp/src/executor_pool.cc         | 87 +++++++++++++++++++++
 .../tubemq-client-cpp/third_party/README.md        | 23 ++++++
 .../tubemq-client-cpp/third_party/readme.md        |  2 -
 11 files changed, 288 insertions(+), 65 deletions(-)

diff --git a/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt 
b/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt
index db9014a..984a28a 100644
--- a/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/CMakeLists.txt
@@ -22,6 +22,26 @@ cmake_minimum_required (VERSION 3.1)
 
 project (TubeMQ)
 
+set (CXX_FLAGS
+    -g
+    -fPIC
+    -Wall
+    -D__STDC_FORMAT_MACROS
+    -Wno-unused-parameter
+    -Wno-unused-function
+    -Wunused-variable
+    -Wunused-value
+    -Wshadow
+    -Wcast-qual
+    -Wcast-align
+    -Wwrite-strings
+    -Wsign-compare
+    -Winvalid-pch
+    -fms-extensions
+    -Wfloat-equal
+    -Wextra
+    -std=c++11
+    )
 
 INCLUDE_DIRECTORIES(include)
 
@@ -35,16 +55,3 @@ ADD_SUBDIRECTORY(third_party)
 ADD_SUBDIRECTORY(example)
 
 
-if (UNIX)
-    SET(CMAKE_CXX_FLAGS_DEBUG   "-O1 -g -ggdb -D_DEBUG")
-    SET(CMAKE_CXX_FLAGS_RELEASE "-O3 -g -ggdb -DNDEBUG")
-    SET(DEPENDENT_LIBRARIES log4cplus pthread)
-else (UNIX)
-    SET(DEPENDENT_LIBRARIES log4cplus)
-endif (UNIX)
-
-
-
-
-
-
diff --git a/tubemq-client-twins/tubemq-client-cpp/README.md 
b/tubemq-client-twins/tubemq-client-cpp/README.md
new file mode 100644
index 0000000..ecf0f6f
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/README.md
@@ -0,0 +1,33 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+
+# TubeMQ C++ client library
+## Requirements
+
+ * CMake
+ * [ASIO](https://github.com/chriskohlhoff/asio.git)
+ * [OpenSSL](https://github.com/openssl/openssl.git)
+ * [Protocol Buffer](https://developers.google.com/protocol-buffers/)
+ * [Log4cplus](https://github.com/log4cplus/log4cplus.git)
+ * [Rapidjson](https://github.com/Tencent/rapidjson.git)
+
+
diff --git a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt 
b/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
index 1d8fcdd..9d729c2 100644
--- a/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/example/CMakeLists.txt
@@ -26,7 +26,8 @@ function(tubemq_add_example _name)
   set(_srcs ${ARGN})
   message (STATUS "${_name} sources: ${_srcs}")
   add_executable (${_name} ${_srcs})
-  TARGET_LINK_LIBRARIES (${_name} tubemq log4cplus pthread)
+  TARGET_LINK_LIBRARIES (${_name} tubemq ssl crypto log4cplus pthread)
 endfunction()
 
 add_subdirectory (log)
+add_subdirectory (executor_pool)
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt 
b/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt
similarity index 67%
copy from tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
copy to 
tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt
index a6e76e5..bef5bbb 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/CMakeLists.txt
@@ -18,15 +18,4 @@
 #
 
 
-cmake_minimum_required (VERSION 3.1)
-
-
-set(CMAKE_C_FLAGS "-O2 -g -Wall -Werror -Wsign-compare -fno-strict-aliasing 
-fPIC")
-set(CMAKE_CXX_FLAGS "-std=c++11 -O2 -g -Wall -Werror -Wsign-compare 
-fno-strict-aliasing -fPIC")
-
-AUX_SOURCE_DIRECTORY(. CURRENT_DIR_SRCS)                                       
 
-ADD_LIBRARY(tubemq STATIC ${CURRENT_DIR_SRCS})   
-TARGET_LINK_LIBRARIES (tubemq)
-
-
-
+tubemq_add_example(executor_pool main.cc)
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h 
b/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc
similarity index 50%
copy from tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h
copy to tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc
index fb22587..8c5ca25 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h
+++ b/tubemq-client-twins/tubemq-client-cpp/example/executor_pool/main.cc
@@ -17,42 +17,39 @@
  * under the License.
  */
 
-#ifndef _TUBEMQ_SINGLETON_H
-#define _TUBEMQ_SINGLETON_H
-
-#include <assert.h>
-#include <stdlib.h>
-
-#include <mutex>
+#include <chrono>
+#include <exception>
+#include <functional>
+#include <iostream>
+#include <string>
 #include <thread>
 
-#include "tubemq/noncopyable.h"
+#include "tubemq/executor_pool.h"
 
-namespace tubemq {
+using namespace std;
+using namespace tubemq;
 
-template <typename T>
-class Singleton : noncopyable {
- public:
-  Singleton() = delete;
-  ~Singleton() = delete;
-
-  static T& instance() {
-    std::call_once(once_, Singleton::init);
-    assert(value_ != nullptr);
-    return *value_;
+void handler(int a, const asio::error_code& error) {
+  if (!error) {
+    // Timer expired.
+    std::cout << "handlertimeout:" << a << endl;
   }
+}
+
+int main() {
+  using namespace std::placeholders;  // for _1, _2, _3...
+  ExecutorPool pool(4);
+  auto timer = pool.Get()->CreateSteadyTimer();
+  timer->expires_after(std::chrono::seconds(5));
+  std::cout << "startwait" << endl;
+  timer->wait();
+  std::cout << "endwait" << endl;
+
+  timer->expires_after(std::chrono::milliseconds(100));
+  std::cout << "startsyncwait" << endl;
+  timer->async_wait(std::bind(handler, 5, _1));
+  std::cout << "endsyncwait" << endl;
+  std::this_thread::sleep_for(5s);
+  return 0;
+}
 
- private:
-  static void init() { value_ = new T(); }
-
- private:
-  static std::once_flag once_;
-  static T* value_;
-};
-
-template <typename T>
-T* Singleton<T>::value_ = nullptr;
-
-}  // namespace tubemq
-
-#endif  // _TUBEMQ_SINGLETON_H
diff --git 
a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h 
b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
new file mode 100644
index 0000000..0dd1f66
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/executor_pool.h
@@ -0,0 +1,88 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#ifndef _TUBEMQ_EXECUTOR_POOL_
+#define _TUBEMQ_EXECUTOR_POOL_
+
+#include <stdlib.h>
+
+#include <asio.hpp>
+#include <asio/ssl.hpp>
+#include <functional>
+#include <memory>
+#include <mutex>
+#include <thread>
+
+#include "tubemq/noncopyable.h"
+
+namespace tubemq {
+
+typedef std::shared_ptr<asio::ip::tcp::socket> SocketPtr;
+typedef std::shared_ptr<asio::ssl::stream<asio::ip::tcp::socket &> > 
TlsSocketPtr;
+typedef std::shared_ptr<asio::ip::tcp::resolver> TcpResolverPtr;
+typedef std::shared_ptr<asio::steady_timer> SteadyTimerPtr;
+
+class Executor : noncopyable {
+ public:
+  Executor();
+  ~Executor();
+
+  SocketPtr CreateSocket();
+  TlsSocketPtr CreateTlsSocket(SocketPtr &socket, asio::ssl::context &ctx);
+  TcpResolverPtr CreateTcpResolver();
+  SteadyTimerPtr CreateSteadyTimer();
+  using func = std::function<void(void)>;
+
+  void Post(func task);
+
+  std::shared_ptr<asio::io_context> GetIoContext() { return io_context_; }
+
+  // Close executor and exit thread
+  void Close();
+
+ private:
+  void StartWorker(std::shared_ptr<asio::io_context> io_context);
+  std::shared_ptr<asio::io_context> io_context_;
+  using io_context_work = 
asio::executor_work_guard<asio::io_context::executor_type>;
+  io_context_work work_;
+  std::thread worker_;
+};
+
+typedef std::shared_ptr<Executor> ExecutorPtr;
+
+class ExecutorPool : noncopyable {
+ public:
+  explicit ExecutorPool(int nthreads = 2);
+
+  ExecutorPtr Get();
+
+  void Close();
+
+ private:
+  typedef std::vector<ExecutorPtr> ExecutorList;
+  ExecutorList executors_;
+  uint32_t executorIdx_;
+  std::mutex mutex_;
+  typedef std::unique_lock<std::mutex> Lock;
+};
+
+typedef std::shared_ptr<ExecutorPool> ExecutorPoolPtr;
+}  // namespace tubemq
+
+#endif  //_TUBEMQ_EXECUTOR_POOL_
diff --git a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h 
b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h
index fb22587..2761bda 100644
--- a/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h
+++ b/tubemq-client-twins/tubemq-client-cpp/include/tubemq/singleton.h
@@ -36,7 +36,7 @@ class Singleton : noncopyable {
   Singleton() = delete;
   ~Singleton() = delete;
 
-  static T& instance() {
+  static T& Instance() {
     std::call_once(once_, Singleton::init);
     assert(value_ != nullptr);
     return *value_;
@@ -51,6 +51,9 @@ class Singleton : noncopyable {
 };
 
 template <typename T>
+std::once_flag Singleton<T>::once_;
+
+template <typename T>
 T* Singleton<T>::value_ = nullptr;
 
 }  // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt 
b/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
index a6e76e5..5995d5c 100644
--- a/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
+++ b/tubemq-client-twins/tubemq-client-cpp/src/CMakeLists.txt
@@ -21,9 +21,6 @@
 cmake_minimum_required (VERSION 3.1)
 
 
-set(CMAKE_C_FLAGS "-O2 -g -Wall -Werror -Wsign-compare -fno-strict-aliasing 
-fPIC")
-set(CMAKE_CXX_FLAGS "-std=c++11 -O2 -g -Wall -Werror -Wsign-compare 
-fno-strict-aliasing -fPIC")
-
 AUX_SOURCE_DIRECTORY(. CURRENT_DIR_SRCS)                                       
 
 ADD_LIBRARY(tubemq STATIC ${CURRENT_DIR_SRCS})   
 TARGET_LINK_LIBRARIES (tubemq)
diff --git a/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc 
b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc
new file mode 100644
index 0000000..a2536ae
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/src/executor_pool.cc
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+#include "tubemq/executor_pool.h"
+
+#include <asio.hpp>
+#include <functional>
+#include <memory>
+
+namespace tubemq {
+
+Executor::Executor()
+    : io_context_(new asio::io_context()),
+      work_(asio::make_work_guard(*io_context_)),
+      worker_(std::bind(&Executor::StartWorker, this, io_context_)) {}
+
+Executor::~Executor() {
+  Close();
+  if (worker_.joinable()) {
+    worker_.detach();
+  }
+}
+
+void Executor::StartWorker(std::shared_ptr<asio::io_context> io_context) { 
io_context_->run(); }
+
+SocketPtr Executor::CreateSocket() { return SocketPtr(new 
asio::ip::tcp::socket(*io_context_)); }
+
+TlsSocketPtr Executor::CreateTlsSocket(SocketPtr &socket, asio::ssl::context 
&ctx) {
+  return std::shared_ptr<asio::ssl::stream<asio::ip::tcp::socket &> >(
+      new asio::ssl::stream<asio::ip::tcp::socket &>(*socket, ctx));
+}
+
+TcpResolverPtr Executor::CreateTcpResolver() {
+  return TcpResolverPtr(new asio::ip::tcp::resolver(*io_context_));
+}
+
+SteadyTimerPtr Executor::CreateSteadyTimer() {
+  return SteadyTimerPtr(new asio::steady_timer(*io_context_));
+}
+
+void Executor::Close() {
+  io_context_->stop();
+  if (std::this_thread::get_id() != worker_.get_id() && worker_.joinable()) {
+    worker_.join();
+  }
+}
+
+void Executor::Post(Executor::func task) { io_context_->post(task); }
+
+ExecutorPool::ExecutorPool(int nthreads) : executors_(nthreads), 
executorIdx_(0), mutex_() {}
+
+ExecutorPtr ExecutorPool::Get() {
+  Lock lock(mutex_);
+
+  int idx = executorIdx_++ % executors_.size();
+  if (!executors_[idx]) {
+    executors_[idx] = std::make_shared<Executor>();
+  }
+
+  return executors_[idx];
+}
+
+void ExecutorPool::Close() {
+  for (auto it = executors_.begin(); it != executors_.end(); ++it) {
+    if (*it != nullptr) {
+      (*it)->Close();
+    }
+    it->reset();
+  }
+}
+}  // namespace tubemq
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/README.md 
b/tubemq-client-twins/tubemq-client-cpp/third_party/README.md
new file mode 100644
index 0000000..6497505
--- /dev/null
+++ b/tubemq-client-twins/tubemq-client-cpp/third_party/README.md
@@ -0,0 +1,23 @@
+<!--
+
+    Licensed to the Apache Software Foundation (ASF) under one
+    or more contributor license agreements.  See the NOTICE file
+    distributed with this work for additional information
+    regarding copyright ownership.  The ASF licenses this file
+    to you under the Apache License, Version 2.0 (the
+    "License"); you may not use this file except in compliance
+    with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing,
+    software distributed under the License is distributed on an
+    "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+    KIND, either express or implied.  See the License for the
+    specific language governing permissions and limitations
+    under the License.
+
+-->
+
+#Third-party libraries#
+tubemq-client-cpp depends on several third-party libraries, their source code 
is available (usually as a git submodule) in this directory.
diff --git a/tubemq-client-twins/tubemq-client-cpp/third_party/readme.md 
b/tubemq-client-twins/tubemq-client-cpp/third_party/readme.md
deleted file mode 100644
index e1752ca..0000000
--- a/tubemq-client-twins/tubemq-client-cpp/third_party/readme.md
+++ /dev/null
@@ -1,2 +0,0 @@
-#Third-party libraries#
-tubemq-client-cpp depends on several third-party libraries, their source code 
is available (usually as a git submodule) in this directory.
\ No newline at end of file

Reply via email to