This is an automated email from the ASF dual-hosted git repository.
wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git
The following commit(s) were added to refs/heads/master by this push:
new 122355a2 EventDispatcher supports various IO types (#2560)
122355a2 is described below
commit 122355a29e2dd3341ad6f47e17682b482c45b7e4
Author: Bright Chen <[email protected]>
AuthorDate: Mon Jun 3 16:29:47 2024 +0800
EventDispatcher supports various IO types (#2560)
---
src/brpc/acceptor.cpp | 2 +-
src/brpc/details/health_check.cpp | 3 +-
src/brpc/event_dispatcher.cpp | 18 +
src/brpc/event_dispatcher.h | 211 ++++++++++-
src/brpc/event_dispatcher_epoll.cpp | 85 +++--
src/brpc/event_dispatcher_kqueue.cpp | 82 ++---
src/brpc/socket.cpp | 592 ++++++++++++++----------------
src/brpc/socket.h | 107 ++----
src/brpc/socket_id.h | 16 +-
src/brpc/socket_inl.h | 193 +---------
src/brpc/versioned_ref_with_id.h | 624 ++++++++++++++++++++++++++++++++
src/butil/memory/scope_guard.h | 13 +-
src/butil/type_traits.h | 36 +-
test/brpc_event_dispatcher_unittest.cpp | 266 +++++++++++++-
test/brpc_load_balancer_unittest.cpp | 2 +-
test/brpc_redis_unittest.cpp | 4 +-
16 files changed, 1531 insertions(+), 723 deletions(-)
diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp
index d2657258..e8e6dcbe 100644
--- a/src/brpc/acceptor.cpp
+++ b/src/brpc/acceptor.cpp
@@ -321,7 +321,7 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket*
acception) {
// Always add this socket into `_socket_map' whether it
// has been `SetFailed' or not, whether `Acceptor' is
// running or not. Otherwise, `Acceptor::BeforeRecycle'
- // may be called (inside Socket::OnRecycle) after `Acceptor'
+ // may be called (inside Socket::BeforeRecycled) after
`Acceptor'
// has been destroyed
am->_socket_map.insert(socket_id, ConnectStatistics());
}
diff --git a/src/brpc/details/health_check.cpp
b/src/brpc/details/health_check.cpp
index ea6e7874..81008255 100644
--- a/src/brpc/details/health_check.cpp
+++ b/src/brpc/details/health_check.cpp
@@ -210,7 +210,8 @@ bool HealthCheckTask::OnTriggeringTask(timespec*
next_abstime) {
ptr->_ninflight_app_health_check.fetch_add(
1, butil::memory_order_relaxed);
}
- ptr->Revive();
+ // See comments above.
+ ptr->Revive(2/*note*/);
ptr->_hc_count = 0;
if (!FLAGS_health_check_path.empty()) {
HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp
index 689e80da..53495ea6 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher.cpp
@@ -71,6 +71,24 @@ EventDispatcher& GetGlobalEventDispatcher(int fd,
bthread_tag_t tag) {
return g_edisp[tag * FLAGS_event_dispatcher_num + index];
}
+int IOEventData::OnCreated(const IOEventDataOptions& options) {
+ if (!options.input_cb) {
+ LOG(ERROR) << "Invalid input_cb=NULL";
+ return -1;
+ }
+ if (!options.output_cb) {
+ LOG(ERROR) << "Invalid output_cb=NULL";
+ return -1;
+ }
+
+ _options = options;
+ return 0;
+}
+
+void IOEventData::BeforeRecycled() {
+ _options = { NULL, NULL, NULL };
+}
+
} // namespace brpc
#if defined(OS_LINUX)
diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h
index 0de74df6..51c404e2 100644
--- a/src/brpc/event_dispatcher.h
+++ b/src/brpc/event_dispatcher.h
@@ -21,16 +21,78 @@
#include "butil/macros.h" // DISALLOW_COPY_AND_ASSIGN
#include "bthread/types.h" // bthread_t, bthread_attr_t
-#include "brpc/socket.h" // Socket, SocketId
+#include "brpc/versioned_ref_with_id.h"
namespace brpc {
+// Unique identifier of a IOEventData.
+// Users shall store EventDataId instead of EventData and call
EventData::Address()
+// to convert the identifier to an unique_ptr at each access. Whenever a
+// unique_ptr is not destructed, the enclosed EventData will not be recycled.
+typedef VRefId IOEventDataId;
+
+const VRefId INVALID_IO_EVENT_DATA_ID = INVALID_VREF_ID;
+
+class IOEventData;
+
+typedef VersionedRefWithIdUniquePtr<IOEventData> EventDataUniquePtr;
+
+// User callback type of input event and output event.
+typedef int (*InputEventCallback) (void* id, uint32_t events,
+ const bthread_attr_t& thread_attr);
+typedef InputEventCallback OutputEventCallback;
+
+struct IOEventDataOptions {
+ // Callback for input event.
+ InputEventCallback input_cb;
+ // Callback for output event.
+ OutputEventCallback output_cb;
+ // User data.
+ void* user_data;
+};
+
+// EventDispatcher finds IOEventData by IOEventDataId which is
+// stored in epoll/kqueue data, and calls input/output event callback,
+// so EventDispatcher supports various IO types, such as socket,
+// pipe, eventfd, timerfd, etc.
+class IOEventData : public VersionedRefWithId<IOEventData> {
+public:
+ explicit IOEventData(Forbidden f)
+ : VersionedRefWithId<IOEventData>(f)
+ , _options{ NULL, NULL, NULL } {}
+
+ DISALLOW_COPY_AND_ASSIGN(IOEventData);
+
+ int CallInputEventCallback(uint32_t events,
+ const bthread_attr_t& thread_attr) {
+ return _options.input_cb(_options.user_data, events, thread_attr);
+ }
+
+ int CallOutputEventCallback(uint32_t events,
+ const bthread_attr_t& thread_attr) {
+ return _options.output_cb(_options.user_data, events, thread_attr);
+ }
+
+private:
+friend class VersionedRefWithId<IOEventData>;
+
+ int OnCreated(const IOEventDataOptions& options);
+ void BeforeRecycled();
+
+ IOEventDataOptions _options;
+};
+
+namespace rdma {
+class RdmaEndpoint;
+}
+
// Dispatch edge-triggered events of file descriptors to consumers
// running in separate bthreads.
class EventDispatcher {
friend class Socket;
friend class rdma::RdmaEndpoint;
+template <typename T> friend class IOEvent;
public:
EventDispatcher();
@@ -40,7 +102,7 @@ public:
// Use |*consumer_thread_attr| (if it's not NULL) as the attribute to
// create bthreads running user callbacks.
// Returns 0 on success, -1 otherwise.
- virtual int Start(const bthread_attr_t* consumer_thread_attr);
+ virtual int Start(const bthread_attr_t* thread_attr);
// True iff this dispatcher is running in a bthread
bool Running() const;
@@ -57,19 +119,19 @@ public:
// When the file descriptor is removed from internal epoll, the Socket
// will be dereferenced once additionally.
// Returns 0 on success, -1 otherwise.
- int AddConsumer(SocketId socket_id, int fd);
+ int AddConsumer(IOEventDataId event_data_id, int fd);
// Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is
// true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
// be used instead of EPOLL_CTL_ADD. When event arrives,
// `Socket::HandleEpollOut' will be called with `socket_id'
// Returns 0 on success, -1 otherwise and errno is set
- int RegisterEvent(SocketId socket_id, int fd, bool pollin);
+ int RegisterEvent(IOEventDataId event_data_id, int fd, bool pollin);
// Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event
// will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL
// Returns 0 on success, -1 otherwise and errno is set
- int UnregisterEvent(SocketId socket_id, int fd, bool pollin);
+ int UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin);
private:
DISALLOW_COPY_AND_ASSIGN(EventDispatcher);
@@ -83,8 +145,33 @@ private:
// Remove the file descriptor `fd' from epoll.
int RemoveConsumer(int fd);
- // The epoll to watch events.
- int _epfd;
+ // Call user callback of input event and output event.
+ template<bool IsInputEvent>
+ static int OnEvent(IOEventDataId event_data_id, uint32_t events,
+ const bthread_attr_t& thread_attr) {
+ EventDataUniquePtr data;
+ if (IOEventData::Address(event_data_id, &data) != 0) {
+ return -1;
+ }
+ return IsInputEvent ?
+ data->CallInputEventCallback(events, thread_attr) :
+ data->CallOutputEventCallback(events, thread_attr);
+ }
+
+ static int CallInputEventCallback(IOEventDataId event_data_id,
+ uint32_t events,
+ const bthread_attr_t& thread_attr) {
+ return OnEvent<true>(event_data_id, events, thread_attr);
+ }
+
+ static int CallOutputEventCallback(IOEventDataId event_data_id,
+ uint32_t events,
+ const bthread_attr_t& thread_attr) {
+ return OnEvent<false>(event_data_id, events, thread_attr);
+ }
+
+ // The epoll/kqueue fd to watch events.
+ int _event_dispatcher_fd;
// false unless Stop() is called.
volatile bool _stop;
@@ -93,7 +180,7 @@ private:
bthread_t _tid;
// The attribute of bthreads calling user callbacks.
- bthread_attr_t _consumer_thread_attr;
+ bthread_attr_t _thread_attr;
// Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
int _wakeup_fds[2];
@@ -101,6 +188,114 @@ private:
EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);
+// IOEvent class manages the IO events of a file descriptor conveniently.
+template <typename T>
+class IOEvent {
+public:
+ IOEvent()
+ : _init(false)
+ , _event_data_id(INVALID_IO_EVENT_DATA_ID)
+ , _bthread_tag(bthread_self_tag()) {}
+
+ ~IOEvent() { Reset(); }
+
+ DISALLOW_COPY_AND_ASSIGN(IOEvent);
+
+ int Init(void* user_data) {
+ if (_init) {
+ LOG(WARNING) << "IOEvent has been initialized";
+ return 0;
+ }
+ IOEventDataOptions options{ OnInputEvent, OnOutputEvent, user_data };
+ if (IOEventData::Create(&_event_data_id, options) != 0) {
+ LOG(ERROR) << "Fail to create EventData";
+ return -1;
+ }
+ _init = true;
+ return 0;
+ }
+
+ void Reset() {
+ if (_init) {
+ IOEventData::SetFailedById(_event_data_id);
+ _init = false;
+ }
+ }
+
+ // See comments of `EventDispatcher::AddConsumer'.
+ int AddConsumer(int fd) {
+ if (!_init) {
+ LOG(ERROR) << "IOEvent has not been initialized";
+ return -1;
+ }
+ return GetGlobalEventDispatcher(fd, _bthread_tag)
+ .AddConsumer(_event_data_id, fd);
+ }
+
+ // See comments of `EventDispatcher::RemoveConsumer'.
+ int RemoveConsumer(int fd) {
+ if (!_init) {
+ LOG(ERROR) << "IOEvent has not been initialized";
+ return -1;
+ }
+ return GetGlobalEventDispatcher(fd, _bthread_tag).RemoveConsumer(fd);
+ }
+
+ // See comments of `EventDispatcher::RegisterEvent'.
+ int RegisterEvent(int fd, bool pollin) {
+ if (!_init) {
+ LOG(ERROR) << "IOEvent has not been initialized";
+ return -1;
+ }
+ return GetGlobalEventDispatcher(fd, _bthread_tag)
+ .RegisterEvent(_event_data_id, fd, pollin);
+ }
+
+ // See comments of `EventDispatcher::UnregisterEvent'.
+ int UnregisterEvent(int fd, bool pollin) {
+ if (!_init) {
+ LOG(ERROR) << "IOEvent has not been initialized";
+ return -1;
+ }
+ return GetGlobalEventDispatcher(fd, _bthread_tag)
+ .UnregisterEvent(_event_data_id, fd, pollin);
+ }
+
+ void set_bthread_tag(bthread_tag_t bthread_tag) {
+ _bthread_tag = bthread_tag;
+ }
+ bthread_tag_t bthread_tag() const {
+ return _bthread_tag;
+ }
+
+private:
+ // Generic callback to handle input event.
+ static int OnInputEvent(void* user_data, uint32_t events,
+ const bthread_attr_t& thread_attr) {
+ static_assert(
+ butil::is_result_int<decltype(&T::OnInputEvent),
+ void*, uint32_t,
+ bthread_attr_t>::value,
+ "T::OnInputEvent signature mismatch");
+ return T::OnInputEvent(user_data, events, thread_attr);
+ }
+
+ // Generic callback to handle output event.
+ static int OnOutputEvent(void* user_data, uint32_t events,
+ const bthread_attr_t& thread_attr) {
+ static_assert(
+ butil::is_result_int<decltype(&T::OnOutputEvent),
+ void*, uint32_t,
+ bthread_attr_t>::value,
+ "T::OnInputEvent signature mismatch");
+ return T::OnOutputEvent(user_data, events, thread_attr);
+ }
+
+ bool _init;
+ IOEventDataId _event_data_id;
+ bthread_tag_t _bthread_tag;
+};
+
} // namespace brpc
diff --git a/src/brpc/event_dispatcher_epoll.cpp
b/src/brpc/event_dispatcher_epoll.cpp
index 1ac7647d..64717b16 100644
--- a/src/brpc/event_dispatcher_epoll.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -23,17 +23,16 @@
namespace brpc {
EventDispatcher::EventDispatcher()
- : _epfd(-1)
+ : _event_dispatcher_fd(-1)
, _stop(false)
, _tid(0)
- , _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
-{
- _epfd = epoll_create(1024 * 1024);
- if (_epfd < 0) {
+ , _thread_attr(BTHREAD_ATTR_NORMAL) {
+ _event_dispatcher_fd = epoll_create(1024 * 1024);
+ if (_event_dispatcher_fd < 0) {
PLOG(FATAL) << "Fail to create epoll";
return;
}
- CHECK_EQ(0, butil::make_close_on_exec(_epfd));
+ CHECK_EQ(0, butil::make_close_on_exec(_event_dispatcher_fd));
_wakeup_fds[0] = -1;
_wakeup_fds[1] = -1;
@@ -46,9 +45,9 @@ EventDispatcher::EventDispatcher()
EventDispatcher::~EventDispatcher() {
Stop();
Join();
- if (_epfd >= 0) {
- close(_epfd);
- _epfd = -1;
+ if (_event_dispatcher_fd >= 0) {
+ close(_event_dispatcher_fd);
+ _event_dispatcher_fd = -1;
}
if (_wakeup_fds[0] > 0) {
close(_wakeup_fds[0]);
@@ -57,7 +56,7 @@ EventDispatcher::~EventDispatcher() {
}
int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
- if (_epfd < 0) {
+ if (_event_dispatcher_fd < 0) {
LOG(FATAL) << "epoll was not created";
return -1;
}
@@ -68,22 +67,21 @@ int EventDispatcher::Start(const bthread_attr_t*
consumer_thread_attr) {
return -1;
}
- // Set _consumer_thread_attr before creating epoll thread to make sure
+ // Set _thread_attr before creating epoll thread to make sure
// everyting seems sane to the thread.
- _consumer_thread_attr = (consumer_thread_attr ?
- *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
+ _thread_attr = consumer_thread_attr ?
+ *consumer_thread_attr : BTHREAD_ATTR_NORMAL;
- //_consumer_thread_attr is used in StartInputEvent(), assign flag
NEVER_QUIT to it will cause new bthread
+ //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it
will cause new bthread
// that created by epoll_wait() never to quit.
- bthread_attr_t epoll_thread_attr = _consumer_thread_attr |
BTHREAD_NEVER_QUIT;
+ bthread_attr_t epoll_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT;
// Polling thread uses the same attr for consumer threads (NORMAL right
// now). Previously, we used small stack (32KB) which may be overflowed
// when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this
// is also a potential issue for consumer threads, using the same attr
// should be a reasonable solution.
- int rc = bthread_start_background(
- &_tid, &epoll_thread_attr, RunThis, this);
+ int rc = bthread_start_background(&_tid, &epoll_thread_attr, RunThis,
this);
if (rc) {
LOG(FATAL) << "Fail to create epoll thread: " << berror(rc);
return -1;
@@ -92,15 +90,15 @@ int EventDispatcher::Start(const bthread_attr_t*
consumer_thread_attr) {
}
bool EventDispatcher::Running() const {
- return !_stop && _epfd >= 0 && _tid != 0;
+ return !_stop && _event_dispatcher_fd >= 0 && _tid != 0;
}
void EventDispatcher::Stop() {
_stop = true;
- if (_epfd >= 0) {
+ if (_event_dispatcher_fd >= 0) {
epoll_event evt = { EPOLLOUT, { NULL } };
- epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
+ epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
}
}
@@ -111,62 +109,62 @@ void EventDispatcher::Join() {
}
}
-int EventDispatcher::RegisterEvent(SocketId socket_id, int fd, bool pollin) {
- if (_epfd < 0) {
+int EventDispatcher::RegisterEvent(IOEventDataId event_data_id,
+ int fd, bool pollin) {
+ if (_event_dispatcher_fd < 0) {
errno = EINVAL;
return -1;
}
epoll_event evt;
- evt.data.u64 = socket_id;
+ evt.data.u64 = event_data_id;
evt.events = EPOLLOUT | EPOLLET;
#ifdef BRPC_SOCKET_HAS_EOF
evt.events |= has_epollrdhup;
#endif
if (pollin) {
evt.events |= EPOLLIN;
- if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
+ if (epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_MOD, fd, &evt) < 0) {
// This fd has been removed from epoll via `RemoveConsumer',
// in which case errno will be ENOENT
return -1;
}
} else {
- if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
+ if (epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_ADD, fd, &evt) < 0) {
return -1;
}
}
return 0;
}
-int EventDispatcher::UnregisterEvent(SocketId socket_id,
- int fd, bool pollin) {
+int EventDispatcher::UnregisterEvent(IOEventDataId event_data_id,
+ int fd, bool pollin) {
if (pollin) {
epoll_event evt;
- evt.data.u64 = socket_id;
+ evt.data.u64 = event_data_id;
evt.events = EPOLLIN | EPOLLET;
#ifdef BRPC_SOCKET_HAS_EOF
evt.events |= has_epollrdhup;
#endif
- return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt);
+ return epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_MOD, fd, &evt);
} else {
- return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
+ return epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_DEL, fd, NULL);
}
return -1;
}
-int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
- if (_epfd < 0) {
+int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) {
+ if (_event_dispatcher_fd < 0) {
errno = EINVAL;
return -1;
}
epoll_event evt;
+ evt.data.u64 = event_data_id;
evt.events = EPOLLIN | EPOLLET;
- evt.data.u64 = socket_id;
#ifdef BRPC_SOCKET_HAS_EOF
evt.events |= has_epollrdhup;
#endif
- return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
- return -1;
+ return epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_ADD, fd, &evt);
}
int EventDispatcher::RemoveConsumer(int fd) {
@@ -180,8 +178,8 @@ int EventDispatcher::RemoveConsumer(int fd) {
// from epoll again! If the fd was level-triggered and there's data left,
// epoll_wait will keep returning events of the fd continuously, making
// program abnormal.
- if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
- PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
+ if (epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_DEL, fd, NULL) < 0) {
+ PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" <<
_event_dispatcher_fd;
return -1;
}
return 0;
@@ -197,12 +195,12 @@ void EventDispatcher::Run() {
epoll_event e[32];
#ifdef BRPC_ADDITIONAL_EPOLL
// Performance downgrades in examples.
- int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
+ int n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), 0);
if (n == 0) {
- n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
+ n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1);
}
#else
- const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
+ const int n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1);
#endif
if (_stop) {
// epoll_ctl/epoll_wait should have some sort of memory fencing
@@ -215,7 +213,7 @@ void EventDispatcher::Run() {
// We've checked _stop, no wake-up will be missed.
continue;
}
- PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
+ PLOG(FATAL) << "Fail to epoll_wait epfd=" << _event_dispatcher_fd;
break;
}
for (int i = 0; i < n; ++i) {
@@ -225,14 +223,13 @@ void EventDispatcher::Run() {
#endif
) {
// We don't care about the return value.
- Socket::StartInputEvent(e[i].data.u64, e[i].events,
- _consumer_thread_attr);
+ CallInputEventCallback(e[i].data.u64, e[i].events,
_thread_attr);
}
}
for (int i = 0; i < n; ++i) {
if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
// We don't care about the return value.
- Socket::HandleEpollOut(e[i].data.u64);
+ CallOutputEventCallback(e[i].data.u64, e[i].events,
_thread_attr);
}
}
}
diff --git a/src/brpc/event_dispatcher_kqueue.cpp
b/src/brpc/event_dispatcher_kqueue.cpp
index fa52a204..97ad29bb 100644
--- a/src/brpc/event_dispatcher_kqueue.cpp
+++ b/src/brpc/event_dispatcher_kqueue.cpp
@@ -23,17 +23,16 @@
namespace brpc {
EventDispatcher::EventDispatcher()
- : _epfd(-1)
+ : _event_dispatcher_fd(-1)
, _stop(false)
, _tid(0)
- , _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
-{
- _epfd = kqueue();
- if (_epfd < 0) {
+ , _thread_attr(BTHREAD_ATTR_NORMAL) {
+ _event_dispatcher_fd = kqueue();
+ if (_event_dispatcher_fd < 0) {
PLOG(FATAL) << "Fail to create kqueue";
return;
}
- CHECK_EQ(0, butil::make_close_on_exec(_epfd));
+ CHECK_EQ(0, butil::make_close_on_exec(_event_dispatcher_fd));
_wakeup_fds[0] = -1;
_wakeup_fds[1] = -1;
@@ -46,9 +45,9 @@ EventDispatcher::EventDispatcher()
EventDispatcher::~EventDispatcher() {
Stop();
Join();
- if (_epfd >= 0) {
- close(_epfd);
- _epfd = -1;
+ if (_event_dispatcher_fd >= 0) {
+ close(_event_dispatcher_fd);
+ _event_dispatcher_fd = -1;
}
if (_wakeup_fds[0] > 0) {
close(_wakeup_fds[0]);
@@ -56,8 +55,8 @@ EventDispatcher::~EventDispatcher() {
}
}
-int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
- if (_epfd < 0) {
+int EventDispatcher::Start(const bthread_attr_t* thread_attr) {
+ if (_event_dispatcher_fd < 0) {
LOG(FATAL) << "kqueue was not created";
return -1;
}
@@ -68,14 +67,13 @@ int EventDispatcher::Start(const bthread_attr_t*
consumer_thread_attr) {
return -1;
}
- // Set _consumer_thread_attr before creating kqueue thread to make sure
+ // Set _thread_attr before creating kqueue thread to make sure
// everyting seems sane to the thread.
- _consumer_thread_attr = (consumer_thread_attr ?
- *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
+ _thread_attr = (thread_attr ? *thread_attr : BTHREAD_ATTR_NORMAL);
- //_consumer_thread_attr is used in StartInputEvent(), assign flag
NEVER_QUIT to it will cause new bthread
+ //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it
will cause new bthread
// that created by kevent() never to quit.
- bthread_attr_t kqueue_thread_attr = _consumer_thread_attr |
BTHREAD_NEVER_QUIT;
+ bthread_attr_t kqueue_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT;
// Polling thread uses the same attr for consumer threads (NORMAL right
// now). Previously, we used small stack (32KB) which may be overflowed
@@ -92,17 +90,17 @@ int EventDispatcher::Start(const bthread_attr_t*
consumer_thread_attr) {
}
bool EventDispatcher::Running() const {
- return !_stop && _epfd >= 0 && _tid != 0;
+ return !_stop && _event_dispatcher_fd >= 0 && _tid != 0;
}
void EventDispatcher::Stop() {
_stop = true;
- if (_epfd >= 0) {
+ if (_event_dispatcher_fd >= 0) {
struct kevent kqueue_event;
EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
0, 0, NULL);
- kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
+ kevent(_event_dispatcher_fd, &kqueue_event, 1, NULL, 0, NULL);
}
}
@@ -113,8 +111,9 @@ void EventDispatcher::Join() {
}
}
-int EventDispatcher::RegisterEvent(SocketId socket_id, int fd, bool pollin) {
- if (_epfd < 0) {
+int EventDispatcher::RegisterEvent(IOEventDataId event_data_id,
+ int fd, bool pollin) {
+ if (_event_dispatcher_fd < 0) {
errno = EINVAL;
return -1;
}
@@ -122,44 +121,44 @@ int EventDispatcher::RegisterEvent(SocketId socket_id,
int fd, bool pollin) {
struct kevent evt;
//TODO(zhujiashun): add EV_EOF
EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
+ 0, 0, (void*)event_data_id);
+ if (kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL) < 0) {
return -1;
}
if (pollin) {
EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
+ 0, 0, (void*)event_data_id);
+ if (kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL) < 0) {
return -1;
}
}
return 0;
}
-int EventDispatcher::UnregisterEvent(SocketId socket_id,
- int fd, bool pollin) {
+int EventDispatcher::UnregisterEvent(IOEventDataId event_data_id,
+ int fd, bool pollin) {
struct kevent evt;
EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
+ if (kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL) < 0) {
return -1;
}
if (pollin) {
EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- return kevent(_epfd, &evt, 1, NULL, 0, NULL);
+ 0, 0, (void*)event_data_id);
+ return kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL);
}
return 0;
}
-int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
- if (_epfd < 0) {
+int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) {
+ if (_event_dispatcher_fd < 0) {
errno = EINVAL;
return -1;
}
struct kevent evt;
EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
- 0, 0, (void*)socket_id);
- return kevent(_epfd, &evt, 1, NULL, 0, NULL);
+ 0, 0, (void*)event_data_id);
+ return kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL);
}
int EventDispatcher::RemoveConsumer(int fd) {
@@ -175,9 +174,9 @@ int EventDispatcher::RemoveConsumer(int fd) {
// program abnormal.
struct kevent evt;
EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
- kevent(_epfd, &evt, 1, NULL, 0, NULL);
+ kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL);
EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
- kevent(_epfd, &evt, 1, NULL, 0, NULL);
+ kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL);
return 0;
}
@@ -189,7 +188,7 @@ void* EventDispatcher::RunThis(void* arg) {
void EventDispatcher::Run() {
while (!_stop) {
struct kevent e[32];
- int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
+ int n = kevent(_event_dispatcher_fd, NULL, 0, e, ARRAY_SIZE(e), NULL);
if (_stop) {
// EV_SET/kevent should have some sort of memory fencing
// guaranteeing that we(after kevent) see _stop set before
@@ -201,20 +200,21 @@ void EventDispatcher::Run() {
// We've checked _stop, no wake-up will be missed.
continue;
}
- PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
+ PLOG(FATAL) << "Fail to kqueue epfd=" << _event_dispatcher_fd;
break;
}
for (int i = 0; i < n; ++i) {
if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
// We don't care about the return value.
- Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
- _consumer_thread_attr);
+ CallInputEventCallback((IOEventDataId)e[i].udata,
+ e[i].filter, _thread_attr);
}
}
for (int i = 0; i < n; ++i) {
if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
// We don't care about the return value.
- Socket::HandleEpollOut((SocketId)e[i].udata);
+ CallOutputEventCallback((IOEventDataId)e[i].udata,
+ e[i].filter, _thread_attr);
}
}
}
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 447bac5f..85aa1507 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -35,6 +35,7 @@
#include "butil/logging.h" // CHECK
#include "butil/macros.h"
#include "butil/class_name.h" // butil::class_name
+#include "butil/memory/scope_guard.h"
#include "brpc/log.h"
#include "brpc/reloadable_flags.h" // BRPC_VALIDATE_GFLAG
#include "brpc/errno.pb.h"
@@ -447,9 +448,9 @@ public:
static const uint64_t AUTH_FLAG = (1ul << 32);
-Socket::Socket(Forbidden)
+Socket::Socket(Forbidden f)
// must be even because Address() relies on evenness of version
- : _versioned_ref(0)
+ : VersionedRefWithId<Socket>(f)
, _shared_part(NULL)
, _nevent(0)
, _keytable_pool(NULL)
@@ -459,7 +460,6 @@ Socket::Socket(Forbidden)
, _on_edge_triggered_events(NULL)
, _user(NULL)
, _conn(NULL)
- , _this_id(0)
, _preferred_index(-1)
, _hc_count(0)
, _last_msg_size(0)
@@ -483,7 +483,6 @@ Socket::Socket(Forbidden)
, _overcrowded(false)
, _fail_me_at_server_stop(false)
, _logoff_flag(false)
- , _additional_ref_status(REF_USING)
, _error_code(0)
, _pipeline_q(NULL)
, _last_writetime_us(0)
@@ -494,8 +493,7 @@ Socket::Socket(Forbidden)
, _stream_set(NULL)
, _total_streams_unconsumed_size(0)
, _ninflight_app_health_check(0)
- , _http_request_method(HTTP_METHOD_GET)
-{
+ , _http_request_method(HTTP_METHOD_GET) {
CreateVarsOnce();
pthread_mutex_init(&_id_wait_list_mutex, NULL);
_epollout_butex = bthread::butex_create_checked<butil::atomic<int> >();
@@ -621,7 +619,7 @@ int Socket::ResetFileDescriptor(int fd) {
EnableKeepaliveIfNeeded(fd);
if (_on_edge_triggered_events) {
- if (GetGlobalEventDispatcher(fd, _bthread_tag).AddConsumer(id(), fd)
!= 0) {
+ if (_io_event.AddConsumer(fd) != 0) {
PLOG(ERROR) << "Fail to add SocketId=" << id()
<< " into EventDispatcher";
_fd.store(-1, butil::memory_order_release);
@@ -698,116 +696,260 @@ void Socket::EnableKeepaliveIfNeeded(int fd) {
// version: from version part of _versioned_nref, must be an EVEN number.
// slot: designated by ResourcePool.
int Socket::Create(const SocketOptions& options, SocketId* id) {
- butil::ResourceId<Socket> slot;
- Socket* const m = butil::get_resource(&slot, Forbidden());
- if (m == NULL) {
- LOG(FATAL) << "Fail to get_resource<Socket>";
+ return VersionedRefWithId<Socket>::Create(id, options);
+}
+
+int Socket::OnCreated(const SocketOptions& options) {
+ if (_io_event.Init((void*)id()) != 0) {
+ LOG(ERROR) << "Fail to init IOEvent";
+ SetFailed(ENOMEM, "%s", "Fail to init IOEvent");
return -1;
}
+ _io_event.set_bthread_tag(options.bthread_tag);
+ auto guard = butil::MakeScopeGuard([this] {
+ _io_event.Reset();
+ });
+
g_vars->nsocket << 1;
- CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
- m->_nevent.store(0, butil::memory_order_relaxed);
- m->_keytable_pool = options.keytable_pool;
- m->_tos = 0;
- m->_remote_side = options.remote_side;
- m->_on_edge_triggered_events = options.on_edge_triggered_events;
- m->_user = options.user;
- m->_conn = options.conn;
- m->_app_connect = options.app_connect;
- // nref can be non-zero due to concurrent AddressSocket().
- // _this_id will only be used in destructor/Destroy of referenced
- // slots, which is safe and properly fenced. Although it's better
- // to put the id into SocketUniquePtr.
- m->_this_id = MakeSocketId(
- VersionOfVRef(m->_versioned_ref.fetch_add(
- 1, butil::memory_order_release)), slot);
- m->_preferred_index = -1;
- m->_hc_count = 0;
- CHECK(m->_read_buf.empty());
+ CHECK(NULL == _shared_part.load(butil::memory_order_relaxed));
+ _nevent.store(0, butil::memory_order_relaxed);
+ _keytable_pool = options.keytable_pool;
+ _tos = 0;
+ _remote_side = options.remote_side;
+ _on_edge_triggered_events = options.on_edge_triggered_events;
+ _user = options.user;
+ _conn = options.conn;
+ _app_connect = options.app_connect;
+ _preferred_index = -1;
+ _hc_count = 0;
+ CHECK(_read_buf.empty());
const int64_t cpuwide_now = butil::cpuwide_time_us();
- m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
- m->reset_parsing_context(options.initial_parsing_context);
- m->_correlation_id = 0;
- m->_health_check_interval_s = options.health_check_interval_s;
- m->_is_hc_related_ref_held = false;
- m->_hc_started.store(false, butil::memory_order_relaxed);
- m->_ninprocess.store(1, butil::memory_order_relaxed);
- m->_auth_flag_error.store(0, butil::memory_order_relaxed);
- const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
+ _last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
+ reset_parsing_context(options.initial_parsing_context);
+ _correlation_id = 0;
+ _health_check_interval_s = options.health_check_interval_s;
+ _is_hc_related_ref_held = false;
+ _hc_started.store(false, butil::memory_order_relaxed);
+ _ninprocess.store(1, butil::memory_order_relaxed);
+ _auth_flag_error.store(0, butil::memory_order_relaxed);
+ const int rc2 = bthread_id_create(&_auth_id, NULL, NULL);
if (rc2) {
LOG(ERROR) << "Fail to create auth_id: " << berror(rc2);
- m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
+ SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
return -1;
}
- m->_force_ssl = options.force_ssl;
+ _force_ssl = options.force_ssl;
// Disable SSL check if there is no SSL context
- m->_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
- m->_ssl_session = NULL;
- m->_ssl_ctx = options.initial_ssl_ctx;
+ _ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
+ _ssl_session = NULL;
+ _ssl_ctx = options.initial_ssl_ctx;
#if BRPC_WITH_RDMA
- CHECK(m->_rdma_ep == NULL);
+ CHECK(_rdma_ep == NULL);
if (options.use_rdma) {
- m->_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
- if (!m->_rdma_ep) {
+ _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
+ if (!_rdma_ep) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to create RdmaEndpoint";
- m->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
+ SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
berror(saved_errno));
return -1;
}
- m->_rdma_state = RDMA_UNKNOWN;
+ _rdma_state = RDMA_UNKNOWN;
} else {
- m->_rdma_state = RDMA_OFF;
+ _rdma_state = RDMA_OFF;
}
#endif
- m->_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
- m->_controller_released_socket.store(false, butil::memory_order_relaxed);
- m->_overcrowded = false;
- // May be non-zero for RTMP connections.
- m->_fail_me_at_server_stop = false;
- m->_logoff_flag.store(false, butil::memory_order_relaxed);
- m->_additional_ref_status.store(REF_USING, butil::memory_order_relaxed);
- m->_error_code = 0;
- m->_error_text.clear();
- m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
- m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
- m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
+ _connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
+ _controller_released_socket.store(false, butil::memory_order_relaxed);
+ _overcrowded = false;
+ // Maybe non-zero for RTMP connections.
+ _fail_me_at_server_stop = false;
+ _logoff_flag.store(false, butil::memory_order_relaxed);
+ _error_code = 0;
+ _agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
+ _total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
+ _ninflight_app_health_check.store(0, butil::memory_order_relaxed);
// NOTE: last two params are useless in bthread > r32787
- const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
+ const int rc = bthread_id_list_init(&_id_wait_list, 512, 512);
if (rc) {
LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc);
- m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
+ SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
return -1;
}
- m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
- m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
- m->_keepalive_options = options.keepalive_options;
- m->_bthread_tag = options.bthread_tag;
- CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
- m->_is_write_shutdown = false;
- // Must be last one! Internal fields of this Socket may be access
+ _last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
+ _unwritten_bytes.store(0, butil::memory_order_relaxed);
+ _keepalive_options = options.keepalive_options;
+ CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
+ _is_write_shutdown = false;
+ // Must be the last one! Internal fields of this Socket may be accessed
// just after calling ResetFileDescriptor.
- if (m->ResetFileDescriptor(options.fd) != 0) {
+ if (ResetFileDescriptor(options.fd) != 0) {
const int saved_errno = errno;
PLOG(ERROR) << "Fail to ResetFileDescriptor";
- m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
+ SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
berror(saved_errno));
return -1;
}
- *id = m->_this_id;
+ guard.dismiss();
+
return 0;
}
+void Socket::BeforeRecycled() {
+ const bool create_by_connect = CreatedByConnect();
+ if (_app_connect) {
+ std::shared_ptr<AppConnect> tmp;
+ _app_connect.swap(tmp);
+ tmp->StopConnect(this);
+ }
+ if (_conn) {
+ SocketConnection* const saved_conn = _conn;
+ _conn = NULL;
+ saved_conn->BeforeRecycle(this);
+ }
+ if (_user) {
+ SocketUser* const saved_user = _user;
+ _user = NULL;
+ saved_user->BeforeRecycle(this);
+ }
+ SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire);
+ if (sp) {
+ sp->RemoveRefManually();
+ }
+
+ const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
+ if (ValidFileDescriptor(prev_fd)) {
+ if (_on_edge_triggered_events != NULL) {
+ _io_event.RemoveConsumer(prev_fd);
+ }
+ close(prev_fd);
+ if (create_by_connect) {
+ g_vars->channel_conn << -1;
+ }
+ }
+ _io_event.Reset();
+
+#if BRPC_WITH_RDMA
+ if (_rdma_ep) {
+ delete _rdma_ep;
+ _rdma_ep = NULL;
+ _rdma_state = RDMA_UNKNOWN;
+ }
+#endif
+
+ reset_parsing_context(NULL);
+ _read_buf.clear();
+
+ _auth_flag_error.store(0, butil::memory_order_relaxed);
+ bthread_id_error(_auth_id, 0);
+
+ bthread_id_list_destroy(&_id_wait_list);
+
+ if (_ssl_session) {
+ SSL_free(_ssl_session);
+ _ssl_session = NULL;
+ }
+
+ _ssl_ctx = NULL;
+
+ delete _pipeline_q;
+ _pipeline_q = NULL;
+
+ delete _auth_context;
+ _auth_context = NULL;
+
+ delete _stream_set;
+ _stream_set = NULL;
+
+ const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed);
+ if (asid != INVALID_SOCKET_ID) {
+ SocketUniquePtr ptr;
+ if (Socket::Address(asid, &ptr) == 0) {
+ ptr->ReleaseAdditionalReference();
+ }
+ }
+ g_vars->nsocket << -1;
+}
+
+void Socket::OnFailed(int error_code, const std::string& error_text) {
+ // Update _error_text
+ pthread_mutex_lock(&_id_wait_list_mutex);
+ _error_code = error_code;
+ _error_text = error_text;
+ pthread_mutex_unlock(&_id_wait_list_mutex);
+
+ // Do health-checking even if we're not connected before, needed
+ // by Channel to revive never-connected socket when server side
+ // comes online.
+ if (HCEnabled()) {
+ bool expect = false;
+ if (_hc_started.compare_exchange_strong(expect,
+ true,
+ butil::memory_order_relaxed,
+ butil::memory_order_relaxed)) {
+ GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
+ StartHealthCheck(id(),
+ GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
+ } else {
+ // No need to run 2 health checking at the same time.
+ RPC_VLOG << "There is already a health checking running "
+ "for SocketId=" << id();
+ }
+ }
+ // Wake up all threads waiting on EPOLLOUT when closing fd
+ _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
+ bthread::butex_wake_all(_epollout_butex);
+
+ // Wake up all unresponded RPC.
+ CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe(
+ &_id_wait_list, error_code, error_text,
+ &_id_wait_list_mutex));
+ ResetAllStreams(error_code, error_text);
+ // _app_connect shouldn't be set to NULL in SetFailed otherwise
+ // HC is always not supported.
+ // FIXME: Design a better interface for AppConnect
+ // if (_app_connect) {
+ // AppConnect* const saved_app_connect = _app_connect;
+ // _app_connect = NULL;
+ // saved_app_connect->StopConnect(this);
+ // }
+}
+
+void Socket::AfterRevived() {
+ if (_user) {
+ _user->AfterRevived(this);
+ } else {
+ LOG(INFO) << "Revived " << description() << " (Connectable)";
+ }
+}
+
+std::string Socket::OnDescription() const {
+ // NOTE: The output of `description()' should be consistent with
operator<<()
+ std::string result;
+ result.reserve(64);
+ const int saved_fd = fd();
+ if (saved_fd >= 0) {
+ butil::string_appendf(&result, "fd=%d", saved_fd);
+ }
+ butil::string_appendf(&result, " addr=%s",
+ butil::endpoint2str(remote_side()).c_str());
+ const int local_port = local_side().port;
+ if (local_port > 0) {
+ butil::string_appendf(&result, ":%d", local_port);
+ }
+ return result;
+}
+
int Socket::WaitAndReset(int32_t expected_nref) {
- const uint32_t id_ver = VersionOfSocketId(_this_id);
+ const uint32_t id_ver = VersionOfVRefId(id());
uint64_t vref;
// Wait until nref == expected_nref.
- while (1) {
+ while (true) {
// The acquire fence pairs with release fence in Dereference to avoid
// inconsistent states to be seen by others.
- vref = _versioned_ref.load(butil::memory_order_acquire);
+ vref = versioned_ref();
if (VersionOfVRef(vref) != id_ver + 1) {
- LOG(WARNING) << "SocketId=" << _this_id << " is already alive or
recycled";
+ LOG(WARNING) << "SocketId=" << id() << " is already alive or
recycled";
return -1;
}
if (NRefOfVRef(vref) > expected_nref) {
@@ -816,7 +958,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
return -1;
}
} else if (NRefOfVRef(vref) < expected_nref) {
- RPC_VLOG << "SocketId=" << _this_id
+ RPC_VLOG << "SocketId=" << id()
<< " was abandoned during health checking";
return -1;
} else {
@@ -824,7 +966,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
// so no need to do health checking.
if (!_is_hc_related_ref_held) {
RPC_VLOG << "Nobody holds a health-checking-related reference"
- << " for SocketId=" << _this_id;
+ << " for SocketId=" << id();
return -1;
}
@@ -836,7 +978,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
if (ValidFileDescriptor(prev_fd)) {
if (_on_edge_triggered_events != NULL) {
- GetGlobalEventDispatcher(prev_fd,
_bthread_tag).RemoveConsumer(prev_fd);
+ _io_event.RemoveConsumer(prev_fd);
}
close(prev_fd);
if (CreatedByConnect()) {
@@ -892,59 +1034,6 @@ int Socket::WaitAndReset(int32_t expected_nref) {
return 0;
}
-// We don't care about the return value of Revive.
-void Socket::Revive() {
- const uint32_t id_ver = VersionOfSocketId(_this_id);
- uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
- _additional_ref_status.store(REF_REVIVING, butil::memory_order_relaxed);
- while (1) {
- CHECK_EQ(id_ver + 1, VersionOfVRef(vref));
-
- int32_t nref = NRefOfVRef(vref);
- if (nref <= 1) {
- // Set status to REF_RECYLED since no one uses this socket
- _additional_ref_status.store(REF_RECYCLED,
butil::memory_order_relaxed);
- CHECK_EQ(1, nref);
- LOG(WARNING) << *this << " was abandoned during revival";
- return;
- }
- // +1 is the additional ref added in Create(). TODO(gejun): we should
- // remove this additional nref someday.
- if (_versioned_ref.compare_exchange_weak(
- vref, MakeVRef(id_ver, nref + 1/*note*/),
- butil::memory_order_release,
- butil::memory_order_relaxed)) {
- // Set status to REF_USING since we add additional ref again
- _additional_ref_status.store(REF_USING,
butil::memory_order_relaxed);
- if (_user) {
- _user->AfterRevived(this);
- } else {
- LOG(INFO) << "Revived " << *this << " (Connectable)";
- }
- return;
- }
- }
-}
-
-int Socket::ReleaseAdditionalReference() {
- do {
- AdditionalRefStatus expect = REF_USING;
- if (_additional_ref_status.compare_exchange_strong(
- expect,
- REF_RECYCLED,
- butil::memory_order_relaxed,
- butil::memory_order_relaxed)) {
- return Dereference();
- }
-
- if (expect == REF_REVIVING) { // sched_yield to wait until status is
not REF_REVIVING
- sched_yield();
- } else {
- return -1; // REF_RECYCLED
- }
- } while (1);
-}
-
void Socket::AddRecentError() {
SharedPart* sp = GetSharedPart();
if (sp) {
@@ -968,87 +1057,6 @@ int Socket::isolated_times() const {
return 0;
}
-int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
- if (error_code == 0) {
- CHECK(false) << "error_code is 0";
- error_code = EFAILEDSOCKET;
- }
- const uint32_t id_ver = VersionOfSocketId(_this_id);
- uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
- for (;;) { // need iteration to retry compare_exchange_strong
- if (VersionOfVRef(vref) != id_ver) {
- return -1;
- }
- // Try to set version=id_ver+1 (to make later Address() return NULL),
- // retry on fail.
- if (_versioned_ref.compare_exchange_strong(
- vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
- butil::memory_order_release,
- butil::memory_order_relaxed)) {
- // Update _error_text
- std::string error_text;
- if (error_fmt != NULL) {
- va_list ap;
- va_start(ap, error_fmt);
- butil::string_vprintf(&error_text, error_fmt, ap);
- va_end(ap);
- }
- pthread_mutex_lock(&_id_wait_list_mutex);
- _error_code = error_code;
- _error_text = error_text;
- pthread_mutex_unlock(&_id_wait_list_mutex);
-
- // Do health-checking even if we're not connected before, needed
- // by Channel to revive never-connected socket when server side
- // comes online.
- if (HCEnabled()) {
- bool expect = false;
- if (_hc_started.compare_exchange_strong(expect,
- true,
-
butil::memory_order_relaxed,
-
butil::memory_order_relaxed)) {
- GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
- StartHealthCheck(id(),
-
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
- } else {
- // No need to run 2 health checking at the same time.
- RPC_VLOG << "There is already a health checking running "
- "for SocketId=" << _this_id;
- }
- }
- // Wake up all threads waiting on EPOLLOUT when closing fd
- _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
- bthread::butex_wake_all(_epollout_butex);
-
- // Wake up all unresponded RPC.
- CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe(
- &_id_wait_list, error_code, error_text,
- &_id_wait_list_mutex));
-
- ResetAllStreams(error_code, error_text);
- // _app_connect shouldn't be set to NULL in SetFailed otherwise
- // HC is always not supported.
- // FIXME: Design a better interface for AppConnect
- // if (_app_connect) {
- // AppConnect* const saved_app_connect = _app_connect;
- // _app_connect = NULL;
- // saved_app_connect->StopConnect(this);
- // }
-
- // Deref additionally which is added at creation so that this
- // Socket's reference will hit 0(recycle) when no one addresses it.
- ReleaseAdditionalReference();
- // NOTE: This Socket may be recycled at this point, don't
- // touch anything.
- return 0;
- }
- }
-}
-
-int Socket::SetFailed() {
- return SetFailed(EFAILEDSOCKET, NULL);
-}
-
void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code,
latency_us)) {
if (SetFailed(main_socket_id()) == 0) {
@@ -1075,12 +1083,29 @@ int Socket::ReleaseReferenceIfIdle(int idle_seconds) {
return ReleaseAdditionalReference();
}
+
+int Socket::SetFailed() {
+ return SetFailed(EFAILEDSOCKET, NULL);
+}
+
+int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
+ std::string error_text;
+ if (error_fmt != NULL) {
+ va_list ap;
+ va_start(ap, error_fmt);
+ butil::string_vprintf(&error_text, error_fmt, ap);
+ va_end(ap);
+ }
+ return VersionedRefWithId<Socket>::SetFailed(error_code, error_text);
+}
+
int Socket::SetFailed(SocketId id) {
SocketUniquePtr ptr;
if (Address(id, &ptr) != 0) {
return -1;
}
- return ptr->SetFailed();
+
+ return ptr->SetFailed(EFAILEDSOCKET, NULL);
}
void Socket::NotifyOnFailed(bthread_id_t id) {
@@ -1101,16 +1126,16 @@ void Socket::NotifyOnFailed(bthread_id_t id) {
// For unit-test.
int Socket::Status(SocketId id, int32_t* nref) {
- const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
+ const butil::ResourceId<Socket> slot = SlotOfVRefId<Socket>(id);
Socket* const m = address_resource(slot);
if (m != NULL) {
- const uint64_t vref =
m->_versioned_ref.load(butil::memory_order_relaxed);
- if (VersionOfVRef(vref) == VersionOfSocketId(id)) {
+ const uint64_t vref = m->versioned_ref();
+ if (VersionOfVRef(vref) == VersionOfVRefId(id)) {
if (nref) {
*nref = NRefOfVRef(vref);
}
return 0;
- } else if (VersionOfVRef(vref) == VersionOfSocketId(id) + 1) {
+ } else if (VersionOfVRef(vref) == VersionOfVRefId(id) + 1) {
if (nref) {
*nref = NRefOfVRef(vref);
}
@@ -1120,81 +1145,6 @@ int Socket::Status(SocketId id, int32_t* nref) {
return -1;
}
-void Socket::OnRecycle() {
- const bool create_by_connect = CreatedByConnect();
- if (_app_connect) {
- std::shared_ptr<AppConnect> tmp;
- _app_connect.swap(tmp);
- tmp->StopConnect(this);
- }
- if (_conn) {
- SocketConnection* const saved_conn = _conn;
- _conn = NULL;
- saved_conn->BeforeRecycle(this);
- }
- if (_user) {
- SocketUser* const saved_user = _user;
- _user = NULL;
- saved_user->BeforeRecycle(this);
- }
- SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire);
- if (sp) {
- sp->RemoveRefManually();
- }
- const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
- if (ValidFileDescriptor(prev_fd)) {
- if (_on_edge_triggered_events != NULL) {
- GetGlobalEventDispatcher(prev_fd,
_bthread_tag).RemoveConsumer(prev_fd);
- }
- close(prev_fd);
- if (create_by_connect) {
- g_vars->channel_conn << -1;
- }
- }
-
-#if BRPC_WITH_RDMA
- if (_rdma_ep) {
- delete _rdma_ep;
- _rdma_ep = NULL;
- _rdma_state = RDMA_UNKNOWN;
- }
-#endif
-
- reset_parsing_context(NULL);
- _read_buf.clear();
-
- _auth_flag_error.store(0, butil::memory_order_relaxed);
- bthread_id_error(_auth_id, 0);
-
- bthread_id_list_destroy(&_id_wait_list);
-
- if (_ssl_session) {
- SSL_free(_ssl_session);
- _ssl_session = NULL;
- }
-
- _ssl_ctx = NULL;
-
- delete _pipeline_q;
- _pipeline_q = NULL;
-
- delete _auth_context;
- _auth_context = NULL;
-
- delete _stream_set;
- _stream_set = NULL;
-
- const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed);
- if (asid != INVALID_SOCKET_ID) {
- SocketUniquePtr ptr;
- if (Socket::Address(asid, &ptr) == 0) {
- ptr->ReleaseAdditionalReference();
- }
- }
-
- g_vars->nsocket << -1;
-}
-
void* Socket::ProcessEvent(void* arg) {
// the enclosed Socket is valid and free to access inside this function.
SocketUniquePtr s(static_cast<Socket*>(arg));
@@ -1272,8 +1222,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const
timespec* abstime) {
// Do not need to check addressable since it will be called by
// health checker which called `SetFailed' before
const int expected_val =
_epollout_butex->load(butil::memory_order_relaxed);
- EventDispatcher& edisp = GetGlobalEventDispatcher(fd, _bthread_tag);
- if (edisp.RegisterEvent(id(), fd, pollin) != 0) {
+ if (_io_event.RegisterEvent(fd, pollin) != 0) {
return -1;
}
@@ -1285,7 +1234,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const
timespec* abstime) {
}
// Ignore return value since `fd' might have been removed
// by `RemoveConsumer' in `SetFailed'
- butil::ignore_result(edisp.UnregisterEvent(id(), fd, pollin));
+ butil::ignore_result(_io_event.UnregisterEvent(fd, pollin));
errno = saved_errno;
// Could be writable or spurious wakeup (by former epollout)
return rc;
@@ -1333,7 +1282,7 @@ int Socket::Connect(const timespec* abstime,
// be added into epoll device soon
SocketId connect_id;
SocketOptions options;
- options.bthread_tag = _bthread_tag;
+ options.bthread_tag = _io_event.bthread_tag();
options.user = req;
if (Socket::Create(options, &connect_id) != 0) {
LOG(FATAL) << "Fail to create Socket";
@@ -1348,8 +1297,7 @@ int Socket::Connect(const timespec* abstime,
// Add `sockfd' into epoll so that `HandleEpollOutRequest' will
// be called with `req' when epoll event reaches
- if (GetGlobalEventDispatcher(sockfd,
_bthread_tag).RegisterEvent(connect_id, sockfd, false) !=
- 0) {
+ if (s->_io_event.RegisterEvent(sockfd, false) != 0) {
const int saved_errno = errno;
PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";
s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",
@@ -1417,7 +1365,7 @@ int Socket::ConnectIfNot(const timespec* abstime,
WriteRequest* req) {
return 0;
}
// Set tag for client side socket
- _bthread_tag = bthread_self_tag();
+ _io_event.set_bthread_tag(bthread_self_tag());
// Have to hold a reference for `req'
SocketUniquePtr s;
ReAddress(&s);
@@ -1440,7 +1388,9 @@ void Socket::WakeAsEpollOut() {
bthread::butex_wake_except(_epollout_butex, 0);
}
-int Socket::HandleEpollOut(SocketId id) {
+int Socket::OnOutputEvent(void* user_data, uint32_t,
+ const bthread_attr_t&) {
+ auto id = reinterpret_cast<SocketId>(user_data);
SocketUniquePtr s;
// Since Sockets might have been `SetFailed' before they were
// added into epoll, these sockets miss the signal inside
@@ -1486,7 +1436,7 @@ int Socket::HandleEpollOutRequest(int error_code,
EpollOutRequest* req) {
}
// We've got the right to call user callback
// The timer will be removed inside destructor of EpollOutRequest
- GetGlobalEventDispatcher(req->fd, _bthread_tag).UnregisterEvent(id(),
req->fd, false);
+ butil::ignore_result(_io_event.UnregisterEvent(req->fd, false));
return req->on_epollout_event(req->fd, error_code, req->data);
}
@@ -2229,8 +2179,9 @@ AuthContext* Socket::mutable_auth_context() {
return _auth_context;
}
-int Socket::StartInputEvent(SocketId id, uint32_t events,
- const bthread_attr_t& thread_attr) {
+int Socket::OnInputEvent(void* user_data, uint32_t events,
+ const bthread_attr_t& thread_attr) {
+ auto id = reinterpret_cast<SocketId>(user_data);
SocketUniquePtr s;
if (Address(id, &s) < 0) {
return -1;
@@ -2324,7 +2275,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
// }
os << "# This is a broken Socket\n";
}
- const uint64_t vref =
ptr->_versioned_ref.load(butil::memory_order_relaxed);
+ const uint64_t vref = ptr->versioned_ref();
size_t npipelined = 0;
size_t idsizes[4];
size_t nidsize = 0;
@@ -2384,7 +2335,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\nlocal_side=" << ptr->_local_side
<< "\non_et_events=" << (void*)ptr->_on_edge_triggered_events
<< "\nuser=" << ShowObject(ptr->_user)
- << "\nthis_id=" << ptr->_this_id
+ << "\nthis_id=" << ptr->id()
<< "\npreferred_index=" << preferred_index;
InputMessenger* messenger = dynamic_cast<InputMessenger*>(ptr->user());
if (messenger != NULL) {
@@ -2429,7 +2380,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
<< "\nauth_context=" << ptr->_auth_context
<< "\nlogoff_flag=" <<
ptr->_logoff_flag.load(butil::memory_order_relaxed)
<< "\n_additional_ref_status="
- << ptr->_additional_ref_status.load(butil::memory_order_relaxed)
+ << ptr->additional_ref_status()
<< "\ntotal_streams_buffer_size="
<< ptr->_total_streams_unconsumed_size.load(butil::memory_order_relaxed)
<< "\nninflight_app_health_check="
@@ -2570,7 +2521,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
ptr->_rdma_ep->DebugInfo(os);
}
#endif
- { os << "\nbthread_tag=" << ptr->_bthread_tag; }
+ { os << "\nbthread_tag=" << ptr->_io_event.bthread_tag(); }
}
int Socket::CheckHealth() {
@@ -2620,7 +2571,7 @@ void Socket::ResetAllStreams(int error_code, const
std::string& error_text) {
if (_stream_set != NULL) {
// Not delete _stream_set because there are likely more streams added
// after reviving if the Socket is still in use, or it is to be
deleted in
- // OnRecycle()
+ // BeforeRecycled()
saved_stream_set.swap(*_stream_set);
}
_stream_mutex.unlock();
@@ -3000,25 +2951,6 @@ void Socket::OnProgressiveReadCompleted() {
}
}
-std::string Socket::description() const {
- // NOTE: The output should be consistent with operator<<()
- std::string result;
- result.reserve(64);
- butil::string_appendf(&result, "Socket{id=%" PRIu64, id());
- const int saved_fd = fd();
- if (saved_fd >= 0) {
- butil::string_appendf(&result, " fd=%d", saved_fd);
- }
- butil::string_appendf(&result, " addr=%s",
- butil::endpoint2str(remote_side()).c_str());
- const int local_port = local_side().port;
- if (local_port > 0) {
- butil::string_appendf(&result, ":%d", local_port);
- }
- butil::string_appendf(&result, "} (0x%p)", this);
- return result;
-}
-
SocketSSLContext::SocketSSLContext()
: raw_ctx(NULL)
{}
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index 97ce5685..6d9c8f11 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -38,7 +38,9 @@
#include "brpc/socket_id.h" // SocketId
#include "brpc/socket_message.h" // SocketMessagePtr
#include "bvar/bvar.h"
-#include "http_method.h"
+#include "brpc/http_method.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/versioned_ref_with_id.h"
namespace brpc {
namespace policy {
@@ -282,7 +284,7 @@ struct SocketOptions {
// Abstractions on reading from and writing into file descriptors.
// NOTE: accessed by multiple threads(frequently), align it by cacheline.
-class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket {
+class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket : public
VersionedRefWithId<Socket> {
friend class EventDispatcher;
friend class InputMessenger;
friend class Acceptor;
@@ -299,16 +301,18 @@ friend class HealthCheckTask;
friend class OnAppHealthCheckDone;
friend class HealthCheckManager;
friend class policy::H2GlobalStreamCreator;
+friend class VersionedRefWithId<Socket>;
+friend class IOEvent<Socket>;
+friend void DereferenceSocket(Socket*);
class SharedPart;
- struct Forbidden {};
struct WriteRequest;
public:
const static int STREAM_FAKE_FD = INT_MAX;
// NOTE: User cannot create Socket from constructor. Use Create()
// instead. It's public just because of requirement of ResourcePool.
- Socket(Forbidden);
- ~Socket();
+ explicit Socket(Forbidden);
+ ~Socket() override;
// Write `msg' into this Socket and clear it. The `msg' should be an
// intact request or response. To prevent messages from interleaving
@@ -419,9 +423,6 @@ public:
// After health checking is complete, set _hc_started to false.
void AfterHCCompleted() { _hc_started.store(false,
butil::memory_order_relaxed); }
- // The unique identifier.
- SocketId id() const { return _this_id; }
-
// `user' parameter passed to Create().
SocketUser* user() const { return _user; }
@@ -446,24 +447,9 @@ public:
AuthContext* mutable_auth_context();
// Create a Socket according to `options', put the identifier into `id'.
- // Returns 0 on sucess, -1 otherwise.
+ // Returns 0 on success, -1 otherwise.
static int Create(const SocketOptions& options, SocketId* id);
- // Place the Socket associated with identifier `id' into unique_ptr `ptr',
- // which will be released automatically when out of scope (w/o explicit
- // std::move). User can still access `ptr' after calling ptr->SetFailed()
- // before release of `ptr'.
- // This function is wait-free.
- // Returns 0 on success, -1 when the Socket was SetFailed().
- static int Address(SocketId id, SocketUniquePtr* ptr);
-
- // Re-address current socket into `ptr'.
- // Always succeed even if this socket is failed.
- void ReAddress(SocketUniquePtr* ptr);
-
- // Returns 0 on success, 1 on failed socket, -1 on recycled.
- static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr);
-
// Mark this Socket or the Socket associated with `id' as failed.
// Any later Address() of the identifier shall return NULL unless the
// Socket was revivied by StartHealthCheck. The Socket is NOT recycled
@@ -486,20 +472,10 @@ public:
void FeedbackCircuitBreaker(int error_code, int64_t latency_us);
- bool Failed() const;
-
- bool DidReleaseAdditionalRereference() const {
- return _additional_ref_status.load(butil::memory_order_relaxed) ==
REF_RECYCLED;
- }
-
// Notify `id' object (by calling bthread_id_error) when this Socket
// has been `SetFailed'. If it already has, notify `id' immediately
void NotifyOnFailed(bthread_id_t id);
- // Release the additional reference which added inside `Create'
- // before so that `Socket' will be recycled automatically once
- // on one is addressing it.
- int ReleaseAdditionalReference();
// `ReleaseAdditionalReference' this Socket iff it has no data
// transmission during the last `idle_seconds'
int ReleaseReferenceIfIdle(int idle_seconds);
@@ -515,8 +491,8 @@ public:
// Start to process edge-triggered events from the fd.
// This function does not block caller.
- static int StartInputEvent(SocketId id, uint32_t events,
- const bthread_attr_t& thread_attr);
+ static int OnInputEvent(void* user_data, uint32_t events,
+ const bthread_attr_t& thread_attr);
static const int PROGRESS_INIT = 1;
bool MoreReadEvents(int* progress);
@@ -654,9 +630,6 @@ public:
_last_writetime_us.load(butil::memory_order_relaxed));
}
- // A brief description of this socket, consistent with os << *this
- std::string description() const;
-
// Returns true if the remote side is overcrowded.
bool is_overcrowded() const { return _overcrowded; }
@@ -678,8 +651,20 @@ private:
int ConductError(bthread_id_t);
int StartWrite(WriteRequest*, const WriteOptions&);
- int Dereference();
-friend void DereferenceSocket(Socket*);
+ // Create a Socket according to `options', put the identifier into `id'.
+ // Returns 0 on success, -1 otherwise.
+ int OnCreated(const SocketOptions& options);
+
+ // Called before returning to pool.
+ void BeforeRecycled();
+
+ void OnFailed(int error_code, const std::string& error_text);
+
+ // Make this socket addressable again.
+ void AfterRevived();
+
+ std::string OnDescription() const;
+
static int Status(SocketId, int32_t* nref = NULL); // for unit-test.
@@ -701,9 +686,6 @@ friend void DereferenceSocket(Socket*);
// success, -1 otherwise and errno is set
ssize_t DoWrite(WriteRequest* req);
- // Called before returning to pool.
- void OnRecycle();
-
// [Not thread-safe] Wait for EPOLLOUT event on `fd'. If `pollin' is
// true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
// be used instead of EPOLL_CTL_ADD. Note that spurious wakeups may
@@ -735,9 +717,6 @@ friend void DereferenceSocket(Socket*);
// Wait until nref hits `expected_nref' and reset some internal resources.
int WaitAndReset(int32_t expected_nref);
- // Make this socket addressable again.
- void Revive();
-
static void* ProcessEvent(void*);
static void* KeepWrite(void*);
@@ -755,8 +734,9 @@ friend void DereferenceSocket(Socket*);
// Try to wake socket just like epollout has arrived
void WakeAsEpollOut();
- // Generic callback for Socket to handle epollout event
- static int HandleEpollOut(SocketId socket_id);
+ // Generic callback for Socket to handle output event.
+ static int OnOutputEvent(void* user_data, uint32_t,
+ const bthread_attr_t&);
class EpollOutRequest;
// Callback to handle epollout event whose request data
@@ -811,16 +791,6 @@ friend void DereferenceSocket(Socket*);
void CancelUnwrittenBytes(size_t bytes);
private:
- // unsigned 32-bit version + signed 32-bit referenced-count.
- // Meaning of version:
- // * Created version: no SetFailed() is called on the Socket yet. Must be
- // same evenness with initial _versioned_ref because during lifetime of
- // a Socket on the slot, the version is added with 1 twice. This is
- // also the version encoded in SocketId.
- // * Failed version: = created version + 1, SetFailed()-ed but returned.
- // * Other versions: the socket is already recycled.
- butil::atomic<uint64_t> _versioned_ref;
-
// In/Out bytes/messages, SocketPool etc
// _shared_part is shared by a main socket and all its pooled sockets.
// Can't use intrusive_ptr because the creation is based on optimistic
@@ -838,7 +808,6 @@ private:
// [ Set in ResetFileDescriptor ]
butil::atomic<int> _fd; // -1 when not connected.
- bthread_tag_t _bthread_tag; // bthread tag of this socket
int _tos; // Type of service which is actually only 8bits.
int64_t _reset_fd_real_us; // When _fd was reset, in microseconds.
@@ -864,8 +833,7 @@ private:
// Initialized by SocketOptions.app_connect.
std::shared_ptr<AppConnect> _app_connect;
- // Identifier of this Socket in ResourcePool
- SocketId _this_id;
+ IOEvent<Socket> _io_event;
// last chosen index of the protocol as a heuristic value to avoid
// iterating all protocol handlers each time.
@@ -953,21 +921,6 @@ private:
// Set by SetLogOff
butil::atomic<bool> _logoff_flag;
- // Status flag used to mark that
- enum AdditionalRefStatus {
- REF_USING, // additional reference has been increased
- REF_REVIVING, // additional reference is increasing
- REF_RECYCLED // additional reference has been decreased
- };
-
- // Indicates whether additional reference has increased,
- // decreased, or is increasing.
- // additional ref status:
- // `Socket'、`Create': REF_USING
- // `SetFailed': REF_USING -> REF_RECYCLED
- // `Revive' REF_RECYCLED -> REF_REVIVING -> REF_USING
- butil::atomic<AdditionalRefStatus> _additional_ref_status;
-
// Concrete error information from SetFailed()
// Accesses to these 2 fields(especially _error_text) must be protected
// by _id_wait_list_mutex
diff --git a/src/brpc/socket_id.h b/src/brpc/socket_id.h
index 72043247..e000c98b 100644
--- a/src/brpc/socket_id.h
+++ b/src/brpc/socket_id.h
@@ -22,9 +22,7 @@
// To brpc developers: This is a header included by user, don't depend
// on internal structures, use opaque pointers instead.
-#include <stdint.h> // uint64_t
-#include "butil/unique_ptr.h" // std::unique_ptr
-
+#include "brpc/versioned_ref_with_id.h"
namespace brpc {
@@ -32,21 +30,25 @@ namespace brpc {
// Users shall store SocketId instead of Sockets and call Socket::Address()
// to convert the identifier to an unique_ptr at each access. Whenever a
// unique_ptr is not destructed, the enclosed Socket will not be recycled.
-typedef uint64_t SocketId;
+typedef VRefId SocketId;
-const SocketId INVALID_SOCKET_ID = (SocketId)-1;
+const SocketId INVALID_SOCKET_ID = INVALID_VREF_ID;
class Socket;
extern void DereferenceSocket(Socket*);
-struct SocketDeleter {
+// Explicit (full) template specialization to ignore compiler error,
+// because Socket is an incomplete type where only this header is included.
+template<>
+struct VersionedRefWithIdDeleter<Socket> {
void operator()(Socket* m) const {
DereferenceSocket(m);
}
};
-typedef std::unique_ptr<Socket, SocketDeleter> SocketUniquePtr;
+typedef VersionedRefWithIdUniquePtr<Socket> SocketUniquePtr;
+
} // namespace brpc
diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h
index 6af9e8f1..a8ff3ce8 100644
--- a/src/brpc/socket_inl.h
+++ b/src/brpc/socket_inl.h
@@ -23,35 +23,6 @@
namespace brpc {
-// Utility functions to combine and extract SocketId.
-BUTIL_FORCE_INLINE SocketId
-MakeSocketId(uint32_t version, butil::ResourceId<Socket> slot) {
- return SocketId((((uint64_t)version) << 32) | slot.value);
-}
-
-BUTIL_FORCE_INLINE butil::ResourceId<Socket> SlotOfSocketId(SocketId sid) {
- butil::ResourceId<Socket> id = { (sid & 0xFFFFFFFFul) };
- return id;
-}
-
-BUTIL_FORCE_INLINE uint32_t VersionOfSocketId(SocketId sid) {
- return (uint32_t)(sid >> 32);
-}
-
-// Utility functions to combine and extract Socket::_versioned_ref
-BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
- return (uint32_t)(vref >> 32);
-}
-
-BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
- return (int32_t)(vref & 0xFFFFFFFFul);
-}
-
-BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) {
- // 1: Intended conversion to uint32_t, nref=-1 is 00000000FFFFFFFF
- return (((uint64_t)version) << 32) | (uint32_t/*1*/)nref;
-}
-
inline SocketOptions::SocketOptions()
: fd(-1)
, user(NULL)
@@ -63,169 +34,7 @@ inline SocketOptions::SocketOptions()
, conn(NULL)
, app_connect(NULL)
, initial_parsing_context(NULL)
- , bthread_tag(BTHREAD_TAG_DEFAULT)
-{}
-
-inline int Socket::Dereference() {
- const SocketId id = _this_id;
- const uint64_t vref = _versioned_ref.fetch_sub(
- 1, butil::memory_order_release);
- const int32_t nref = NRefOfVRef(vref);
- if (nref > 1) {
- return 0;
- }
- if (__builtin_expect(nref == 1, 1)) {
- const uint32_t ver = VersionOfVRef(vref);
- const uint32_t id_ver = VersionOfSocketId(id);
- // Besides first successful SetFailed() adds 1 to version, one of
- // those dereferencing nref from 1->0 adds another 1 to version.
- // Notice "one of those": The wait-free Address() may make ref of a
- // version-unmatched slot change from 1 to 0 for mutiple times, we
- // have to use version as a guard variable to prevent returning the
- // Socket to pool more than once.
- //
- // Note: `ver == id_ver' means this socket has been `SetRecycle'
- // before rather than `SetFailed'; `ver == ide_ver+1' means we
- // had `SetFailed' this socket before. We should destroy the
- // socket under both situation
- if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
- // sees nref:1->0, try to set version=id_ver+2,--nref.
- // No retry: if version changes, the slot is already returned by
- // another one who sees nref:1->0 concurrently; if nref changes,
- // which must be non-zero, the slot will be returned when
- // nref changes from 1->0 again.
- // Example:
- // SetFailed(): --nref, sees nref:1->0 (1)
- // try to set version=id_ver+2 (2)
- // Address(): ++nref, unmatched version (3)
- // --nref, sees nref:1->0 (4)
- // try to set version=id_ver+2 (5)
- // 1,2,3,4,5 or 1,3,4,2,5:
- // SetFailed() succeeds, Address() fails at (5).
- // 1,3,2,4,5: SetFailed() fails with (2), the slot will be
- // returned by (5) of Address()
- // 1,3,4,5,2: SetFailed() fails with (2), the slot is already
- // returned by (5) of Address().
- uint64_t expected_vref = vref - 1;
- if (_versioned_ref.compare_exchange_strong(
- expected_vref, MakeVRef(id_ver + 2, 0),
- butil::memory_order_acquire,
- butil::memory_order_relaxed)) {
- OnRecycle();
- return_resource(SlotOfSocketId(id));
- return 1;
- }
- return 0;
- }
- LOG(FATAL) << "Invalid SocketId=" << id;
- return -1;
- }
- LOG(FATAL) << "Over dereferenced SocketId=" << id;
- return -1;
-}
-
-inline int Socket::Address(SocketId id, SocketUniquePtr* ptr) {
- const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
- Socket* const m = address_resource(slot);
- if (__builtin_expect(m != NULL, 1)) {
- // acquire fence makes sure this thread sees latest changes before
- // Dereference() or Revive().
- const uint64_t vref1 = m->_versioned_ref.fetch_add(
- 1, butil::memory_order_acquire);
- const uint32_t ver1 = VersionOfVRef(vref1);
- if (ver1 == VersionOfSocketId(id)) {
- ptr->reset(m);
- return 0;
- }
-
- const uint64_t vref2 = m->_versioned_ref.fetch_sub(
- 1, butil::memory_order_release);
- const int32_t nref = NRefOfVRef(vref2);
- if (nref > 1) {
- return -1;
- } else if (__builtin_expect(nref == 1, 1)) {
- const uint32_t ver2 = VersionOfVRef(vref2);
- if ((ver2 & 1)) {
- if (ver1 == ver2 || ver1 + 1 == ver2) {
- uint64_t expected_vref = vref2 - 1;
- if (m->_versioned_ref.compare_exchange_strong(
- expected_vref, MakeVRef(ver2 + 1, 0),
- butil::memory_order_acquire,
- butil::memory_order_relaxed)) {
- m->OnRecycle();
- return_resource(SlotOfSocketId(id));
- }
- } else {
- CHECK(false) << "ref-version=" << ver1
- << " unref-version=" << ver2;
- }
- } else {
- CHECK_EQ(ver1, ver2);
- // Addressed a free slot.
- }
- } else {
- CHECK(false) << "Over dereferenced SocketId=" << id;
- }
- }
- return -1;
-}
-
-inline void Socket::ReAddress(SocketUniquePtr* ptr) {
- _versioned_ref.fetch_add(1, butil::memory_order_acquire);
- ptr->reset(this);
-}
-
-inline int Socket::AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr) {
- const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
- Socket* const m = address_resource(slot);
- if (__builtin_expect(m != NULL, 1)) {
- const uint64_t vref1 = m->_versioned_ref.fetch_add(
- 1, butil::memory_order_acquire);
- const uint32_t ver1 = VersionOfVRef(vref1);
- if (ver1 == VersionOfSocketId(id)) {
- ptr->reset(m);
- return 0;
- }
- if (ver1 == VersionOfSocketId(id) + 1) {
- ptr->reset(m);
- return 1;
- }
-
- const uint64_t vref2 = m->_versioned_ref.fetch_sub(
- 1, butil::memory_order_release);
- const int32_t nref = NRefOfVRef(vref2);
- if (nref > 1) {
- return -1;
- } else if (__builtin_expect(nref == 1, 1)) {
- const uint32_t ver2 = VersionOfVRef(vref2);
- if ((ver2 & 1)) {
- if (ver1 == ver2 || ver1 + 1 == ver2) {
- uint64_t expected_vref = vref2 - 1;
- if (m->_versioned_ref.compare_exchange_strong(
- expected_vref, MakeVRef(ver2 + 1, 0),
- butil::memory_order_acquire,
- butil::memory_order_relaxed)) {
- m->OnRecycle();
- return_resource(slot);
- }
- } else {
- CHECK(false) << "ref-version=" << ver1
- << " unref-version=" << ver2;
- }
- } else {
- // Addressed a free slot.
- }
- } else {
- CHECK(false) << "Over dereferenced SocketId=" << id;
- }
- }
- return -1;
-}
-
-inline bool Socket::Failed() const {
- return VersionOfVRef(_versioned_ref.load(butil::memory_order_relaxed))
- != VersionOfSocketId(_this_id);
-}
+ , bthread_tag(BTHREAD_TAG_DEFAULT) {}
inline bool Socket::MoreReadEvents(int* progress) {
// Fail to CAS means that new events arrived.
diff --git a/src/brpc/versioned_ref_with_id.h b/src/brpc/versioned_ref_with_id.h
new file mode 100644
index 00000000..38141f3d
--- /dev/null
+++ b/src/brpc/versioned_ref_with_id.h
@@ -0,0 +1,624 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef BRPC_VERSIONED_REF_WITH_ID_H
+#define BRPC_VERSIONED_REF_WITH_ID_H
+
+#include <memory>
+#include "butil/resource_pool.h"
+#include "butil/class_name.h"
+#include "butil/logging.h"
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+
+namespace brpc {
+
+// Unique identifier of a T object.
+typedef uint64_t VRefId;
+
+const VRefId INVALID_VREF_ID = (VRefId)-1;
+
+template<typename T>
+class VersionedRefWithId;
+
+template<typename T>
+void DereferenceVersionedRefWithId(T* r);
+
+template<typename T>
+struct VersionedRefWithIdDeleter {
+ void operator()(T* r) const {
+ DereferenceVersionedRefWithId(r);
+ }
+};
+
+template<typename T>
+using VersionedRefWithIdUniquePtr =
+ std::unique_ptr<T, VersionedRefWithIdDeleter<T>>;
+
+// Utility functions to combine and extract VRefId.
+template <typename T>
+BUTIL_FORCE_INLINE VRefId MakeVRefId(uint32_t version,
+ butil::ResourceId<T> slot) {
+ return VRefId((((uint64_t)version) << 32) | slot.value);
+}
+
+template<typename T>
+BUTIL_FORCE_INLINE butil::ResourceId<T> SlotOfVRefId(VRefId vref_id) {
+ return { (vref_id & 0xFFFFFFFFul) };
+}
+
+BUTIL_FORCE_INLINE uint32_t VersionOfVRefId(VRefId vref_id) {
+ return (uint32_t)(vref_id >> 32);
+}
+
+// Utility functions to combine and extract _versioned_ref
+BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
+ return (uint32_t)(vref >> 32);
+}
+
+BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
+ return (int32_t)(vref & 0xFFFFFFFFul);
+}
+
+BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) {
+ // 1: Intended conversion to uint32_t, nref=-1 is 00000000FFFFFFFF
+ return (((uint64_t)version) << 32) | (uint32_t/*1*/)nref;
+}
+
+
+template <typename Ret>
+typename std::enable_if<!butil::is_void<Ret>::value, Ret>::type ReturnEmpty() {
+ return Ret{};
+}
+
+template <typename Ret>
+typename std::enable_if<butil::is_void<Ret>::value, Ret>::type ReturnEmpty() {}
+
+// Call func_name of class_type if class_type implements func_name,
+// otherwise call default function.
+#define WRAPPER_OF(class_type, func_name, return_type)
\
+ struct func_name ## Wrapper {
\
+ template<typename V, typename... Args>
\
+ static auto Test(int) -> decltype(
\
+ std::declval<V>().func_name(std::declval<Args>()...),
std::true_type()); \
+ template<typename>
\
+ static auto Test(...) -> std::false_type;
\
+
\
+ template<typename... Args>
\
+ typename std::enable_if<decltype(
\
+ Test<class_type, Args...>(0))::value, return_type>::type
\
+ Call(class_type* obj, Args... args) {
\
+ BAIDU_CASSERT((butil::is_result_same<
\
+ return_type, decltype(&T::func_name), T,
Args...>::value), \
+ "Params or return type mismatch");
\
+ return obj->func_name(std::forward<Args>(args)...);
\
+ }
\
+
\
+ template<typename... Args>
\
+ typename std::enable_if<!decltype(
\
+ Test<class_type, Args...>(0))::value, return_type>::type
\
+ Call(class_type* obj, Args... args) {
\
+ return ReturnEmpty<return_type>();
\
+ }
\
+ }
+
+#define WRAPPER_CALL(func_name, obj, ...) func_name ## Wrapper().Call(obj, ##
__VA_ARGS__)
+
+// VersionedRefWithId is an efficient data structure, which can be find
+// in O(1)-time by VRefId.
+// Users shall call VersionedRefWithId::Create() to create T,
+// store VRefId instead of T and call VersionedRefWithId::Address()
+// to convert the identifier to an unique_ptr at each access. Whenever
+// a unique_ptr is not destructed, the enclosed T will not be recycled.
+//
+// CRTP
+// Derived classes implement 6 functions :
+// 1. (required) int OnCreated(Args... args) :
+// Will be called in Create() to initialize T init when T is created
successfully.
+// If initialization fails, return non-zero. VersionedRefWithId will be
`SetFailed'
+// and Create() returns non-zero.
+// 2. (required) void BeforeRecycled() :
+// Will be called in Dereference() before T is recycled.
+// 3. (optional) void OnFailed(Args... args) :
+// Will be called in SetFailed() when VersionedRefWithId is set failed
successfully.
+// 4. (optional) void BeforeAdditionalRefReleased() :
+// Will be called in ReleaseAdditionalReference() before additional ref is
released.
+// 5. (optional) void AfterRevived() :
+// Will be called in Revive() When VersionedRefWithId is revived.
+// 6. (optional) std::string OnDescription() const :
+// Will be called in description().
+//
+// Example usage:
+//
+// class UserData : public brpc::VersionedRefWithId<UserData> {
+// public:
+// explicit UserData(Forbidden f)
+// : brpc::VersionedRefWithId<UserData>(f)
+// , _count(0) {}
+
+// void Add(int c) {
+// _count.fetch_add(c, butil::memory_order_relaxed);
+// }
+// void Sub(int c) {
+// _count.fetch_sub(c, butil::memory_order_relaxed);
+// }
+// private:
+// friend class brpc::VersionedRefWithId<UserData>;
+//
+// int OnCreated() {
+// _count.store(1, butil::memory_order_relaxed);
+// return 0;
+// }
+// void OnFailed(int error_code, const std::string& error_text) {
+// _count.fetch_sub(1, butil::memory_order_relaxed);
+// }
+// void BeforeRecycled() {
+// _count.store(0, butil::memory_order_relaxed);
+// }
+//
+// butil::atomic<int> _count;
+// };
+//
+// typedef brpc::VRefId UserDataId;
+// const brpc::VRefId INVALID_EVENT_DATA_ID = brpc::INVALID_VREF_ID;
+// typedef brpc::VersionedRefWithIdUniquePtr<UserData> UserDataUniquePtr;
+//
+// And to call methods on UserData:
+// UserDataId id;
+// if (UserData::Create(&id) ! =0) {
+// LOG(ERROR) << "Fail to create UserData";
+// return;
+// }
+// UserDataUniquePtr user_data;
+// if (UserData::Address(id, &user_data) != 0) {
+// LOG(ERROR) << "Fail to address UserDataId=" << id;
+// return;
+// }
+// user_data->Add(10);
+// user_data->SetFailed();
+// UserData::SetFailedById(id);
+//
+template<typename T>
+class VersionedRefWithId {
+protected:
+ struct Forbidden {};
+
+public:
+ explicit VersionedRefWithId(Forbidden)
+ // Must be even because Address() relies on evenness of version.
+ : _versioned_ref(0)
+ , _this_id(0)
+ , _additional_ref_status(ADDITIONAL_REF_USING) {}
+
+ virtual ~VersionedRefWithId() = default;
+ DISALLOW_COPY_AND_ASSIGN(VersionedRefWithId);
+
+ // Create a VersionedRefWithId, put the identifier into `id'.
+ // `args' will be passed to OnCreated() directly.
+ // Returns 0 on success, -1 otherwise.
+ template<typename ... Args>
+ static int Create(VRefId* id, Args... args);
+
+ // Place the VersionedRefWithId associated with identifier `id' into
+ // unique_ptr `ptr', which will be released automatically when out
+ // of scope (w/o explicit std::move). User can still access `ptr'
+ // after calling ptr->SetFailed() before release of `ptr'.
+ // This function is wait-free.
+ // Returns 0 on success, -1 when the Socket was SetFailed().
+ static int Address(VRefId id, VersionedRefWithIdUniquePtr<T>* ptr);
+
+ // Returns 0 on success, 1 on failed socket, -1 on recycled.
+ static int AddressFailedAsWell(VRefId id, VersionedRefWithIdUniquePtr<T>*
ptr);
+
+ // Re-address current VersionedRefWithId into `ptr'.
+ // Always succeed even if this socket is failed.
+ void ReAddress(VersionedRefWithIdUniquePtr<T>* ptr);
+
+ // Returns signed 32-bit referenced-count.
+ int32_t nref() const {
+ return NRefOfVRef(_versioned_ref.load(butil::memory_order_relaxed));
+ }
+
+ // Mark this VersionedRefWithId or the VersionedRefWithId associated
+ // with `id' as failed.
+ // Any later Address() of the identifier shall return NULL. The
+ // VersionedRefWithId is NOT recycled after calling this function,
+ // instead it will be recycled when no one references it. Internal
+ // fields of the Socket are still accessible after calling this
+ // function. Calling SetFailed() of a VersionedRefWithId more than
+ // once is OK.
+ // T::OnFailed() will be called when SetFailed() successfully.
+ // This function is lock-free.
+ // Returns -1 when the Socket was already SetFailed(), 0 otherwise.
+ template<typename... Args>
+ static int SetFailedById(VRefId id, Args... args);
+
+ template<typename... Args>
+ int SetFailed(Args... args);
+
+ bool Failed() const {
+ return VersionOfVRef(_versioned_ref.load(butil::memory_order_relaxed))
+ != VersionOfVRefId(_this_id);
+ }
+
+ // Release the additional reference which added inside `Create'
+ // before so that `VersionedRefWithId' will be recycled automatically
+ // once on one is addressing it.
+ int ReleaseAdditionalReference();
+
+ VRefId id() const { return _this_id; }
+
+ // A brief description.
+ std::string description() const;
+
+protected:
+friend void DereferenceVersionedRefWithId<>(T* r);
+
+ // Status flag used to mark that
+ enum AdditionalRefStatus {
+ // 1. Additional reference has been increased;
+ ADDITIONAL_REF_USING,
+ // 2. Additional reference is increasing;
+ ADDITIONAL_REF_REVIVING,
+ // 3. Additional reference has been decreased.
+ ADDITIONAL_REF_RECYCLED
+ };
+
+ AdditionalRefStatus additional_ref_status() const {
+ return _additional_ref_status.load(butil::memory_order_relaxed);
+ }
+
+ uint64_t versioned_ref() const {
+ // The acquire fence pairs with release fence in Dereference to avoid
+ // inconsistent states to be seen by others.
+ return _versioned_ref.load(butil::memory_order_acquire);
+ }
+
+ template<typename... Args>
+ int SetFailedImpl(Args... args);
+
+ // Release the reference. If no one is addressing this VersionedRefWithId,
+ // it will be recycled automatically and T::BeforeRecycled() will be
called.
+ int Dereference();
+
+ // Make this socket addressable again.
+ // If nref is less than `at_least_nref', VersionedRefWithId was
+ // abandoned during revival and cannot be revived.
+ void Revive(int32_t at_least_nref);
+
+private:
+ typedef butil::ResourceId<T> resource_id_t;
+
+ // 1. When `failed_as_well=true', returns 0 on success,
+ // 1 on failed socket, -1 on recycled.
+ // 2. When `failed_as_well=true', returns 0 on success,
+ // -1 when the Socket was SetFailed().
+ static int AddressImpl(VRefId id, bool failed_as_well,
+ VersionedRefWithIdUniquePtr<T>* ptr);
+
+ // Callback wrapper of Derived classes.
+ WRAPPER_OF(T, OnFailed, void);
+ WRAPPER_OF(T, BeforeAdditionalRefReleased, void);
+ WRAPPER_OF(T, AfterRevived, void);
+ WRAPPER_OF(T, OnDescription, std::string);
+
+ // unsigned 32-bit version + signed 32-bit referenced-count.
+ // Meaning of version:
+ // * Created version: no SetFailed() is called on the VersionedRefWithId
yet.
+ // Must be same evenness with initial _versioned_ref because during
lifetime
+ // of a VersionedRefWithId on the slot, the version is added with 1
twice.
+ // This is also the version encoded in VRefId.
+ // * Failed version: = created version + 1, SetFailed()-ed but returned.
+ // * Other versions: the socket is already recycled.
+ butil::atomic<uint64_t> BAIDU_CACHELINE_ALIGNMENT _versioned_ref;
+ // The unique identifier.
+ VRefId _this_id;
+ // Indicates whether additional reference has increased,
+ // decreased, or is increasing.
+ // additional ref status:
+ // `Socket'、`Create': REF_USING
+ // `SetFailed': REF_USING -> REF_RECYCLED
+ // `Revive' REF_RECYCLED -> REF_REVIVING -> REF_USING
+ butil::atomic<AdditionalRefStatus> _additional_ref_status;
+};
+
+template<typename T>
+void DereferenceVersionedRefWithId(T* r) {
+ if (r) {
+ static_cast<VersionedRefWithId<T>*>(r)->Dereference();
+ }
+}
+
+template <typename T>
+template<typename ... Args>
+int VersionedRefWithId<T>::Create(VRefId* id, Args... args) {
+ resource_id_t slot;
+ T* const t = butil::get_resource(&slot, Forbidden());
+ if (t == NULL) {
+ LOG(FATAL) << "Fail to get_resource<"
+ << butil::class_name_str<T>() << ">";
+ return -1;
+ }
+ // nref can be non-zero due to concurrent Address().
+ // _this_id will only be used in destructor/Destroy of referenced
+ // slots, which is safe and properly fenced. Although it's better
+ // to put the id into VersionedRefWithIdUniquePtr.
+ VersionedRefWithId<T>* const vref_with_id = t;
+ vref_with_id->_this_id = MakeVRefId<T>(
+ VersionOfVRef(vref_with_id->_versioned_ref.fetch_add(
+ 1, butil::memory_order_release)), slot);
+ vref_with_id->_additional_ref_status.store(
+ ADDITIONAL_REF_USING, butil::memory_order_relaxed);
+ BAIDU_CASSERT((butil::is_result_int<decltype(&T::OnCreated), T,
Args...>::value),
+ "T::OnCreated must accept Args params and return int");
+ // At last, call T::OnCreated() to initialize the T object.
+ if (t->OnCreated(std::forward<Args>(args)...) != 0) {
+ vref_with_id->SetFailed();
+ // NOTE: This object may be recycled at this point,
+ // don't touch anything.
+ return -1;
+ }
+ *id = vref_with_id->_this_id;
+ return 0;
+}
+
+template<typename T>
+int VersionedRefWithId<T>::Address(
+ VRefId id, VersionedRefWithIdUniquePtr<T>* ptr) {
+ return AddressImpl(id, false, ptr);
+}
+
+template<typename T>
+int VersionedRefWithId<T>::AddressFailedAsWell(
+ VRefId id, VersionedRefWithIdUniquePtr<T>* ptr) {
+ return AddressImpl(id, true, ptr);
+}
+
+template<typename T>
+int VersionedRefWithId<T>::AddressImpl(
+ VRefId id, bool failed_as_well, VersionedRefWithIdUniquePtr<T>* ptr) {
+ const resource_id_t slot = SlotOfVRefId<T>(id);
+ T* const t = address_resource(slot);
+ if (__builtin_expect(t != NULL, 1)) {
+ // acquire fence makes sure this thread sees latest changes before
+ // Dereference() or Revive().
+ VersionedRefWithId<T>* const vref_with_id = t;
+ const uint64_t vref1 = vref_with_id->_versioned_ref.fetch_add(
+ 1, butil::memory_order_acquire);
+ const uint32_t ver1 = VersionOfVRef(vref1);
+ if (ver1 == VersionOfVRefId(id)) {
+ ptr->reset(t);
+ return 0;
+ }
+ if (failed_as_well && ver1 == VersionOfVRefId(id) + 1) {
+ ptr->reset(t);
+ return 1;
+ }
+
+ const uint64_t vref2 = vref_with_id->_versioned_ref.fetch_sub(
+ 1, butil::memory_order_release);
+ const int32_t nref = NRefOfVRef(vref2);
+ if (nref > 1) {
+ return -1;
+ } else if (__builtin_expect(nref == 1, 1)) {
+ const uint32_t ver2 = VersionOfVRef(vref2);
+ if ((ver2 & 1)) {
+ if (ver1 == ver2 || ver1 + 1 == ver2) {
+ uint64_t expected_vref = vref2 - 1;
+ if (vref_with_id->_versioned_ref.compare_exchange_strong(
+ expected_vref, MakeVRef(ver2 + 1, 0),
+ butil::memory_order_acquire,
+ butil::memory_order_relaxed)) {
+ BAIDU_CASSERT((butil::is_result_void<
+ decltype(&T::BeforeRecycled),
T>::value),
+ "T::BeforeRecycled must accept Args
params"
+ " and return void");
+ t->BeforeRecycled();
+ return_resource(slot);
+ }
+ } else {
+ CHECK(false) << "ref-version=" << ver1
+ << " unref-version=" << ver2;
+ }
+ } else {
+ // Addressed a free slot.
+ }
+ } else {
+ CHECK(false) << "Over dereferenced SocketId=" << id;
+ }
+ }
+ return -1;
+}
+
+template<typename T>
+void VersionedRefWithId<T>::ReAddress(VersionedRefWithIdUniquePtr<T>* ptr) {
+ _versioned_ref.fetch_add(1, butil::memory_order_acquire);
+ ptr->reset(static_cast<T*>(this));
+}
+
+template<typename T>
+template<typename... Args>
+int VersionedRefWithId<T>::SetFailedById(VRefId id, Args... args) {
+ VersionedRefWithIdUniquePtr<T> ptr;
+ if (Address(id, &ptr) != 0) {
+ return -1;
+ }
+ return ptr->SetFailed(std::forward<Args>(args)...);
+}
+
+template<typename T>
+template<typename... Args>
+int VersionedRefWithId<T>::SetFailed(Args... args) {
+ return SetFailedImpl(std::forward<Args>(args)...);
+}
+
+template<typename T>
+template<typename... Args>
+int VersionedRefWithId<T>::SetFailedImpl(Args... args) {
+ const uint32_t id_ver = VersionOfVRefId(_this_id);
+ uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
+ for (;;) {
+ if (VersionOfVRef(vref) != id_ver) {
+ return -1;
+ }
+ // Try to set version=id_ver+1 (to make later address() return NULL),
+ // retry on fail.
+ if (_versioned_ref.compare_exchange_strong(
+ vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
+ butil::memory_order_release,
+ butil::memory_order_relaxed)) {
+ // Call T::OnFailed() to notify the failure of T.
+ WRAPPER_CALL(OnFailed, static_cast<T*>(this),
std::forward<Args>(args)...);
+ // Deref additionally which is added at creation so that this
+ // queue's reference will hit 0(recycle) when no one addresses it.
+ ReleaseAdditionalReference();
+ // NOTE: This object may be recycled at this point, don't
+ // touch anything.
+ return 0;
+ }
+ }
+}
+
+template<typename T>
+int VersionedRefWithId<T>::ReleaseAdditionalReference() {
+ do {
+ AdditionalRefStatus expect = ADDITIONAL_REF_USING;
+ if (_additional_ref_status.compare_exchange_strong(
+ expect, ADDITIONAL_REF_RECYCLED,
+ butil::memory_order_relaxed,
+ butil::memory_order_relaxed)) {
+ BeforeAdditionalRefReleasedWrapper();
+ WRAPPER_CALL(BeforeAdditionalRefReleased, static_cast<T*>(this));
+ return Dereference();
+ }
+
+ if (expect == ADDITIONAL_REF_REVIVING) {
+ // sched_yield to wait until status is not REF_REVIVING.
+ sched_yield();
+ } else {
+ return -1; // REF_RECYCLED
+ }
+ } while (true);
+}
+
+template<typename T>
+int VersionedRefWithId<T>::Dereference() {
+ const VRefId id = _this_id;
+ const uint64_t vref = _versioned_ref.fetch_sub(
+ 1, butil::memory_order_release);
+ const int32_t nref = NRefOfVRef(vref);
+ if (nref > 1) {
+ return 0;
+ }
+ if (__builtin_expect(nref == 1, 1)) {
+ const uint32_t ver = VersionOfVRef(vref);
+ const uint32_t id_ver = VersionOfVRefId(id);
+ // Besides first successful SetFailed() adds 1 to version, one of
+ // those dereferencing nref from 1->0 adds another 1 to version.
+ // Notice "one of those": The wait-free Address() may make ref of a
+ // version-unmatched slot change from 1 to 0 for mutiple times, we
+ // have to use version as a guard variable to prevent returning the
+ // VersionedRefWithId to pool more than once.
+ //
+ // Note: `ver == id_ver' means this VersionedRefWithId has been
`SetRecycle'
+ // before rather than `SetFailed'; `ver == ide_ver+1' means we
+ // had `SetFailed' this socket before. We should destroy the
+ // socket under both situation
+ if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
+ // sees nref:1->0, try to set version=id_ver+2,--nref.
+ // No retry: if version changes, the slot is already returned by
+ // another one who sees nref:1->0 concurrently; if nref changes,
+ // which must be non-zero, the slot will be returned when
+ // nref changes from 1->0 again.
+ // Example:
+ // SetFailed(): --nref, sees nref:1->0 (1)
+ // try to set version=id_ver+2 (2)
+ // Address(): ++nref, unmatched version (3)
+ // --nref, sees nref:1->0 (4)
+ // try to set version=id_ver+2 (5)
+ // 1,2,3,4,5 or 1,3,4,2,5:
+ // SetFailed() succeeds, Address() fails at (5).
+ // 1,3,2,4,5: SetFailed() fails with (2), the slot will be
+ // returned by (5) of Address()
+ // 1,3,4,5,2: SetFailed() fails with (2), the slot is already
+ // returned by (5) of Address().
+ uint64_t expected_vref = vref - 1;
+ if (_versioned_ref.compare_exchange_strong(
+ expected_vref, MakeVRef(id_ver + 2, 0),
+ butil::memory_order_acquire,
+ butil::memory_order_relaxed)) {
+ static_cast<T*>(this)->BeforeRecycled();
+ return_resource(SlotOfVRefId<T>(id));
+ return 1;
+ }
+ return 0;
+ }
+ LOG(FATAL) << "Invalid VRefId=" << id;
+ return -1;
+ }
+ LOG(FATAL) << "Over dereferenced VRefId=" << id;
+ return -1;
+}
+
+template<typename T>
+void VersionedRefWithId<T>::Revive(int32_t at_least_nref) {
+ const uint32_t id_ver = VersionOfVRefId(_this_id);
+ uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
+ _additional_ref_status.store(
+ ADDITIONAL_REF_REVIVING, butil::memory_order_relaxed);
+ while (true) {
+ CHECK_EQ(id_ver + 1, VersionOfVRef(vref)) << "id=" << id();
+
+ int32_t nref = NRefOfVRef(vref);
+ if (nref < at_least_nref) {
+ // Set the status to REF_RECYLED since no one uses this socket
+ _additional_ref_status.store(
+ ADDITIONAL_REF_RECYCLED, butil::memory_order_relaxed);
+ CHECK_EQ(1, nref);
+ LOG(WARNING) << description() << " was abandoned during revival";
+ return;
+ }
+ // +1 is the additional ref added in Create(). TODO(gejun): we should
+ // remove this additional nref someday.
+ if (_versioned_ref.compare_exchange_weak(
+ vref, MakeVRef(id_ver, nref + 1/*note*/),
+ butil::memory_order_release,
+ butil::memory_order_relaxed)) {
+ // Set the status to REF_USING since we add additional ref again
+ _additional_ref_status.store(
+ ADDITIONAL_REF_USING, butil::memory_order_relaxed);
+ WRAPPER_CALL(AfterRevived, static_cast<T*>(this));
+ return;
+ }
+ }
+}
+
+template<typename T>
+std::string VersionedRefWithId<T>::description() const {
+ std::string result;
+ result.reserve(128);
+ butil::string_appendf(&result, "Socket{id=%" PRIu64, id());
+ result.append(WRAPPER_CALL(
+ OnDescription, const_cast<T*>(static_cast<const T*>(this))));
+ butil::string_appendf(&result, "} (0x%p)", this);
+ return result;
+}
+
+} // namespace brpc
+
+#endif // BRPC_VERSIONED_REF_WITH_ID_H
diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h
index 5ed65a4c..ec662b46 100644
--- a/src/butil/memory/scope_guard.h
+++ b/src/butil/memory/scope_guard.h
@@ -18,20 +18,13 @@
#ifndef BRPC_SCOPED_GUARD_H
#define BRPC_SCOPED_GUARD_H
-#include <type_traits>
+#include "butil/type_traits.h"
#include "butil/macros.h"
namespace butil {
-// Whether a no-argument callable returns void.
-template<typename T>
-struct returns_void_t
- : public std::is_same<void, decltype(std::declval<T&&>()())>
-{};
-
template<typename Callback,
- typename = typename std::enable_if<
- returns_void_t<Callback>::value>::type>
+ typename = std::enable_if<is_result_void<Callback>::value>>
class ScopeGuard;
template<typename Callback>
@@ -43,7 +36,7 @@ template<typename Callback>
class ScopeGuard<Callback> {
public:
ScopeGuard(ScopeGuard&& other) noexcept
- :_callback(std::move(other._callback))
+ : _callback(std::move(other._callback))
, _dismiss(other._dismiss) {
other.dismiss();
}
diff --git a/src/butil/type_traits.h b/src/butil/type_traits.h
index 5f342db3..3cf8473f 100644
--- a/src/butil/type_traits.h
+++ b/src/butil/type_traits.h
@@ -341,7 +341,7 @@ template <typename T> struct is_enum_impl<true, T> :
false_type { };
template <typename T> struct is_enum
: internal::is_enum_impl<
- is_same<T, void>::value ||
+ is_same<T, void>::value ||
is_integral<T>::value ||
is_floating_point<T>::value ||
is_reference<T>::value ||
@@ -351,6 +351,40 @@ template <typename T> struct is_enum<const T> : is_enum<T>
{ };
template <typename T> struct is_enum<volatile T> : is_enum<T> { };
template <typename T> struct is_enum<const volatile T> : is_enum<T> { };
+// Deduces the return type of an INVOKE expression
+// at compile time.
+// If the callable is non-static member function,
+// the first argument should be the class type.
+#if (__cplusplus >= 201703L)
+// std::result_of is deprecated in C++17 and removed in C++20,
+// use std::invoke_result instead.
+template <typename>
+struct result_of;
+template <typename F, typename... Args>
+struct result_of<F(Args...)> : std::invoke_result<F, Args...> {};
+#elif (__cplusplus >= 201103L)
+template <typename F>
+using result_of = std::result_of<F>;
+#else
+#error Only C++11 or later is supported.
+#endif
+
+template <typename F>
+using result_of_t = typename result_of<F>::type;
+
+// Whether a callable returns type which is same as ReturnType.
+template<typename ReturnType, typename F, typename... Args>
+struct is_result_same
+ : public butil::is_same<ReturnType, result_of_t<F(Args...)>> {};
+
+// Whether a callable returns void.
+template<typename F, typename... Args>
+struct is_result_void : public is_result_same<void, F, Args...> {};
+
+// Whether a callable returns int.
+template<typename F, typename... Args>
+struct is_result_int : public is_result_same<int, F, Args...> {};
+
} // namespace butil
#endif // BUTIL_TYPE_TRAITS_H
diff --git a/test/brpc_event_dispatcher_unittest.cpp
b/test/brpc_event_dispatcher_unittest.cpp
index d8f69842..185e9f2d 100644
--- a/test/brpc_event_dispatcher_unittest.cpp
+++ b/test/brpc_event_dispatcher_unittest.cpp
@@ -27,18 +27,19 @@
#include "butil/time.h"
#include "butil/macros.h"
#include "butil/fd_utility.h"
+#include "butil/memory/scope_guard.h"
+#include "bthread/bthread.h"
#include "brpc/event_dispatcher.h"
+#include "brpc/socket.h"
#include "brpc/details/has_epollrdhup.h"
+#include "brpc/versioned_ref_with_id.h"
class EventDispatcherTest : public ::testing::Test{
protected:
- EventDispatcherTest(){
- };
- virtual ~EventDispatcherTest(){};
- virtual void SetUp() {
- };
- virtual void TearDown() {
- };
+ EventDispatcherTest() = default;
+ ~EventDispatcherTest() override = default;
+ void SetUp() override {}
+ void TearDown() override {}
};
TEST_F(EventDispatcherTest, has_epollrdhup) {
@@ -52,6 +53,128 @@ TEST_F(EventDispatcherTest, versioned_ref) {
ASSERT_EQ(brpc::MakeVRef(1, 1), versioned_ref);
}
+struct UserData;
+
+UserData* g_user_data = NULL;
+
+struct UserData : public brpc::VersionedRefWithId<UserData> {
+ explicit UserData(Forbidden f)
+ : brpc::VersionedRefWithId<UserData>(f)
+ , count(0)
+ , _additional_ref_released(false) {}
+
+ int OnCreated() {
+ count.store(1, butil::memory_order_relaxed);
+ _additional_ref_released = false;
+ g_user_data = this;
+ return 0;
+ }
+
+ void BeforeRecycled() {
+ count.store(0, butil::memory_order_relaxed);
+ g_user_data = NULL;
+ }
+
+ void BeforeAdditionalRefReleased() {
+ _additional_ref_released = true;
+ }
+
+ void OnFailed() {
+ count.fetch_sub(1, butil::memory_order_relaxed);
+ }
+
+ void AfterRevived() {
+ count.fetch_add(1, butil::memory_order_relaxed);
+ _additional_ref_released = false;
+ }
+
+ butil::atomic<int> count;
+ bool _additional_ref_released;
+};
+
+// Unique identifier of a UserData.
+// Users shall store UserDataId instead of UserData and call
UserData::Address()
+// to convert the identifier to an unique_ptr at each access. Whenever a
+// unique_ptr is not destructed, the enclosed UserData will not be recycled.
+typedef brpc::VRefId UserDataId;
+
+const brpc::VRefId INVALID_EVENT_DATA_ID = brpc::INVALID_VREF_ID;
+
+typedef brpc::VersionedRefWithIdUniquePtr<UserData> UserDataUniquePtr;
+
+volatile bool vref_thread_stop = false;
+butil::atomic<int> g_count(1);
+
+void TestVRef(UserDataId id) {
+ UserDataUniquePtr ptr;
+ ASSERT_EQ(0, UserData::Address(id, &ptr));
+ ptr->count.fetch_add(1, butil::memory_order_relaxed);
+ g_count.fetch_add(1, butil::memory_order_relaxed);
+}
+
+void* VRefThread(void* arg) {
+ auto id = (UserDataId)arg;
+ while (!vref_thread_stop) {
+ TestVRef(id);
+ }
+ return NULL;
+}
+
+TEST_F(EventDispatcherTest, versioned_ref_with_id) {
+ UserDataId id = INVALID_EVENT_DATA_ID;
+ ASSERT_EQ(0, UserData::Create(&id));
+ ASSERT_NE(INVALID_EVENT_DATA_ID, id);
+ UserDataUniquePtr ptr;
+ ASSERT_EQ(0, UserData::Address(id, &ptr));
+ ASSERT_EQ(2, ptr->nref());
+ ASSERT_FALSE(ptr->Failed());
+ ASSERT_EQ(1, ptr->count);
+ ASSERT_EQ(g_user_data, ptr.get());
+ {
+ UserDataUniquePtr temp_ptr;
+ ASSERT_EQ(0, UserData::Address(id, &temp_ptr));
+ ASSERT_EQ(ptr, temp_ptr);
+ ASSERT_EQ(3, ptr->nref());
+ }
+
+ const size_t thread_num = 8;
+ pthread_t tid[thread_num];
+ for (auto& i : tid) {
+ ASSERT_EQ(0, pthread_create(&i, NULL, VRefThread, (void*)id));
+ }
+
+ sleep(2);
+
+ vref_thread_stop = true;
+ for (const auto i : tid) {
+ pthread_join(i, NULL);
+ }
+
+ ASSERT_EQ(2, ptr->nref());
+ ASSERT_EQ(g_count, ptr->count);
+ ASSERT_EQ(0, ptr->SetFailed());
+ ASSERT_TRUE(ptr->Failed());
+ ASSERT_EQ(g_count - 1, ptr->count);
+ // Additional reference has been released.
+ ASSERT_TRUE(ptr->_additional_ref_released);
+ ASSERT_EQ(1, ptr->nref());
+ {
+ UserDataUniquePtr temp_ptr;
+ ASSERT_EQ(1, UserData::AddressFailedAsWell(id, &temp_ptr));
+ ASSERT_EQ(ptr, temp_ptr);
+ ASSERT_EQ(2, ptr->nref());
+ }
+ ptr->Revive(1);
+ ASSERT_EQ(2, ptr->nref());
+ ASSERT_EQ(g_count, ptr->count);
+ ASSERT_FALSE(ptr->_additional_ref_released);
+ ASSERT_EQ(0, UserData::SetFailedById(id));
+ ASSERT_EQ(g_count - 1, ptr->count);
+ ptr.reset();
+ ASSERT_EQ(nullptr, ptr);
+ ASSERT_EQ(nullptr, g_user_data);
+}
+
std::vector<int> err_fd;
pthread_mutex_t err_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
@@ -79,7 +202,7 @@ struct BAIDU_CACHELINE_ALIGNMENT SocketExtra : public
brpc::SocketUser {
times = 0;
}
- virtual void BeforeRecycle(brpc::Socket* m) {
+ void BeforeRecycle(brpc::Socket* m) override {
pthread_mutex_lock(&rel_fd_mutex);
rel_fd.push_back(m->fd());
pthread_mutex_unlock(&rel_fd_mutex);
@@ -267,3 +390,130 @@ TEST_F(EventDispatcherTest, dispatch_tasks) {
ASSERT_EQ(NCLIENT, info.free_item_num - old_info.free_item_num);
#endif
}
+
+// Unique identifier of a EventPipe.
+// Users shall store EventFDId instead of EventPipe and call
EventPipe::Address()
+// to convert the identifier to an unique_ptr at each access. Whenever a
+// unique_ptr is not destructed, the enclosed EventPipe will not be recycled.
+typedef brpc::VRefId EventPipeId;
+
+const brpc::VRefId INVALID_EVENT_PIPE_ID = brpc::INVALID_VREF_ID;
+
+class EventPipe;
+typedef brpc::VersionedRefWithIdUniquePtr<EventPipe> EventPipeUniquePtr;
+
+
+class EventPipe : public brpc::VersionedRefWithId<EventPipe> {
+public:
+ explicit EventPipe(Forbidden f)
+ : brpc::VersionedRefWithId<EventPipe>(f)
+ , _pipe_fds{-1, -1}
+ , _input_event_count(0)
+ {}
+
+ int Notify() {
+ char c = 0;
+ if (write(_pipe_fds[1], &c, 1) != 1) {
+ PLOG(ERROR) << "Fail to write to _pipe_fds[1]";
+ return -1;
+ }
+ return 0;
+ }
+
+private:
+friend class VersionedRefWithId<EventPipe>;
+friend class brpc::IOEvent<EventPipe>;
+
+ int OnCreated() {
+ if (pipe(_pipe_fds)) {
+ PLOG(FATAL) << "Fail to create _pipe_fds";
+ return -1;
+ }
+ if (_io_event.Init((void*)id()) != 0) {
+ LOG(ERROR) << "Fail to init IOEvent";
+ return -1;
+ }
+ _io_event.set_bthread_tag(bthread_self_tag());
+ if (_io_event.AddConsumer(_pipe_fds[0]) != 0) {
+ PLOG(ERROR) << "Fail to add SocketId=" << id()
+ << " into EventDispatcher";
+ return -1;
+ }
+
+
+ _input_event_count = 0;
+ return 0;
+ }
+
+ void BeforeRecycled() {
+ brpc::GetGlobalEventDispatcher(_pipe_fds[0], bthread_self_tag())
+ .RemoveConsumer(_pipe_fds[0]);
+ _io_event.Reset();
+ if (_pipe_fds[0] >= 0) {
+ close(_pipe_fds[0]);
+ }
+ if (_pipe_fds[1] >= 0) {
+ close(_pipe_fds[1]);
+ }
+ }
+
+ static int OnInputEvent(void* user_data, uint32_t,
+ const bthread_attr_t&) {
+ auto id = reinterpret_cast<EventPipeId>(user_data);
+ EventPipeUniquePtr ptr;
+ if (EventPipe::Address(id, &ptr) != 0) {
+ LOG(WARNING) << "Fail to address EventPipe";
+ return -1;
+ }
+
+ char buf[1024];
+ ssize_t nr = read(ptr->_pipe_fds[0], &buf, arraysize(buf));
+ if (nr <= 0) {
+ if (errno == EAGAIN) {
+ return 0;
+ } else {
+ PLOG(WARNING) << "Fail to read from _pipe_fds[0]";
+ ptr->SetFailed();
+ return -1;
+ }
+ }
+
+ ptr->_input_event_count += nr;
+ return 0;
+ }
+
+ static int OnOutputEvent(void*, uint32_t,
+ const bthread_attr_t&) {
+ EXPECT_TRUE(false) << "Should not be called";
+ return 0;
+ }
+
+ brpc::IOEvent<EventPipe> _io_event;
+ int _pipe_fds[2];
+
+ size_t _input_event_count;
+};
+
+TEST_F(EventDispatcherTest, customize_dispatch_task) {
+ EventPipeId id = INVALID_EVENT_PIPE_ID;
+ ASSERT_EQ(0, EventPipe::Create(&id));
+ ASSERT_NE(INVALID_EVENT_PIPE_ID, id);
+ EventPipeUniquePtr ptr;
+ ASSERT_EQ(0, EventPipe::Address(id, &ptr));
+ ASSERT_EQ(2, ptr->nref());
+ ASSERT_FALSE(ptr->Failed());
+
+ ASSERT_EQ((size_t)0, ptr->_input_event_count);
+ const size_t N = 10000;
+ for (size_t i = 0; i < N; ++i) {
+ ASSERT_EQ(0, ptr->Notify());
+ }
+ usleep(1000 * 50);
+ ASSERT_EQ(N, ptr->_input_event_count);
+
+ ASSERT_EQ(0, ptr->SetFailed());
+ ASSERT_TRUE(ptr->Failed());
+ ptr.reset();
+ ASSERT_EQ(nullptr, ptr);
+ ASSERT_NE(0, EventPipe::Address(id, &ptr));
+}
diff --git a/test/brpc_load_balancer_unittest.cpp
b/test/brpc_load_balancer_unittest.cpp
index d98c11b5..14b0131c 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -1107,7 +1107,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
{
brpc::SocketUniquePtr dummy_ptr;
ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(),
&dummy_ptr));
- dummy_ptr->Revive();
+ dummy_ptr->Revive(2);
}
bthread_usleep(brpc::FLAGS_detect_available_server_interval_ms * 1000);
// After one server is revived, the reject rate should be 50%
diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp
index 1176676c..573ab2ed 100644
--- a/test/brpc_redis_unittest.cpp
+++ b/test/brpc_redis_unittest.cpp
@@ -1080,7 +1080,7 @@ public:
brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>&
args,
brpc::RedisReply* output,
- bool flush_batched) {
+ bool flush_batched) override {
output->SetStatus("OK");
return brpc::REDIS_CMD_CONTINUE;
}
@@ -1093,7 +1093,7 @@ public:
public:
brpc::RedisCommandHandlerResult Run(const
std::vector<butil::StringPiece>& args,
brpc::RedisReply* output,
- bool flush_batched) {
+ bool flush_batched) override {
if (args[0] == "multi") {
output->SetError("ERR duplicate multi");
return brpc::REDIS_CMD_CONTINUE;
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]