This is an automated email from the ASF dual-hosted git repository.

swebb2066 pushed a commit to branch improve_async_appender
in repository https://gitbox.apache.org/repos/asf/logging-log4cxx.git


The following commit(s) were added to refs/heads/improve_async_appender by this 
push:
     new 7fd034a7 Use a ring buffer to pass events to the dispatch thread
7fd034a7 is described below

commit 7fd034a763d7b86399fe33cfd29dd99872f3af52
Author: Stephen Webb <[email protected]>
AuthorDate: Sat Mar 9 14:19:34 2024 +1100

    Use a ring buffer to pass events to the dispatch thread
---
 src/main/cpp/asyncappender.cpp | 189 ++++++++---------------------------------
 1 file changed, 34 insertions(+), 155 deletions(-)

diff --git a/src/main/cpp/asyncappender.cpp b/src/main/cpp/asyncappender.cpp
index a7265a51..7355f054 100644
--- a/src/main/cpp/asyncappender.cpp
+++ b/src/main/cpp/asyncappender.cpp
@@ -96,78 +96,13 @@ typedef std::map<LogString, DiscardSummary> DiscardMap;
 }
 #endif
 
-#define USE_ATOMIC_QUEUE 1
-#if USE_ATOMIC_QUEUE
-#include <atomic>
-#include <bit>
-namespace
-{
 static const int CACHE_LINE_SIZE = 128;
-class AtomicQueue
-{
-public:
-       struct alignas(CACHE_LINE_SIZE) Node
-       {
-               LoggingEventPtr data;
-               Node* next;
-               Node() : next(0) {}
-               Node(const LoggingEventPtr& event, Node* n)
-                       : data(event)
-                       , next(n)
-               { }
-       };
-
-       AtomicQueue(size_t bufferSize)
-               : m_head(0)
-               , m_nextNode(0)
-               , m_nodeStore(std::bit_ceil(bufferSize + 2))
-       {}
-
-       void push(const LoggingEventPtr& event)
-       {
-               auto index = m_nextNode++;
-               auto n = &m_nodeStore[index % m_nodeStore.size()];
-               *n = Node(event, m_head.load(std::memory_order_relaxed));
-               while (!m_head.compare_exchange_weak(n->next, n, 
std::memory_order_release))
-               {
-               }
-       }
-
-       Node* pop_all(void)
-       {
-               return m_head.exchange(0, std::memory_order_consume);
-       }
-
-       Node* pop_all_reverse(void)
-       {
-               Node* first = 0;
-               auto last = pop_all();
-               while (last)
-               {
-                       auto tmp = last;
-                       last = last->next;
-                       tmp->next = first;
-                       first = tmp;
-               }
-               return first;
-       }
-
-       void setBufferSize(size_t bufferSize)
-       {
-               m_nodeStore.resize(std::bit_ceil(bufferSize + 2));
-       }
-private:
-       alignas(CACHE_LINE_SIZE) std::atomic<Node*> m_head;
-       alignas(CACHE_LINE_SIZE) std::atomic<size_t> m_nextNode;
-       alignas(CACHE_LINE_SIZE) std::vector<Node> m_nodeStore;
-};
-} // namespace
-#endif
 
 struct AsyncAppender::AsyncAppenderPriv : public 
AppenderSkeleton::AppenderSkeletonPrivate
 {
        AsyncAppenderPriv() :
                AppenderSkeletonPrivate(),
+               buffer(DEFAULT_BUFFER_SIZE),
                bufferSize(DEFAULT_BUFFER_SIZE),
                appenders(pool),
                dispatcher(),
@@ -176,9 +111,9 @@ struct AsyncAppender::AsyncAppenderPriv : public 
AppenderSkeleton::AppenderSkele
 #if LOG4CXX_EVENTS_AT_EXIT
                , atExitRegistryRaii([this]{atExitActivated();})
 #endif
-#if USE_ATOMIC_QUEUE
-               , eventList(DEFAULT_BUFFER_SIZE)
-#endif
+               , eventCount(0)
+               , dispatchedCount(0)
+               , commitCount(0)
        {
        }
 
@@ -192,12 +127,10 @@ struct AsyncAppender::AsyncAppenderPriv : public 
AppenderSkeleton::AppenderSkele
        }
 #endif
 
