chenBright commented on code in PR #2225: URL: https://github.com/apache/brpc/pull/2225#discussion_r1199568069
########## src/butil/containers/doubly_buffered_data_bthread.h: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Date: Mon Sep 22 22:23:13 CST 2014 + +#ifndef BUTIL_DOUBLY_BUFFERED_DATA_BTHREAD_H +#define BUTIL_DOUBLY_BUFFERED_DATA_BTHREAD_H + +#include <deque> +#include <vector> // std::vector +#include <pthread.h> +#include "butil/containers/doubly_buffered_data_internal.h" +#include "butil/scoped_lock.h" +#include "butil/thread_local.h" +#include "butil/logging.h" +#include "butil/macros.h" +#include "butil/type_traits.h" +#include "butil/errno.h" +#include "butil/atomicops.h" +#include "butil/unique_ptr.h" + +namespace butil { + +// Compared to DoublyBufferedData, DoublyBufferedDataBthread allows to +// bthread is suspended when executing query logic. Review Comment: done ########## src/butil/containers/doubly_buffered_data_bthread.h: ########## @@ -0,0 +1,409 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +// Date: Mon Sep 22 22:23:13 CST 2014 + +#ifndef BUTIL_DOUBLY_BUFFERED_DATA_BTHREAD_H +#define BUTIL_DOUBLY_BUFFERED_DATA_BTHREAD_H + +#include <deque> +#include <vector> // std::vector +#include <pthread.h> +#include "butil/containers/doubly_buffered_data_internal.h" +#include "butil/scoped_lock.h" +#include "butil/thread_local.h" +#include "butil/logging.h" +#include "butil/macros.h" +#include "butil/type_traits.h" +#include "butil/errno.h" +#include "butil/atomicops.h" +#include "butil/unique_ptr.h" + +namespace butil { + +// Compared to DoublyBufferedData, DoublyBufferedDataBthread allows to +// bthread is suspended when executing query logic. +// If bthread will not be suspended in the query logic, DoublyBufferedDataBthread +// also makes Read() almost lock-free by making Modify() *much* slower. +// If bthread will be suspended in the query logic, there is competition among +// bthreads using the same Wrapper. +// +// Read(): begin with thread-local reference count of foreground instance +// incremented by one which be protected by a thread-local mutex, then read +// the foreground instance which will not be changed before its all thread-local +// reference count become zero. At last, after the query completes, thread-local +// reference count of foreground instance will be decremented by one, and if +// it becomes zero, notifies Modify(). +// +// Modify(): Modify background instance which is not used by any Read(), flip +// foreground and background, lock thread-local mutexes one by one and wait +// until thread-local reference counts which be protected by a thread-local +// mutex become 0 to make sure all existing Read() finish and later Read() +// see new foreground, then modify background(foreground before flip) again. + +template <typename T> +class DoublyBufferedDataBthread { + friend class internal::WrapperTLSGroup<DoublyBufferedDataBthread>; + class Wrapper; +public: + typedef T DataType; + + class ScopedPtr { + friend class DoublyBufferedDataBthread; + public: + ScopedPtr() : _data(NULL), _index(0), _w(NULL) {} + ~ScopedPtr() { + if (_w) { + _w->EndRead(_index); + } + } + const T* get() const { return _data; } + const T& operator*() const { return *_data; } + const T* operator->() const { return _data; } + + private: + DISALLOW_COPY_AND_ASSIGN(ScopedPtr); + const T* _data; + int _index; + Wrapper* _w; + }; + + DoublyBufferedDataBthread(); + ~DoublyBufferedDataBthread(); + + // Put foreground instance into ptr. The instance will not be changed until + // ptr is destructed. + // Returns 0 on success, -1 otherwise. + int Read(ScopedPtr* ptr); + + // Modify background and foreground instances. fn(T&, ...) will be called + // twice. Modify() from different threads are exclusive from each other. + // NOTE: Call same series of fn to different equivalent instances should + // result in equivalent instances, otherwise foreground and background + // instance will be inconsistent. + template <typename Fn> size_t Modify(Fn& fn); + template <typename Fn, typename Arg1> size_t Modify(Fn& fn, const Arg1&); + template <typename Fn, typename Arg1, typename Arg2> + size_t Modify(Fn& fn, const Arg1&, const Arg2&); + + // fn(T& background, const T& foreground, ...) will be called to background + // and foreground instances respectively. + template <typename Fn> size_t ModifyWithForeground(Fn& fn); + template <typename Fn, typename Arg1> + size_t ModifyWithForeground(Fn& fn, const Arg1&); + template <typename Fn, typename Arg1, typename Arg2> + size_t ModifyWithForeground(Fn& fn, const Arg1&, const Arg2&); + +private: + + const T* UnsafeRead(int& index) const { + index = _index.load(butil::memory_order_acquire); + return _data + index; + } + + Wrapper* AddWrapper(Wrapper*); + void RemoveWrapper(Wrapper*); + + // Foreground and background void. + T _data[2]; + + // Index of foreground instance. + butil::atomic<int> _index; + + // Key to access thread-local wrappers. + internal::WrapperTLSId _wrapper_key; + + // All thread-local instances. + std::vector<Wrapper*> _wrappers; + + // Sequence access to _wrappers. + pthread_mutex_t _wrappers_mutex{}; + + // Sequence modifications. + pthread_mutex_t _modify_mutex{}; +}; + +template <typename T> +class DoublyBufferedDataBthread<T>::Wrapper { +friend class DoublyBufferedDataBthread; +public: + explicit Wrapper() + : _control(NULL) + , _ref0(0) + , _ref1(0) + , _modify_wait(false) { + pthread_mutex_init(&_mutex, NULL); + pthread_cond_init(&_cond0, NULL); + pthread_cond_init(&_cond1, NULL); + } + + ~Wrapper() { + if (_control != NULL) { + _control->RemoveWrapper(this); + } + + WaitReadDone(0); + WaitReadDone(1); + + pthread_mutex_destroy(&_mutex); + pthread_cond_destroy(&_cond0); + pthread_cond_destroy(&_cond1); + } + + // _mutex will be locked by the calling pthread and DoublyBufferedDataBthread. + // Most of the time, no modifications are done, so the mutex is + // uncontended and fast. + inline void BeginRead() { + pthread_mutex_lock(&_mutex); + } + + // _mutex will be unlocked by the calling pthread and DoublyBufferedDataBthread. + inline void BeginReadRelease() { + pthread_mutex_unlock(&_mutex); + } + + // Thread-local reference count which be protected by _mutex + // will be decremented by one. + inline void EndRead(int index) { + BAIDU_SCOPED_LOCK(_mutex); + SubRef(index); + SignalReadCond(index); + } + + inline void WaitReadDone(int index) { + BAIDU_SCOPED_LOCK(_mutex); + if (index != 0 && index != 1) { + return; + } + int& ref = index == 0 ? _ref0 : _ref1; + while (ref != 0) { + _modify_wait = true; + if (index == 0) { + pthread_cond_wait(&_cond0, &_mutex); + } else if (index == 1) { + pthread_cond_wait(&_cond1, &_mutex); + } + } + _modify_wait = false; + } + + inline void SignalReadCond(int index) { + if (index == 0 && _ref0 == 0) { + pthread_cond_signal(&_cond0); + } else if (index == 1 && _ref1 == 0) { + pthread_cond_signal(&_cond1); + } + } + + void AddRef(int index) { + if (index == 0) { + ++_ref0; + } else { + ++_ref1; + } + } + + void SubRef(int index) { + if (index == 0) { + --_ref0; + } else { + --_ref1; + } + } + +private: + DoublyBufferedDataBthread* _control; + pthread_mutex_t _mutex{}; + pthread_cond_t _cond0{}; // Cond for _ref0. + pthread_cond_t _cond1{}; // Cond for _ref1. + int _ref0; // Reference count for _data[0]. Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org --------------------------------------------------------------------- To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org For additional commands, e-mail: dev-h...@brpc.apache.org