This is an automated email from the ASF dual-hosted git repository.
swebb2066 pushed a commit to branch improve_async_appender
in repository https://gitbox.apache.org/repos/asf/logging-log4cxx.git
The following commit(s) were added to refs/heads/improve_async_appender by this
push:
new 7fd034a7 Use a ring buffer to pass events to the dispatch thread
7fd034a7 is described below
commit 7fd034a763d7b86399fe33cfd29dd99872f3af52
Author: Stephen Webb <[email protected]>
AuthorDate: Sat Mar 9 14:19:34 2024 +1100
Use a ring buffer to pass events to the dispatch thread
---
src/main/cpp/asyncappender.cpp | 189 ++++++++---------------------------------
1 file changed, 34 insertions(+), 155 deletions(-)
diff --git a/src/main/cpp/asyncappender.cpp b/src/main/cpp/asyncappender.cpp
index a7265a51..7355f054 100644
--- a/src/main/cpp/asyncappender.cpp
+++ b/src/main/cpp/asyncappender.cpp
@@ -96,78 +96,13 @@ typedef std::map<LogString, DiscardSummary> DiscardMap;
}
#endif
-#define USE_ATOMIC_QUEUE 1
-#if USE_ATOMIC_QUEUE
-#include <atomic>
-#include <bit>
-namespace
-{
static const int CACHE_LINE_SIZE = 128;
-class AtomicQueue
-{
-public:
- struct alignas(CACHE_LINE_SIZE) Node
- {
- LoggingEventPtr data;
- Node* next;
- Node() : next(0) {}
- Node(const LoggingEventPtr& event, Node* n)
- : data(event)
- , next(n)
- { }
- };
-
- AtomicQueue(size_t bufferSize)
- : m_head(0)
- , m_nextNode(0)
- , m_nodeStore(std::bit_ceil(bufferSize + 2))
- {}
-
- void push(const LoggingEventPtr& event)
- {
- auto index = m_nextNode++;
- auto n = &m_nodeStore[index % m_nodeStore.size()];
- *n = Node(event, m_head.load(std::memory_order_relaxed));
- while (!m_head.compare_exchange_weak(n->next, n,
std::memory_order_release))
- {
- }
- }
-
- Node* pop_all(void)
- {
- return m_head.exchange(0, std::memory_order_consume);
- }
-
- Node* pop_all_reverse(void)
- {
- Node* first = 0;
- auto last = pop_all();
- while (last)
- {
- auto tmp = last;
- last = last->next;
- tmp->next = first;
- first = tmp;
- }
- return first;
- }
-
- void setBufferSize(size_t bufferSize)
- {
- m_nodeStore.resize(std::bit_ceil(bufferSize + 2));
- }
-private:
- alignas(CACHE_LINE_SIZE) std::atomic<Node*> m_head;
- alignas(CACHE_LINE_SIZE) std::atomic<size_t> m_nextNode;
- alignas(CACHE_LINE_SIZE) std::vector<Node> m_nodeStore;
-};
-} // namespace
-#endif
struct AsyncAppender::AsyncAppenderPriv : public
AppenderSkeleton::AppenderSkeletonPrivate
{
AsyncAppenderPriv() :
AppenderSkeletonPrivate(),
+ buffer(DEFAULT_BUFFER_SIZE),
bufferSize(DEFAULT_BUFFER_SIZE),
appenders(pool),
dispatcher(),
@@ -176,9 +111,9 @@ struct AsyncAppender::AsyncAppenderPriv : public
AppenderSkeleton::AppenderSkele
#if LOG4CXX_EVENTS_AT_EXIT
, atExitRegistryRaii([this]{atExitActivated();})
#endif
-#if USE_ATOMIC_QUEUE
- , eventList(DEFAULT_BUFFER_SIZE)
-#endif
+ , eventCount(0)
+ , dispatchedCount(0)
+ , commitCount(0)
{
}
@@ -192,12 +127,10 @@ struct AsyncAppender::AsyncAppenderPriv : public
AppenderSkeleton::AppenderSkele
}
#endif
-#if LOG4CXX_ABI_VERSION <= 15 || !(USE_ATOMIC_QUEUE)
/**
* Event buffer.
*/
LoggingEventList buffer;
-#endif
/**
* Mutex used to guard access to buffer and discardMap.
@@ -213,7 +146,7 @@ struct AsyncAppender::AsyncAppenderPriv : public
AppenderSkeleton::AppenderSkele
DiscardMap discardMap;
/**
- * Buffer size.
+ * The maximum number of undispatched events.
*/
int bufferSize;
@@ -241,17 +174,20 @@ struct AsyncAppender::AsyncAppenderPriv : public
AppenderSkeleton::AppenderSkele
helpers::AtExitRegistry::Raii atExitRegistryRaii;
#endif
-#if USE_ATOMIC_QUEUE
/**
- * Pending events
+ * Used to calculate the buffer position at which to store the next
event.
*/
- alignas(CACHE_LINE_SIZE) AtomicQueue eventList;
+ alignas(CACHE_LINE_SIZE) std::atomic<size_t> eventCount;
/**
- * The number of pending events.
+ * Used to calculate the buffer position from which to extract the next
event.
*/
- alignas(CACHE_LINE_SIZE) std::atomic<int> approxListSize;
-#endif
+ alignas(CACHE_LINE_SIZE) std::atomic<size_t> dispatchedCount;
+
+ /**
+ * Used to communicate to the dispatch thread when an event is
committed in buffer.
+ */
+ alignas(CACHE_LINE_SIZE) std::atomic<size_t> commitCount;
};
@@ -323,46 +259,28 @@ void AsyncAppender::append(const spi::LoggingEventPtr&
event, Pool& p)
std::unique_lock<std::mutex> lock(priv->bufferMutex);
if (!priv->dispatcher.joinable())
priv->dispatcher =
ThreadUtility::instance()->createThread( LOG4CXX_STR("AsyncAppender"),
&AsyncAppender::dispatch, this );
-#if !USE_ATOMIC_QUEUE
- priv->buffer.reserve(priv->bufferSize);
-#endif
}
while (true)
{
-#if USE_ATOMIC_QUEUE
- auto newSize = ++priv->approxListSize;
- if (newSize <= priv->bufferSize)
+ // Claim a slot in the ring buffer
+ auto oldEventCount = priv->eventCount++;
+ auto pendingCount = oldEventCount - priv->dispatchedCount;
+ if (0 <= pendingCount && pendingCount < priv->bufferSize)
{
- priv->eventList.push(event);
+ auto index = oldEventCount % priv->buffer.size();
+ // Write to the ring buffer
+ priv->buffer[index] = event;
+ // Notify the dispatch thread that an event has been
added
+
priv->commitCount.compare_exchange_strong(oldEventCount, oldEventCount + 1);
priv->bufferNotEmpty.notify_all();
break;
}
else
- --priv->approxListSize;
- break;
+ --priv->eventCount;
//
- // Following code is only reachable if buffer is full
+ // Following code is only reachable if buffer is full or
eventCount has overflowed
//
std::unique_lock<std::mutex> lock(priv->bufferMutex);
-#else
- std::unique_lock<std::mutex> lock(priv->bufferMutex);
- size_t previousSize = priv->buffer.size();
-
- if (previousSize < (size_t)priv->bufferSize)
- {
- priv->buffer.push_back(event);
-
- if (previousSize == 0)
- {
- priv->bufferNotEmpty.notify_all();
- }
-
- break;
- }
- //
- // Following code is only reachable if buffer is full
- //
-#endif
//
// if blocking and thread is not already interrupted
// and not the dispatcher then
@@ -375,11 +293,7 @@ void AsyncAppender::append(const spi::LoggingEventPtr&
event, Pool& p)
{
priv->bufferNotFull.wait(lock, [this]()
{
-#if USE_ATOMIC_QUEUE
- return priv->approxListSize < priv->bufferSize;
-#else
- return priv->buffer.size() < priv->bufferSize;
-#endif
+ return priv->eventCount - priv->dispatchedCount
< priv->bufferSize;
});
discard = false;
}
@@ -419,10 +333,6 @@ void AsyncAppender::close()
if ( priv->dispatcher.joinable() )
{
-#if USE_ATOMIC_QUEUE
- // Queue a special event that will terminate the dispatch thread
- priv->eventList.push(LoggingEventPtr());
-#endif
priv->dispatcher.join();
}
@@ -487,9 +397,7 @@ void AsyncAppender::setBufferSize(int size)
std::lock_guard<std::mutex> lock(priv->bufferMutex);
priv->bufferSize = (size < 1) ? 1 : size;
-#if USE_ATOMIC_QUEUE
- priv->eventList.setBufferSize(priv->bufferSize);
-#endif
+ priv->buffer.resize(priv->bufferSize);
priv->bufferNotFull.notify_all();
}
@@ -573,59 +481,30 @@ void AsyncAppender::dispatch()
while (isActive)
{
LoggingEventList events;
-#if USE_ATOMIC_QUEUE
- auto eventList = priv->eventList.pop_all_reverse();
- priv->approxListSize = 0;
- if (!eventList)
- {
- std::unique_lock<std::mutex> lock(priv->bufferMutex);
- priv->bufferNotEmpty.wait(lock, [this, &eventList]() ->
bool
- {
- eventList =
priv->eventList.pop_all_reverse();
- priv->approxListSize = 0;
- return eventList || priv->closed;
- }
- );
- }
- priv->bufferNotFull.notify_all();
- while (eventList)
- {
- if (eventList->data)
- events.push_back(eventList->data);
- else
- isActive = false;
- auto next = eventList->next;
- eventList = next;
- }
- {
- std::unique_lock<std::mutex> lock(priv->bufferMutex);
- for (auto item : priv->discardMap)
- events.push_back(item.second.createEvent(p));
- priv->discardMap.clear();
- }
-#else
//
// process events after lock on buffer is released.
//
{
std::unique_lock<std::mutex> lock(priv->bufferMutex);
priv->bufferNotEmpty.wait(lock, [this]() -> bool
- { return 0 < priv->buffer.size() ||
priv->closed; }
+ { return priv->dispatchedCount !=
priv->commitCount || priv->closed; }
);
isActive = !priv->closed;
- events = std::move(priv->buffer);
+ while (priv->dispatchedCount != priv->commitCount)
+ {
+ auto index = priv->dispatchedCount %
priv->buffer.size();
+ events.push_back(priv->buffer[index]);
+ ++priv->dispatchedCount;
+ }
for (auto discardItem : priv->discardMap)
{
events.push_back(discardItem.second.createEvent(p));
}
- priv->buffer.clear();
- priv->buffer.reserve(priv->bufferSize);
priv->discardMap.clear();
priv->bufferNotFull.notify_all();
}
-#endif
for (auto item : events)
{