This is an automated email from the ASF dual-hosted git repository.
swebb2066 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/logging-log4cxx.git
The following commit(s) were added to refs/heads/master by this push:
new 1989b756 Reduce SocketAppender overhead (#449)
1989b756 is described below
commit 1989b756d76fa635b699c0e0a6f7a388f6e01879
Author: Stephen Webb <[email protected]>
AuthorDate: Fri Jan 17 11:53:40 2025 +1100
Reduce SocketAppender overhead (#449)
* Add a reconnection test
---
src/main/cpp/aprsocket.cpp | 5 +
src/main/cpp/socketappenderskeleton.cpp | 98 ++++++----
src/main/cpp/threadutility.cpp | 45 ++---
.../include/log4cxx/net/socketappenderskeleton.h | 2 +-
src/main/include/log4cxx/private/aprsocket.h | 2 +
.../log4cxx/private/socketappenderskeleton_priv.h | 29 ++-
src/test/cpp/net/CMakeLists.txt | 1 +
src/test/cpp/net/socketappendertestcase.cpp | 204 ++++++++++++++++-----
8 files changed, 267 insertions(+), 119 deletions(-)
diff --git a/src/main/cpp/aprsocket.cpp b/src/main/cpp/aprsocket.cpp
index c879b513..2ed76bda 100644
--- a/src/main/cpp/aprsocket.cpp
+++ b/src/main/cpp/aprsocket.cpp
@@ -159,5 +159,10 @@ void APRSocket::close()
}
}
+apr_socket_t* APRSocket::getSocketPtr() const
+{
+ return _priv->socket;
+}
+
} //namespace helpers
} //namespace log4cxx
diff --git a/src/main/cpp/socketappenderskeleton.cpp
b/src/main/cpp/socketappenderskeleton.cpp
index 993294bb..fdba18a8 100644
--- a/src/main/cpp/socketappenderskeleton.cpp
+++ b/src/main/cpp/socketappenderskeleton.cpp
@@ -21,6 +21,7 @@
#include <log4cxx/helpers/optionconverter.h>
#include <log4cxx/helpers/stringhelper.h>
#include <log4cxx/spi/loggingevent.h>
+#include <log4cxx/helpers/threadutility.h>
#include <log4cxx/helpers/transcoder.h>
#include <log4cxx/helpers/bytearrayoutputstream.h>
#include <log4cxx/helpers/threadutility.h>
@@ -36,17 +37,17 @@ using namespace LOG4CXX_NS::net;
#define _priv static_cast<SocketAppenderSkeletonPriv*>(m_priv.get())
SocketAppenderSkeleton::SocketAppenderSkeleton(int defaultPort, int
reconnectionDelay)
- :
AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(defaultPort,
reconnectionDelay))
+ :
AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(defaultPort,
reconnectionDelay))
{
}
SocketAppenderSkeleton::SocketAppenderSkeleton(helpers::InetAddressPtr
address, int port, int reconnectionDelay)
- : AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(address,
port, reconnectionDelay))
+ :
AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(address, port,
reconnectionDelay))
{
}
SocketAppenderSkeleton::SocketAppenderSkeleton(const LogString& host, int
port, int reconnectionDelay)
- : AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(host,
port, reconnectionDelay))
+ : AppenderSkeleton(std::make_unique<SocketAppenderSkeletonPriv>(host,
port, reconnectionDelay))
{
}
@@ -68,8 +69,8 @@ void SocketAppenderSkeleton::activateOptions(Pool& p)
void SocketAppenderSkeleton::close()
{
- _priv->stopMonitor();
- cleanUp(_priv->pool);
+ _priv->stopMonitor();
+ cleanUp(_priv->pool);
}
void SocketAppenderSkeleton::connect(Pool& p)
@@ -136,22 +137,47 @@ void SocketAppenderSkeleton::setOption(const LogString&
option, const LogString&
void SocketAppenderSkeleton::fireConnector()
{
std::lock_guard<std::recursive_mutex> lock(_priv->mutex);
-
- if ( !_priv->thread.joinable() )
- {
- LogLog::debug(LOG4CXX_STR("Connector thread not alive: starting
monitor."));
-
- _priv->thread = ThreadUtility::instance()->createThread(
LOG4CXX_STR("SocketAppend"), &SocketAppenderSkeleton::monitor, this );
- }
+ if (_priv->taskName.empty())
+ {
+ Pool p;
+ _priv->taskName = _priv->name + LOG4CXX_STR(":")
+ + _priv->address->toString() + LOG4CXX_STR(":");
+ StringHelper::toString(_priv->port, p, _priv->taskName);
+ }
+ auto taskManager = ThreadUtility::instancePtr();
+ if (!taskManager->value().hasPeriodicTask(_priv->taskName))
+ {
+ Pool p;
+ if (LogLog::isDebugEnabled())
+ {
+ Pool p;
+ LogString msg(LOG4CXX_STR("Waiting "));
+ StringHelper::toString(_priv->reconnectionDelay, p, msg);
+ msg += LOG4CXX_STR(" ms before retrying [")
+ + _priv->address->toString() + LOG4CXX_STR(":");
+ StringHelper::toString(_priv->port, p, msg);
+ msg += LOG4CXX_STR("].");
+ LogLog::debug(msg);
+ }
+ taskManager->value().addPeriodicTask(_priv->taskName
+ , std::bind(&SocketAppenderSkeleton::retryConnect, this)
+ , std::chrono::milliseconds(_priv->reconnectionDelay)
+ );
+ }
+ _priv->taskManager = taskManager;
}
-void SocketAppenderSkeleton::monitor()
+void SocketAppenderSkeleton::retryConnect()
{
- Pool p;
- SocketPtr socket;
-
- while (!is_closed())
+ if (is_closed())
{
+ if (auto pManager = _priv->taskManager.lock())
+ pManager->value().removePeriodicTask(_priv->taskName);
+ }
+ else
+ {
+ Pool p;
+ SocketPtr socket;
try
{
if (LogLog::isDebugEnabled())
@@ -166,8 +192,14 @@ void SocketAppenderSkeleton::monitor()
setSocket(socket, p);
if (LogLog::isDebugEnabled())
{
- LogLog::debug(LOG4CXX_STR("Connection
established. Exiting connector thread."));
+ LogString msg(LOG4CXX_STR("Connection
established to [")
+ + _priv->address->toString() +
LOG4CXX_STR(":"));
+ StringHelper::toString(_priv->port, p, msg);
+ msg += LOG4CXX_STR("].");
+ LogLog::debug(msg);
}
+ if (auto pManager = _priv->taskManager.lock())
+
pManager->value().removePeriodicTask(_priv->taskName);
return;
}
catch (ConnectException& e)
@@ -197,26 +229,17 @@ void SocketAppenderSkeleton::monitor()
msg += LOG4CXX_STR("].");
LogLog::debug(msg);
}
-
- std::unique_lock<std::mutex> lock(
_priv->interrupt_mutex );
- if (_priv->interrupt.wait_for( lock,
std::chrono::milliseconds( _priv->reconnectionDelay ),
- std::bind(&SocketAppenderSkeleton::is_closed,
this) ))
- break;
}
}
}
void SocketAppenderSkeleton::SocketAppenderSkeletonPriv::stopMonitor()
{
- {
- std::lock_guard<std::mutex> lock(this->interrupt_mutex);
- if (this->closed)
- return;
- this->closed = true;
- }
- this->interrupt.notify_all();
- if (this->thread.joinable())
- this->thread.join();
+ this->closed = true;
+ if (this->taskName.empty())
+ ;
+ else if (auto pManager = this->taskManager.lock())
+ pManager->value().removePeriodicTask(this->taskName);
}
bool SocketAppenderSkeleton::is_closed()
@@ -258,6 +281,17 @@ bool SocketAppenderSkeleton::getLocationInfo() const
void SocketAppenderSkeleton::setReconnectionDelay(int reconnectionDelay1)
{
_priv->reconnectionDelay = reconnectionDelay1;
+ if (_priv->taskName.empty())
+ return;
+ auto pManager = _priv->taskManager.lock();
+ if (pManager && pManager->value().hasPeriodicTask(_priv->taskName))
+ {
+ pManager->value().removePeriodicTask(_priv->taskName);
+ pManager->value().addPeriodicTask(_priv->taskName
+ , std::bind(&SocketAppenderSkeleton::retryConnect, this)
+ , std::chrono::milliseconds(_priv->reconnectionDelay)
+ );
+ }
}
int SocketAppenderSkeleton::getReconnectionDelay() const
diff --git a/src/main/cpp/threadutility.cpp b/src/main/cpp/threadutility.cpp
index 72e87248..b9dec099 100644
--- a/src/main/cpp/threadutility.cpp
+++ b/src/main/cpp/threadutility.cpp
@@ -69,16 +69,17 @@ struct ThreadUtility::priv_data
LogString name;
Period delay;
TimePoint nextRun;
- int errorCount;
std::function<void()> f;
+ int errorCount;
+ bool removed;
};
using JobStore = std::list<NamedPeriodicFunction>;
JobStore jobs;
- std::mutex job_mutex;
+ std::recursive_mutex job_mutex;
std::thread thread;
std::condition_variable interrupt;
std::mutex interrupt_mutex;
- bool terminated{false};
+ bool terminated{ false };
int retryCount{ 2 };
Period maxDelay{ 0 };
@@ -264,11 +265,11 @@ ThreadStartPost ThreadUtility::postStartFunction()
*/
void ThreadUtility::addPeriodicTask(const LogString& name,
std::function<void()> f, const Period& delay)
{
- std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+ std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
if (m_priv->maxDelay < delay)
m_priv->maxDelay = delay;
auto currentTime = std::chrono::system_clock::now();
- m_priv->jobs.push_back( priv_data::NamedPeriodicFunction{name, delay,
currentTime + delay, 0, f} );
+ m_priv->jobs.push_back( priv_data::NamedPeriodicFunction{name, delay,
currentTime + delay, f, 0, false} );
if (!m_priv->thread.joinable())
{
m_priv->terminated = false;
@@ -283,10 +284,10 @@ void ThreadUtility::addPeriodicTask(const LogString&
name, std::function<void()>
*/
bool ThreadUtility::hasPeriodicTask(const LogString& name)
{
- std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+ std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
, [&name](const priv_data::NamedPeriodicFunction& item)
- { return name == item.name; }
+ { return !item.removed && name == item.name; }
);
return m_priv->jobs.end() != pItem;
}
@@ -297,7 +298,7 @@ bool ThreadUtility::hasPeriodicTask(const LogString& name)
void ThreadUtility::removeAllPeriodicTasks()
{
{
- std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+ std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
while (!m_priv->jobs.empty())
m_priv->jobs.pop_back();
}
@@ -309,14 +310,14 @@ void ThreadUtility::removeAllPeriodicTasks()
*/
void ThreadUtility::removePeriodicTask(const LogString& name)
{
- std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+ std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
auto pItem = std::find_if(m_priv->jobs.begin(), m_priv->jobs.end()
, [&name](const priv_data::NamedPeriodicFunction& item)
- { return name == item.name; }
+ { return !item.removed && name == item.name; }
);
if (m_priv->jobs.end() != pItem)
{
- m_priv->jobs.erase(pItem);
+ pItem->removed = true;
m_priv->interrupt.notify_one();
}
}
@@ -328,14 +329,14 @@ void ThreadUtility::removePeriodicTasksMatching(const
LogString& namePrefix)
{
while (1)
{
- std::lock_guard<std::mutex> lock(m_priv->job_mutex);
+ std::lock_guard<std::recursive_mutex> lock(m_priv->job_mutex);
auto pItem = std::find_if(m_priv->jobs.begin(),
m_priv->jobs.end()
, [&namePrefix](const priv_data::NamedPeriodicFunction&
item)
- { return namePrefix.size() <= item.name.size() &&
item.name.substr(0, namePrefix.size()) == namePrefix; }
+ { return !item.removed && namePrefix.size() <=
item.name.size() && item.name.substr(0, namePrefix.size()) == namePrefix; }
);
if (m_priv->jobs.end() == pItem)
break;
- m_priv->jobs.erase(pItem);
+ pItem->removed = true;
}
m_priv->interrupt.notify_one();
}
@@ -348,14 +349,14 @@ void ThreadUtility::priv_data::doPeriodicTasks()
auto currentTime = std::chrono::system_clock::now();
TimePoint nextOperationTime = currentTime + this->maxDelay;
{
- std::lock_guard<std::mutex> lock(this->job_mutex);
- if (this->jobs.empty())
- break;
+ std::lock_guard<std::recursive_mutex>
lock(this->job_mutex);
for (auto& item : this->jobs)
{
if (this->terminated)
return;
- if (item.nextRun <= currentTime)
+ if (item.removed)
+ ;
+ else if (item.nextRun <= currentTime)
{
try
{
@@ -380,17 +381,19 @@ void ThreadUtility::priv_data::doPeriodicTasks()
nextOperationTime = item.nextRun;
}
}
- // Remove faulty tasks
+ // Delete removed and faulty tasks
while (1)
{
- std::lock_guard<std::mutex> lock(this->job_mutex);
+ std::lock_guard<std::recursive_mutex>
lock(this->job_mutex);
auto pItem = std::find_if(this->jobs.begin(),
this->jobs.end()
, [this](const NamedPeriodicFunction& item)
- { return this->retryCount < item.errorCount; }
+ { return item.removed || this->retryCount <
item.errorCount; }
);
if (this->jobs.end() == pItem)
break;
this->jobs.erase(pItem);
+ if (this->jobs.empty())
+ return;
}
std::unique_lock<std::mutex> lock(this->interrupt_mutex);
diff --git a/src/main/include/log4cxx/net/socketappenderskeleton.h
b/src/main/include/log4cxx/net/socketappenderskeleton.h
index c0edada5..0a166738 100644
--- a/src/main/include/log4cxx/net/socketappenderskeleton.h
+++ b/src/main/include/log4cxx/net/socketappenderskeleton.h
@@ -164,7 +164,7 @@ class LOG4CXX_EXPORT SocketAppenderSkeleton : public
AppenderSkeleton
connection is droppped.
*/
- void monitor();
+ void retryConnect();
bool is_closed();
SocketAppenderSkeleton(const SocketAppenderSkeleton&);
SocketAppenderSkeleton& operator=(const
SocketAppenderSkeleton&);
diff --git a/src/main/include/log4cxx/private/aprsocket.h
b/src/main/include/log4cxx/private/aprsocket.h
index 4b6e6dd7..9c566af6 100644
--- a/src/main/include/log4cxx/private/aprsocket.h
+++ b/src/main/include/log4cxx/private/aprsocket.h
@@ -41,6 +41,8 @@ class LOG4CXX_EXPORT APRSocket : public helpers::Socket
/** Closes this socket. */
virtual void close();
+ apr_socket_t* getSocketPtr() const;
+
private:
struct APRSocketPriv;
};
diff --git a/src/main/include/log4cxx/private/socketappenderskeleton_priv.h
b/src/main/include/log4cxx/private/socketappenderskeleton_priv.h
index 64ef6c35..9ed67f41 100644
--- a/src/main/include/log4cxx/private/socketappenderskeleton_priv.h
+++ b/src/main/include/log4cxx/private/socketappenderskeleton_priv.h
@@ -20,10 +20,7 @@
#include <log4cxx/net/socketappenderskeleton.h>
#include <log4cxx/private/appenderskeleton_priv.h>
#include <log4cxx/helpers/inetaddress.h>
-
-#if LOG4CXX_EVENTS_AT_EXIT
-#include <log4cxx/private/atexitregistry.h>
-#endif
+#include <log4cxx/helpers/threadutility.h>
namespace LOG4CXX_NS
{
@@ -39,9 +36,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv :
public AppenderSkele
port(defaultPort),
reconnectionDelay(reconnectionDelay),
locationInfo(false)
-#if LOG4CXX_EVENTS_AT_EXIT
- , atExitRegistryRaii([this]{stopMonitor();})
-#endif
{ }
SocketAppenderSkeletonPriv(helpers::InetAddressPtr address, int
defaultPort, int reconnectionDelay) :
@@ -51,9 +45,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv :
public AppenderSkele
port(defaultPort),
reconnectionDelay(reconnectionDelay),
locationInfo(false)
-#if LOG4CXX_EVENTS_AT_EXIT
- , atExitRegistryRaii([this]{stopMonitor();})
-#endif
{ }
SocketAppenderSkeletonPriv(const LogString& host, int port, int delay) :
@@ -63,9 +54,6 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv :
public AppenderSkele
port(port),
reconnectionDelay(delay),
locationInfo(false)
-#if LOG4CXX_EVENTS_AT_EXIT
- , atExitRegistryRaii([this]{stopMonitor();})
-#endif
{ }
~SocketAppenderSkeletonPriv()
@@ -84,15 +72,22 @@ struct SocketAppenderSkeleton::SocketAppenderSkeletonPriv :
public AppenderSkele
int port;
int reconnectionDelay;
bool locationInfo;
+#if LOG4CXX_ABI_VERSION <= 15
std::thread thread;
std::condition_variable interrupt;
std::mutex interrupt_mutex;
-
-#if LOG4CXX_EVENTS_AT_EXIT
- helpers::AtExitRegistry::Raii atExitRegistryRaii;
#endif
-
void stopMonitor();
+
+ /**
+ Manages asynchronous reconnection attempts.
+ */
+ helpers::ThreadUtility::ManagerWeakPtr taskManager;
+
+ /**
+ The reconnection task name.
+ */
+ LogString taskName;
};
} // namespace net
diff --git a/src/test/cpp/net/CMakeLists.txt b/src/test/cpp/net/CMakeLists.txt
index 555a0cf9..7d43f790 100644
--- a/src/test/cpp/net/CMakeLists.txt
+++ b/src/test/cpp/net/CMakeLists.txt
@@ -20,6 +20,7 @@ if(LOG4CXX_NETWORKING_SUPPORT)
set(NET_TESTS
syslogappendertestcase
telnetappendertestcase
+ socketappendertestcase
xmlsocketappendertestcase
)
else()
diff --git a/src/test/cpp/net/socketappendertestcase.cpp
b/src/test/cpp/net/socketappendertestcase.cpp
index 2271930f..b64c1ee0 100644
--- a/src/test/cpp/net/socketappendertestcase.cpp
+++ b/src/test/cpp/net/socketappendertestcase.cpp
@@ -16,10 +16,21 @@
*/
#include "../appenderskeletontestcase.h"
-#include "apr.h"
-
-using namespace log4cxx;
-using namespace log4cxx::helpers;
+#include <log4cxx/patternlayout.h>
+#include <log4cxx/basicconfigurator.h>
+#include <log4cxx/net/xmlsocketappender.h>
+#include <log4cxx/helpers/serversocket.h>
+#include <log4cxx/helpers/loglog.h>
+#include <log4cxx/helpers/stringhelper.h>
+#include <log4cxx/helpers/transcoder.h>
+#include <log4cxx/private/aprsocket.h>
+#include <apr_network_io.h>
+
+namespace LOG4CXX_NS { namespace net {
+ using SocketAppender = XMLSocketAppender;
+} }
+
+using namespace LOG4CXX_NS;
/**
Unit tests of log4cxx::SocketAppender
@@ -32,61 +43,158 @@ class SocketAppenderTestCase : public
AppenderSkeletonTestCase
//
LOGUNIT_TEST(testDefaultThreshold);
LOGUNIT_TEST(testSetOptionThreshold);
- LOGUNIT_TEST(testInvalidHost);
+ LOGUNIT_TEST(testRetryConnect);
LOGUNIT_TEST_SUITE_END();
-
- public:
-
- void setUp()
- {
+#ifdef _DEBUG
+ struct Fixture
+ {
+ Fixture() {
+ helpers::LogLog::setInternalDebugging(true);
}
+ } suiteFixture;
+#endif
- void tearDown()
- {
- BasicConfigurator::resetConfiguration();
- }
+
+ public:
AppenderSkeleton* createAppenderSkeleton() const
{
return new log4cxx::net::SocketAppender();
}
- void testInvalidHost(){
-// log4cxx::net::SocketAppenderPtr appender =
std::make_shared<log4cxx::net::SocketAppender>();
-// log4cxx::PatternLayoutPtr layout =
std::make_shared<log4cxx::PatternLayout>(LOG4CXX_STR("%m%n"));
-
-// log4cxx::helpers::ServerSocket serverSocket(4445);
-
-// appender->setLayout(layout);
-// appender->setRemoteHost(LOG4CXX_STR("localhost"));
-// appender->setReconnectionDelay(1);
-// appender->setPort(4445);
-// log4cxx::helpers::Pool pool;
-// appender->activateOptions(pool);
-
-// BasicConfigurator::configure(appender);
-
-//
log4cxx::Logger::getRootLogger()->setLevel(log4cxx::Level::getAll());
-
-// std::thread th1( [](){
-// for( int x = 0; x < 3000; x++ ){
-//
LOG4CXX_INFO(Logger::getLogger(LOG4CXX_STR("test")), "Some message" );
-// }
-// });
-// std::thread th2( [](){
-// for( int x = 0; x < 3000; x++ ){
-//
LOG4CXX_INFO(Logger::getLogger(LOG4CXX_STR("test")), "Some message" );
-// }
-// });
-
-// SocketPtr incomingSocket = serverSocket.accept();
-// incomingSocket->close();
-
-// // If we do not get here, we have deadlocked
-// th1.join();
-// th2.join();
+ void testRetryConnect()
+ {
+ int tcpPort = 44445;
+ auto appender = std::make_shared<net::SocketAppender>();
+
appender->setLayout(std::make_shared<log4cxx::PatternLayout>(LOG4CXX_STR("%d
[%T] %m%n")));
+ appender->setRemoteHost(LOG4CXX_STR("localhost"));
+ appender->setReconnectionDelay(50); // milliseconds
+ appender->setPort(tcpPort);
+ helpers::Pool pool;
+ appender->activateOptions(pool);
+
+ BasicConfigurator::configure(appender);
+
+ helpers::ServerSocketUniquePtr serverSocket;
+ try
+ {
+ serverSocket =
helpers::ServerSocket::create(tcpPort);
+ }
+ catch (std::exception& ex)
+ {
+
helpers::LogLog::error(LOG4CXX_STR("ServerSocket::create failed"), ex);
+ LOGUNIT_FAIL("ServerSocket::create");
+ }
+ serverSocket->setSoTimeout(1000); // milliseconds
+
+ auto logger = Logger::getLogger("test");
+ int logEventCount = 3000;
+ auto doLogging = [logger, logEventCount]()
+ {
+ for( int x = 0; x < logEventCount; x++ ){
+ LOG4CXX_INFO(logger, "Message " << x);
+ if (0 == x % 1000)
+ apr_sleep(50000); // 50
millisecond
+ }
+ };
+ std::vector<std::thread> loggingThread;
+ for (auto i : {0, 1})
+ loggingThread.emplace_back(doLogging);
+
+ helpers::SocketPtr incomingSocket;
+ try
+ {
+ incomingSocket = serverSocket->accept();
+ }
+ catch (std::exception& ex)
+ {
+
helpers::LogLog::error(LOG4CXX_STR("ServerSocket::accept failed"), ex);
+ for (auto& t : loggingThread)
+ t.join();
+ serverSocket->close();
+ LOGUNIT_FAIL("accept failed");
+ }
+ auto aprSocket =
std::dynamic_pointer_cast<helpers::APRSocket>(incomingSocket);
+ LOGUNIT_ASSERT(aprSocket);
+ auto pSocket = aprSocket->getSocketPtr();
+ LOGUNIT_ASSERT(pSocket);
+ apr_socket_timeout_set(pSocket, 200000); // 200
millisecond
+ std::vector<int> messageCount;
+ char buffer[8*1024];
+ apr_size_t len = sizeof(buffer);
+ apr_status_t status;
+ while (APR_SUCCESS == (status =
apr_socket_recv(pSocket, buffer, &len)))
+ {
+ auto pStart = &buffer[0];
+ auto pEnd = pStart + len;
+ for (auto pChar = pStart; pChar < pEnd; ++pChar)
+ {
+ if ('\n' == *pChar)
+ {
+ std::string line(pStart, pChar);
+ auto pos = line.rfind(' ');
+ if (line.npos != pos && pos + 1
< line.size())
+ {
+ try
+ {
+ auto msgNumber
= std::stoi(line.substr(pos));
+ if
(messageCount.size() <= msgNumber)
+
messageCount.resize(msgNumber + 1);
+
++messageCount[msgNumber];
+ }
+ catch (std::exception
const& ex)
+ {
+ LogString msg;
+
helpers::Transcoder::decode(ex.what(), msg);
+ msg +=
LOG4CXX_STR(" processing\n");
+
helpers::Transcoder::decode(line, msg);
+
helpers::LogLog::warn(msg);
+ }
+ }
+ pStart = pChar + 1;
+ }
+ }
+ len = sizeof(buffer);
+ }
+ if (helpers::LogLog::isDebugEnabled())
+ {
+ LogString msg = LOG4CXX_STR("apr_socket_recv
terminated");
+ char err_buff[1024] = {0};
+ apr_strerror(status, err_buff,
sizeof(err_buff));
+ if (0 == err_buff[0] || 0 == strncmp(err_buff,
"APR does not understand", 23))
+ {
+ msg.append(LOG4CXX_STR(": error code
"));
+ helpers::Pool p;
+ helpers::StringHelper::toString(status,
p, msg);
+ }
+ else
+ {
+ msg.append(LOG4CXX_STR(" - "));
+ std::string sMsg = err_buff;
+ LOG4CXX_DECODE_CHAR(lsMsg, sMsg);
+ msg.append(lsMsg);
+ }
+ helpers::LogLog::debug(msg);
+ }
+ incomingSocket->close();
+ serverSocket->close();
+ for (auto& t : loggingThread)
+ t.join();
+
+ if (helpers::LogLog::isDebugEnabled())
+ {
+ helpers::Pool p;
+ LogString msg(LOG4CXX_STR("messageCount "));
+ for (auto item : messageCount)
+ {
+ msg += logchar(' ');
+ helpers::StringHelper::toString(item,
p, msg);
+ }
+ helpers::LogLog::debug(msg);
+ }
+ LOGUNIT_ASSERT_EQUAL(logEventCount,
(int)messageCount.size());
}
};