Updated Branches: refs/heads/master 71d8eab9b -> 3505de43f
TS-1240: fix race in log buffer queuing code. Project: http://git-wip-us.apache.org/repos/asf/trafficserver/repo Commit: http://git-wip-us.apache.org/repos/asf/trafficserver/commit/3505de43 Tree: http://git-wip-us.apache.org/repos/asf/trafficserver/tree/3505de43 Diff: http://git-wip-us.apache.org/repos/asf/trafficserver/diff/3505de43 Branch: refs/heads/master Commit: 3505de43f5615c0f5a7ae6b62d0b0673257261c5 Parents: 71d8eab Author: [email protected] <[email protected]> Authored: Thu May 24 10:32:44 2012 -0700 Committer: John Plevyak <[email protected]> Committed: Thu May 24 13:45:26 2012 -0700 ---------------------------------------------------------------------- lib/ts/ink_queue.cc | 15 ----- lib/ts/ink_queue.h | 6 ++ proxy/logging/LogBuffer.cc | 10 ++-- proxy/logging/LogBuffer.h | 44 +++++++-------- proxy/logging/LogObject.cc | 113 ++++++++++++++++++++++++--------------- proxy/logging/LogObject.h | 39 ++++---------- 6 files changed, 111 insertions(+), 116 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/lib/ts/ink_queue.cc ---------------------------------------------------------------------- diff --git a/lib/ts/ink_queue.cc b/lib/ts/ink_queue.cc index 4c8697b..e9cc992 100644 --- a/lib/ts/ink_queue.cc +++ b/lib/ts/ink_queue.cc @@ -50,12 +50,6 @@ #include "ink_resource.h" -#ifdef __x86_64__ -#define INK_QUEUE_LD64(dst,src) *((uint64_t*)&(dst)) = *((uint64_t*)&(src)) -#else -#define INK_QUEUE_LD64(dst,src) (ink_queue_load_64((void *)&(dst), (void *)&(src))) -#endif - typedef struct _ink_freelist_list { InkFreeList *fl; @@ -92,15 +86,6 @@ inkcoreapi volatile int64_t freelist_allocated_mem = 0; #define fl_memadd(_x_) \ ink_atomic_increment64(&freelist_allocated_mem, (int64_t) (_x_)); -//static void ink_queue_load_64(void *dst, void *src) -//{ -// int32_t src_version = (*(head_p *) src).s.version; -// void *src_pointer = (*(head_p *) src).s.pointer; -// -// (*(head_p *) dst).s.version = src_version; -// (*(head_p *) dst).s.pointer = src_pointer; -//} - void ink_freelist_init(InkFreeList * f, http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/lib/ts/ink_queue.h ---------------------------------------------------------------------- diff --git a/lib/ts/ink_queue.h b/lib/ts/ink_queue.h index e1aebbc..1176b95 100644 --- a/lib/ts/ink_queue.h +++ b/lib/ts/ink_queue.h @@ -65,6 +65,12 @@ extern "C" void ink_queue_load_64(void *dst, void *src); +#ifdef __x86_64__ +#define INK_QUEUE_LD64(dst,src) *((uint64_t*)&(dst)) = *((uint64_t*)&(src)) +#else +#define INK_QUEUE_LD64(dst,src) (ink_queue_load_64((void *)&(dst), (void *)&(src))) +#endif + /* * Generic Free List Manager */ http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogBuffer.cc ---------------------------------------------------------------------- diff --git a/proxy/logging/LogBuffer.cc b/proxy/logging/LogBuffer.cc index 363476d..937247b 100644 --- a/proxy/logging/LogBuffer.cc +++ b/proxy/logging/LogBuffer.cc @@ -129,11 +129,10 @@ LogBufferHeader::log_filename() -------------------------------------------------------------------------*/ LogBuffer::LogBuffer(LogObject * owner, size_t size, size_t buf_align, size_t write_align): - next_flush(NULL), - next_list(NULL), m_size(size), m_buf_align(buf_align), - m_write_align(write_align), m_max_entries(Log::config->max_entries_per_buffer), m_owner(owner) + m_write_align(write_align), m_max_entries(Log::config->max_entries_per_buffer), m_owner(owner), + m_references(0) { size_t hdr_size; @@ -159,13 +158,12 @@ LogBuffer::LogBuffer(LogObject * owner, size_t size, size_t buf_align, size_t wr } LogBuffer::LogBuffer(LogObject * owner, LogBufferHeader * header): - next_flush(NULL), - next_list(NULL), m_unaligned_buffer(NULL), m_buffer((char *) header), m_size(0), m_buf_align(LB_DEFAULT_ALIGN), - m_write_align(INK_MIN_ALIGN), m_max_entries(0), m_expiration_time(0), m_owner(owner), m_header(header) + m_write_align(INK_MIN_ALIGN), m_max_entries(0), m_expiration_time(0), m_owner(owner), m_header(header), + m_references(0) { // This constructor does not allocate a buffer because it gets it as // an argument. We set m_unaligned_buffer to NULL, which means that http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogBuffer.h ---------------------------------------------------------------------- diff --git a/proxy/logging/LogBuffer.h b/proxy/logging/LogBuffer.h index 8662975..279e69c 100644 --- a/proxy/logging/LogBuffer.h +++ b/proxy/logging/LogBuffer.h @@ -130,17 +130,11 @@ union LB_State /*------------------------------------------------------------------------- LogBuffer -------------------------------------------------------------------------*/ -#define CLASS_SIGN_LOGBUFFER 0xFACE5370 /* LogBuffer class signature */ - class LogBuffer { public: - unsigned long sign; /* class signature (must be CLASS_SIGN_LOGBUFFER) */ - LogBuffer *next_flush; /* next in flush list */ - LogBuffer *next_list; /* next in list */ - - enum LB_ResultCode - { + SLINK(LogBuffer, write_link); + enum LB_ResultCode { LB_OK = 0, LB_FULL_NO_WRITERS, LB_FULL_ACTIVE_WRITERS, @@ -199,14 +193,16 @@ public: // static functions static size_t max_entry_bytes(); - static int to_ascii(LogEntryHeader * entry, LogFormatType type, - char *buf, int max_len, char *symbol_str, char *printf_str, - unsigned buffer_version, char *alt_format = NULL); - static int resolve_custom_entry(LogFieldList * fieldlist, - char *printf_str, char *read_from, char *write_to, - int write_to_len, long timestamp, long timestamp_us, - unsigned buffer_version, LogFieldList * alt_fieldlist = NULL, - char *alt_printf_str = NULL); + static int to_ascii( + LogEntryHeader * entry, LogFormatType type, + char *buf, int max_len, char *symbol_str, char *printf_str, + unsigned buffer_version, char *alt_format = NULL); + static int resolve_custom_entry( + LogFieldList * fieldlist, + char *printf_str, char *read_from, char *write_to, + int write_to_len, long timestamp, long timestamp_us, + unsigned buffer_version, LogFieldList * alt_fieldlist = NULL, + char *alt_printf_str = NULL); private: char *m_unaligned_buffer; // the unaligned buffer @@ -223,7 +219,10 @@ private: LogObject *m_owner; // the LogObject that owns this buf. LogBufferHeader *m_header; - uint32_t m_id; // unique buffer id (for debugging) + uint32_t m_id; // unique buffer id (for debugging) +public: + volatile int m_references; // oustanding checkout_write references. +private: // private functions size_t _add_buffer_header(); @@ -270,8 +269,7 @@ public: This class will iterate over the entries in a LogBuffer. -------------------------------------------------------------------------*/ -class LogBufferIterator -{ +class LogBufferIterator { public: LogBufferIterator(LogBufferHeader * header, bool in_network_order = false); ~LogBufferIterator(); @@ -300,10 +298,10 @@ private: inline LogBufferIterator::LogBufferIterator(LogBufferHeader * header, bool in_network_order) - : m_in_network_order(in_network_order), - m_next(0), - m_iter_entry_count(0), - m_buffer_entry_count(0) +: m_in_network_order(in_network_order), + m_next(0), + m_iter_entry_count(0), + m_buffer_entry_count(0) { ink_debug_assert(header); http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogObject.cc ---------------------------------------------------------------------- diff --git a/proxy/logging/LogObject.cc b/proxy/logging/LogObject.cc index 24af0af..2e21f7b 100644 --- a/proxy/logging/LogObject.cc +++ b/proxy/logging/LogObject.cc @@ -40,34 +40,32 @@ #include "LogObject.h" size_t -LogBufferManager::flush_buffers(LogBufferSink *sink) -{ - while (!ink_atomic_cas(&_flush_array_lock, 0, 1)); - - int ofa = _open_flush_array; - int nfb = _num_flush_buffers[ofa]; - if (nfb) { - _open_flush_array = !_open_flush_array; // switch to other array - _num_flush_buffers[_open_flush_array] = 0; // clear count - } - - _flush_array_lock = 0; - - if (nfb) { - int i; - - for (i=0 ;i < nfb; ++i) { - LogBuffer *flush_buffer = _flush_array[ofa][i]; - - flush_buffer->update_header_data(); - sink->write(flush_buffer); - delete flush_buffer; +LogBufferManager::flush_buffers(LogBufferSink *sink) { + SList(LogBuffer, write_link) q(write_list.popall()), new_q; + LogBuffer *b = NULL; + while ((b = q.pop())) { + if (b->m_references) { // Still has outstanding references. + write_list.push(b); + } else if (_num_flush_buffers > FLUSH_ARRAY_SIZE) { + delete b; + ink_atomic_increment(&_num_flush_buffers, -1); + Warning("Dropping log buffer, can't keep up."); + } else { + new_q.push(b); } + } - Debug("log-logbuffer", "flushed %d buffers from array %d", nfb, ofa); + int flushed = 0; + while ((b = new_q.pop())) { + b->update_header_data(); + sink->write(b); + delete b; + ink_atomic_increment(&_num_flush_buffers, -1); + flushed++; } - return nfb; + Debug("log-logbuffer", "flushed %d buffers", flushed); + return flushed; } /*------------------------------------------------------------------------- @@ -85,8 +83,7 @@ LogObject::LogObject(LogFormat *format, const char *log_dir, m_rolling_offset_hr (rolling_offset_hr), m_rolling_size_mb (rolling_size_mb), m_last_roll_time(0), - m_ref_count (0), - m_log_buffer (NULL) + m_ref_count (0) { ink_debug_assert (format != NULL); m_format = new LogFormat(*format); @@ -120,8 +117,9 @@ LogObject::LogObject(LogFormat *format, const char *log_dir, Log::config->overspill_report_count)); #endif // TS_MICRO - m_log_buffer = NEW (new LogBuffer (this, Log::config->log_buffer_size)); - ink_debug_assert (m_log_buffer != NULL); + LogBuffer *b = NEW (new LogBuffer (this, Log::config->log_buffer_size)); + ink_debug_assert(b); + SET_FREELIST_POINTER_VERSION(m_log_buffer, b, 0); _setup_rolling(rolling_enabled, rolling_interval_sec, rolling_offset_hr, rolling_size_mb); @@ -137,8 +135,7 @@ LogObject::LogObject(LogObject& rhs) m_signature(rhs.m_signature), m_rolling_interval_sec(rhs.m_rolling_interval_sec), m_last_roll_time(rhs.m_last_roll_time), - m_ref_count(0), - m_log_buffer(NULL) + m_ref_count(0) { m_format = new LogFormat(*(rhs.m_format)); @@ -166,8 +163,9 @@ LogObject::LogObject(LogObject& rhs) // copy gets a fresh log buffer // - m_log_buffer = NEW (new LogBuffer (this, Log::config->log_buffer_size)); - ink_debug_assert (m_log_buffer != NULL); + LogBuffer *b = NEW (new LogBuffer (this, Log::config->log_buffer_size)); + ink_debug_assert(b); + SET_FREELIST_POINTER_VERSION(m_log_buffer, b, 0); Debug("log-config", "exiting LogObject copy constructor, " "filename=%s this=%p", m_filename, this); @@ -194,7 +192,7 @@ LogObject::~LogObject() ats_free(m_filename); ats_free(m_alt_filename); delete m_format; - delete m_log_buffer; + delete (LogBuffer*)FREELIST_POINTER(m_log_buffer); } //----------------------------------------------------------------------------- @@ -405,16 +403,26 @@ LogObject::displayAsXML(FILE * fd, bool extended) LogBuffer * -LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) -{ +LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) { LogBuffer::LB_ResultCode result_code; LogBuffer *buffer; LogBuffer *new_buffer; bool retry = true; do { - buffer = m_log_buffer; + // To avoid a race condition, we keep a count of held references in + // the pointer itself and add this to m_outstanding_references. + head_p h; + int result = 0; + do { + INK_QUEUE_LD64(h, m_log_buffer); + head_p new_h; + SET_FREELIST_POINTER_VERSION(new_h, FREELIST_POINTER(h), FREELIST_VERSION(h) + 1); + result = ink_atomic_cas64((int64_t*)&m_log_buffer.data, h.data, new_h.data); + } while (!result); + buffer = (LogBuffer*)FREELIST_POINTER(h); result_code = buffer->checkout_write(write_offset, bytes_needed); + bool decremented = false; switch (result_code) { case LogBuffer::LB_OK: @@ -428,11 +436,17 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) // no more room in current buffer, create a new one new_buffer = NEW (new LogBuffer(this, Log::config->log_buffer_size)); - // swap the new buffer for the old one (only this thread - // should be doing this, so there should be no problem) - // + // swap the new buffer for the old one INK_WRITE_MEMORY_BARRIER; - ink_atomic_swap_ptr((void *)&m_log_buffer, new_buffer); + head_p old_h; + do { + INK_QUEUE_LD64(old_h, m_log_buffer); + head_p tmp_h; + SET_FREELIST_POINTER_VERSION(tmp_h, new_buffer, 0); + result = ink_atomic_cas64((int64_t*)&m_log_buffer.data, old_h.data, tmp_h.data); + } while (!result); + if (FREELIST_POINTER(old_h) == FREELIST_POINTER(h)) + ink_atomic_increment(&buffer->m_references, FREELIST_VERSION(old_h) - 1); if (result_code == LogBuffer::LB_FULL_NO_WRITERS) { // there are no writers, move the old buffer to the flush list @@ -440,7 +454,8 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) m_buffer_manager.add_to_flush_queue(buffer); ink_cond_signal(&Log::flush_cond); } - // fallover to retry + decremented = true; + break; case LogBuffer::LB_RETRY: // no more room, but another thread should be taking care of @@ -460,7 +475,19 @@ LogObject::_checkout_write(size_t * write_offset, size_t bytes_needed) default: ink_debug_assert(false); } - + if (!decremented) { + head_p old_h; + do { + INK_QUEUE_LD64(old_h, m_log_buffer); + if (FREELIST_POINTER(old_h) != FREELIST_POINTER(h)) + break; + head_p tmp_h; + SET_FREELIST_POINTER_VERSION(tmp_h, FREELIST_POINTER(h), FREELIST_VERSION(old_h) - 1); + result = ink_atomic_cas64((int64_t*)&m_log_buffer.data, old_h.data, tmp_h.data); + } while (!result); + if (FREELIST_POINTER(old_h) != FREELIST_POINTER(h)) + ink_atomic_increment(&buffer->m_references, -1); + } } while (retry && write_offset); // if write_offset is null, we do // not retry because we really do // not want to write to the buffer @@ -750,7 +777,7 @@ LogObject::_roll_files(long last_roll_time, long time_now) void LogObject::check_buffer_expiration(long time_now) { - LogBuffer *b = m_log_buffer; + LogBuffer *b = (LogBuffer*)FREELIST_POINTER(m_log_buffer); if (b && time_now > b->expiration_time()) { force_new_buffer(); } http://git-wip-us.apache.org/repos/asf/trafficserver/blob/3505de43/proxy/logging/LogObject.h ---------------------------------------------------------------------- diff --git a/proxy/logging/LogObject.h b/proxy/logging/LogObject.h index 1b3232a..b163696 100644 --- a/proxy/logging/LogObject.h +++ b/proxy/logging/LogObject.h @@ -67,40 +67,21 @@ Debug("log-api-mutex", _f) class LogBufferManager { -private: - int _flush_array_lock; - - LogBuffer *_flush_array[2][FLUSH_ARRAY_SIZE]; - - int _num_flush_buffers[2]; // number of buffers in queue - int _open_flush_array; // index of queue accepting buffers - -public: - LogBufferManager() - : _flush_array_lock(0), _open_flush_array(0) - { - ink_zero(_num_flush_buffers); - } - - void add_to_flush_queue(LogBuffer * buffer) - { - while (!ink_atomic_cas(&_flush_array_lock, 0, 1)); + private: + ASLL(LogBuffer, write_link) write_list; + int _num_flush_buffers; - if (_num_flush_buffers[_open_flush_array] < FLUSH_ARRAY_SIZE) { - int idx = _num_flush_buffers[_open_flush_array]++; + public: + LogBufferManager() : _num_flush_buffers(0) { } - _flush_array[_open_flush_array][idx] = buffer; - } else { - Warning("Dropping log buffer, can't keep up"); - delete buffer; + void add_to_flush_queue(LogBuffer *buffer) { + write_list.push(buffer); + ink_atomic_increment(&_num_flush_buffers, 1); } - _flush_array_lock = 0; - } - size_t flush_buffers(LogBufferSink *sink); + size_t flush_buffers(LogBufferSink *sink); }; - class LogObject { public: @@ -235,7 +216,7 @@ private: int m_ref_count; - LogBuffer *volatile m_log_buffer; // current work buffer + volatile head_p m_log_buffer; // current work buffer LogBufferManager m_buffer_manager; void generate_filenames(const char *log_dir, const char *basename, LogFileFormat file_format);
