Ostrich: use a thread pool instead of starting and stopping individual threads
Project: http://git-wip-us.apache.org/repos/asf/marmotta/repo Commit: http://git-wip-us.apache.org/repos/asf/marmotta/commit/d811ef38 Tree: http://git-wip-us.apache.org/repos/asf/marmotta/tree/d811ef38 Diff: http://git-wip-us.apache.org/repos/asf/marmotta/diff/d811ef38 Branch: refs/heads/develop Commit: d811ef382445cdccf2c23c417ae912cda16ccdf3 Parents: 76ed061 Author: Sebastian Schaffert <[email protected]> Authored: Tue Aug 23 12:30:39 2016 +0200 Committer: Sebastian Schaffert <[email protected]> Committed: Tue Aug 23 12:30:39 2016 +0200 ---------------------------------------------------------------------- .../backend/persistence/leveldb_persistence.cc | 56 ++--- .../backend/persistence/leveldb_persistence.h | 2 + libraries/ostrich/backend/util/CMakeLists.txt | 3 +- libraries/ostrich/backend/util/threadpool.h | 251 +++++++++++++++++++ 4 files changed, 283 insertions(+), 29 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/persistence/leveldb_persistence.cc ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.cc b/libraries/ostrich/backend/persistence/leveldb_persistence.cc index fbb1cb2..efc1dc6 100644 --- a/libraries/ostrich/backend/persistence/leveldb_persistence.cc +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.cc @@ -293,7 +293,7 @@ bool Matches(const Statement& pattern, const Statement& stmt) { dbimpl::DB* buildDB(const std::string& path, const std::string& suffix, const dbimpl::Options& options) { dbimpl::DB* db; dbimpl::Status status = dbimpl::DB::Open(options, path + "/" + suffix + ".db", &db); - assert(status.ok()); + CHECK_STATUS(status); return db; } @@ -327,7 +327,7 @@ dbimpl::Options buildNsOptions() { } LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSize) - : comparator(new KeyComparator()) + : workers(8), comparator(new KeyComparator()) , cache(dbimpl::NewLRUCache(cacheSize)) , options(buildOptions(comparator.get(), cache.get())) , db_ns_prefix(buildDB(path, "ns_prefix", buildNsOptions())) @@ -335,23 +335,23 @@ LevelDBPersistence::LevelDBPersistence(const std::string &path, int64_t cacheSiz , db_meta(buildDB(path, "metadata", buildNsOptions())) { // Open databases in separate threads as LevelDB does a lot of computation on open. - std::vector<std::thread> openers; - openers.push_back(std::thread([&]() { + std::vector<std::future<void>> openers; + openers.push_back(workers.push([&](int id) { db_spoc.reset(buildDB(path, "spoc", *options)); })); - openers.push_back(std::thread([&]() { + openers.push_back(workers.push([&](int id) { db_cspo.reset(buildDB(path, "cspo", *options)); })); - openers.push_back(std::thread([&]() { + openers.push_back(workers.push([&](int id) { db_opsc.reset(buildDB(path, "opsc", *options)); })); - openers.push_back(std::thread([&]() { + openers.push_back(workers.push([&](int id) { db_pcos.reset(buildDB(path, "pcos", *options)); })); for (auto& t : openers) { - t.join(); + t.wait(); } CHECK_NOTNULL(db_spoc.get()); @@ -434,26 +434,26 @@ int64_t LevelDBPersistence::AddStatements(StatementIterator& it) { dbimpl::WriteBatch batch_spoc, batch_cspo, batch_opsc, batch_pcos; auto writeBatches = [&]{ - std::vector<std::thread> writers; - writers.push_back(std::thread([&]() { + std::vector<std::future<void>> writers; + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos)); batch_pcos.Clear(); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc)); batch_opsc.Clear(); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo)); batch_cspo.Clear(); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc)); batch_spoc.Clear(); })); for (auto& t : writers) { - t.join(); + t.wait(); } }; @@ -546,22 +546,22 @@ int64_t LevelDBPersistence::RemoveStatements(const rdf::proto::Statement& patter count = RemoveStatements(pattern, batch_spoc, batch_cspo, batch_opsc, batch_pcos); - std::vector<std::thread> writers; - writers.push_back(std::thread([&]() { + std::vector<std::future<void>> writers; + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &batch_pcos)); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &batch_opsc)); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &batch_cspo)); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &batch_spoc)); })); for (auto& t : writers) { - t.join(); + t.wait(); } DLOG(INFO) << "Removed " << count << " statements (time=" << @@ -579,34 +579,34 @@ UpdateStatistics LevelDBPersistence::Update(LevelDBPersistence::UpdateIterator & WriteBatch b_spoc, b_cspo, b_opsc, b_pcos, b_prefix, b_url; auto writeBatches = [&]{ - std::vector<std::thread> writers; - writers.push_back(std::thread([&]() { + std::vector<std::future<void>> writers; + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_pcos->Write(dbimpl::WriteOptions(), &b_pcos)); b_pcos.Clear(); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_opsc->Write(dbimpl::WriteOptions(), &b_opsc)); b_opsc.Clear(); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_cspo->Write(dbimpl::WriteOptions(), &b_cspo)); b_cspo.Clear(); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_spoc->Write(dbimpl::WriteOptions(), &b_spoc)); b_spoc.Clear(); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_ns_prefix->Write(dbimpl::WriteOptions(), &b_prefix)); b_prefix.Clear(); })); - writers.push_back(std::thread([&]() { + writers.push_back(workers.push([&](int id) { CHECK_STATUS(db_ns_url->Write(dbimpl::WriteOptions(), &b_url)); b_url.Clear(); })); for (auto& t : writers) { - t.join(); + t.wait(); } }; http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/persistence/leveldb_persistence.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/persistence/leveldb_persistence.h b/libraries/ostrich/backend/persistence/leveldb_persistence.h index 9fd1924..eee80e4 100644 --- a/libraries/ostrich/backend/persistence/leveldb_persistence.h +++ b/libraries/ostrich/backend/persistence/leveldb_persistence.h @@ -39,6 +39,7 @@ namespace dbimpl = leveldb; #include "model/rdf_model.h" #include "service/sail.pb.h" #include "util/iterator.h" +#include "util/threadpool.h" namespace marmotta { namespace persistence { @@ -142,6 +143,7 @@ class LevelDBPersistence { */ int64_t Size(); private: + ctpl::thread_pool workers; std::unique_ptr<KeyComparator> comparator; std::shared_ptr<dbimpl::Cache> cache; http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/util/CMakeLists.txt ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/util/CMakeLists.txt b/libraries/ostrich/backend/util/CMakeLists.txt index a4ad8b3..d7e13e0 100644 --- a/libraries/ostrich/backend/util/CMakeLists.txt +++ b/libraries/ostrich/backend/util/CMakeLists.txt @@ -1,6 +1,7 @@ include_directories(.. ${CMAKE_CURRENT_BINARY_DIR}/..) -add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h iterator.h unique.h time_logger.cc time_logger.h) +add_library(marmotta_util murmur3.cc murmur3.h split.cc split.h + iterator.h unique.h time_logger.cc time_logger.h threadpool.h) add_library(marmotta_raptor_util raptor_util.h raptor_util.cc) target_link_libraries(marmotta_raptor_util marmotta_model ${CMAKE_THREAD_LIBS_INIT} ${RAPTOR_LIBRARY} ${GFLAGS_LIBRARY}) \ No newline at end of file http://git-wip-us.apache.org/repos/asf/marmotta/blob/d811ef38/libraries/ostrich/backend/util/threadpool.h ---------------------------------------------------------------------- diff --git a/libraries/ostrich/backend/util/threadpool.h b/libraries/ostrich/backend/util/threadpool.h new file mode 100644 index 0000000..9047aad --- /dev/null +++ b/libraries/ostrich/backend/util/threadpool.h @@ -0,0 +1,251 @@ +/********************************************************* +* +* Copyright (C) 2014 by Vitaliy Vitsentiy +* +* Licensed under the Apache License, Version 2.0 (the "License"); +* you may not use this file except in compliance with the License. +* You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +* +*********************************************************/ + + +#ifndef __ctpl_stl_thread_pool_H__ +#define __ctpl_stl_thread_pool_H__ + +#include <functional> +#include <thread> +#include <atomic> +#include <vector> +#include <memory> +#include <exception> +#include <future> +#include <mutex> +#include <queue> + + + +// thread pool to run user's functors with signature +// ret func(int id, other_params) +// where id is the index of the thread that runs the functor +// ret is some return type + + +namespace ctpl { + +namespace detail { +template <typename T> +class Queue { + public: + bool push(T const & value) { + std::unique_lock<std::mutex> lock(this->mutex); + this->q.push(value); + return true; + } + // deletes the retrieved element, do not use for non integral types + bool pop(T & v) { + std::unique_lock<std::mutex> lock(this->mutex); + if (this->q.empty()) + return false; + v = this->q.front(); + this->q.pop(); + return true; + } + bool empty() { + std::unique_lock<std::mutex> lock(this->mutex); + return this->q.empty(); + } + private: + std::queue<T> q; + std::mutex mutex; +}; +} + +class thread_pool { + + public: + + thread_pool() { this->init(); } + thread_pool(int nThreads) { this->init(); this->resize(nThreads); } + + // the destructor waits for all the functions in the queue to be finished + ~thread_pool() { + this->stop(true); + } + + // get the number of running threads in the pool + int size() { return static_cast<int>(this->threads.size()); } + + // number of idle threads + int n_idle() { return this->nWaiting; } + std::thread & get_thread(int i) { return *this->threads[i]; } + + // change the number of threads in the pool + // should be called from one thread, otherwise be careful to not interleave, also with this->stop() + // nThreads must be >= 0 + void resize(int nThreads) { + if (!this->isStop && !this->isDone) { + int oldNThreads = static_cast<int>(this->threads.size()); + if (oldNThreads <= nThreads) { // if the number of threads is increased + this->threads.resize(nThreads); + this->flags.resize(nThreads); + + for (int i = oldNThreads; i < nThreads; ++i) { + this->flags[i] = std::make_shared<std::atomic<bool>>(false); + this->set_thread(i); + } + } + else { // the number of threads is decreased + for (int i = oldNThreads - 1; i >= nThreads; --i) { + *this->flags[i] = true; // this thread will finish + this->threads[i]->detach(); + } + { + // stop the detached threads that were waiting + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_all(); + } + this->threads.resize(nThreads); // safe to delete because the threads are detached + this->flags.resize(nThreads); // safe to delete because the threads have copies of shared_ptr of the flags, not originals + } + } + } + + // empty the queue + void clear_queue() { + std::function<void(int id)> * _f; + while (this->q.pop(_f)) + delete _f; // empty the queue + } + + // pops a functional wrapper to the original function + std::function<void(int)> pop() { + std::function<void(int id)> * _f = nullptr; + this->q.pop(_f); + std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred + std::function<void(int)> f; + if (_f) + f = *_f; + return f; + } + + // wait for all computing threads to finish and stop all threads + // may be called asynchronously to not pause the calling thread while waiting + // if isWait == true, all the functions in the queue are run, otherwise the queue is cleared without running the functions + void stop(bool isWait = false) { + if (!isWait) { + if (this->isStop) + return; + this->isStop = true; + for (int i = 0, n = this->size(); i < n; ++i) { + *this->flags[i] = true; // command the threads to stop + } + this->clear_queue(); // empty the queue + } + else { + if (this->isDone || this->isStop) + return; + this->isDone = true; // give the waiting threads a command to finish + } + { + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_all(); // stop all waiting threads + } + for (int i = 0; i < static_cast<int>(this->threads.size()); ++i) { // wait for the computing threads to finish + if (this->threads[i]->joinable()) + this->threads[i]->join(); + } + // if there were no threads in the pool but some functors in the queue, the functors are not deleted by the threads + // therefore delete them here + this->clear_queue(); + this->threads.clear(); + this->flags.clear(); + } + + template<typename F, typename... Rest> + auto push(F && f, Rest&&... rest) ->std::future<decltype(f(0, rest...))> { + auto pck = std::make_shared<std::packaged_task<decltype(f(0, rest...))(int)>>( + std::bind(std::forward<F>(f), std::placeholders::_1, std::forward<Rest>(rest)...) + ); + auto _f = new std::function<void(int id)>([pck](int id) { + (*pck)(id); + }); + this->q.push(_f); + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_one(); + return pck->get_future(); + } + + // run the user's function that excepts argument int - id of the running thread. returned value is templatized + // operator returns std::future, where the user can get the result and rethrow the catched exceptins + template<typename F> + auto push(F && f) ->std::future<decltype(f(0))> { + auto pck = std::make_shared<std::packaged_task<decltype(f(0))(int)>>(std::forward<F>(f)); + auto _f = new std::function<void(int id)>([pck](int id) { + (*pck)(id); + }); + this->q.push(_f); + std::unique_lock<std::mutex> lock(this->mutex); + this->cv.notify_one(); + return pck->get_future(); + } + + + private: + + // deleted + thread_pool(const thread_pool &);// = delete; + thread_pool(thread_pool &&);// = delete; + thread_pool & operator=(const thread_pool &);// = delete; + thread_pool & operator=(thread_pool &&);// = delete; + + void set_thread(int i) { + std::shared_ptr<std::atomic<bool>> flag(this->flags[i]); // a copy of the shared ptr to the flag + auto f = [this, i, flag/* a copy of the shared ptr to the flag */]() { + std::atomic<bool> & _flag = *flag; + std::function<void(int id)> * _f; + bool isPop = this->q.pop(_f); + while (true) { + while (isPop) { // if there is anything in the queue + std::unique_ptr<std::function<void(int id)>> func(_f); // at return, delete the function even if an exception occurred + (*_f)(i); + if (_flag) + return; // the thread is wanted to stop, return even if the queue is not empty yet + else + isPop = this->q.pop(_f); + } + // the queue is empty here, wait for the next command + std::unique_lock<std::mutex> lock(this->mutex); + ++this->nWaiting; + this->cv.wait(lock, [this, &_f, &isPop, &_flag](){ isPop = this->q.pop(_f); return isPop || this->isDone || _flag; }); + --this->nWaiting; + if (!isPop) + return; // if the queue is empty and this->isDone == true or *flag then return + } + }; + this->threads[i].reset(new std::thread(f)); // compiler may not support std::make_unique() + } + + void init() { this->nWaiting = 0; this->isStop = false; this->isDone = false; } + + std::vector<std::unique_ptr<std::thread>> threads; + std::vector<std::shared_ptr<std::atomic<bool>>> flags; + detail::Queue<std::function<void(int id)> *> q; + std::atomic<bool> isDone; + std::atomic<bool> isStop; + std::atomic<int> nWaiting; // how many threads are waiting + + std::mutex mutex; + std::condition_variable cv; +}; + +} + +#endif // __ctpl_stl_thread_pool_H__ \ No newline at end of file