-#if LOG4CXX_ABI_VERSION <= 15 || !(USE_ATOMIC_QUEUE)
        /**
         * Event buffer.
        */
        LoggingEventList buffer;
-#endif
 
        /**
         *  Mutex used to guard access to buffer and discardMap.
@@ -213,7 +146,7 @@ struct AsyncAppender::AsyncAppenderPriv : public 
AppenderSkeleton::AppenderSkele
        DiscardMap discardMap;
 
        /**
-        * Buffer size.
+        * The maximum number of undispatched events.
        */
        int bufferSize;
 
@@ -241,17 +174,20 @@ struct AsyncAppender::AsyncAppenderPriv : public 
AppenderSkeleton::AppenderSkele
        helpers::AtExitRegistry::Raii atExitRegistryRaii;
 #endif
 
-#if USE_ATOMIC_QUEUE
        /**
-        * Pending events
+        * Used to calculate the buffer position at which to store the next 
event.
        */
-       alignas(CACHE_LINE_SIZE) AtomicQueue eventList;
+       alignas(CACHE_LINE_SIZE) std::atomic<size_t> eventCount;
 
        /**
-        * The number of pending events.
+        * Used to calculate the buffer position from which to extract the next 
event.
        */
-       alignas(CACHE_LINE_SIZE) std::atomic<int> approxListSize;
-#endif
+       alignas(CACHE_LINE_SIZE) std::atomic<size_t> dispatchedCount;
+
+       /**
+        * Used to communicate to the dispatch thread when an event is 
committed in buffer.
+       */
+       alignas(CACHE_LINE_SIZE) std::atomic<size_t> commitCount;
 };
 
 
@@ -323,46 +259,28 @@ void AsyncAppender::append(const spi::LoggingEventPtr& 
event, Pool& p)
                std::unique_lock<std::mutex> lock(priv->bufferMutex);
                if (!priv->dispatcher.joinable())
                        priv->dispatcher = 
ThreadUtility::instance()->createThread( LOG4CXX_STR("AsyncAppender"), 
&AsyncAppender::dispatch, this );
-#if !USE_ATOMIC_QUEUE
-               priv->buffer.reserve(priv->bufferSize);
-#endif
        }
        while (true)
        {
-#if USE_ATOMIC_QUEUE
-               auto newSize = ++priv->approxListSize;
-               if (newSize <= priv->bufferSize)
+               // Claim a slot in the ring buffer
+               auto oldEventCount = priv->eventCount++;
+               auto pendingCount = oldEventCount - priv->dispatchedCount;
+               if (0 <= pendingCount && pendingCount < priv->bufferSize)
                {
-                       priv->eventList.push(event);
+                       auto index = oldEventCount % priv->buffer.size();
+                       // Write to the ring buffer
+                       priv->buffer[index] = event;
+                       // Notify the dispatch thread that an event has been 
added
+                       
priv->commitCount.compare_exchange_strong(oldEventCount, oldEventCount + 1);
                        priv->bufferNotEmpty.notify_all();
                        break;
                }
                else
-                       --priv->approxListSize;
-               break;
+                       --priv->eventCount;
                //
-               //   Following code is only reachable if buffer is full
+               //   Following code is only reachable if buffer is full or 
eventCount has overflowed
                //
                std::unique_lock<std::mutex> lock(priv->bufferMutex);
-#else
-               std::unique_lock<std::mutex> lock(priv->bufferMutex);
-               size_t previousSize = priv->buffer.size();
-
-               if (previousSize < (size_t)priv->bufferSize)
-               {
-                       priv->buffer.push_back(event);
-
-                       if (previousSize == 0)
-                       {
-                               priv->bufferNotEmpty.notify_all();
-                       }
-
-                       break;
-               }
-               //
-               //   Following code is only reachable if buffer is full
-               //
-#endif
                //
                //   if blocking and thread is not already interrupted
                //      and not the dispatcher then
@@ -375,11 +293,7 @@ void AsyncAppender::append(const spi::LoggingEventPtr& 
event, Pool& p)
                {
                        priv->bufferNotFull.wait(lock, [this]()
                        {
-#if USE_ATOMIC_QUEUE
-                               return priv->approxListSize < priv->bufferSize;
-#else
-                               return priv->buffer.size() < priv->bufferSize;
-#endif
+                               return priv->eventCount - priv->dispatchedCount 
< priv->bufferSize;
                        });
                        discard = false;
                }
@@ -419,10 +333,6 @@ void AsyncAppender::close()
 
        if ( priv->dispatcher.joinable() )
        {
-#if USE_ATOMIC_QUEUE
-               // Queue a special event that will terminate the dispatch thread
-               priv->eventList.push(LoggingEventPtr());
-#endif
                priv->dispatcher.join();
        }
 
@@ -487,9 +397,7 @@ void AsyncAppender::setBufferSize(int size)
 
        std::lock_guard<std::mutex> lock(priv->bufferMutex);
        priv->bufferSize = (size < 1) ? 1 : size;
-#if USE_ATOMIC_QUEUE
-       priv->eventList.setBufferSize(priv->bufferSize);
-#endif
+       priv->buffer.resize(priv->bufferSize);
        priv->bufferNotFull.notify_all();
 }
 
