This is an automated email from the ASF dual-hosted git repository.

wwbmmm pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/brpc.git


The following commit(s) were added to refs/heads/master by this push:
     new 122355a2 EventDispatcher supports various IO types (#2560)
122355a2 is described below

commit 122355a29e2dd3341ad6f47e17682b482c45b7e4
Author: Bright Chen <chenguangmin...@foxmail.com>
AuthorDate: Mon Jun 3 16:29:47 2024 +0800

    EventDispatcher supports various IO types (#2560)
---
 src/brpc/acceptor.cpp                   |   2 +-
 src/brpc/details/health_check.cpp       |   3 +-
 src/brpc/event_dispatcher.cpp           |  18 +
 src/brpc/event_dispatcher.h             | 211 ++++++++++-
 src/brpc/event_dispatcher_epoll.cpp     |  85 +++--
 src/brpc/event_dispatcher_kqueue.cpp    |  82 ++---
 src/brpc/socket.cpp                     | 592 ++++++++++++++----------------
 src/brpc/socket.h                       | 107 ++----
 src/brpc/socket_id.h                    |  16 +-
 src/brpc/socket_inl.h                   | 193 +---------
 src/brpc/versioned_ref_with_id.h        | 624 ++++++++++++++++++++++++++++++++
 src/butil/memory/scope_guard.h          |  13 +-
 src/butil/type_traits.h                 |  36 +-
 test/brpc_event_dispatcher_unittest.cpp | 266 +++++++++++++-
 test/brpc_load_balancer_unittest.cpp    |   2 +-
 test/brpc_redis_unittest.cpp            |   4 +-
 16 files changed, 1531 insertions(+), 723 deletions(-)

diff --git a/src/brpc/acceptor.cpp b/src/brpc/acceptor.cpp
index d2657258..e8e6dcbe 100644
--- a/src/brpc/acceptor.cpp
+++ b/src/brpc/acceptor.cpp
@@ -321,7 +321,7 @@ void Acceptor::OnNewConnectionsUntilEAGAIN(Socket* 
acception) {
                 // Always add this socket into `_socket_map' whether it
                 // has been `SetFailed' or not, whether `Acceptor' is
                 // running or not. Otherwise, `Acceptor::BeforeRecycle'
-                // may be called (inside Socket::OnRecycle) after `Acceptor'
+                // may be called (inside Socket::BeforeRecycled) after 
`Acceptor'
                 // has been destroyed
                 am->_socket_map.insert(socket_id, ConnectStatistics());
             }
diff --git a/src/brpc/details/health_check.cpp 
b/src/brpc/details/health_check.cpp
index ea6e7874..81008255 100644
--- a/src/brpc/details/health_check.cpp
+++ b/src/brpc/details/health_check.cpp
@@ -210,7 +210,8 @@ bool HealthCheckTask::OnTriggeringTask(timespec* 
next_abstime) {
             ptr->_ninflight_app_health_check.fetch_add(
                     1, butil::memory_order_relaxed);
         }
-        ptr->Revive();
+        // See comments above.
+        ptr->Revive(2/*note*/);
         ptr->_hc_count = 0;
         if (!FLAGS_health_check_path.empty()) {
             HealthCheckManager::StartCheck(_id, ptr->_health_check_interval_s);
diff --git a/src/brpc/event_dispatcher.cpp b/src/brpc/event_dispatcher.cpp
index 689e80da..53495ea6 100644
--- a/src/brpc/event_dispatcher.cpp
+++ b/src/brpc/event_dispatcher.cpp
@@ -71,6 +71,24 @@ EventDispatcher& GetGlobalEventDispatcher(int fd, 
bthread_tag_t tag) {
     return g_edisp[tag * FLAGS_event_dispatcher_num + index];
 }
 
+int IOEventData::OnCreated(const IOEventDataOptions& options) {
+    if (!options.input_cb) {
+        LOG(ERROR) << "Invalid input_cb=NULL";
+        return -1;
+    }
+    if (!options.output_cb) {
+        LOG(ERROR) << "Invalid output_cb=NULL";
+        return -1;
+    }
+
+    _options = options;
+    return 0;
+}
+
+void IOEventData::BeforeRecycled() {
+    _options = { NULL, NULL, NULL };
+}
+
 } // namespace brpc
 
 #if defined(OS_LINUX)
diff --git a/src/brpc/event_dispatcher.h b/src/brpc/event_dispatcher.h
index 0de74df6..51c404e2 100644
--- a/src/brpc/event_dispatcher.h
+++ b/src/brpc/event_dispatcher.h
@@ -21,16 +21,78 @@
 
 #include "butil/macros.h"                     // DISALLOW_COPY_AND_ASSIGN
 #include "bthread/types.h"                   // bthread_t, bthread_attr_t
-#include "brpc/socket.h"                     // Socket, SocketId
+#include "brpc/versioned_ref_with_id.h"
 
 
 namespace brpc {
 
+// Unique identifier of a IOEventData.
+// Users shall store EventDataId instead of EventData and call 
EventData::Address()
+// to convert the identifier to an unique_ptr at each access. Whenever a
+// unique_ptr is not destructed, the enclosed EventData will not be recycled.
+typedef VRefId IOEventDataId;
+
+const VRefId INVALID_IO_EVENT_DATA_ID = INVALID_VREF_ID;
+
+class IOEventData;
+
+typedef VersionedRefWithIdUniquePtr<IOEventData> EventDataUniquePtr;
+
+// User callback type of input event and output event.
+typedef int (*InputEventCallback) (void* id, uint32_t events,
+                                   const bthread_attr_t& thread_attr);
+typedef InputEventCallback OutputEventCallback;
+
+struct IOEventDataOptions {
+    // Callback for input event.
+    InputEventCallback input_cb;
+    // Callback for output event.
+    OutputEventCallback output_cb;
+    // User data.
+    void* user_data;
+};
+
+// EventDispatcher finds IOEventData by IOEventDataId which is
+// stored in epoll/kqueue data, and calls input/output event callback,
+// so EventDispatcher supports various IO types, such as socket,
+// pipe, eventfd, timerfd, etc.
+class IOEventData : public VersionedRefWithId<IOEventData> {
+public:
+    explicit IOEventData(Forbidden f)
+        : VersionedRefWithId<IOEventData>(f)
+        , _options{ NULL, NULL, NULL } {}
+
+    DISALLOW_COPY_AND_ASSIGN(IOEventData);
+
+    int CallInputEventCallback(uint32_t events,
+                               const bthread_attr_t& thread_attr) {
+        return _options.input_cb(_options.user_data, events, thread_attr);
+    }
+
+    int CallOutputEventCallback(uint32_t events,
+                                const bthread_attr_t& thread_attr) {
+        return _options.output_cb(_options.user_data, events, thread_attr);
+    }
+
+private:
+friend class VersionedRefWithId<IOEventData>;
+
+    int OnCreated(const IOEventDataOptions& options);
+    void BeforeRecycled();
+
+    IOEventDataOptions _options;
+};
+
+namespace rdma {
+class RdmaEndpoint;
+}
+
 // Dispatch edge-triggered events of file descriptors to consumers
 // running in separate bthreads.
 class EventDispatcher {
 friend class Socket;
 friend class rdma::RdmaEndpoint;
+template <typename T> friend class IOEvent;
 public:
     EventDispatcher();
     
@@ -40,7 +102,7 @@ public:
     // Use |*consumer_thread_attr| (if it's not NULL) as the attribute to
     // create bthreads running user callbacks.
     // Returns 0 on success, -1 otherwise.
-    virtual int Start(const bthread_attr_t* consumer_thread_attr);
+    virtual int Start(const bthread_attr_t* thread_attr);
 
     // True iff this dispatcher is running in a bthread
     bool Running() const;
@@ -57,19 +119,19 @@ public:
     // When the file descriptor is removed from internal epoll, the Socket
     // will be dereferenced once additionally.
     // Returns 0 on success, -1 otherwise.
-    int AddConsumer(SocketId socket_id, int fd);
+    int AddConsumer(IOEventDataId event_data_id, int fd);
 
     // Watch EPOLLOUT event on `fd' into epoll device. If `pollin' is
     // true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
     // be used instead of EPOLL_CTL_ADD. When event arrives,
     // `Socket::HandleEpollOut' will be called with `socket_id'
     // Returns 0 on success, -1 otherwise and errno is set
-    int RegisterEvent(SocketId socket_id, int fd, bool pollin);
+    int RegisterEvent(IOEventDataId event_data_id, int fd, bool pollin);
     
     // Remove EPOLLOUT event on `fd'. If `pollin' is true, EPOLLIN event
     // will be kept and EPOLL_CTL_MOD will be used instead of EPOLL_CTL_DEL
     // Returns 0 on success, -1 otherwise and errno is set
-    int UnregisterEvent(SocketId socket_id, int fd, bool pollin);
+    int UnregisterEvent(IOEventDataId event_data_id, int fd, bool pollin);
 
 private:
     DISALLOW_COPY_AND_ASSIGN(EventDispatcher);
@@ -83,8 +145,33 @@ private:
     // Remove the file descriptor `fd' from epoll.
     int RemoveConsumer(int fd);
 
-    // The epoll to watch events.
-    int _epfd;
+    // Call user callback of input event and output event.
+    template<bool IsInputEvent>
+    static int OnEvent(IOEventDataId event_data_id, uint32_t events,
+                       const bthread_attr_t& thread_attr) {
+        EventDataUniquePtr data;
+        if (IOEventData::Address(event_data_id, &data) != 0) {
+            return -1;
+        }
+        return IsInputEvent ?
+               data->CallInputEventCallback(events, thread_attr) :
+               data->CallOutputEventCallback(events, thread_attr);
+    }
+
+    static int CallInputEventCallback(IOEventDataId event_data_id,
+                                      uint32_t events,
+                                      const bthread_attr_t& thread_attr) {
+        return OnEvent<true>(event_data_id, events, thread_attr);
+    }
+
+    static int CallOutputEventCallback(IOEventDataId event_data_id,
+                                       uint32_t events,
+                                       const bthread_attr_t& thread_attr) {
+        return OnEvent<false>(event_data_id, events, thread_attr);
+    }
+
+    // The epoll/kqueue fd to watch events.
+    int _event_dispatcher_fd;
 
     // false unless Stop() is called.
     volatile bool _stop;
@@ -93,7 +180,7 @@ private:
     bthread_t _tid;
 
     // The attribute of bthreads calling user callbacks.
-    bthread_attr_t _consumer_thread_attr;
+    bthread_attr_t _thread_attr;
 
     // Pipe fds to wakeup EventDispatcher from `epoll_wait' in order to quit
     int _wakeup_fds[2];
@@ -101,6 +188,114 @@ private:
 
 EventDispatcher& GetGlobalEventDispatcher(int fd, bthread_tag_t tag);
 
+// IOEvent class manages the IO events of a file descriptor conveniently.
+template <typename T>
+class IOEvent {
+public:
+    IOEvent()
+        : _init(false)
+        , _event_data_id(INVALID_IO_EVENT_DATA_ID)
+        , _bthread_tag(bthread_self_tag()) {}
+
+    ~IOEvent() { Reset(); }
+
+    DISALLOW_COPY_AND_ASSIGN(IOEvent);
+
+    int Init(void* user_data) {
+        if (_init) {
+            LOG(WARNING) << "IOEvent has been initialized";
+            return 0;
+        }
+        IOEventDataOptions options{ OnInputEvent, OnOutputEvent, user_data };
+        if (IOEventData::Create(&_event_data_id, options) != 0) {
+            LOG(ERROR) << "Fail to create EventData";
+            return -1;
+        }
+        _init = true;
+        return 0;
+    }
+
+    void Reset() {
+        if (_init) {
+            IOEventData::SetFailedById(_event_data_id);
+            _init = false;
+        }
+    }
+
+    // See comments of `EventDispatcher::AddConsumer'.
+    int AddConsumer(int fd) {
+        if (!_init) {
+            LOG(ERROR) << "IOEvent has not been initialized";
+            return -1;
+        }
+        return GetGlobalEventDispatcher(fd, _bthread_tag)
+            .AddConsumer(_event_data_id, fd);
+    }
+
+    // See comments of `EventDispatcher::RemoveConsumer'.
+    int RemoveConsumer(int fd) {
+        if (!_init) {
+            LOG(ERROR) << "IOEvent has not been initialized";
+            return -1;
+        }
+        return GetGlobalEventDispatcher(fd, _bthread_tag).RemoveConsumer(fd);
+    }
+
+    // See comments of `EventDispatcher::RegisterEvent'.
+    int RegisterEvent(int fd, bool pollin) {
+        if (!_init) {
+            LOG(ERROR) << "IOEvent has not been initialized";
+            return -1;
+        }
+        return GetGlobalEventDispatcher(fd, _bthread_tag)
+            .RegisterEvent(_event_data_id, fd, pollin);
+    }
+
+    // See comments of `EventDispatcher::UnregisterEvent'.
+    int UnregisterEvent(int fd, bool pollin) {
+        if (!_init) {
+            LOG(ERROR) << "IOEvent has not been initialized";
+            return -1;
+        }
+        return GetGlobalEventDispatcher(fd, _bthread_tag)
+            .UnregisterEvent(_event_data_id, fd, pollin);
+    }
+
+    void set_bthread_tag(bthread_tag_t bthread_tag) {
+        _bthread_tag = bthread_tag;
+    }
+    bthread_tag_t bthread_tag() const {
+        return _bthread_tag;
+    }
+
+private:
+    // Generic callback to handle input event.
+    static int OnInputEvent(void* user_data, uint32_t events,
+                            const bthread_attr_t& thread_attr) {
+        static_assert(
+            butil::is_result_int<decltype(&T::OnInputEvent),
+                                 void*, uint32_t,
+                                 bthread_attr_t>::value,
+            "T::OnInputEvent signature mismatch");
+        return T::OnInputEvent(user_data, events, thread_attr);
+    }
+
+    // Generic callback to handle output event.
+    static int OnOutputEvent(void* user_data, uint32_t events,
+                             const bthread_attr_t& thread_attr) {
+        static_assert(
+            butil::is_result_int<decltype(&T::OnOutputEvent),
+                                 void*, uint32_t,
+                                 bthread_attr_t>::value,
+            "T::OnInputEvent signature mismatch");
+        return T::OnOutputEvent(user_data, events, thread_attr);
+    }
+
+    bool _init;
+    IOEventDataId _event_data_id;
+    bthread_tag_t _bthread_tag;
+};
+
 } // namespace brpc
 
 
diff --git a/src/brpc/event_dispatcher_epoll.cpp 
b/src/brpc/event_dispatcher_epoll.cpp
index 1ac7647d..64717b16 100644
--- a/src/brpc/event_dispatcher_epoll.cpp
+++ b/src/brpc/event_dispatcher_epoll.cpp
@@ -23,17 +23,16 @@
 namespace brpc {
 
 EventDispatcher::EventDispatcher()
-    : _epfd(-1)
+    : _event_dispatcher_fd(-1)
     , _stop(false)
     , _tid(0)
-    , _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
-{
-    _epfd = epoll_create(1024 * 1024);
-    if (_epfd < 0) {
+    , _thread_attr(BTHREAD_ATTR_NORMAL) {
+    _event_dispatcher_fd = epoll_create(1024 * 1024);
+    if (_event_dispatcher_fd < 0) {
         PLOG(FATAL) << "Fail to create epoll";
         return;
     }
-    CHECK_EQ(0, butil::make_close_on_exec(_epfd));
+    CHECK_EQ(0, butil::make_close_on_exec(_event_dispatcher_fd));
 
     _wakeup_fds[0] = -1;
     _wakeup_fds[1] = -1;
@@ -46,9 +45,9 @@ EventDispatcher::EventDispatcher()
 EventDispatcher::~EventDispatcher() {
     Stop();
     Join();
-    if (_epfd >= 0) {
-        close(_epfd);
-        _epfd = -1;
+    if (_event_dispatcher_fd >= 0) {
+        close(_event_dispatcher_fd);
+        _event_dispatcher_fd = -1;
     }
     if (_wakeup_fds[0] > 0) {
         close(_wakeup_fds[0]);
@@ -57,7 +56,7 @@ EventDispatcher::~EventDispatcher() {
 }
 
 int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
-    if (_epfd < 0) {
+    if (_event_dispatcher_fd < 0) {
         LOG(FATAL) << "epoll was not created";
         return -1;
     }
@@ -68,22 +67,21 @@ int EventDispatcher::Start(const bthread_attr_t* 
consumer_thread_attr) {
         return -1;
     }
 
-    // Set _consumer_thread_attr before creating epoll thread to make sure
+    // Set _thread_attr before creating epoll thread to make sure
     // everyting seems sane to the thread.
-    _consumer_thread_attr = (consumer_thread_attr  ?
-                             *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
+    _thread_attr = consumer_thread_attr  ?
+        *consumer_thread_attr : BTHREAD_ATTR_NORMAL;
 
-    //_consumer_thread_attr is used in StartInputEvent(), assign flag 
NEVER_QUIT to it will cause new bthread
+    //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it 
will cause new bthread
     // that created by epoll_wait() never to quit.
-    bthread_attr_t epoll_thread_attr = _consumer_thread_attr | 
BTHREAD_NEVER_QUIT;
+    bthread_attr_t epoll_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT;
 
     // Polling thread uses the same attr for consumer threads (NORMAL right
     // now). Previously, we used small stack (32KB) which may be overflowed
     // when the older comlog (e.g. 3.1.85) calls com_openlog_r(). Since this
     // is also a potential issue for consumer threads, using the same attr
     // should be a reasonable solution.
-    int rc = bthread_start_background(
-        &_tid, &epoll_thread_attr, RunThis, this);
+    int rc = bthread_start_background(&_tid, &epoll_thread_attr, RunThis, 
this);
     if (rc) {
         LOG(FATAL) << "Fail to create epoll thread: " << berror(rc);
         return -1;
@@ -92,15 +90,15 @@ int EventDispatcher::Start(const bthread_attr_t* 
consumer_thread_attr) {
 }
 
 bool EventDispatcher::Running() const {
-    return !_stop  && _epfd >= 0 && _tid != 0;
+    return !_stop  && _event_dispatcher_fd >= 0 && _tid != 0;
 }
 
 void EventDispatcher::Stop() {
     _stop = true;
 
-    if (_epfd >= 0) {
+    if (_event_dispatcher_fd >= 0) {
         epoll_event evt = { EPOLLOUT,  { NULL } };
-        epoll_ctl(_epfd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
+        epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_ADD, _wakeup_fds[1], &evt);
     }
 }
 
@@ -111,62 +109,62 @@ void EventDispatcher::Join() {
     }
 }
 
-int EventDispatcher::RegisterEvent(SocketId socket_id, int fd, bool pollin) {
-    if (_epfd < 0) {
+int EventDispatcher::RegisterEvent(IOEventDataId event_data_id,
+                                   int fd, bool pollin) {
+    if (_event_dispatcher_fd < 0) {
         errno = EINVAL;
         return -1;
     }
 
     epoll_event evt;
-    evt.data.u64 = socket_id;
+    evt.data.u64 = event_data_id;
     evt.events = EPOLLOUT | EPOLLET;
 #ifdef BRPC_SOCKET_HAS_EOF
     evt.events |= has_epollrdhup;
 #endif
     if (pollin) {
         evt.events |= EPOLLIN;
-        if (epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt) < 0) {
+        if (epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_MOD, fd, &evt) < 0) {
             // This fd has been removed from epoll via `RemoveConsumer',
             // in which case errno will be ENOENT
             return -1;
         }
     } else {
-        if (epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt) < 0) {
+        if (epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_ADD, fd, &evt) < 0) {
             return -1;
         }
     }
     return 0;
 }
 
-int EventDispatcher::UnregisterEvent(SocketId socket_id, 
-                                    int fd, bool pollin) {
+int EventDispatcher::UnregisterEvent(IOEventDataId event_data_id,
+                                     int fd, bool pollin) {
     if (pollin) {
         epoll_event evt;
-        evt.data.u64 = socket_id;
+        evt.data.u64 = event_data_id;
         evt.events = EPOLLIN | EPOLLET;
 #ifdef BRPC_SOCKET_HAS_EOF
         evt.events |= has_epollrdhup;
 #endif
-        return epoll_ctl(_epfd, EPOLL_CTL_MOD, fd, &evt);
+        return epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_MOD, fd, &evt);
     } else {
-        return epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL);
+        return epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_DEL, fd, NULL);
     }
     return -1;
 }
 
-int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
-    if (_epfd < 0) {
+int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) {
+    if (_event_dispatcher_fd < 0) {
         errno = EINVAL;
         return -1;
     }
     epoll_event evt;
+    evt.data.u64 = event_data_id;
     evt.events = EPOLLIN | EPOLLET;
-    evt.data.u64 = socket_id;
 #ifdef BRPC_SOCKET_HAS_EOF
     evt.events |= has_epollrdhup;
 #endif
-    return epoll_ctl(_epfd, EPOLL_CTL_ADD, fd, &evt);
-    return -1;
+    return epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_ADD, fd, &evt);
 }
 
 int EventDispatcher::RemoveConsumer(int fd) {
@@ -180,8 +178,8 @@ int EventDispatcher::RemoveConsumer(int fd) {
     // from epoll again! If the fd was level-triggered and there's data left,
     // epoll_wait will keep returning events of the fd continuously, making
     // program abnormal.
-    if (epoll_ctl(_epfd, EPOLL_CTL_DEL, fd, NULL) < 0) {
-        PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << _epfd;
+    if (epoll_ctl(_event_dispatcher_fd, EPOLL_CTL_DEL, fd, NULL) < 0) {
+        PLOG(WARNING) << "Fail to remove fd=" << fd << " from epfd=" << 
_event_dispatcher_fd;
         return -1;
     }
     return 0;
@@ -197,12 +195,12 @@ void EventDispatcher::Run() {
         epoll_event e[32];
 #ifdef BRPC_ADDITIONAL_EPOLL
         // Performance downgrades in examples.
-        int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), 0);
+        int n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), 0);
         if (n == 0) {
-            n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
+            n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1);
         }
 #else
-        const int n = epoll_wait(_epfd, e, ARRAY_SIZE(e), -1);
+        const int n = epoll_wait(_event_dispatcher_fd, e, ARRAY_SIZE(e), -1);
 #endif
         if (_stop) {
             // epoll_ctl/epoll_wait should have some sort of memory fencing
@@ -215,7 +213,7 @@ void EventDispatcher::Run() {
                 // We've checked _stop, no wake-up will be missed.
                 continue;
             }
-            PLOG(FATAL) << "Fail to epoll_wait epfd=" << _epfd;
+            PLOG(FATAL) << "Fail to epoll_wait epfd=" << _event_dispatcher_fd;
             break;
         }
         for (int i = 0; i < n; ++i) {
@@ -225,14 +223,13 @@ void EventDispatcher::Run() {
 #endif
                 ) {
                 // We don't care about the return value.
-                Socket::StartInputEvent(e[i].data.u64, e[i].events,
-                                        _consumer_thread_attr);
+                CallInputEventCallback(e[i].data.u64, e[i].events, 
_thread_attr);
             }
         }
         for (int i = 0; i < n; ++i) {
             if (e[i].events & (EPOLLOUT | EPOLLERR | EPOLLHUP)) {
                 // We don't care about the return value.
-                Socket::HandleEpollOut(e[i].data.u64);
+                CallOutputEventCallback(e[i].data.u64, e[i].events, 
_thread_attr);
             }
         }
     }
diff --git a/src/brpc/event_dispatcher_kqueue.cpp 
b/src/brpc/event_dispatcher_kqueue.cpp
index fa52a204..97ad29bb 100644
--- a/src/brpc/event_dispatcher_kqueue.cpp
+++ b/src/brpc/event_dispatcher_kqueue.cpp
@@ -23,17 +23,16 @@
 namespace brpc {
 
 EventDispatcher::EventDispatcher()
-    : _epfd(-1)
+    : _event_dispatcher_fd(-1)
     , _stop(false)
     , _tid(0)
-    , _consumer_thread_attr(BTHREAD_ATTR_NORMAL)
-{
-    _epfd = kqueue();
-    if (_epfd < 0) {
+    , _thread_attr(BTHREAD_ATTR_NORMAL) {
+    _event_dispatcher_fd = kqueue();
+    if (_event_dispatcher_fd < 0) {
         PLOG(FATAL) << "Fail to create kqueue";
         return;
     }
-    CHECK_EQ(0, butil::make_close_on_exec(_epfd));
+    CHECK_EQ(0, butil::make_close_on_exec(_event_dispatcher_fd));
 
     _wakeup_fds[0] = -1;
     _wakeup_fds[1] = -1;
@@ -46,9 +45,9 @@ EventDispatcher::EventDispatcher()
 EventDispatcher::~EventDispatcher() {
     Stop();
     Join();
-    if (_epfd >= 0) {
-        close(_epfd);
-        _epfd = -1;
+    if (_event_dispatcher_fd >= 0) {
+        close(_event_dispatcher_fd);
+        _event_dispatcher_fd = -1;
     }
     if (_wakeup_fds[0] > 0) {
         close(_wakeup_fds[0]);
@@ -56,8 +55,8 @@ EventDispatcher::~EventDispatcher() {
     }
 }
 
-int EventDispatcher::Start(const bthread_attr_t* consumer_thread_attr) {
-    if (_epfd < 0) {
+int EventDispatcher::Start(const bthread_attr_t* thread_attr) {
+    if (_event_dispatcher_fd < 0) {
         LOG(FATAL) << "kqueue was not created";
         return -1;
     }
@@ -68,14 +67,13 @@ int EventDispatcher::Start(const bthread_attr_t* 
consumer_thread_attr) {
         return -1;
     }
 
-    // Set _consumer_thread_attr before creating kqueue thread to make sure
+    // Set _thread_attr before creating kqueue thread to make sure
     // everyting seems sane to the thread.
-    _consumer_thread_attr = (consumer_thread_attr  ?
-                             *consumer_thread_attr : BTHREAD_ATTR_NORMAL);
+    _thread_attr = (thread_attr ? *thread_attr : BTHREAD_ATTR_NORMAL);
 
-    //_consumer_thread_attr is used in StartInputEvent(), assign flag 
NEVER_QUIT to it will cause new bthread
+    //_thread_attr is used in StartInputEvent(), assign flag NEVER_QUIT to it 
will cause new bthread
     // that created by kevent() never to quit.
-    bthread_attr_t kqueue_thread_attr = _consumer_thread_attr | 
BTHREAD_NEVER_QUIT;
+    bthread_attr_t kqueue_thread_attr = _thread_attr | BTHREAD_NEVER_QUIT;
 
     // Polling thread uses the same attr for consumer threads (NORMAL right
     // now). Previously, we used small stack (32KB) which may be overflowed
@@ -92,17 +90,17 @@ int EventDispatcher::Start(const bthread_attr_t* 
consumer_thread_attr) {
 }
 
 bool EventDispatcher::Running() const {
-    return !_stop  && _epfd >= 0 && _tid != 0;
+    return !_stop  && _event_dispatcher_fd >= 0 && _tid != 0;
 }
 
 void EventDispatcher::Stop() {
     _stop = true;
 
-    if (_epfd >= 0) {
+    if (_event_dispatcher_fd >= 0) {
         struct kevent kqueue_event;
         EV_SET(&kqueue_event, _wakeup_fds[1], EVFILT_WRITE, EV_ADD | EV_ENABLE,
                     0, 0, NULL);
-        kevent(_epfd, &kqueue_event, 1, NULL, 0, NULL);
+        kevent(_event_dispatcher_fd, &kqueue_event, 1, NULL, 0, NULL);
     }
 }
 
@@ -113,8 +111,9 @@ void EventDispatcher::Join() {
     }
 }
 
-int EventDispatcher::RegisterEvent(SocketId socket_id, int fd, bool pollin) {
-    if (_epfd < 0) {
+int EventDispatcher::RegisterEvent(IOEventDataId event_data_id,
+                                   int fd, bool pollin) {
+    if (_event_dispatcher_fd < 0) {
         errno = EINVAL;
         return -1;
     }
@@ -122,44 +121,44 @@ int EventDispatcher::RegisterEvent(SocketId socket_id, 
int fd, bool pollin) {
     struct kevent evt;
     //TODO(zhujiashun): add EV_EOF
     EV_SET(&evt, fd, EVFILT_WRITE, EV_ADD | EV_ENABLE | EV_CLEAR,
-                0, 0, (void*)socket_id);
-    if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
+           0, 0, (void*)event_data_id);
+    if (kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL) < 0) {
         return -1;
     }
     if (pollin) {
         EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
-                    0, 0, (void*)socket_id);
-        if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
+               0, 0, (void*)event_data_id);
+        if (kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL) < 0) {
             return -1;
         }
     }
     return 0;
 }
 
-int EventDispatcher::UnregisterEvent(SocketId socket_id, 
-                                    int fd, bool pollin) {
+int EventDispatcher::UnregisterEvent(IOEventDataId event_data_id,
+                                     int fd, bool pollin) {
     struct kevent evt;
     EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-    if (kevent(_epfd, &evt, 1, NULL, 0, NULL) < 0) {
+    if (kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL) < 0) {
         return -1;
     }
     if (pollin) {
         EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
-                    0, 0, (void*)socket_id);
-        return kevent(_epfd, &evt, 1, NULL, 0, NULL);
+               0, 0, (void*)event_data_id);
+        return kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL);
     }
     return 0;
 }
 
-int EventDispatcher::AddConsumer(SocketId socket_id, int fd) {
-    if (_epfd < 0) {
+int EventDispatcher::AddConsumer(IOEventDataId event_data_id, int fd) {
+    if (_event_dispatcher_fd < 0) {
         errno = EINVAL;
         return -1;
     }
     struct kevent evt;
     EV_SET(&evt, fd, EVFILT_READ, EV_ADD | EV_ENABLE | EV_CLEAR,
-                0, 0, (void*)socket_id);
-    return kevent(_epfd, &evt, 1, NULL, 0, NULL);
+           0, 0, (void*)event_data_id);
+    return kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL);
 }
 
 int EventDispatcher::RemoveConsumer(int fd) {
@@ -175,9 +174,9 @@ int EventDispatcher::RemoveConsumer(int fd) {
     // program abnormal.
     struct kevent evt;
     EV_SET(&evt, fd, EVFILT_READ, EV_DELETE, 0, 0, NULL);
-    kevent(_epfd, &evt, 1, NULL, 0, NULL);
+    kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL);
     EV_SET(&evt, fd, EVFILT_WRITE, EV_DELETE, 0, 0, NULL);
-    kevent(_epfd, &evt, 1, NULL, 0, NULL);
+    kevent(_event_dispatcher_fd, &evt, 1, NULL, 0, NULL);
     return 0;
 }
 
@@ -189,7 +188,7 @@ void* EventDispatcher::RunThis(void* arg) {
 void EventDispatcher::Run() {
     while (!_stop) {
         struct kevent e[32];
-        int n = kevent(_epfd, NULL, 0, e, ARRAY_SIZE(e), NULL);
+        int n = kevent(_event_dispatcher_fd, NULL, 0, e, ARRAY_SIZE(e), NULL);
         if (_stop) {
             // EV_SET/kevent should have some sort of memory fencing
             // guaranteeing that we(after kevent) see _stop set before
@@ -201,20 +200,21 @@ void EventDispatcher::Run() {
                 // We've checked _stop, no wake-up will be missed.
                 continue;
             }
-            PLOG(FATAL) << "Fail to kqueue epfd=" << _epfd;
+            PLOG(FATAL) << "Fail to kqueue epfd=" << _event_dispatcher_fd;
             break;
         }
         for (int i = 0; i < n; ++i) {
             if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_READ) {
                 // We don't care about the return value.
-                Socket::StartInputEvent((SocketId)e[i].udata, e[i].filter,
-                                        _consumer_thread_attr);
+                CallInputEventCallback((IOEventDataId)e[i].udata,
+                                       e[i].filter, _thread_attr);
             }
         }
         for (int i = 0; i < n; ++i) {
             if ((e[i].flags & EV_ERROR) || e[i].filter == EVFILT_WRITE) {
                 // We don't care about the return value.
-                Socket::HandleEpollOut((SocketId)e[i].udata);
+                CallOutputEventCallback((IOEventDataId)e[i].udata,
+                                        e[i].filter, _thread_attr);
             }
         }
     }
diff --git a/src/brpc/socket.cpp b/src/brpc/socket.cpp
index 447bac5f..85aa1507 100644
--- a/src/brpc/socket.cpp
+++ b/src/brpc/socket.cpp
@@ -35,6 +35,7 @@
 #include "butil/logging.h"                        // CHECK
 #include "butil/macros.h"
 #include "butil/class_name.h"                     // butil::class_name
+#include "butil/memory/scope_guard.h"
 #include "brpc/log.h"
 #include "brpc/reloadable_flags.h"          // BRPC_VALIDATE_GFLAG
 #include "brpc/errno.pb.h"
@@ -447,9 +448,9 @@ public:
 
 static const uint64_t AUTH_FLAG = (1ul << 32);
 
-Socket::Socket(Forbidden)
+Socket::Socket(Forbidden f)
     // must be even because Address() relies on evenness of version
-    : _versioned_ref(0)
+    : VersionedRefWithId<Socket>(f)
     , _shared_part(NULL)
     , _nevent(0)
     , _keytable_pool(NULL)
@@ -459,7 +460,6 @@ Socket::Socket(Forbidden)
     , _on_edge_triggered_events(NULL)
     , _user(NULL)
     , _conn(NULL)
-    , _this_id(0)
     , _preferred_index(-1)
     , _hc_count(0)
     , _last_msg_size(0)
@@ -483,7 +483,6 @@ Socket::Socket(Forbidden)
     , _overcrowded(false)
     , _fail_me_at_server_stop(false)
     , _logoff_flag(false)
-    , _additional_ref_status(REF_USING)
     , _error_code(0)
     , _pipeline_q(NULL)
     , _last_writetime_us(0)
@@ -494,8 +493,7 @@ Socket::Socket(Forbidden)
     , _stream_set(NULL)
     , _total_streams_unconsumed_size(0)
     , _ninflight_app_health_check(0)
-    , _http_request_method(HTTP_METHOD_GET)
-{
+    , _http_request_method(HTTP_METHOD_GET) {
     CreateVarsOnce();
     pthread_mutex_init(&_id_wait_list_mutex, NULL);
     _epollout_butex = bthread::butex_create_checked<butil::atomic<int> >();
@@ -621,7 +619,7 @@ int Socket::ResetFileDescriptor(int fd) {
     EnableKeepaliveIfNeeded(fd);
 
     if (_on_edge_triggered_events) {
-        if (GetGlobalEventDispatcher(fd, _bthread_tag).AddConsumer(id(), fd) 
!= 0) {
+        if (_io_event.AddConsumer(fd) != 0) {
             PLOG(ERROR) << "Fail to add SocketId=" << id() 
                         << " into EventDispatcher";
             _fd.store(-1, butil::memory_order_release);
@@ -698,116 +696,260 @@ void Socket::EnableKeepaliveIfNeeded(int fd) {
 //   version: from version part of _versioned_nref, must be an EVEN number.
 //   slot: designated by ResourcePool.
 int Socket::Create(const SocketOptions& options, SocketId* id) {
-    butil::ResourceId<Socket> slot;
-    Socket* const m = butil::get_resource(&slot, Forbidden());
-    if (m == NULL) {
-        LOG(FATAL) << "Fail to get_resource<Socket>";
+    return VersionedRefWithId<Socket>::Create(id, options);
+}
+
+int Socket::OnCreated(const SocketOptions& options) {
+    if (_io_event.Init((void*)id()) != 0) {
+        LOG(ERROR) << "Fail to init IOEvent";
+        SetFailed(ENOMEM, "%s", "Fail to init IOEvent");
         return -1;
     }
+    _io_event.set_bthread_tag(options.bthread_tag);
+    auto guard = butil::MakeScopeGuard([this] {
+        _io_event.Reset();
+    });
+
     g_vars->nsocket << 1;
-    CHECK(NULL == m->_shared_part.load(butil::memory_order_relaxed));
-    m->_nevent.store(0, butil::memory_order_relaxed);
-    m->_keytable_pool = options.keytable_pool;
-    m->_tos = 0;
-    m->_remote_side = options.remote_side;
-    m->_on_edge_triggered_events = options.on_edge_triggered_events;
-    m->_user = options.user;
-    m->_conn = options.conn;
-    m->_app_connect = options.app_connect;
-    // nref can be non-zero due to concurrent AddressSocket().
-    // _this_id will only be used in destructor/Destroy of referenced
-    // slots, which is safe and properly fenced. Although it's better
-    // to put the id into SocketUniquePtr.
-    m->_this_id = MakeSocketId(
-            VersionOfVRef(m->_versioned_ref.fetch_add(
-                    1, butil::memory_order_release)), slot);
-    m->_preferred_index = -1;
-    m->_hc_count = 0;
-    CHECK(m->_read_buf.empty());
+    CHECK(NULL == _shared_part.load(butil::memory_order_relaxed));
+    _nevent.store(0, butil::memory_order_relaxed);
+    _keytable_pool = options.keytable_pool;
+    _tos = 0;
+    _remote_side = options.remote_side;
+    _on_edge_triggered_events = options.on_edge_triggered_events;
+    _user = options.user;
+    _conn = options.conn;
+    _app_connect = options.app_connect;
+    _preferred_index = -1;
+    _hc_count = 0;
+    CHECK(_read_buf.empty());
     const int64_t cpuwide_now = butil::cpuwide_time_us();
-    m->_last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
-    m->reset_parsing_context(options.initial_parsing_context);
-    m->_correlation_id = 0;
-    m->_health_check_interval_s = options.health_check_interval_s;
-    m->_is_hc_related_ref_held = false;
-    m->_hc_started.store(false, butil::memory_order_relaxed);
-    m->_ninprocess.store(1, butil::memory_order_relaxed);
-    m->_auth_flag_error.store(0, butil::memory_order_relaxed);
-    const int rc2 = bthread_id_create(&m->_auth_id, NULL, NULL);
+    _last_readtime_us.store(cpuwide_now, butil::memory_order_relaxed);
+    reset_parsing_context(options.initial_parsing_context);
+    _correlation_id = 0;
+    _health_check_interval_s = options.health_check_interval_s;
+    _is_hc_related_ref_held = false;
+    _hc_started.store(false, butil::memory_order_relaxed);
+    _ninprocess.store(1, butil::memory_order_relaxed);
+    _auth_flag_error.store(0, butil::memory_order_relaxed);
+    const int rc2 = bthread_id_create(&_auth_id, NULL, NULL);
     if (rc2) {
         LOG(ERROR) << "Fail to create auth_id: " << berror(rc2);
-        m->SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
+        SetFailed(rc2, "Fail to create auth_id: %s", berror(rc2));
         return -1;
     }
-    m->_force_ssl = options.force_ssl;
+    _force_ssl = options.force_ssl;
     // Disable SSL check if there is no SSL context
-    m->_ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
-    m->_ssl_session = NULL;
-    m->_ssl_ctx = options.initial_ssl_ctx;
+    _ssl_state = (options.initial_ssl_ctx == NULL ? SSL_OFF : SSL_UNKNOWN);
+    _ssl_session = NULL;
+    _ssl_ctx = options.initial_ssl_ctx;
 #if BRPC_WITH_RDMA
-    CHECK(m->_rdma_ep == NULL);
+    CHECK(_rdma_ep == NULL);
     if (options.use_rdma) {
-        m->_rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
-        if (!m->_rdma_ep) {
+        _rdma_ep = new (std::nothrow)rdma::RdmaEndpoint(m);
+        if (!_rdma_ep) {
             const int saved_errno = errno;
             PLOG(ERROR) << "Fail to create RdmaEndpoint";
-            m->SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
+            SetFailed(saved_errno, "Fail to create RdmaEndpoint: %s",
                          berror(saved_errno));
             return -1;
         }
-        m->_rdma_state = RDMA_UNKNOWN;
+        _rdma_state = RDMA_UNKNOWN;
     } else {
-        m->_rdma_state = RDMA_OFF;
+        _rdma_state = RDMA_OFF;
     }
 #endif
-    m->_connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
-    m->_controller_released_socket.store(false, butil::memory_order_relaxed);
-    m->_overcrowded = false;
-    // May be non-zero for RTMP connections.
-    m->_fail_me_at_server_stop = false;
-    m->_logoff_flag.store(false, butil::memory_order_relaxed);
-    m->_additional_ref_status.store(REF_USING, butil::memory_order_relaxed);
-    m->_error_code = 0;
-    m->_error_text.clear();
-    m->_agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
-    m->_total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
-    m->_ninflight_app_health_check.store(0, butil::memory_order_relaxed);
+    _connection_type_for_progressive_read = CONNECTION_TYPE_UNKNOWN;
+    _controller_released_socket.store(false, butil::memory_order_relaxed);
+    _overcrowded = false;
+    // Maybe non-zero for RTMP connections.
+    _fail_me_at_server_stop = false;
+    _logoff_flag.store(false, butil::memory_order_relaxed);
+    _error_code = 0;
+    _agent_socket_id.store(INVALID_SOCKET_ID, butil::memory_order_relaxed);
+    _total_streams_unconsumed_size.store(0, butil::memory_order_relaxed);
+    _ninflight_app_health_check.store(0, butil::memory_order_relaxed);
     // NOTE: last two params are useless in bthread > r32787
-    const int rc = bthread_id_list_init(&m->_id_wait_list, 512, 512);
+    const int rc = bthread_id_list_init(&_id_wait_list, 512, 512);
     if (rc) {
         LOG(ERROR) << "Fail to init _id_wait_list: " << berror(rc);
-        m->SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
+        SetFailed(rc, "Fail to init _id_wait_list: %s", berror(rc));
         return -1;
     }
-    m->_last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
-    m->_unwritten_bytes.store(0, butil::memory_order_relaxed);
-    m->_keepalive_options = options.keepalive_options;
-    m->_bthread_tag = options.bthread_tag;
-    CHECK(NULL == m->_write_head.load(butil::memory_order_relaxed));
-    m->_is_write_shutdown = false;
-    // Must be last one! Internal fields of this Socket may be access
+    _last_writetime_us.store(cpuwide_now, butil::memory_order_relaxed);
+    _unwritten_bytes.store(0, butil::memory_order_relaxed);
+    _keepalive_options = options.keepalive_options;
+    CHECK(NULL == _write_head.load(butil::memory_order_relaxed));
+    _is_write_shutdown = false;
+    // Must be the last one! Internal fields of this Socket may be accessed
     // just after calling ResetFileDescriptor.
-    if (m->ResetFileDescriptor(options.fd) != 0) {
+    if (ResetFileDescriptor(options.fd) != 0) {
         const int saved_errno = errno;
         PLOG(ERROR) << "Fail to ResetFileDescriptor";
-        m->SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s", 
+        SetFailed(saved_errno, "Fail to ResetFileDescriptor: %s",
                      berror(saved_errno));
         return -1;
     }
-    *id = m->_this_id;
+    guard.dismiss();
+
     return 0;
 }
 
+void Socket::BeforeRecycled() {
+    const bool create_by_connect = CreatedByConnect();
+    if (_app_connect) {
+        std::shared_ptr<AppConnect> tmp;
+        _app_connect.swap(tmp);
+        tmp->StopConnect(this);
+    }
+    if (_conn) {
+        SocketConnection* const saved_conn = _conn;
+        _conn = NULL;
+        saved_conn->BeforeRecycle(this);
+    }
+    if (_user) {
+        SocketUser* const saved_user = _user;
+        _user = NULL;
+        saved_user->BeforeRecycle(this);
+    }
+    SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire);
+    if (sp) {
+        sp->RemoveRefManually();
+    }
+
+    const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
+    if (ValidFileDescriptor(prev_fd)) {
+        if (_on_edge_triggered_events != NULL) {
+            _io_event.RemoveConsumer(prev_fd);
+        }
+        close(prev_fd);
+        if (create_by_connect) {
+            g_vars->channel_conn << -1;
+        }
+    }
+    _io_event.Reset();
+
+#if BRPC_WITH_RDMA
+    if (_rdma_ep) {
+        delete _rdma_ep;
+        _rdma_ep = NULL;
+        _rdma_state = RDMA_UNKNOWN;
+    }
+#endif
+
+    reset_parsing_context(NULL);
+    _read_buf.clear();
+
+    _auth_flag_error.store(0, butil::memory_order_relaxed);
+    bthread_id_error(_auth_id, 0);
+
+    bthread_id_list_destroy(&_id_wait_list);
+
+    if (_ssl_session) {
+        SSL_free(_ssl_session);
+        _ssl_session = NULL;
+    }
+
+    _ssl_ctx = NULL;
+
+    delete _pipeline_q;
+    _pipeline_q = NULL;
+
+    delete _auth_context;
+    _auth_context = NULL;
+
+    delete _stream_set;
+    _stream_set = NULL;
+
+    const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed);
+    if (asid != INVALID_SOCKET_ID) {
+        SocketUniquePtr ptr;
+        if (Socket::Address(asid, &ptr) == 0) {
+            ptr->ReleaseAdditionalReference();
+        }
+    }
+    g_vars->nsocket << -1;
+}
+
+void Socket::OnFailed(int error_code, const std::string& error_text) {
+    // Update _error_text
+    pthread_mutex_lock(&_id_wait_list_mutex);
+    _error_code = error_code;
+    _error_text = error_text;
+    pthread_mutex_unlock(&_id_wait_list_mutex);
+
+    // Do health-checking even if we're not connected before, needed
+    // by Channel to revive never-connected socket when server side
+    // comes online.
+    if (HCEnabled()) {
+        bool expect = false;
+        if (_hc_started.compare_exchange_strong(expect,
+            true,
+            butil::memory_order_relaxed,
+            butil::memory_order_relaxed)) {
+            GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
+            StartHealthCheck(id(),
+                GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
+        } else {
+            // No need to run 2 health checking at the same time.
+            RPC_VLOG << "There is already a health checking running "
+                        "for SocketId=" << id();
+        }
+    }
+    // Wake up all threads waiting on EPOLLOUT when closing fd
+    _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
+    bthread::butex_wake_all(_epollout_butex);
+
+    // Wake up all unresponded RPC.
+    CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe(
+        &_id_wait_list, error_code, error_text,
+        &_id_wait_list_mutex));
+    ResetAllStreams(error_code, error_text);
+    // _app_connect shouldn't be set to NULL in SetFailed otherwise
+    // HC is always not supported.
+    // FIXME: Design a better interface for AppConnect
+    // if (_app_connect) {
+    //     AppConnect* const saved_app_connect = _app_connect;
+    //     _app_connect = NULL;
+    //     saved_app_connect->StopConnect(this);
+    // }
+}
+
+void Socket::AfterRevived() {
+    if (_user) {
+        _user->AfterRevived(this);
+    } else {
+        LOG(INFO) << "Revived " << description() << " (Connectable)";
+    }
+}
+
+std::string Socket::OnDescription() const {
+    // NOTE: The output of `description()' should be consistent with 
operator<<()
+    std::string result;
+    result.reserve(64);
+    const int saved_fd = fd();
+    if (saved_fd >= 0) {
+        butil::string_appendf(&result, "fd=%d", saved_fd);
+    }
+    butil::string_appendf(&result, " addr=%s",
+        butil::endpoint2str(remote_side()).c_str());
+    const int local_port = local_side().port;
+    if (local_port > 0) {
+        butil::string_appendf(&result, ":%d", local_port);
+    }
+    return result;
+}
+
 int Socket::WaitAndReset(int32_t expected_nref) {
-    const uint32_t id_ver = VersionOfSocketId(_this_id);
+    const uint32_t id_ver = VersionOfVRefId(id());
     uint64_t vref;
     // Wait until nref == expected_nref.
-    while (1) {
+    while (true) {
         // The acquire fence pairs with release fence in Dereference to avoid
         // inconsistent states to be seen by others.
-        vref = _versioned_ref.load(butil::memory_order_acquire);
+        vref = versioned_ref();
         if (VersionOfVRef(vref) != id_ver + 1) {
-            LOG(WARNING) << "SocketId=" << _this_id << " is already alive or 
recycled";
+            LOG(WARNING) << "SocketId=" << id() << " is already alive or 
recycled";
             return -1;
         }
         if (NRefOfVRef(vref) > expected_nref) {
@@ -816,7 +958,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
                 return -1;
             }
         } else if (NRefOfVRef(vref) < expected_nref) {
-            RPC_VLOG << "SocketId=" << _this_id 
+            RPC_VLOG << "SocketId=" << id()
                      << " was abandoned during health checking";
             return -1;
         } else {
@@ -824,7 +966,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
             // so no need to do health checking.
             if (!_is_hc_related_ref_held) {
                 RPC_VLOG << "Nobody holds a health-checking-related reference"
-                         << " for SocketId=" << _this_id;
+                         << " for SocketId=" << id();
                 return -1;
             }
 
@@ -836,7 +978,7 @@ int Socket::WaitAndReset(int32_t expected_nref) {
     const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
     if (ValidFileDescriptor(prev_fd)) {
         if (_on_edge_triggered_events != NULL) {
-            GetGlobalEventDispatcher(prev_fd, 
_bthread_tag).RemoveConsumer(prev_fd);
+            _io_event.RemoveConsumer(prev_fd);
         }
         close(prev_fd);
         if (CreatedByConnect()) {
@@ -892,59 +1034,6 @@ int Socket::WaitAndReset(int32_t expected_nref) {
     return 0;
 }
 
-// We don't care about the return value of Revive.
-void Socket::Revive() {
-    const uint32_t id_ver = VersionOfSocketId(_this_id);
-    uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
-    _additional_ref_status.store(REF_REVIVING, butil::memory_order_relaxed);
-    while (1) {
-        CHECK_EQ(id_ver + 1, VersionOfVRef(vref));
-        
-        int32_t nref = NRefOfVRef(vref);
-        if (nref <= 1) {
-            // Set status to REF_RECYLED since no one uses this socket
-            _additional_ref_status.store(REF_RECYCLED, 
butil::memory_order_relaxed);
-            CHECK_EQ(1, nref);
-            LOG(WARNING) << *this << " was abandoned during revival";
-            return;
-        }
-        // +1 is the additional ref added in Create(). TODO(gejun): we should
-        // remove this additional nref someday.
-        if (_versioned_ref.compare_exchange_weak(
-                vref, MakeVRef(id_ver, nref + 1/*note*/),
-                butil::memory_order_release,
-                butil::memory_order_relaxed)) {
-            // Set status to REF_USING since we add additional ref again
-            _additional_ref_status.store(REF_USING, 
butil::memory_order_relaxed);
-            if (_user) {
-                _user->AfterRevived(this);
-            } else {
-                LOG(INFO) << "Revived " << *this << " (Connectable)";
-            }
-            return;
-        }
-    }
-}
-
-int Socket::ReleaseAdditionalReference() {
-    do {
-        AdditionalRefStatus expect = REF_USING;
-        if (_additional_ref_status.compare_exchange_strong(
-            expect,
-            REF_RECYCLED,
-            butil::memory_order_relaxed,
-            butil::memory_order_relaxed)) {
-            return Dereference();
-        }
-
-        if (expect == REF_REVIVING) { // sched_yield to wait until status is 
not REF_REVIVING
-            sched_yield();
-        } else {
-            return -1; // REF_RECYCLED
-        }
-    } while (1);
-}
-
 void Socket::AddRecentError() {
     SharedPart* sp = GetSharedPart();
     if (sp) {
@@ -968,87 +1057,6 @@ int Socket::isolated_times() const {
     return 0;
 }
 
-int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
-    if (error_code == 0) {
-        CHECK(false) << "error_code is 0";
-        error_code = EFAILEDSOCKET;
-    }
-    const uint32_t id_ver = VersionOfSocketId(_this_id);
-    uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
-    for (;;) {  // need iteration to retry compare_exchange_strong
-        if (VersionOfVRef(vref) != id_ver) {
-            return -1;
-        }
-        // Try to set version=id_ver+1 (to make later Address() return NULL),
-        // retry on fail.
-        if (_versioned_ref.compare_exchange_strong(
-                vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
-                butil::memory_order_release,
-                butil::memory_order_relaxed)) {
-            // Update _error_text
-            std::string error_text;
-            if (error_fmt != NULL) {
-                va_list ap;
-                va_start(ap, error_fmt);
-                butil::string_vprintf(&error_text, error_fmt, ap);
-                va_end(ap);
-            }
-            pthread_mutex_lock(&_id_wait_list_mutex);
-            _error_code = error_code;
-            _error_text = error_text;
-            pthread_mutex_unlock(&_id_wait_list_mutex);
-            
-            // Do health-checking even if we're not connected before, needed
-            // by Channel to revive never-connected socket when server side
-            // comes online.
-            if (HCEnabled()) {
-                bool expect = false;
-                if (_hc_started.compare_exchange_strong(expect,
-                                                        true,
-                                                        
butil::memory_order_relaxed,
-                                                        
butil::memory_order_relaxed)) {
-                    GetOrNewSharedPart()->circuit_breaker.MarkAsBroken();
-                    StartHealthCheck(id(),
-                        
GetOrNewSharedPart()->circuit_breaker.isolation_duration_ms());
-                } else {
-                    // No need to run 2 health checking at the same time.
-                    RPC_VLOG << "There is already a health checking running "
-                                "for SocketId=" << _this_id;
-                }
-            }
-            // Wake up all threads waiting on EPOLLOUT when closing fd
-            _epollout_butex->fetch_add(1, butil::memory_order_relaxed);
-            bthread::butex_wake_all(_epollout_butex);
-
-            // Wake up all unresponded RPC.
-            CHECK_EQ(0, bthread_id_list_reset2_pthreadsafe(
-                         &_id_wait_list, error_code, error_text,
-                         &_id_wait_list_mutex));
-
-            ResetAllStreams(error_code, error_text);
-            // _app_connect shouldn't be set to NULL in SetFailed otherwise
-            // HC is always not supported.
-            // FIXME: Design a better interface for AppConnect
-            // if (_app_connect) {
-            //     AppConnect* const saved_app_connect = _app_connect;
-            //     _app_connect = NULL;
-            //     saved_app_connect->StopConnect(this);
-            // }
-
-            // Deref additionally which is added at creation so that this
-            // Socket's reference will hit 0(recycle) when no one addresses it.
-            ReleaseAdditionalReference();
-            // NOTE: This Socket may be recycled at this point, don't
-            // touch anything.
-            return 0;
-        }
-    }
-}
-
-int Socket::SetFailed() {
-    return SetFailed(EFAILEDSOCKET, NULL);
-}
-
 void Socket::FeedbackCircuitBreaker(int error_code, int64_t latency_us) {
     if (!GetOrNewSharedPart()->circuit_breaker.OnCallEnd(error_code, 
latency_us)) {
         if (SetFailed(main_socket_id()) == 0) {
@@ -1075,12 +1083,29 @@ int Socket::ReleaseReferenceIfIdle(int idle_seconds) {
     return ReleaseAdditionalReference();
 }
 
+
+int Socket::SetFailed() {
+    return SetFailed(EFAILEDSOCKET, NULL);
+}
+
+int Socket::SetFailed(int error_code, const char* error_fmt, ...) {
+    std::string error_text;
+    if (error_fmt != NULL) {
+        va_list ap;
+        va_start(ap, error_fmt);
+        butil::string_vprintf(&error_text, error_fmt, ap);
+        va_end(ap);
+    }
+    return VersionedRefWithId<Socket>::SetFailed(error_code, error_text);
+}
+
 int Socket::SetFailed(SocketId id) {
     SocketUniquePtr ptr;
     if (Address(id, &ptr) != 0) {
         return -1;
     }
-    return ptr->SetFailed();
+
+    return ptr->SetFailed(EFAILEDSOCKET, NULL);
 }
 
 void Socket::NotifyOnFailed(bthread_id_t id) {
@@ -1101,16 +1126,16 @@ void Socket::NotifyOnFailed(bthread_id_t id) {
 
 // For unit-test.
 int Socket::Status(SocketId id, int32_t* nref) {
-    const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
+    const butil::ResourceId<Socket> slot = SlotOfVRefId<Socket>(id);
     Socket* const m = address_resource(slot);
     if (m != NULL) {
-        const uint64_t vref = 
m->_versioned_ref.load(butil::memory_order_relaxed);
-        if (VersionOfVRef(vref) == VersionOfSocketId(id)) {
+        const uint64_t vref = m->versioned_ref();
+        if (VersionOfVRef(vref) == VersionOfVRefId(id)) {
             if (nref) {
                 *nref = NRefOfVRef(vref);
             }
             return 0;
-        } else if (VersionOfVRef(vref) == VersionOfSocketId(id) + 1) {
+        } else if (VersionOfVRef(vref) == VersionOfVRefId(id) + 1) {
             if (nref) {
                 *nref = NRefOfVRef(vref);
             }
@@ -1120,81 +1145,6 @@ int Socket::Status(SocketId id, int32_t* nref) {
     return -1;
 }
 
-void Socket::OnRecycle() {
-    const bool create_by_connect = CreatedByConnect();
-    if (_app_connect) {
-        std::shared_ptr<AppConnect> tmp;
-        _app_connect.swap(tmp);
-        tmp->StopConnect(this);
-    }
-    if (_conn) {
-        SocketConnection* const saved_conn = _conn;
-        _conn = NULL;
-        saved_conn->BeforeRecycle(this);
-    }
-    if (_user) {
-        SocketUser* const saved_user = _user;
-        _user = NULL;
-        saved_user->BeforeRecycle(this);
-    }
-    SharedPart* sp = _shared_part.exchange(NULL, butil::memory_order_acquire);
-    if (sp) {
-        sp->RemoveRefManually();
-    }
-    const int prev_fd = _fd.exchange(-1, butil::memory_order_relaxed);
-    if (ValidFileDescriptor(prev_fd)) {
-        if (_on_edge_triggered_events != NULL) {
-            GetGlobalEventDispatcher(prev_fd, 
_bthread_tag).RemoveConsumer(prev_fd);
-        }
-        close(prev_fd);
-        if (create_by_connect) {
-            g_vars->channel_conn << -1;
-        }
-    }
-
-#if BRPC_WITH_RDMA
-    if (_rdma_ep) {
-        delete _rdma_ep;
-        _rdma_ep = NULL;
-        _rdma_state = RDMA_UNKNOWN;
-    }
-#endif
-
-    reset_parsing_context(NULL);
-    _read_buf.clear();
-
-    _auth_flag_error.store(0, butil::memory_order_relaxed);
-    bthread_id_error(_auth_id, 0);
-    
-    bthread_id_list_destroy(&_id_wait_list);
-
-    if (_ssl_session) {
-        SSL_free(_ssl_session);
-        _ssl_session = NULL;
-    }
-
-    _ssl_ctx = NULL;
-    
-    delete _pipeline_q;
-    _pipeline_q = NULL;
-
-    delete _auth_context;
-    _auth_context = NULL;
-
-    delete _stream_set;
-    _stream_set = NULL;
-
-    const SocketId asid = _agent_socket_id.load(butil::memory_order_relaxed);
-    if (asid != INVALID_SOCKET_ID) {
-        SocketUniquePtr ptr;
-        if (Socket::Address(asid, &ptr) == 0) {
-            ptr->ReleaseAdditionalReference();
-        }
-    }
-    
-    g_vars->nsocket << -1;
-}
-
 void* Socket::ProcessEvent(void* arg) {
     // the enclosed Socket is valid and free to access inside this function.
     SocketUniquePtr s(static_cast<Socket*>(arg));
@@ -1272,8 +1222,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const 
timespec* abstime) {
     // Do not need to check addressable since it will be called by
     // health checker which called `SetFailed' before
     const int expected_val = 
_epollout_butex->load(butil::memory_order_relaxed);
-    EventDispatcher& edisp = GetGlobalEventDispatcher(fd, _bthread_tag);
-    if (edisp.RegisterEvent(id(), fd, pollin) != 0) {
+    if (_io_event.RegisterEvent(fd, pollin) != 0) {
         return -1;
     }
 
@@ -1285,7 +1234,7 @@ int Socket::WaitEpollOut(int fd, bool pollin, const 
timespec* abstime) {
     }
     // Ignore return value since `fd' might have been removed
     // by `RemoveConsumer' in `SetFailed'
-    butil::ignore_result(edisp.UnregisterEvent(id(), fd, pollin));
+    butil::ignore_result(_io_event.UnregisterEvent(fd, pollin));
     errno = saved_errno;
     // Could be writable or spurious wakeup (by former epollout)
     return rc;
@@ -1333,7 +1282,7 @@ int Socket::Connect(const timespec* abstime,
         // be added into epoll device soon
         SocketId connect_id;
         SocketOptions options;
-        options.bthread_tag = _bthread_tag;
+        options.bthread_tag = _io_event.bthread_tag();
         options.user = req;
         if (Socket::Create(options, &connect_id) != 0) {
             LOG(FATAL) << "Fail to create Socket";
@@ -1348,8 +1297,7 @@ int Socket::Connect(const timespec* abstime,
 
         // Add `sockfd' into epoll so that `HandleEpollOutRequest' will
         // be called with `req' when epoll event reaches
-        if (GetGlobalEventDispatcher(sockfd, 
_bthread_tag).RegisterEvent(connect_id, sockfd, false) !=
-            0) {
+        if (s->_io_event.RegisterEvent(sockfd, false) != 0) {
             const int saved_errno = errno;
             PLOG(WARNING) << "Fail to add fd=" << sockfd << " into epoll";
             s->SetFailed(saved_errno, "Fail to add fd=%d into epoll: %s",
@@ -1417,7 +1365,7 @@ int Socket::ConnectIfNot(const timespec* abstime, 
WriteRequest* req) {
        return 0;
     }
     // Set tag for client side socket
-    _bthread_tag = bthread_self_tag();
+    _io_event.set_bthread_tag(bthread_self_tag());
     // Have to hold a reference for `req'
     SocketUniquePtr s;
     ReAddress(&s);
@@ -1440,7 +1388,9 @@ void Socket::WakeAsEpollOut() {
     bthread::butex_wake_except(_epollout_butex, 0);
 }
 
-int Socket::HandleEpollOut(SocketId id) {
+int Socket::OnOutputEvent(void* user_data, uint32_t,
+                          const bthread_attr_t&) {
+    auto id = reinterpret_cast<SocketId>(user_data);
     SocketUniquePtr s;
     // Since Sockets might have been `SetFailed' before they were
     // added into epoll, these sockets miss the signal inside
@@ -1486,7 +1436,7 @@ int Socket::HandleEpollOutRequest(int error_code, 
EpollOutRequest* req) {
     }
     // We've got the right to call user callback
     // The timer will be removed inside destructor of EpollOutRequest
-    GetGlobalEventDispatcher(req->fd, _bthread_tag).UnregisterEvent(id(), 
req->fd, false);
+    butil::ignore_result(_io_event.UnregisterEvent(req->fd, false));
     return req->on_epollout_event(req->fd, error_code, req->data);
 }
 
@@ -2229,8 +2179,9 @@ AuthContext* Socket::mutable_auth_context() {
     return _auth_context;
 }
 
-int Socket::StartInputEvent(SocketId id, uint32_t events,
-                            const bthread_attr_t& thread_attr) {
+int Socket::OnInputEvent(void* user_data, uint32_t events,
+                         const bthread_attr_t& thread_attr) {
+    auto id = reinterpret_cast<SocketId>(user_data);
     SocketUniquePtr s;
     if (Address(id, &s) < 0) {
         return -1;
@@ -2324,7 +2275,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
         // }
         os << "# This is a broken Socket\n";
     }
-    const uint64_t vref = 
ptr->_versioned_ref.load(butil::memory_order_relaxed);
+    const uint64_t vref = ptr->versioned_ref();
     size_t npipelined = 0;
     size_t idsizes[4];
     size_t nidsize = 0;
@@ -2384,7 +2335,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
        << "\nlocal_side=" << ptr->_local_side
        << "\non_et_events=" << (void*)ptr->_on_edge_triggered_events
        << "\nuser=" << ShowObject(ptr->_user)
-       << "\nthis_id=" << ptr->_this_id
+       << "\nthis_id=" << ptr->id()
        << "\npreferred_index=" << preferred_index;
     InputMessenger* messenger = dynamic_cast<InputMessenger*>(ptr->user());
     if (messenger != NULL) {
@@ -2429,7 +2380,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
        << "\nauth_context=" << ptr->_auth_context
        << "\nlogoff_flag=" << 
ptr->_logoff_flag.load(butil::memory_order_relaxed)
        << "\n_additional_ref_status="
-       << ptr->_additional_ref_status.load(butil::memory_order_relaxed)
+       << ptr->additional_ref_status()
        << "\ntotal_streams_buffer_size="
        << ptr->_total_streams_unconsumed_size.load(butil::memory_order_relaxed)
        << "\nninflight_app_health_check="
@@ -2570,7 +2521,7 @@ void Socket::DebugSocket(std::ostream& os, SocketId id) {
         ptr->_rdma_ep->DebugInfo(os);
     }
 #endif
-    { os << "\nbthread_tag=" << ptr->_bthread_tag; }
+    { os << "\nbthread_tag=" << ptr->_io_event.bthread_tag(); }
 }
 
 int Socket::CheckHealth() {
@@ -2620,7 +2571,7 @@ void Socket::ResetAllStreams(int error_code, const 
std::string& error_text) {
     if (_stream_set != NULL) {
         // Not delete _stream_set because there are likely more streams added
         // after reviving if the Socket is still in use, or it is to be 
deleted in 
-        // OnRecycle()
+        // BeforeRecycled()
         saved_stream_set.swap(*_stream_set);
     }
     _stream_mutex.unlock();
@@ -3000,25 +2951,6 @@ void Socket::OnProgressiveReadCompleted() {
     }
 }
 
-std::string Socket::description() const {
-    // NOTE: The output should be consistent with operator<<()
-    std::string result;
-    result.reserve(64);
-    butil::string_appendf(&result, "Socket{id=%" PRIu64, id());
-    const int saved_fd = fd();
-    if (saved_fd >= 0) {
-        butil::string_appendf(&result, " fd=%d", saved_fd);
-    }
-    butil::string_appendf(&result, " addr=%s",
-                          butil::endpoint2str(remote_side()).c_str());
-    const int local_port = local_side().port;
-    if (local_port > 0) {
-        butil::string_appendf(&result, ":%d", local_port);
-    }
-    butil::string_appendf(&result, "} (0x%p)", this);
-    return result;
-}
-
 SocketSSLContext::SocketSSLContext()
     : raw_ctx(NULL)
 {}
diff --git a/src/brpc/socket.h b/src/brpc/socket.h
index 97ce5685..6d9c8f11 100644
--- a/src/brpc/socket.h
+++ b/src/brpc/socket.h
@@ -38,7 +38,9 @@
 #include "brpc/socket_id.h"               // SocketId
 #include "brpc/socket_message.h"          // SocketMessagePtr
 #include "bvar/bvar.h"
-#include "http_method.h"
+#include "brpc/http_method.h"
+#include "brpc/event_dispatcher.h"
+#include "brpc/versioned_ref_with_id.h"
 
 namespace brpc {
 namespace policy {
@@ -282,7 +284,7 @@ struct SocketOptions {
 
 // Abstractions on reading from and writing into file descriptors.
 // NOTE: accessed by multiple threads(frequently), align it by cacheline.
-class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket {
+class BAIDU_CACHELINE_ALIGNMENT/*note*/ Socket : public 
VersionedRefWithId<Socket> {
 friend class EventDispatcher;
 friend class InputMessenger;
 friend class Acceptor;
@@ -299,16 +301,18 @@ friend class HealthCheckTask;
 friend class OnAppHealthCheckDone;
 friend class HealthCheckManager;
 friend class policy::H2GlobalStreamCreator;
+friend class VersionedRefWithId<Socket>;
+friend class IOEvent<Socket>;
+friend void DereferenceSocket(Socket*);
     class SharedPart;
-    struct Forbidden {};
     struct WriteRequest;
 
 public:
     const static int STREAM_FAKE_FD = INT_MAX;
     // NOTE: User cannot create Socket from constructor. Use Create()
     // instead. It's public just because of requirement of ResourcePool.
-    Socket(Forbidden);
-    ~Socket();
+    explicit Socket(Forbidden);
+    ~Socket() override;
 
     // Write `msg' into this Socket and clear it. The `msg' should be an
     // intact request or response. To prevent messages from interleaving
@@ -419,9 +423,6 @@ public:
     // After health checking is complete, set _hc_started to false.
     void AfterHCCompleted() { _hc_started.store(false, 
butil::memory_order_relaxed); }
 
-    // The unique identifier.
-    SocketId id() const { return _this_id; }
-
     // `user' parameter passed to Create().
     SocketUser* user() const { return _user; }
 
@@ -446,24 +447,9 @@ public:
     AuthContext* mutable_auth_context();
 
     // Create a Socket according to `options', put the identifier into `id'.
-    // Returns 0 on sucess, -1 otherwise.
+    // Returns 0 on success, -1 otherwise.
     static int Create(const SocketOptions& options, SocketId* id);
 
-    // Place the Socket associated with identifier `id' into unique_ptr `ptr',
-    // which will be released automatically when out of scope (w/o explicit
-    // std::move). User can still access `ptr' after calling ptr->SetFailed()
-    // before release of `ptr'.
-    // This function is wait-free.
-    // Returns 0 on success, -1 when the Socket was SetFailed().
-    static int Address(SocketId id, SocketUniquePtr* ptr);
-
-    // Re-address current socket into `ptr'.
-    // Always succeed even if this socket is failed.
-    void ReAddress(SocketUniquePtr* ptr);
-
-    // Returns 0 on success, 1 on failed socket, -1 on recycled.
-    static int AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr);
-
     // Mark this Socket or the Socket associated with `id' as failed.
     // Any later Address() of the identifier shall return NULL unless the
     // Socket was revivied by StartHealthCheck. The Socket is NOT recycled
@@ -486,20 +472,10 @@ public:
 
     void FeedbackCircuitBreaker(int error_code, int64_t latency_us);
 
-    bool Failed() const;
-
-    bool DidReleaseAdditionalRereference() const {
-        return _additional_ref_status.load(butil::memory_order_relaxed) == 
REF_RECYCLED;
-    }
-
     // Notify `id' object (by calling bthread_id_error) when this Socket
     // has been `SetFailed'. If it already has, notify `id' immediately
     void NotifyOnFailed(bthread_id_t id);
 
-    // Release the additional reference which added inside `Create'
-    // before so that `Socket' will be recycled automatically once
-    // on one is addressing it.
-    int ReleaseAdditionalReference();
     // `ReleaseAdditionalReference' this Socket iff it has no data
     // transmission during the last `idle_seconds'
     int ReleaseReferenceIfIdle(int idle_seconds);
@@ -515,8 +491,8 @@ public:
 
     // Start to process edge-triggered events from the fd.
     // This function does not block caller.
-    static int StartInputEvent(SocketId id, uint32_t events,
-                               const bthread_attr_t& thread_attr);
+    static int OnInputEvent(void* user_data, uint32_t events,
+                            const bthread_attr_t& thread_attr);
 
     static const int PROGRESS_INIT = 1;
     bool MoreReadEvents(int* progress);
@@ -654,9 +630,6 @@ public:
             _last_writetime_us.load(butil::memory_order_relaxed));
     }
 
-    // A brief description of this socket, consistent with os << *this
-    std::string description() const;
-
     // Returns true if the remote side is overcrowded.
     bool is_overcrowded() const { return _overcrowded; }
 
@@ -678,8 +651,20 @@ private:
     int ConductError(bthread_id_t);
     int StartWrite(WriteRequest*, const WriteOptions&);
 
-    int Dereference();
-friend void DereferenceSocket(Socket*);
+    // Create a Socket according to `options', put the identifier into `id'.
+    // Returns 0 on success, -1 otherwise.
+    int OnCreated(const SocketOptions& options);
+
+    // Called before returning to pool.
+    void BeforeRecycled();
+
+    void OnFailed(int error_code, const std::string& error_text);
+
+    // Make this socket addressable again.
+    void AfterRevived();
+
+    std::string OnDescription() const;
+
 
     static int Status(SocketId, int32_t* nref = NULL);  // for unit-test.
 
@@ -701,9 +686,6 @@ friend void DereferenceSocket(Socket*);
     // success, -1 otherwise and errno is set
     ssize_t DoWrite(WriteRequest* req);
 
-    // Called before returning to pool.
-    void OnRecycle();
-
     // [Not thread-safe] Wait for EPOLLOUT event on `fd'. If `pollin' is
     // true, EPOLLIN event will also be included and EPOLL_CTL_MOD will
     // be used instead of EPOLL_CTL_ADD. Note that spurious wakeups may
@@ -735,9 +717,6 @@ friend void DereferenceSocket(Socket*);
     // Wait until nref hits `expected_nref' and reset some internal resources.
     int WaitAndReset(int32_t expected_nref);
 
-    // Make this socket addressable again.
-    void Revive();
-
     static void* ProcessEvent(void*);
 
     static void* KeepWrite(void*);
@@ -755,8 +734,9 @@ friend void DereferenceSocket(Socket*);
     // Try to wake socket just like epollout has arrived
     void WakeAsEpollOut();
 
-    // Generic callback for Socket to handle epollout event
-    static int HandleEpollOut(SocketId socket_id);
+    // Generic callback for Socket to handle output event.
+    static int OnOutputEvent(void* user_data, uint32_t,
+                             const bthread_attr_t&);
 
     class EpollOutRequest;
     // Callback to handle epollout event whose request data
@@ -811,16 +791,6 @@ friend void DereferenceSocket(Socket*);
     void CancelUnwrittenBytes(size_t bytes);
 
 private:
-    // unsigned 32-bit version + signed 32-bit referenced-count.
-    // Meaning of version:
-    // * Created version: no SetFailed() is called on the Socket yet. Must be
-    //   same evenness with initial _versioned_ref because during lifetime of
-    //   a Socket on the slot, the version is added with 1 twice. This is
-    //   also the version encoded in SocketId.
-    // * Failed version: = created version + 1, SetFailed()-ed but returned.
-    // * Other versions: the socket is already recycled.
-    butil::atomic<uint64_t> _versioned_ref;
-
     // In/Out bytes/messages, SocketPool etc
     // _shared_part is shared by a main socket and all its pooled sockets.
     // Can't use intrusive_ptr because the creation is based on optimistic
@@ -838,7 +808,6 @@ private:
 
     // [ Set in ResetFileDescriptor ]
     butil::atomic<int> _fd;  // -1 when not connected.
-    bthread_tag_t _bthread_tag;  // bthread tag of this socket
     int _tos;                // Type of service which is actually only 8bits.
     int64_t _reset_fd_real_us; // When _fd was reset, in microseconds.
 
@@ -864,8 +833,7 @@ private:
     // Initialized by SocketOptions.app_connect.
     std::shared_ptr<AppConnect> _app_connect;
 
-    // Identifier of this Socket in ResourcePool
-    SocketId _this_id;
+    IOEvent<Socket> _io_event;
 
     // last chosen index of the protocol as a heuristic value to avoid
     // iterating all protocol handlers each time.
@@ -953,21 +921,6 @@ private:
     // Set by SetLogOff
     butil::atomic<bool> _logoff_flag;
 
-    // Status flag used to mark that
-    enum AdditionalRefStatus {
-        REF_USING,        // additional reference has been increased
-        REF_REVIVING,     // additional reference is increasing
-        REF_RECYCLED      // additional reference has been decreased
-    };
-
-    // Indicates whether additional reference has increased,
-    // decreased, or is increasing.
-    // additional ref status:
-    // `Socket'、`Create': REF_USING
-    // `SetFailed': REF_USING -> REF_RECYCLED
-    // `Revive' REF_RECYCLED -> REF_REVIVING -> REF_USING
-    butil::atomic<AdditionalRefStatus> _additional_ref_status;
-
     // Concrete error information from SetFailed()
     // Accesses to these 2 fields(especially _error_text) must be protected
     // by _id_wait_list_mutex
diff --git a/src/brpc/socket_id.h b/src/brpc/socket_id.h
index 72043247..e000c98b 100644
--- a/src/brpc/socket_id.h
+++ b/src/brpc/socket_id.h
@@ -22,9 +22,7 @@
 // To brpc developers: This is a header included by user, don't depend
 // on internal structures, use opaque pointers instead.
 
-#include <stdint.h>               // uint64_t
-#include "butil/unique_ptr.h"      // std::unique_ptr
-
+#include "brpc/versioned_ref_with_id.h"
 
 namespace brpc {
 
@@ -32,21 +30,25 @@ namespace brpc {
 // Users shall store SocketId instead of Sockets and call Socket::Address()
 // to convert the identifier to an unique_ptr at each access. Whenever a
 // unique_ptr is not destructed, the enclosed Socket will not be recycled.
-typedef uint64_t SocketId;
+typedef VRefId SocketId;
 
-const SocketId INVALID_SOCKET_ID = (SocketId)-1;
+const SocketId INVALID_SOCKET_ID = INVALID_VREF_ID;
 
 class Socket;
 
 extern void DereferenceSocket(Socket*);
 
-struct SocketDeleter {
+// Explicit (full) template specialization to ignore compiler error,
+// because Socket is an incomplete type where only this header is included.
+template<>
+struct VersionedRefWithIdDeleter<Socket> {
     void operator()(Socket* m) const {
         DereferenceSocket(m);
     }
 };
 
-typedef std::unique_ptr<Socket, SocketDeleter> SocketUniquePtr;
+typedef VersionedRefWithIdUniquePtr<Socket> SocketUniquePtr;
+
 
 } // namespace brpc
 
diff --git a/src/brpc/socket_inl.h b/src/brpc/socket_inl.h
index 6af9e8f1..a8ff3ce8 100644
--- a/src/brpc/socket_inl.h
+++ b/src/brpc/socket_inl.h
@@ -23,35 +23,6 @@
 
 namespace brpc {
 
-// Utility functions to combine and extract SocketId.
-BUTIL_FORCE_INLINE SocketId
-MakeSocketId(uint32_t version, butil::ResourceId<Socket> slot) {
-    return SocketId((((uint64_t)version) << 32) | slot.value);
-}
-
-BUTIL_FORCE_INLINE butil::ResourceId<Socket> SlotOfSocketId(SocketId sid) {
-    butil::ResourceId<Socket> id = { (sid & 0xFFFFFFFFul) };
-    return id;
-}
-
-BUTIL_FORCE_INLINE uint32_t VersionOfSocketId(SocketId sid) {
-    return (uint32_t)(sid >> 32);
-}
-
-// Utility functions to combine and extract Socket::_versioned_ref
-BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
-    return (uint32_t)(vref >> 32);
-}
-
-BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
-    return (int32_t)(vref & 0xFFFFFFFFul);
-}
-
-BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) {
-    // 1: Intended conversion to uint32_t, nref=-1 is 00000000FFFFFFFF
-    return (((uint64_t)version) << 32) | (uint32_t/*1*/)nref;
-}
-
 inline SocketOptions::SocketOptions()
     : fd(-1)
     , user(NULL)
@@ -63,169 +34,7 @@ inline SocketOptions::SocketOptions()
     , conn(NULL)
     , app_connect(NULL)
     , initial_parsing_context(NULL)
-    , bthread_tag(BTHREAD_TAG_DEFAULT)
-{}
-
-inline int Socket::Dereference() {
-    const SocketId id = _this_id;
-    const uint64_t vref = _versioned_ref.fetch_sub(
-        1, butil::memory_order_release);
-    const int32_t nref = NRefOfVRef(vref);
-    if (nref > 1) {
-        return 0;
-    }
-    if (__builtin_expect(nref == 1, 1)) {
-        const uint32_t ver = VersionOfVRef(vref);
-        const uint32_t id_ver = VersionOfSocketId(id);
-        // Besides first successful SetFailed() adds 1 to version, one of
-        // those dereferencing nref from 1->0 adds another 1 to version.
-        // Notice "one of those": The wait-free Address() may make ref of a
-        // version-unmatched slot change from 1 to 0 for mutiple times, we
-        // have to use version as a guard variable to prevent returning the
-        // Socket to pool more than once.
-        //
-        // Note: `ver == id_ver' means this socket has been `SetRecycle'
-        // before rather than `SetFailed'; `ver == ide_ver+1' means we
-        // had `SetFailed' this socket before. We should destroy the
-        // socket under both situation
-        if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
-            // sees nref:1->0, try to set version=id_ver+2,--nref.
-            // No retry: if version changes, the slot is already returned by
-            // another one who sees nref:1->0 concurrently; if nref changes,
-            // which must be non-zero, the slot will be returned when
-            // nref changes from 1->0 again.
-            // Example:
-            //   SetFailed(): --nref, sees nref:1->0           (1)
-            //                try to set version=id_ver+2      (2)
-            //    Address():  ++nref, unmatched version        (3)
-            //                --nref, sees nref:1->0           (4)
-            //                try to set version=id_ver+2      (5)
-            // 1,2,3,4,5 or 1,3,4,2,5:
-            //            SetFailed() succeeds, Address() fails at (5).
-            // 1,3,2,4,5: SetFailed() fails with (2), the slot will be
-            //            returned by (5) of Address()
-            // 1,3,4,5,2: SetFailed() fails with (2), the slot is already
-            //            returned by (5) of Address().
-            uint64_t expected_vref = vref - 1;
-            if (_versioned_ref.compare_exchange_strong(
-                    expected_vref, MakeVRef(id_ver + 2, 0),
-                    butil::memory_order_acquire,
-                    butil::memory_order_relaxed)) {
-                OnRecycle();
-                return_resource(SlotOfSocketId(id));
-                return 1;
-            }
-            return 0;
-        }
-        LOG(FATAL) << "Invalid SocketId=" << id;
-        return -1;
-    }
-    LOG(FATAL) << "Over dereferenced SocketId=" << id;
-    return -1;
-}
-
-inline int Socket::Address(SocketId id, SocketUniquePtr* ptr) {
-    const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
-    Socket* const m = address_resource(slot);
-    if (__builtin_expect(m != NULL, 1)) {
-        // acquire fence makes sure this thread sees latest changes before
-        // Dereference() or Revive().
-        const uint64_t vref1 = m->_versioned_ref.fetch_add(
-            1, butil::memory_order_acquire);
-        const uint32_t ver1 = VersionOfVRef(vref1);
-        if (ver1 == VersionOfSocketId(id)) {
-            ptr->reset(m);
-            return 0;
-        }
-
-        const uint64_t vref2 = m->_versioned_ref.fetch_sub(
-            1, butil::memory_order_release);
-        const int32_t nref = NRefOfVRef(vref2);
-        if (nref > 1) {
-            return -1;
-        } else if (__builtin_expect(nref == 1, 1)) {
-            const uint32_t ver2 = VersionOfVRef(vref2);
-            if ((ver2 & 1)) {
-                if (ver1 == ver2 || ver1 + 1 == ver2) {
-                    uint64_t expected_vref = vref2 - 1;
-                    if (m->_versioned_ref.compare_exchange_strong(
-                            expected_vref, MakeVRef(ver2 + 1, 0),
-                            butil::memory_order_acquire,
-                            butil::memory_order_relaxed)) {
-                        m->OnRecycle();
-                        return_resource(SlotOfSocketId(id));
-                    }
-                } else {
-                    CHECK(false) << "ref-version=" << ver1
-                                 << " unref-version=" << ver2;
-                }
-            } else {
-                CHECK_EQ(ver1, ver2);
-                // Addressed a free slot.
-            }
-        } else {
-            CHECK(false) << "Over dereferenced SocketId=" << id;
-        }
-    }
-    return -1;
-}
-
-inline void Socket::ReAddress(SocketUniquePtr* ptr) {
-    _versioned_ref.fetch_add(1, butil::memory_order_acquire);
-    ptr->reset(this);
-}
-
-inline int Socket::AddressFailedAsWell(SocketId id, SocketUniquePtr* ptr) {
-    const butil::ResourceId<Socket> slot = SlotOfSocketId(id);
-    Socket* const m = address_resource(slot);
-    if (__builtin_expect(m != NULL, 1)) {
-        const uint64_t vref1 = m->_versioned_ref.fetch_add(
-            1, butil::memory_order_acquire);
-        const uint32_t ver1 = VersionOfVRef(vref1);
-        if (ver1 == VersionOfSocketId(id)) {
-            ptr->reset(m);
-            return 0;
-        }
-        if (ver1 == VersionOfSocketId(id) + 1) {
-            ptr->reset(m);
-            return 1;
-        }
-
-        const uint64_t vref2 = m->_versioned_ref.fetch_sub(
-            1, butil::memory_order_release);
-        const int32_t nref = NRefOfVRef(vref2);
-        if (nref > 1) {
-            return -1;
-        } else if (__builtin_expect(nref == 1, 1)) {
-            const uint32_t ver2 = VersionOfVRef(vref2);
-            if ((ver2 & 1)) {
-                if (ver1 == ver2 || ver1 + 1 == ver2) {
-                    uint64_t expected_vref = vref2 - 1;
-                    if (m->_versioned_ref.compare_exchange_strong(
-                            expected_vref, MakeVRef(ver2 + 1, 0),
-                            butil::memory_order_acquire,
-                            butil::memory_order_relaxed)) {
-                        m->OnRecycle();
-                        return_resource(slot);
-                    }
-                } else {
-                    CHECK(false) << "ref-version=" << ver1
-                                 << " unref-version=" << ver2;
-                }
-            } else {
-                // Addressed a free slot.
-            }
-        } else {
-            CHECK(false) << "Over dereferenced SocketId=" << id;
-        }
-    }
-    return -1;    
-}
-
-inline bool Socket::Failed() const {
-    return VersionOfVRef(_versioned_ref.load(butil::memory_order_relaxed))
-        != VersionOfSocketId(_this_id);
-}
+    , bthread_tag(BTHREAD_TAG_DEFAULT) {}
 
 inline bool Socket::MoreReadEvents(int* progress) {
     // Fail to CAS means that new events arrived.
diff --git a/src/brpc/versioned_ref_with_id.h b/src/brpc/versioned_ref_with_id.h
new file mode 100644
index 00000000..38141f3d
--- /dev/null
+++ b/src/brpc/versioned_ref_with_id.h
@@ -0,0 +1,624 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+
+#ifndef BRPC_VERSIONED_REF_WITH_ID_H
+#define BRPC_VERSIONED_REF_WITH_ID_H
+
+#include <memory>
+#include "butil/resource_pool.h"
+#include "butil/class_name.h"
+#include "butil/logging.h"
+#include "bthread/bthread.h"
+#include "brpc/errno.pb.h"
+
+namespace brpc {
+
+// Unique identifier of a T object.
+typedef uint64_t VRefId;
+
+const VRefId INVALID_VREF_ID = (VRefId)-1;
+
+template<typename T>
+class VersionedRefWithId;
+
+template<typename T>
+void DereferenceVersionedRefWithId(T* r);
+
+template<typename T>
+struct VersionedRefWithIdDeleter {
+    void operator()(T* r) const {
+        DereferenceVersionedRefWithId(r);
+    }
+};
+
+template<typename T>
+using VersionedRefWithIdUniquePtr =
+    std::unique_ptr<T, VersionedRefWithIdDeleter<T>>;
+
+// Utility functions to combine and extract VRefId.
+template <typename T>
+BUTIL_FORCE_INLINE VRefId MakeVRefId(uint32_t version,
+                                     butil::ResourceId<T> slot) {
+    return VRefId((((uint64_t)version) << 32) | slot.value);
+}
+
+template<typename T>
+BUTIL_FORCE_INLINE butil::ResourceId<T> SlotOfVRefId(VRefId vref_id) {
+    return { (vref_id & 0xFFFFFFFFul) };
+}
+
+BUTIL_FORCE_INLINE uint32_t VersionOfVRefId(VRefId vref_id) {
+    return (uint32_t)(vref_id >> 32);
+}
+
+// Utility functions to combine and extract _versioned_ref
+BUTIL_FORCE_INLINE uint32_t VersionOfVRef(uint64_t vref) {
+    return (uint32_t)(vref >> 32);
+}
+
+BUTIL_FORCE_INLINE int32_t NRefOfVRef(uint64_t vref) {
+    return (int32_t)(vref & 0xFFFFFFFFul);
+}
+
+BUTIL_FORCE_INLINE uint64_t MakeVRef(uint32_t version, int32_t nref) {
+    // 1: Intended conversion to uint32_t, nref=-1 is 00000000FFFFFFFF
+    return (((uint64_t)version) << 32) | (uint32_t/*1*/)nref;
+}
+
+
+template <typename Ret>
+typename std::enable_if<!butil::is_void<Ret>::value, Ret>::type ReturnEmpty() {
+    return Ret{};
+}
+
+template <typename Ret>
+typename std::enable_if<butil::is_void<Ret>::value, Ret>::type ReturnEmpty() {}
+
+// Call func_name of class_type if class_type implements func_name,
+// otherwise call default function.
+#define WRAPPER_OF(class_type, func_name, return_type)                         
             \
+    struct func_name ## Wrapper {                                              
             \
+        template<typename V, typename... Args>                                 
             \
+        static auto Test(int) -> decltype(                                     
             \
+            std::declval<V>().func_name(std::declval<Args>()...), 
std::true_type());        \
+        template<typename>                                                     
             \
+        static auto Test(...) -> std::false_type;                              
             \
+                                                                               
             \
+        template<typename... Args>                                             
             \
+        typename std::enable_if<decltype(                                      
             \
+            Test<class_type, Args...>(0))::value, return_type>::type           
             \
+        Call(class_type* obj, Args... args) {                                  
             \
+            BAIDU_CASSERT((butil::is_result_same<                              
             \
+                              return_type, decltype(&T::func_name), T, 
Args...>::value),    \
+                          "Params or return type mismatch");                   
                       \
+                return obj->func_name(std::forward<Args>(args)...);            
             \
+        }                                                                      
             \
+                                                                               
             \
+        template<typename... Args>                                             
             \
+        typename std::enable_if<!decltype(                                     
             \
+            Test<class_type, Args...>(0))::value, return_type>::type           
             \
+        Call(class_type* obj, Args... args) {                                  
             \
+            return ReturnEmpty<return_type>();                                 
             \
+        }                                                                      
             \
+    }
+
+#define WRAPPER_CALL(func_name, obj, ...) func_name ## Wrapper().Call(obj, ## 
__VA_ARGS__)
+
+// VersionedRefWithId is an efficient data structure, which can be find
+// in O(1)-time by VRefId.
+// Users shall call VersionedRefWithId::Create() to create T,
+// store VRefId instead of T and call VersionedRefWithId::Address()
+// to convert the identifier to an unique_ptr at each access. Whenever
+// a unique_ptr is not destructed, the enclosed T will not be recycled.
+//
+// CRTP
+// Derived classes implement 6 functions :
+// 1. (required) int OnCreated(Args... args) :
+//   Will be called in Create() to initialize T init when T is created 
successfully.
+//   If initialization fails, return non-zero. VersionedRefWithId will be 
`SetFailed'
+//   and Create() returns non-zero.
+// 2. (required) void BeforeRecycled() :
+//   Will be called in Dereference() before T is recycled.
+// 3. (optional) void OnFailed(Args... args) :
+//   Will be called in SetFailed() when VersionedRefWithId is set failed 
successfully.
+// 4. (optional) void BeforeAdditionalRefReleased() :
+//   Will be called in ReleaseAdditionalReference() before additional ref is 
released.
+// 5. (optional) void AfterRevived() :
+//   Will be called in Revive() When VersionedRefWithId is revived.
+// 6. (optional) std::string OnDescription() const :
+//   Will be called in description().
+//
+// Example usage:
+//
+// class UserData : public brpc::VersionedRefWithId<UserData> {
+// public:
+//     explicit UserData(Forbidden f)
+//         : brpc::VersionedRefWithId<UserData>(f)
+//         , _count(0) {}
+
+//     void Add(int c) {
+//         _count.fetch_add(c, butil::memory_order_relaxed);
+//     }
+//     void Sub(int c) {
+//         _count.fetch_sub(c, butil::memory_order_relaxed);
+//     }
+// private:
+// friend class brpc::VersionedRefWithId<UserData>;
+//
+//     int OnCreated() {
+//         _count.store(1, butil::memory_order_relaxed);
+//         return 0;
+//     }
+//     void OnFailed(int error_code, const std::string& error_text) {
+//         _count.fetch_sub(1, butil::memory_order_relaxed);
+//     }
+//     void BeforeRecycled() {
+//         _count.store(0, butil::memory_order_relaxed);
+//     }
+//
+//     butil::atomic<int> _count;
+// };
+//
+// typedef brpc::VRefId UserDataId;
+// const brpc::VRefId INVALID_EVENT_DATA_ID = brpc::INVALID_VREF_ID;
+// typedef brpc::VersionedRefWithIdUniquePtr<UserData> UserDataUniquePtr;
+//
+// And to call methods on UserData:
+// UserDataId id;
+// if (UserData::Create(&id) ! =0) {
+//     LOG(ERROR) << "Fail to create UserData";
+//     return;
+// }
+// UserDataUniquePtr user_data;
+// if (UserData::Address(id, &user_data) != 0) {
+//     LOG(ERROR) << "Fail to address UserDataId=" << id;
+//     return;
+// }
+// user_data->Add(10);
+// user_data->SetFailed();
+// UserData::SetFailedById(id);
+//
+template<typename T>
+class VersionedRefWithId {
+protected:
+    struct Forbidden {};
+
+public:
+    explicit VersionedRefWithId(Forbidden)
+        // Must be even because Address() relies on evenness of version.
+        : _versioned_ref(0)
+        , _this_id(0)
+        , _additional_ref_status(ADDITIONAL_REF_USING) {}
+
+    virtual ~VersionedRefWithId() = default;
+    DISALLOW_COPY_AND_ASSIGN(VersionedRefWithId);
+
+    // Create a VersionedRefWithId, put the identifier into `id'.
+    // `args' will be passed to OnCreated() directly.
+    // Returns 0 on success, -1 otherwise.
+    template<typename ... Args>
+    static int Create(VRefId* id, Args... args);
+
+    // Place the VersionedRefWithId associated with identifier `id' into
+    // unique_ptr `ptr', which will be released automatically when out
+    // of scope (w/o explicit std::move). User can still access `ptr'
+    // after calling ptr->SetFailed() before release of `ptr'.
+    // This function is wait-free.
+    // Returns 0 on success, -1 when the Socket was SetFailed().
+    static int Address(VRefId id, VersionedRefWithIdUniquePtr<T>* ptr);
+
+    // Returns 0 on success, 1 on failed socket, -1 on recycled.
+    static int AddressFailedAsWell(VRefId id, VersionedRefWithIdUniquePtr<T>* 
ptr);
+
+    // Re-address current VersionedRefWithId into `ptr'.
+    // Always succeed even if this socket is failed.
+    void ReAddress(VersionedRefWithIdUniquePtr<T>* ptr);
+
+    // Returns signed 32-bit referenced-count.
+    int32_t nref() const {
+        return NRefOfVRef(_versioned_ref.load(butil::memory_order_relaxed));
+    }
+
+    // Mark this VersionedRefWithId or the VersionedRefWithId associated
+    // with `id' as failed.
+    // Any later Address() of the identifier shall return NULL. The
+    // VersionedRefWithId is NOT recycled after calling this function,
+    // instead it will be recycled when no one references it. Internal
+    // fields of the Socket are still accessible after calling this
+    // function. Calling SetFailed() of a VersionedRefWithId more than
+    // once is OK.
+    // T::OnFailed() will be called when SetFailed() successfully.
+    // This function is lock-free.
+    // Returns -1 when the Socket was already SetFailed(), 0 otherwise.
+    template<typename... Args>
+    static int SetFailedById(VRefId id, Args... args);
+
+    template<typename... Args>
+    int SetFailed(Args... args);
+
+    bool Failed() const {
+        return VersionOfVRef(_versioned_ref.load(butil::memory_order_relaxed))
+            != VersionOfVRefId(_this_id);
+    }
+
+    // Release the additional reference which added inside `Create'
+    // before so that `VersionedRefWithId' will be recycled automatically
+    // once on one is addressing it.
+    int ReleaseAdditionalReference();
+
+    VRefId id() const { return _this_id; }
+
+    // A brief description.
+    std::string description() const;
+
+protected:
+friend void DereferenceVersionedRefWithId<>(T* r);
+
+    // Status flag used to mark that
+    enum AdditionalRefStatus {
+        // 1. Additional reference has been increased;
+        ADDITIONAL_REF_USING,
+        // 2. Additional reference is increasing;
+        ADDITIONAL_REF_REVIVING,
+        // 3. Additional reference has been decreased.
+        ADDITIONAL_REF_RECYCLED
+    };
+
+    AdditionalRefStatus additional_ref_status() const {
+        return _additional_ref_status.load(butil::memory_order_relaxed);
+    }
+
+    uint64_t versioned_ref() const {
+        // The acquire fence pairs with release fence in Dereference to avoid
+        // inconsistent states to be seen by others.
+        return _versioned_ref.load(butil::memory_order_acquire);
+    }
+
+    template<typename... Args>
+    int SetFailedImpl(Args... args);
+
+    // Release the reference. If no one is addressing this VersionedRefWithId,
+    // it will be recycled automatically and T::BeforeRecycled() will be 
called.
+    int Dereference();
+
+    // Make this socket addressable again.
+    // If nref is less than `at_least_nref', VersionedRefWithId was
+    // abandoned during revival and cannot be revived.
+    void Revive(int32_t at_least_nref);
+
+private:
+    typedef butil::ResourceId<T> resource_id_t;
+
+    // 1. When `failed_as_well=true', returns 0 on success,
+    //    1 on failed socket, -1 on recycled.
+    // 2. When `failed_as_well=true', returns 0 on success,
+    //    -1 when the Socket was SetFailed().
+    static int AddressImpl(VRefId id, bool failed_as_well,
+                           VersionedRefWithIdUniquePtr<T>* ptr);
+
+    // Callback wrapper of Derived classes.
+    WRAPPER_OF(T, OnFailed, void);
+    WRAPPER_OF(T, BeforeAdditionalRefReleased, void);
+    WRAPPER_OF(T, AfterRevived, void);
+    WRAPPER_OF(T, OnDescription, std::string);
+
+    // unsigned 32-bit version + signed 32-bit referenced-count.
+    // Meaning of version:
+    // * Created version: no SetFailed() is called on the VersionedRefWithId 
yet.
+    //   Must be same evenness with initial _versioned_ref because during 
lifetime
+    //   of a VersionedRefWithId on the slot, the version is added with 1 
twice.
+    //   This is also the version encoded in VRefId.
+    // * Failed version: = created version + 1, SetFailed()-ed but returned.
+    // * Other versions: the socket is already recycled.
+    butil::atomic<uint64_t> BAIDU_CACHELINE_ALIGNMENT _versioned_ref;
+    // The unique identifier.
+    VRefId _this_id;
+    // Indicates whether additional reference has increased,
+    // decreased, or is increasing.
+    // additional ref status:
+    // `Socket'、`Create': REF_USING
+    // `SetFailed': REF_USING -> REF_RECYCLED
+    // `Revive' REF_RECYCLED -> REF_REVIVING -> REF_USING
+    butil::atomic<AdditionalRefStatus> _additional_ref_status;
+};
+
+template<typename T>
+void DereferenceVersionedRefWithId(T* r) {
+    if (r) {
+        static_cast<VersionedRefWithId<T>*>(r)->Dereference();
+    }
+}
+
+template <typename T>
+template<typename ... Args>
+int VersionedRefWithId<T>::Create(VRefId* id, Args... args) {
+    resource_id_t slot;
+    T* const t = butil::get_resource(&slot, Forbidden());
+    if (t == NULL) {
+        LOG(FATAL) << "Fail to get_resource<"
+                   << butil::class_name_str<T>() << ">";
+        return -1;
+    }
+    // nref can be non-zero due to concurrent Address().
+    // _this_id will only be used in destructor/Destroy of referenced
+    // slots, which is safe and properly fenced. Although it's better
+    // to put the id into VersionedRefWithIdUniquePtr.
+    VersionedRefWithId<T>* const vref_with_id = t;
+    vref_with_id->_this_id = MakeVRefId<T>(
+        VersionOfVRef(vref_with_id->_versioned_ref.fetch_add(
+            1, butil::memory_order_release)), slot);
+    vref_with_id->_additional_ref_status.store(
+        ADDITIONAL_REF_USING, butil::memory_order_relaxed);
+    BAIDU_CASSERT((butil::is_result_int<decltype(&T::OnCreated), T, 
Args...>::value),
+                  "T::OnCreated must accept Args params and return int");
+    // At last, call T::OnCreated() to initialize the T object.
+    if (t->OnCreated(std::forward<Args>(args)...) != 0) {
+        vref_with_id->SetFailed();
+        // NOTE: This object may be recycled at this point,
+        // don't touch anything.
+        return -1;
+    }
+    *id = vref_with_id->_this_id;
+    return 0;
+}
+
+template<typename T>
+int VersionedRefWithId<T>::Address(
+    VRefId id, VersionedRefWithIdUniquePtr<T>* ptr) {
+    return AddressImpl(id, false, ptr);
+}
+
+template<typename T>
+int VersionedRefWithId<T>::AddressFailedAsWell(
+    VRefId id, VersionedRefWithIdUniquePtr<T>* ptr) {
+    return AddressImpl(id, true, ptr);
+}
+
+template<typename T>
+int VersionedRefWithId<T>::AddressImpl(
+    VRefId id, bool failed_as_well, VersionedRefWithIdUniquePtr<T>* ptr) {
+    const resource_id_t slot = SlotOfVRefId<T>(id);
+    T* const t = address_resource(slot);
+    if (__builtin_expect(t != NULL, 1)) {
+        // acquire fence makes sure this thread sees latest changes before
+        // Dereference() or Revive().
+        VersionedRefWithId<T>* const vref_with_id = t;
+        const uint64_t vref1 = vref_with_id->_versioned_ref.fetch_add(
+            1, butil::memory_order_acquire);
+        const uint32_t ver1 = VersionOfVRef(vref1);
+        if (ver1 == VersionOfVRefId(id)) {
+            ptr->reset(t);
+            return 0;
+        }
+        if (failed_as_well && ver1 == VersionOfVRefId(id) + 1) {
+            ptr->reset(t);
+            return 1;
+        }
+
+        const uint64_t vref2 = vref_with_id->_versioned_ref.fetch_sub(
+            1, butil::memory_order_release);
+        const int32_t nref = NRefOfVRef(vref2);
+        if (nref > 1) {
+            return -1;
+        } else if (__builtin_expect(nref == 1, 1)) {
+            const uint32_t ver2 = VersionOfVRef(vref2);
+            if ((ver2 & 1)) {
+                if (ver1 == ver2 || ver1 + 1 == ver2) {
+                    uint64_t expected_vref = vref2 - 1;
+                    if (vref_with_id->_versioned_ref.compare_exchange_strong(
+                            expected_vref, MakeVRef(ver2 + 1, 0),
+                            butil::memory_order_acquire,
+                            butil::memory_order_relaxed)) {
+                        BAIDU_CASSERT((butil::is_result_void<
+                                          decltype(&T::BeforeRecycled), 
T>::value),
+                                      "T::BeforeRecycled must accept Args 
params"
+                                      " and return void");
+                        t->BeforeRecycled();
+                        return_resource(slot);
+                    }
+                } else {
+                    CHECK(false) << "ref-version=" << ver1
+                                 << " unref-version=" << ver2;
+                }
+            } else {
+                // Addressed a free slot.
+            }
+        } else {
+            CHECK(false) << "Over dereferenced SocketId=" << id;
+        }
+    }
+    return -1;
+}
+
+template<typename T>
+void VersionedRefWithId<T>::ReAddress(VersionedRefWithIdUniquePtr<T>* ptr) {
+    _versioned_ref.fetch_add(1, butil::memory_order_acquire);
+    ptr->reset(static_cast<T*>(this));
+}
+
+template<typename T>
+template<typename... Args>
+int VersionedRefWithId<T>::SetFailedById(VRefId id, Args... args) {
+    VersionedRefWithIdUniquePtr<T> ptr;
+    if (Address(id, &ptr) != 0) {
+        return -1;
+    }
+    return ptr->SetFailed(std::forward<Args>(args)...);
+}
+
+template<typename T>
+template<typename... Args>
+int VersionedRefWithId<T>::SetFailed(Args... args) {
+    return SetFailedImpl(std::forward<Args>(args)...);
+}
+
+template<typename T>
+template<typename... Args>
+int VersionedRefWithId<T>::SetFailedImpl(Args... args) {
+    const uint32_t id_ver = VersionOfVRefId(_this_id);
+    uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
+    for (;;) {
+        if (VersionOfVRef(vref) != id_ver) {
+            return -1;
+        }
+        // Try to set version=id_ver+1 (to make later address() return NULL),
+        // retry on fail.
+        if (_versioned_ref.compare_exchange_strong(
+                vref, MakeVRef(id_ver + 1, NRefOfVRef(vref)),
+                butil::memory_order_release,
+                butil::memory_order_relaxed)) {
+            // Call T::OnFailed() to notify the failure of T.
+            WRAPPER_CALL(OnFailed, static_cast<T*>(this), 
std::forward<Args>(args)...);
+            // Deref additionally which is added at creation so that this
+            // queue's reference will hit 0(recycle) when no one addresses it.
+            ReleaseAdditionalReference();
+            // NOTE: This object may be recycled at this point, don't
+            // touch anything.
+            return 0;
+        }
+    }
+}
+
+template<typename T>
+int VersionedRefWithId<T>::ReleaseAdditionalReference() {
+    do {
+        AdditionalRefStatus expect = ADDITIONAL_REF_USING;
+        if (_additional_ref_status.compare_exchange_strong(
+            expect, ADDITIONAL_REF_RECYCLED,
+            butil::memory_order_relaxed,
+            butil::memory_order_relaxed)) {
+            BeforeAdditionalRefReleasedWrapper();
+            WRAPPER_CALL(BeforeAdditionalRefReleased, static_cast<T*>(this));
+            return Dereference();
+        }
+
+        if (expect == ADDITIONAL_REF_REVIVING) {
+            // sched_yield to wait until status is not REF_REVIVING.
+            sched_yield();
+        } else {
+            return -1; // REF_RECYCLED
+        }
+    } while (true);
+}
+
+template<typename T>
+int VersionedRefWithId<T>::Dereference() {
+    const VRefId id = _this_id;
+    const uint64_t vref = _versioned_ref.fetch_sub(
+        1, butil::memory_order_release);
+    const int32_t nref = NRefOfVRef(vref);
+    if (nref > 1) {
+        return 0;
+    }
+    if (__builtin_expect(nref == 1, 1)) {
+        const uint32_t ver = VersionOfVRef(vref);
+        const uint32_t id_ver = VersionOfVRefId(id);
+        // Besides first successful SetFailed() adds 1 to version, one of
+        // those dereferencing nref from 1->0 adds another 1 to version.
+        // Notice "one of those": The wait-free Address() may make ref of a
+        // version-unmatched slot change from 1 to 0 for mutiple times, we
+        // have to use version as a guard variable to prevent returning the
+        // VersionedRefWithId to pool more than once.
+        //
+        // Note: `ver == id_ver' means this VersionedRefWithId has been 
`SetRecycle'
+        // before rather than `SetFailed'; `ver == ide_ver+1' means we
+        // had `SetFailed' this socket before. We should destroy the
+        // socket under both situation
+        if (__builtin_expect(ver == id_ver || ver == id_ver + 1, 1)) {
+            // sees nref:1->0, try to set version=id_ver+2,--nref.
+            // No retry: if version changes, the slot is already returned by
+            // another one who sees nref:1->0 concurrently; if nref changes,
+            // which must be non-zero, the slot will be returned when
+            // nref changes from 1->0 again.
+            // Example:
+            //   SetFailed(): --nref, sees nref:1->0           (1)
+            //                try to set version=id_ver+2      (2)
+            //    Address():  ++nref, unmatched version        (3)
+            //                --nref, sees nref:1->0           (4)
+            //                try to set version=id_ver+2      (5)
+            // 1,2,3,4,5 or 1,3,4,2,5:
+            //            SetFailed() succeeds, Address() fails at (5).
+            // 1,3,2,4,5: SetFailed() fails with (2), the slot will be
+            //            returned by (5) of Address()
+            // 1,3,4,5,2: SetFailed() fails with (2), the slot is already
+            //            returned by (5) of Address().
+            uint64_t expected_vref = vref - 1;
+            if (_versioned_ref.compare_exchange_strong(
+                    expected_vref, MakeVRef(id_ver + 2, 0),
+                    butil::memory_order_acquire,
+                    butil::memory_order_relaxed)) {
+                static_cast<T*>(this)->BeforeRecycled();
+                return_resource(SlotOfVRefId<T>(id));
+                return 1;
+            }
+            return 0;
+        }
+        LOG(FATAL) << "Invalid VRefId=" << id;
+        return -1;
+    }
+    LOG(FATAL) << "Over dereferenced VRefId=" << id;
+    return -1;
+}
+
+template<typename T>
+void VersionedRefWithId<T>::Revive(int32_t at_least_nref) {
+    const uint32_t id_ver = VersionOfVRefId(_this_id);
+    uint64_t vref = _versioned_ref.load(butil::memory_order_relaxed);
+    _additional_ref_status.store(
+        ADDITIONAL_REF_REVIVING, butil::memory_order_relaxed);
+    while (true) {
+        CHECK_EQ(id_ver + 1, VersionOfVRef(vref)) << "id=" << id();
+
+        int32_t nref = NRefOfVRef(vref);
+        if (nref < at_least_nref) {
+            // Set the status to REF_RECYLED since no one uses this socket
+            _additional_ref_status.store(
+                ADDITIONAL_REF_RECYCLED, butil::memory_order_relaxed);
+            CHECK_EQ(1, nref);
+            LOG(WARNING) << description() << " was abandoned during revival";
+            return;
+        }
+        // +1 is the additional ref added in Create(). TODO(gejun): we should
+        // remove this additional nref someday.
+        if (_versioned_ref.compare_exchange_weak(
+                vref, MakeVRef(id_ver, nref + 1/*note*/),
+                butil::memory_order_release,
+                butil::memory_order_relaxed)) {
+            // Set the status to REF_USING since we add additional ref again
+            _additional_ref_status.store(
+                ADDITIONAL_REF_USING, butil::memory_order_relaxed);
+            WRAPPER_CALL(AfterRevived, static_cast<T*>(this));
+            return;
+        }
+    }
+}
+
+template<typename T>
+std::string VersionedRefWithId<T>::description() const {
+    std::string result;
+    result.reserve(128);
+    butil::string_appendf(&result, "Socket{id=%" PRIu64, id());
+    result.append(WRAPPER_CALL(
+        OnDescription, const_cast<T*>(static_cast<const T*>(this))));
+    butil::string_appendf(&result, "} (0x%p)", this);
+    return result;
+}
+
+} // namespace brpc
+
+#endif // BRPC_VERSIONED_REF_WITH_ID_H
diff --git a/src/butil/memory/scope_guard.h b/src/butil/memory/scope_guard.h
index 5ed65a4c..ec662b46 100644
--- a/src/butil/memory/scope_guard.h
+++ b/src/butil/memory/scope_guard.h
@@ -18,20 +18,13 @@
 #ifndef BRPC_SCOPED_GUARD_H
 #define BRPC_SCOPED_GUARD_H
 
-#include <type_traits>
+#include "butil/type_traits.h"
 #include "butil/macros.h"
 
 namespace butil {
 
-// Whether a no-argument callable returns void.
-template<typename T>
-struct returns_void_t
-    : public std::is_same<void, decltype(std::declval<T&&>()())>
-{};
-
 template<typename Callback,
-    typename = typename std::enable_if<
-        returns_void_t<Callback>::value>::type>
+         typename = std::enable_if<is_result_void<Callback>::value>>
 class ScopeGuard;
 
 template<typename Callback>
@@ -43,7 +36,7 @@ template<typename Callback>
 class ScopeGuard<Callback> {
 public:
     ScopeGuard(ScopeGuard&& other) noexcept
-        :_callback(std::move(other._callback))
+        : _callback(std::move(other._callback))
         , _dismiss(other._dismiss) {
         other.dismiss();
     }
diff --git a/src/butil/type_traits.h b/src/butil/type_traits.h
index 5f342db3..3cf8473f 100644
--- a/src/butil/type_traits.h
+++ b/src/butil/type_traits.h
@@ -341,7 +341,7 @@ template <typename T> struct is_enum_impl<true, T> : 
false_type { };
 
 template <typename T> struct is_enum
     : internal::is_enum_impl<
-                is_same<T, void>::value ||
+                    is_same<T, void>::value ||
                     is_integral<T>::value ||
                     is_floating_point<T>::value ||
                     is_reference<T>::value ||
@@ -351,6 +351,40 @@ template <typename T> struct is_enum<const T> : is_enum<T> 
{ };
 template <typename T> struct is_enum<volatile T> : is_enum<T> { };
 template <typename T> struct is_enum<const volatile T> : is_enum<T> { };
 
+// Deduces the return type of an INVOKE expression
+// at compile time.
+// If the callable is non-static member function,
+// the first argument should be the class type.
+#if (__cplusplus >= 201703L)
+// std::result_of is deprecated in C++17 and removed in C++20,
+// use std::invoke_result instead.
+template <typename>
+struct result_of;
+template <typename F, typename... Args>
+struct result_of<F(Args...)> : std::invoke_result<F, Args...> {};
+#elif (__cplusplus >= 201103L)
+template <typename F>
+using result_of = std::result_of<F>;
+#else
+#error Only C++11 or later is supported.
+#endif
+
+template <typename F>
+using result_of_t = typename result_of<F>::type;
+
+// Whether a callable returns type which is same as ReturnType.
+template<typename ReturnType, typename F, typename... Args>
+struct is_result_same
+    : public butil::is_same<ReturnType, result_of_t<F(Args...)>> {};
+
+// Whether a callable returns void.
+template<typename F, typename... Args>
+struct is_result_void : public is_result_same<void, F, Args...> {};
+
+// Whether a callable returns int.
+template<typename F, typename... Args>
+struct is_result_int : public is_result_same<int, F, Args...> {};
+
 }  // namespace butil
 
 #endif  // BUTIL_TYPE_TRAITS_H
diff --git a/test/brpc_event_dispatcher_unittest.cpp 
b/test/brpc_event_dispatcher_unittest.cpp
index d8f69842..185e9f2d 100644
--- a/test/brpc_event_dispatcher_unittest.cpp
+++ b/test/brpc_event_dispatcher_unittest.cpp
@@ -27,18 +27,19 @@
 #include "butil/time.h"
 #include "butil/macros.h"
 #include "butil/fd_utility.h"
+#include "butil/memory/scope_guard.h"
+#include "bthread/bthread.h"
 #include "brpc/event_dispatcher.h"
+#include "brpc/socket.h"
 #include "brpc/details/has_epollrdhup.h"
+#include "brpc/versioned_ref_with_id.h"
 
 class EventDispatcherTest : public ::testing::Test{
 protected:
-    EventDispatcherTest(){
-    };
-    virtual ~EventDispatcherTest(){};
-    virtual void SetUp() {
-    };
-    virtual void TearDown() {
-    };
+    EventDispatcherTest() = default;
+    ~EventDispatcherTest() override = default;
+    void SetUp() override {}
+    void TearDown() override {}
 };
 
 TEST_F(EventDispatcherTest, has_epollrdhup) {
@@ -52,6 +53,128 @@ TEST_F(EventDispatcherTest, versioned_ref) {
     ASSERT_EQ(brpc::MakeVRef(1, 1), versioned_ref);
 }
 
+struct UserData;
+
+UserData* g_user_data = NULL;
+
+struct UserData : public brpc::VersionedRefWithId<UserData> {
+    explicit UserData(Forbidden f)
+        : brpc::VersionedRefWithId<UserData>(f)
+        , count(0)
+        , _additional_ref_released(false) {}
+
+    int OnCreated() {
+        count.store(1, butil::memory_order_relaxed);
+        _additional_ref_released = false;
+        g_user_data = this;
+        return 0;
+    }
+
+    void BeforeRecycled() {
+        count.store(0, butil::memory_order_relaxed);
+        g_user_data = NULL;
+    }
+
+    void BeforeAdditionalRefReleased() {
+        _additional_ref_released = true;
+    }
+
+    void OnFailed() {
+        count.fetch_sub(1, butil::memory_order_relaxed);
+    }
+
+    void AfterRevived() {
+        count.fetch_add(1, butil::memory_order_relaxed);
+        _additional_ref_released = false;
+    }
+
+    butil::atomic<int> count;
+    bool _additional_ref_released;
+};
+
+// Unique identifier of a UserData.
+// Users shall store UserDataId instead of UserData and call 
UserData::Address()
+// to convert the identifier to an unique_ptr at each access. Whenever a
+// unique_ptr is not destructed, the enclosed UserData will not be recycled.
+typedef brpc::VRefId UserDataId;
+
+const brpc::VRefId INVALID_EVENT_DATA_ID = brpc::INVALID_VREF_ID;
+
+typedef brpc::VersionedRefWithIdUniquePtr<UserData> UserDataUniquePtr;
+
+volatile bool vref_thread_stop = false;
+butil::atomic<int> g_count(1);
+
+void TestVRef(UserDataId id) {
+    UserDataUniquePtr ptr;
+    ASSERT_EQ(0, UserData::Address(id, &ptr));
+    ptr->count.fetch_add(1, butil::memory_order_relaxed);
+    g_count.fetch_add(1, butil::memory_order_relaxed);
+}
+
+void* VRefThread(void* arg) {
+    auto id = (UserDataId)arg;
+    while (!vref_thread_stop) {
+        TestVRef(id);
+    }
+    return NULL;
+}
+
+TEST_F(EventDispatcherTest, versioned_ref_with_id) {
+    UserDataId id = INVALID_EVENT_DATA_ID;
+    ASSERT_EQ(0, UserData::Create(&id));
+    ASSERT_NE(INVALID_EVENT_DATA_ID, id);
+    UserDataUniquePtr ptr;
+    ASSERT_EQ(0, UserData::Address(id, &ptr));
+    ASSERT_EQ(2, ptr->nref());
+    ASSERT_FALSE(ptr->Failed());
+    ASSERT_EQ(1, ptr->count);
+    ASSERT_EQ(g_user_data, ptr.get());
+    {
+        UserDataUniquePtr temp_ptr;
+        ASSERT_EQ(0, UserData::Address(id, &temp_ptr));
+        ASSERT_EQ(ptr, temp_ptr);
+        ASSERT_EQ(3, ptr->nref());
+    }
+
+    const size_t thread_num = 8;
+    pthread_t tid[thread_num];
+    for (auto& i : tid) {
+        ASSERT_EQ(0, pthread_create(&i, NULL, VRefThread, (void*)id));
+    }
+
+    sleep(2);
+
+    vref_thread_stop = true;
+    for (const auto i : tid) {
+        pthread_join(i, NULL);
+    }
+
+    ASSERT_EQ(2, ptr->nref());
+    ASSERT_EQ(g_count, ptr->count);
+    ASSERT_EQ(0, ptr->SetFailed());
+    ASSERT_TRUE(ptr->Failed());
+    ASSERT_EQ(g_count - 1, ptr->count);
+    // Additional reference has been released.
+    ASSERT_TRUE(ptr->_additional_ref_released);
+    ASSERT_EQ(1, ptr->nref());
+    {
+        UserDataUniquePtr temp_ptr;
+        ASSERT_EQ(1, UserData::AddressFailedAsWell(id, &temp_ptr));
+        ASSERT_EQ(ptr, temp_ptr);
+        ASSERT_EQ(2, ptr->nref());
+    }
+    ptr->Revive(1);
+    ASSERT_EQ(2, ptr->nref());
+    ASSERT_EQ(g_count, ptr->count);
+    ASSERT_FALSE(ptr->_additional_ref_released);
+    ASSERT_EQ(0, UserData::SetFailedById(id));
+    ASSERT_EQ(g_count - 1, ptr->count);
+    ptr.reset();
+    ASSERT_EQ(nullptr, ptr);
+    ASSERT_EQ(nullptr, g_user_data);
+}
+
 std::vector<int> err_fd;
 pthread_mutex_t err_fd_mutex = PTHREAD_MUTEX_INITIALIZER;
 
@@ -79,7 +202,7 @@ struct BAIDU_CACHELINE_ALIGNMENT SocketExtra : public 
brpc::SocketUser {
         times = 0;
     }
 
-    virtual void BeforeRecycle(brpc::Socket* m) {
+    void BeforeRecycle(brpc::Socket* m) override {
         pthread_mutex_lock(&rel_fd_mutex);
         rel_fd.push_back(m->fd());
         pthread_mutex_unlock(&rel_fd_mutex);
@@ -267,3 +390,130 @@ TEST_F(EventDispatcherTest, dispatch_tasks) {
     ASSERT_EQ(NCLIENT, info.free_item_num - old_info.free_item_num);
 #endif
 }
+
+// Unique identifier of a EventPipe.
+// Users shall store EventFDId instead of EventPipe and call 
EventPipe::Address()
+// to convert the identifier to an unique_ptr at each access. Whenever a
+// unique_ptr is not destructed, the enclosed EventPipe will not be recycled.
+typedef brpc::VRefId EventPipeId;
+
+const brpc::VRefId INVALID_EVENT_PIPE_ID = brpc::INVALID_VREF_ID;
+
+class EventPipe;
+typedef brpc::VersionedRefWithIdUniquePtr<EventPipe> EventPipeUniquePtr;
+
+
+class EventPipe : public brpc::VersionedRefWithId<EventPipe> {
+public:
+    explicit EventPipe(Forbidden f)
+        : brpc::VersionedRefWithId<EventPipe>(f)
+        , _pipe_fds{-1, -1}
+        , _input_event_count(0)
+        {}
+
+    int Notify() {
+        char c = 0;
+        if (write(_pipe_fds[1], &c, 1) != 1) {
+            PLOG(ERROR) << "Fail to write to _pipe_fds[1]";
+            return -1;
+        }
+        return 0;
+    }
+
+private:
+friend class VersionedRefWithId<EventPipe>;
+friend class brpc::IOEvent<EventPipe>;
+
+    int OnCreated() {
+        if (pipe(_pipe_fds)) {
+            PLOG(FATAL) << "Fail to create _pipe_fds";
+            return -1;
+        }
+        if (_io_event.Init((void*)id()) != 0) {
+            LOG(ERROR) << "Fail to init IOEvent";
+            return -1;
+        }
+        _io_event.set_bthread_tag(bthread_self_tag());
+        if (_io_event.AddConsumer(_pipe_fds[0]) != 0) {
+            PLOG(ERROR) << "Fail to add SocketId=" << id()
+                        << " into EventDispatcher";
+            return -1;
+        }
+
+
+        _input_event_count = 0;
+        return 0;
+    }
+
+    void BeforeRecycled() {
+        brpc::GetGlobalEventDispatcher(_pipe_fds[0], bthread_self_tag())
+            .RemoveConsumer(_pipe_fds[0]);
+        _io_event.Reset();
+        if (_pipe_fds[0] >= 0) {
+            close(_pipe_fds[0]);
+        }
+        if (_pipe_fds[1] >= 0) {
+            close(_pipe_fds[1]);
+        }
+    }
+
+    static int OnInputEvent(void* user_data, uint32_t,
+                            const bthread_attr_t&) {
+        auto id = reinterpret_cast<EventPipeId>(user_data);
+        EventPipeUniquePtr ptr;
+        if (EventPipe::Address(id, &ptr) != 0) {
+            LOG(WARNING) << "Fail to address EventPipe";
+            return -1;
+        }
+
+        char buf[1024];
+        ssize_t nr = read(ptr->_pipe_fds[0], &buf, arraysize(buf));
+        if (nr <= 0) {
+            if (errno == EAGAIN) {
+                return 0;
+            } else {
+                PLOG(WARNING) << "Fail to read from _pipe_fds[0]";
+                ptr->SetFailed();
+                return -1;
+            }
+        }
+
+        ptr->_input_event_count += nr;
+        return 0;
+    }
+
+    static int OnOutputEvent(void*, uint32_t,
+                             const bthread_attr_t&) {
+        EXPECT_TRUE(false) << "Should not be called";
+        return 0;
+    }
+
+    brpc::IOEvent<EventPipe> _io_event;
+    int _pipe_fds[2];
+
+    size_t _input_event_count;
+};
+
+TEST_F(EventDispatcherTest, customize_dispatch_task) {
+    EventPipeId id = INVALID_EVENT_PIPE_ID;
+    ASSERT_EQ(0, EventPipe::Create(&id));
+    ASSERT_NE(INVALID_EVENT_PIPE_ID, id);
+    EventPipeUniquePtr ptr;
+    ASSERT_EQ(0, EventPipe::Address(id, &ptr));
+    ASSERT_EQ(2, ptr->nref());
+    ASSERT_FALSE(ptr->Failed());
+
+    ASSERT_EQ((size_t)0, ptr->_input_event_count);
+    const size_t N = 10000;
+    for (size_t i = 0; i < N; ++i) {
+        ASSERT_EQ(0, ptr->Notify());
+    }
+    usleep(1000 * 50);
+    ASSERT_EQ(N, ptr->_input_event_count);
+
+    ASSERT_EQ(0, ptr->SetFailed());
+    ASSERT_TRUE(ptr->Failed());
+    ptr.reset();
+    ASSERT_EQ(nullptr, ptr);
+    ASSERT_NE(0, EventPipe::Address(id, &ptr));
+}
diff --git a/test/brpc_load_balancer_unittest.cpp 
b/test/brpc_load_balancer_unittest.cpp
index d98c11b5..14b0131c 100644
--- a/test/brpc_load_balancer_unittest.cpp
+++ b/test/brpc_load_balancer_unittest.cpp
@@ -1107,7 +1107,7 @@ TEST_F(LoadBalancerTest, revived_from_all_failed_sanity) {
     {
         brpc::SocketUniquePtr dummy_ptr;
         ASSERT_EQ(1, brpc::Socket::AddressFailedAsWell(ptr[0]->id(), 
&dummy_ptr));
-        dummy_ptr->Revive();
+        dummy_ptr->Revive(2);
     }
     bthread_usleep(brpc::FLAGS_detect_available_server_interval_ms * 1000);
     // After one server is revived, the reject rate should be 50%
diff --git a/test/brpc_redis_unittest.cpp b/test/brpc_redis_unittest.cpp
index 1176676c..573ab2ed 100644
--- a/test/brpc_redis_unittest.cpp
+++ b/test/brpc_redis_unittest.cpp
@@ -1080,7 +1080,7 @@ public:
 
     brpc::RedisCommandHandlerResult Run(const std::vector<butil::StringPiece>& 
args,
                                         brpc::RedisReply* output,
-                                        bool flush_batched) {
+                                        bool flush_batched) override {
         output->SetStatus("OK");
         return brpc::REDIS_CMD_CONTINUE;
     }
@@ -1093,7 +1093,7 @@ public:
     public:
         brpc::RedisCommandHandlerResult Run(const 
std::vector<butil::StringPiece>& args,
                                             brpc::RedisReply* output,
-                                            bool flush_batched) {
+                                            bool flush_batched) override {
             if (args[0] == "multi") {
                 output->SetError("ERR duplicate multi");
                 return brpc::REDIS_CMD_CONTINUE;


---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscr...@brpc.apache.org
For additional commands, e-mail: dev-h...@brpc.apache.org

Reply via email to