Updated Branches: refs/heads/master b30d69825 -> 5f4b470ce
TS-2137: Use eventfd instread of pthread signal/wait in ATS pthread_cond_signal/wait is used in several places in ATS, including but not limited: 1) Logging system. 2) ProtectedQueue in event system. 3) RecProcess in stats system. As we known, pthread_cond_signal() need to take lock, it'll cause more context switch than eventfd. In my testing: 1) Using _signal()/_wait() pair, eventfd is about 5 times faster than pthread cond. 2) Using _signal()/_timedwait() pair, eventfd is about 2 times faster than pthread cond. == NOTE == pthread_cond_signal/wait is also used by AIO module, but we can't simply replace it with eventfd, as AIO code use the mutex to protect other stuff. And I found we can't replace it in ProtectedQueue directly, there are some prepare work to do. Signed-off-by: Yunkai Zhang <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/f4f8d99f Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/f4f8d99f Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/f4f8d99f Branch: refs/heads/master Commit: f4f8d99f65b7ffbc37e7f72adc0694adb26041c4 Parents: b30d698 Author: Yunkai Zhang <[email protected]> Authored: Sat Aug 17 16:17:41 2013 +0800 Committer: Yunkai Zhang <[email protected]> Committed: Wed Aug 21 17:39:53 2013 +0800 ---------------------------------------------------------------------- lib/records/RecProcess.cc | 17 ++- lib/ts/EventNotify.cc | 172 +++++++++++++++++++++++++++++++ lib/ts/EventNotify.h | 54 ++++++++++ lib/ts/Makefile.am | 2 + lib/ts/libts.h | 1 + proxy/logging/Log.cc | 47 +++------ proxy/logging/Log.h | 9 +- proxy/logging/LogCollationHostSM.cc | 2 +- proxy/logging/LogConfig.cc | 2 +- proxy/logging/LogFile.cc | 4 +- proxy/logging/LogObject.cc | 2 +- 11 files changed, 261 insertions(+), 51 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/records/RecProcess.cc ---------------------------------------------------------------------- diff --git a/lib/records/RecProcess.cc b/lib/records/RecProcess.cc index b9c69ba..3f1f34e 100644 --- a/lib/records/RecProcess.cc +++ b/lib/records/RecProcess.cc @@ -35,8 +35,7 @@ static bool g_initialized = false; static bool g_message_initialized = false; static bool g_started = false; -static ink_cond g_force_req_cond; -static ink_mutex g_force_req_mutex; +static EventNotify g_force_req_notify; static int g_rec_raw_stat_sync_interval_ms = REC_RAW_STAT_SYNC_INTERVAL_MS; static int g_rec_config_update_interval_ms = REC_CONFIG_UPDATE_INTERVAL_MS; static int g_rec_remote_sync_interval_ms = REC_REMOTE_SYNC_INTERVAL_MS; @@ -263,9 +262,9 @@ recv_message_cb__process(RecMessage *msg, RecMessageT msg_type, void *cookie) if ((err = recv_message_cb(msg, msg_type, cookie)) == REC_ERR_OKAY) { if (msg_type == RECG_PULL_ACK) { - ink_mutex_acquire(&g_force_req_mutex); - ink_cond_signal(&g_force_req_cond); - ink_mutex_release(&g_force_req_mutex); + g_force_req_notify.lock(); + g_force_req_notify.signal(); + g_force_req_notify.unlock(); } } return err; @@ -419,13 +418,11 @@ RecProcessInitMessage(RecModeT mode_type) return REC_ERR_FAIL; } - ink_cond_init(&g_force_req_cond); - ink_mutex_init(&g_force_req_mutex, NULL); if (mode_type == RECM_CLIENT) { send_pull_message(RECG_PULL_REQ); - ink_mutex_acquire(&g_force_req_mutex); - ink_cond_wait(&g_force_req_cond, &g_force_req_mutex); - ink_mutex_release(&g_force_req_mutex); + g_force_req_notify.lock(); + g_force_req_notify.wait(); + g_force_req_notify.unlock(); } g_message_initialized = true; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.cc ---------------------------------------------------------------------- diff --git a/lib/ts/EventNotify.cc b/lib/ts/EventNotify.cc new file mode 100644 index 0000000..5d0cc53 --- /dev/null +++ b/lib/ts/EventNotify.cc @@ -0,0 +1,172 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +/************************************************************************** + EventNotify.cc + + Generic event notify mechanism among threads. +**************************************************************************/ + +#include "EventNotify.h" +#include "ink_hrtime.h" + +#ifdef TS_HAS_EVENTFD +#include <sys/eventfd.h> +#include <sys/epoll.h> +#endif + +EventNotify::EventNotify(const char *name): m_name(name) +{ +#ifdef TS_HAS_EVENTFD + int ret; + struct epoll_event ev; + + // Don't use noblock here! + m_event_fd = eventfd(0, EFD_CLOEXEC); + if (m_event_fd < 0) { + // EFD_CLOEXEC invalid in <= Linux 2.6.27 + m_event_fd = eventfd(0, 0); + } + ink_release_assert(m_event_fd != -1); + + ev.events = EPOLLIN; + ev.data.fd = m_event_fd; + + m_epoll_fd = epoll_create(1); + ink_release_assert(m_epoll_fd != -1); + + ret = epoll_ctl(m_epoll_fd, EPOLL_CTL_ADD, m_event_fd, &ev); + ink_release_assert(ret != -1); +#else + ink_cond_init(&m_cond); + ink_mutex_init(&m_mutex, m_name); +#endif +} + +void +EventNotify::signal(void) +{ +#ifdef TS_HAS_EVENTFD + ssize_t nr; + uint64_t value = 1; + nr = write(m_event_fd, &value, sizeof(uint64_t)); + ink_release_assert(nr == sizeof(uint64_t)); +#else + ink_cond_signal(&m_cond); +#endif +} + +void +EventNotify::wait(void) +{ +#ifdef TS_HAS_EVENTFD + ssize_t nr; + uint64_t value = 0; + nr = read(m_event_fd, &value, sizeof(uint64_t)); + ink_release_assert(nr == sizeof(uint64_t)); +#else + ink_cond_wait(&m_cond, &m_mutex); +#endif +} + +int +EventNotify::timedwait(ink_timestruc *abstime) +{ +#ifdef TS_HAS_EVENTFD + int timeout; + ssize_t nr, nr_fd = 0; + uint64_t value = 0; + struct timeval curtime; + struct epoll_event ev; + + // Convert absolute time to relative time + gettimeofday(&curtime, NULL); + timeout = (abstime->tv_sec - curtime.tv_sec) * 1000 + + (abstime->tv_nsec / 1000 - curtime.tv_usec) / 1000; + + // + // When timeout < 0, epoll_wait() will wait indefinitely, but + // pthread_cond_timedwait() will return ETIMEDOUT immediately. + // We should keep compatible with pthread_cond_timedwait() here. + // + if (timeout < 0) + return ETIMEDOUT; + + do { + nr_fd = epoll_wait(m_epoll_fd, &ev, 1, timeout); + } while (nr_fd == -1 && errno == EINTR); + + if (nr_fd == 0) + return ETIMEDOUT; + else if (nr_fd == -1) + return errno; + + nr = read(m_event_fd, &value, sizeof(uint64_t)); + ink_release_assert(nr == sizeof(uint64_t)); + + return 0; +#else + return ink_cond_timedwait(&m_cond, &m_mutex, abstime); +#endif +} + +void +EventNotify::lock(void) +{ +#ifdef TS_HAS_EVENTFD + // do nothing +#else + ink_mutex_acquire(&m_mutex); +#endif +} + +bool +EventNotify::trylock(void) +{ +#ifdef TS_HAS_EVENTFD + return true; +#else + return ink_mutex_try_acquire(&m_mutex); +#endif +} + +void +EventNotify::unlock(void) +{ +#ifdef TS_HAS_EVENTFD + // do nothing +#else + ink_mutex_release(&m_mutex); +#endif +} + +EventNotify::~EventNotify() +{ +#ifdef TS_HAS_EVENTFD + close(m_event_fd); + close(m_epoll_fd); +#else + ink_cond_destroy(&m_cond); + ink_mutex_destroy(&m_mutex); +#endif +} http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/EventNotify.h ---------------------------------------------------------------------- diff --git a/lib/ts/EventNotify.h b/lib/ts/EventNotify.h new file mode 100644 index 0000000..16e4809 --- /dev/null +++ b/lib/ts/EventNotify.h @@ -0,0 +1,54 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +/************************************************************************** + EventNotify.h + + Generic event notify mechanism among threads. + +**************************************************************************/ + +#include "ink_thread.h" + +class EventNotify +{ +public: + EventNotify(const char *name = NULL); + void signal(void); + void wait(void); + int timedwait(ink_timestruc *abstime); + void lock(void); + bool trylock(void); + void unlock(void); + ~EventNotify(); + +private: + const char *m_name; +#ifdef TS_HAS_EVENTFD + int m_event_fd; + int m_epoll_fd; +#else + ink_cond m_cond; + ink_mutex m_mutex; +#endif +}; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/Makefile.am ---------------------------------------------------------------------- diff --git a/lib/ts/Makefile.am b/lib/ts/Makefile.am index c360a4b..3fca2f8 100644 --- a/lib/ts/Makefile.am +++ b/lib/ts/Makefile.am @@ -117,6 +117,8 @@ libtsutil_la_SOURCES = \ ink_syslog.h \ ink_thread.cc \ ink_thread.h \ + EventNotify.h \ + EventNotify.cc \ ink_time.cc \ ink_time.h \ inktomi++.h \ http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/lib/ts/libts.h ---------------------------------------------------------------------- diff --git a/lib/ts/libts.h b/lib/ts/libts.h index 046afaa..27c9e92 100644 --- a/lib/ts/libts.h +++ b/lib/ts/libts.h @@ -81,6 +81,7 @@ #include "Bitops.h" #include "Compatability.h" #include "DynArray.h" +#include "EventNotify.h" #include "I_Version.h" #include "InkPool.h" #include "List.h" http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.cc ---------------------------------------------------------------------- diff --git a/proxy/logging/Log.cc b/proxy/logging/Log.cc index 502c424..790bc04 100644 --- a/proxy/logging/Log.cc +++ b/proxy/logging/Log.cc @@ -77,15 +77,12 @@ size_t Log::numInactiveObjects; size_t Log::maxInactiveObjects; // Flush thread stuff -ink_mutex *Log::preproc_mutex; -ink_cond *Log::preproc_cond; -ink_mutex *Log::flush_mutex; -ink_cond *Log::flush_cond; +EventNotify *Log::preproc_notify; +EventNotify *Log::flush_notify; InkAtomicList *Log::flush_data_list; // Collate thread stuff -ink_mutex Log::collate_mutex; -ink_cond Log::collate_cond; +EventNotify Log::collate_notify; ink_thread Log::collate_thread; int Log::collation_accept_file_descriptor; int Log::collation_preproc_threads; @@ -189,10 +186,10 @@ struct PeriodicWakeup : Continuation int wakeup (int /* event ATS_UNUSED */, Event * /* e ATS_UNUSED */) { for (int i = 0; i < m_preproc_threads; i++) { - ink_cond_signal (&Log::preproc_cond[i]); + Log::preproc_notify[i].signal(); } for (int i = 0; i < m_flush_threads; i++) { - ink_cond_signal (&Log::flush_cond[i]); + Log::flush_notify[i].signal(); } return EVENT_CONT; } @@ -1070,8 +1067,7 @@ Log::create_threads() if (!(init_status & THREADS_CREATED)) { char desc[64]; - preproc_mutex = new ink_mutex[collation_preproc_threads]; - preproc_cond = new ink_cond[collation_preproc_threads]; + preproc_notify = new EventNotify[collation_preproc_threads]; size_t stacksize; REC_ReadConfigInteger(stacksize, "proxy.config.thread.default.stacksize"); @@ -1081,9 +1077,6 @@ Log::create_threads() // no need for the conditional var since it will be relying on // on the event system. for (int i = 0; i < collation_preproc_threads; i++) { - sprintf(desc, "Logging preproc thread mutex[%d]", i); - ink_mutex_init(&preproc_mutex[i], desc); - ink_cond_init(&preproc_cond[i]); Continuation *preproc_cont = NEW(new LoggingPreprocContinuation(i)); sprintf(desc, "[LOG_PREPROC %d]", i); eventProcessor.spawn_thread(preproc_cont, desc, stacksize); @@ -1093,13 +1086,9 @@ Log::create_threads() // TODO: Enable multiple flush threads, such as // one flush thread per file. // - flush_mutex = new ink_mutex; - flush_cond = new ink_cond; + flush_notify = new EventNotify; flush_data_list = new InkAtomicList; - sprintf(desc, "Logging flush thread mutex"); - ink_mutex_init(flush_mutex, desc); - ink_cond_init(flush_cond); sprintf(desc, "Logging flush buffer list"); ink_atomiclist_init(flush_data_list, desc, 0); Continuation *flush_cont = NEW(new LoggingFlushContinuation(0)); @@ -1118,8 +1107,6 @@ Log::create_threads() // much overhead associated with keeping an ink_thread blocked on a // condition variable. // - ink_mutex_init(&collate_mutex, "Collate thread mutex"); - ink_cond_init(&collate_cond); Continuation *collate_continuation = NEW(new LoggingCollateContinuation); Event *collate_event = eventProcessor.spawn_thread(collate_continuation); collate_thread = collate_event->ethread->tid; @@ -1246,7 +1233,7 @@ Log::preproc_thread_main(void *args) Debug("log-preproc", "log preproc thread is alive ..."); - ink_mutex_acquire(&preproc_mutex[idx]); + Log::preproc_notify[idx].lock(); while (true) { buffers_preproced = config->log_object_manager.preproc_buffers(idx); @@ -1264,11 +1251,11 @@ Log::preproc_thread_main(void *args) // check the queue and find there is nothing to do, then wait // again. // - ink_cond_wait (&preproc_cond[idx], &preproc_mutex[idx]); + Log::preproc_notify[idx].wait(); } /* NOTREACHED */ - ink_mutex_release(&preproc_mutex[idx]); + Log::preproc_notify[idx].unlock(); return NULL; } @@ -1283,7 +1270,7 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */) int len, bytes_written, total_bytes; SLL<LogFlushData, LogFlushData::Link_link> link, invert_link; - ink_mutex_acquire(flush_mutex); + Log::flush_notify->lock(); while (true) { fdata = (LogFlushData *) ink_atomiclist_popall(flush_data_list); @@ -1369,11 +1356,11 @@ Log::flush_thread_main(void * /* args ATS_UNUSED */) // check the queue and find there is nothing to do, then wait // again. // - ink_cond_wait(flush_cond, flush_mutex); + Log::flush_notify->wait(); } /* NOTREACHED */ - ink_mutex_release(flush_mutex); + Log::flush_notify->unlock(); return NULL; } @@ -1398,7 +1385,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */) Debug("log-thread", "Log collation thread is alive ..."); - ink_mutex_acquire(&collate_mutex); + Log::collate_notify.lock(); while (true) { ink_assert(Log::config != NULL); @@ -1408,7 +1395,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */) // wake-ups. // while (!Log::config->am_collation_host()) { - ink_cond_wait(&collate_cond, &collate_mutex); + Log::collate_notify.wait(); } // Ok, at this point we know we're a log collation host, so get to @@ -1427,7 +1414,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */) // // go to sleep ... // - ink_cond_wait(&collate_cond, &collate_mutex); + Log::collate_notify.wait(); continue; } @@ -1489,7 +1476,7 @@ Log::collate_thread_main(void * /* args ATS_UNUSED */) } /* NOTREACHED */ - ink_mutex_release(&collate_mutex); + Log::collate_notify.unlock(); return NULL; } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/Log.h ---------------------------------------------------------------------- diff --git a/proxy/logging/Log.h b/proxy/logging/Log.h index 2f395ca..2233220 100644 --- a/proxy/logging/Log.h +++ b/proxy/logging/Log.h @@ -420,17 +420,14 @@ public: static void add_to_inactive(LogObject * obj); // logging thread stuff - static ink_mutex *preproc_mutex; - static ink_cond *preproc_cond; + static EventNotify *preproc_notify; static void *preproc_thread_main(void *args); - static ink_mutex *flush_mutex; - static ink_cond *flush_cond; + static EventNotify *flush_notify; static InkAtomicList *flush_data_list; static void *flush_thread_main(void *args); // collation thread stuff - static ink_mutex collate_mutex; - static ink_cond collate_cond; + static EventNotify collate_notify; static ink_thread collate_thread; static int collation_preproc_threads; static int collation_accept_file_descriptor; http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogCollationHostSM.cc ---------------------------------------------------------------------- diff --git a/proxy/logging/LogCollationHostSM.cc b/proxy/logging/LogCollationHostSM.cc index 9add290..36452f2 100644 --- a/proxy/logging/LogCollationHostSM.cc +++ b/proxy/logging/LogCollationHostSM.cc @@ -321,7 +321,7 @@ LogCollationHostSM::host_recv(int event, void * /* data ATS_UNUSED */) // log_buffer = NEW(new LogBuffer(log_object, log_buffer_header)); int idx = log_object->add_to_flush_queue(log_buffer); - ink_cond_signal(&Log::preproc_cond[idx]); + Log::preproc_notify[idx].signal(); } #if defined(LOG_BUFFER_TRACKING) http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogConfig.cc ---------------------------------------------------------------------- diff --git a/proxy/logging/LogConfig.cc b/proxy/logging/LogConfig.cc index 6cf55ba..8e3b585 100644 --- a/proxy/logging/LogConfig.cc +++ b/proxy/logging/LogConfig.cc @@ -661,7 +661,7 @@ LogConfig::setup_collation(LogConfig * prev_config) // since we are the collation host, we need to signal the // collate_cond variable so that our collation thread wakes up. // - ink_cond_signal(&Log::collate_cond); + Log::collate_notify.signal(); #endif Debug("log", "I am a collation host listening on port %d.", collation_port); } else { http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogFile.cc ---------------------------------------------------------------------- diff --git a/proxy/logging/LogFile.cc b/proxy/logging/LogFile.cc index 5bf6ba4..6b93a26 100644 --- a/proxy/logging/LogFile.cc +++ b/proxy/logging/LogFile.cc @@ -523,7 +523,7 @@ LogFile::preproc_and_try_delete(LogBuffer * lb) ink_atomiclist_push(Log::flush_data_list, flush_data); - ink_cond_signal(Log::flush_cond); + Log::flush_notify->signal(); // // LogBuffer will be deleted in flush thread @@ -693,7 +693,7 @@ LogFile::write_ascii_logbuffer3(LogBufferHeader * buffer_header, char *alt_forma LogFlushData *flush_data = new LogFlushData(this, ascii_buffer, fmt_buf_bytes); ink_atomiclist_push(Log::flush_data_list, flush_data); - ink_cond_signal(Log::flush_cond); + Log::flush_notify->signal(); total_bytes += fmt_buf_bytes; } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/f4f8d99f/proxy/logging/LogObject.cc ---------------------------------------------------------------------- diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc index ea059e2..29f1a39 100644 --- a/proxy/logging/LogObject.cc +++ b/proxy/logging/LogObject.cc @@ -461,7 +461,7 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) { int idx = m_buffer_manager_idx++ % m_flush_threads; Debug("log-logbuffer", "adding buffer %d to flush list after checkout", buffer->get_id()); m_buffer_manager[idx].add_to_flush_queue(buffer); - ink_cond_signal(&Log::preproc_cond[idx]); + Log::preproc_notify[idx].signal(); } decremented = true;