@@ -573,59 +481,30 @@ void AsyncAppender::dispatch()
        while (isActive)
        {
                LoggingEventList events;
-#if USE_ATOMIC_QUEUE
-               auto eventList = priv->eventList.pop_all_reverse();
-               priv->approxListSize = 0;
-               if (!eventList)
-               {
-                       std::unique_lock<std::mutex> lock(priv->bufferMutex);
-                       priv->bufferNotEmpty.wait(lock, [this, &eventList]() -> 
bool
-                               {
-                                       eventList = 
priv->eventList.pop_all_reverse();
-                                       priv->approxListSize = 0;
-                                       return eventList || priv->closed;
-                               }
-                       );
-               }
-               priv->bufferNotFull.notify_all();
-               while (eventList)
-               {
-                       if (eventList->data)
-                               events.push_back(eventList->data);
-                       else
-                               isActive = false;
-                       auto next = eventList->next;
-                       eventList = next;
-               }
-               {
-                       std::unique_lock<std::mutex> lock(priv->bufferMutex);
-                       for (auto item : priv->discardMap)
-                               events.push_back(item.second.createEvent(p));
-                       priv->discardMap.clear();
-               }
-#else
                //
                //   process events after lock on buffer is released.
                //
                {
                        std::unique_lock<std::mutex> lock(priv->bufferMutex);
                        priv->bufferNotEmpty.wait(lock, [this]() -> bool
-                               { return 0 < priv->buffer.size() || 
priv->closed; }
+                               { return priv->dispatchedCount != 
priv->commitCount || priv->closed; }
                        );
                        isActive = !priv->closed;
 
-                       events = std::move(priv->buffer);
+                       while (priv->dispatchedCount != priv->commitCount)
+                       {
+                               auto index = priv->dispatchedCount % 
priv->buffer.size();
+                               events.push_back(priv->buffer[index]);
+                               ++priv->dispatchedCount;
+                       }
                        for (auto discardItem : priv->discardMap)
                        {
                                
events.push_back(discardItem.second.createEvent(p));
                        }
 
-                       priv->buffer.clear();
-                       priv->buffer.reserve(priv->bufferSize);
                        priv->discardMap.clear();
                        priv->bufferNotFull.notify_all();
                }
-#endif
 
                for (auto item : events)
                {

Reply via email to