This is an automated email from the ASF dual-hosted git repository. swebb2066 pushed a commit to branch nonblocking_telnetappender in repository https://gitbox.apache.org/repos/asf/logging-log4cxx.git
commit 486c3d901fbc8539c202976163eb3651b03e1179 Author: Stephen Webb <[email protected]> AuthorDate: Thu Jan 29 16:52:32 2026 +1100 Add TelnetAppender option to close the connection when the network buffer is full --- src/main/cpp/aprsocket.cpp | 6 ++ src/main/cpp/telnetappender.cpp | 94 +++++++++++++++++++---- src/main/include/log4cxx/helpers/socket.h | 28 +++++-- src/main/include/log4cxx/net/telnetappender.h | 10 +++ src/main/include/log4cxx/private/aprsocket.h | 6 +- src/test/cpp/net/telnetappendertestcase.cpp | 103 +++++++++++++++++++++++--- 6 files changed, 215 insertions(+), 32 deletions(-) diff --git a/src/main/cpp/aprsocket.cpp b/src/main/cpp/aprsocket.cpp index 2ed76bda..a7064738 100644 --- a/src/main/cpp/aprsocket.cpp +++ b/src/main/cpp/aprsocket.cpp @@ -143,6 +143,12 @@ size_t APRSocket::write(ByteBuffer& buf) return totalWritten; } +void APRSocket::setNonBlocking(bool newValue) +{ + auto status = apr_socket_opt_set(_priv->socket, APR_SO_NONBLOCK, newValue); + if (status != APR_SUCCESS) + throw SocketException(status); +} void APRSocket::close() { diff --git a/src/main/cpp/telnetappender.cpp b/src/main/cpp/telnetappender.cpp index cc3b1439..6cd44d5d 100644 --- a/src/main/cpp/telnetappender.cpp +++ b/src/main/cpp/telnetappender.cpp @@ -24,6 +24,9 @@ #include <log4cxx/helpers/bytebuffer.h> #include <log4cxx/helpers/threadutility.h> #include <log4cxx/private/appenderskeleton_priv.h> +#if LOG4CXX_ABI_VERSION <= 15 +#include <log4cxx/private/aprsocket.h> +#endif #include <mutex> #include <thread> #include <vector> @@ -36,7 +39,11 @@ using namespace LOG4CXX_NS; using namespace LOG4CXX_NS::helpers; using namespace LOG4CXX_NS::net; -typedef helpers::SocketPtr Connection; +struct Connection +{ + helpers::SocketPtr s; + size_t sentCount; +}; LOG4CXX_LIST_DEF(ConnectionList, Connection); IMPLEMENT_LOG4CXX_OBJECT(TelnetAppender) @@ -60,12 +67,14 @@ struct TelnetAppender::TelnetAppenderPriv : public AppenderSkeletonPrivate int port; LogString hostname; bool reuseAddress = false; + bool nonBlocking = false; ConnectionList connections; LogString encoding; LOG4CXX_NS::helpers::CharsetEncoderPtr encoder; std::unique_ptr<helpers::ServerSocket> serverSocket; std::thread sh; size_t activeConnections; + size_t eventCount{ 0 }; #if LOG4CXX_EVENTS_AT_EXIT helpers::AtExitRegistry::Raii atExitRegistryRaii; @@ -137,6 +146,10 @@ void TelnetAppender::setOption(const LogString& option, { setEncoding(value); } + else if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("NONBLOCKING"), LOG4CXX_STR("nonblocking"))) + { + setNonBlocking(OptionConverter::toBoolean(value, true)); + } else if (StringHelper::equalsIgnoreCase(option, LOG4CXX_STR("REUSEADDRESS"), LOG4CXX_STR("reuseaddress"))) { setReuseAddress(OptionConverter::toBoolean(value, true)); @@ -169,13 +182,31 @@ void TelnetAppender::close() { _priv->stopAcceptingConnections(); std::lock_guard<std::recursive_mutex> lock(_priv->mutex); + if (_priv->eventCount && helpers::LogLog::isDebugEnabled()) + { + Pool p; + LogString msg = LOG4CXX_STR("TelnetAppender eventCount "); + helpers::StringHelper::toString(_priv->eventCount, p, msg); + helpers::LogLog::debug(msg); + } SocketPtr nullSocket; + int connectionNumber{ 0 }; for (auto& item : _priv->connections) { - if (item) + ++connectionNumber; + if (item.s) { - item->close(); - item = nullSocket; + item.s->close(); + if (_priv->eventCount && helpers::LogLog::isDebugEnabled()) + { + Pool p; + LogString msg = LOG4CXX_STR("TelnetAppender connection "); + helpers::StringHelper::toString(connectionNumber, p, msg); + msg += LOG4CXX_STR(" sentCount "); + helpers::StringHelper::toString(item.sentCount, p, msg); + helpers::LogLog::debug(msg); + } + item = Connection{ nullSocket, 0 }; } } _priv->activeConnections = 0; @@ -184,19 +215,32 @@ void TelnetAppender::close() void TelnetAppender::write(ByteBuffer& buf) { + int connectionNumber{ 0 }; for (auto& item :_priv->connections) { - if (item) + ++connectionNumber; + if (item.s) { try { ByteBuffer b(buf.current(), buf.remaining()); - item->write(b); + item.s->write(b); + ++item.sentCount; } - catch (Exception&) + catch (const Exception& e) { - // The client has closed the connection, remove it from our list: - item.reset(); + if (helpers::LogLog::isDebugEnabled()) + { + Pool p; + LogString msg(LOG4CXX_STR("TelnetAppender connection ")); + helpers::StringHelper::toString(connectionNumber, p, msg); + msg += LOG4CXX_STR(" sentCount "); + helpers::StringHelper::toString(item.sentCount, p, msg); + msg += LOG4CXX_STR("/"); + helpers::StringHelper::toString(_priv->eventCount, p, msg); + helpers::LogLog::warn(msg, e); + } + item.s.reset(); _priv->activeConnections--; } } @@ -223,7 +267,7 @@ void TelnetAppender::writeStatus(const SocketPtr& socket, const LogString& msg, void TelnetAppender::append(const spi::LoggingEventPtr& event, Pool& p) { size_t count = _priv->activeConnections; - + ++_priv->eventCount; if (count > 0) { LogString msg; @@ -270,6 +314,12 @@ void TelnetAppender::acceptConnections() try { SocketPtr newClient = _priv->serverSocket->accept(); +#if 15 < LOG4CXX_ABI_VERSION + newClient->setNonBlocking(_priv->nonBlocking); +#else + if (auto p = dynamic_cast<APRSocket*>(newClient.get())) + p->setNonBlocking(_priv->nonBlocking); +#endif bool done = _priv->closed; if (done) @@ -296,12 +346,23 @@ void TelnetAppender::acceptConnections() // std::lock_guard<std::recursive_mutex> lock(_priv->mutex); + int connectionNumber{ 0 }; for (auto& item : _priv->connections) { - if (!item) + ++connectionNumber; + if (!item.s) { - item = newClient; + item = Connection{ newClient, 0 }; _priv->activeConnections++; + if (helpers::LogLog::isDebugEnabled()) + { + Pool p; + LogString msg = LOG4CXX_STR("TelnetAppender new connection "); + helpers::StringHelper::toString(connectionNumber, p, msg); + msg += LOG4CXX_STR("/"); + helpers::StringHelper::toString(_priv->activeConnections, p, msg); + helpers::LogLog::debug(msg); + } break; } @@ -370,14 +431,19 @@ void TelnetAppender::setMaxConnections(int newValue) { auto item = _priv->connections.back(); _priv->connections.pop_back(); - if (item) + if (item.s) { - item->close(); + item.s->close(); --_priv->activeConnections; } } } +void TelnetAppender::setNonBlocking(bool newValue) +{ + _priv->nonBlocking = newValue; +} + void TelnetAppender::setReuseAddress(bool reuseAddress) { _priv->reuseAddress = reuseAddress; diff --git a/src/main/include/log4cxx/helpers/socket.h b/src/main/include/log4cxx/helpers/socket.h index 3b9c87ea..36fd7afe 100644 --- a/src/main/include/log4cxx/helpers/socket.h +++ b/src/main/include/log4cxx/helpers/socket.h @@ -33,12 +33,9 @@ LOG4CXX_PTR_DEF(Socket); LOG4CXX_UNIQUE_PTR_DEF(Socket); /** -<p>This class implements client sockets (also called just "sockets"). A socket -is an endpoint for communication between two machines. -<p>The actual work of the socket is performed by an instance of the SocketImpl -class. An application, by changing the socket factory that creates the socket -implementation, can configure itself to create sockets appropriate to the -local firewall. +Abstract base class for an outgoing socket connection. +A socket is an endpoint for communication between +two network connected machines or processes. */ class LOG4CXX_EXPORT Socket : public helpers::Object { @@ -56,7 +53,16 @@ class LOG4CXX_EXPORT Socket : public helpers::Object virtual size_t write(ByteBuffer&) = 0; - /** Closes this socket. */ +#if 15 < LOG4CXX_ABI_VERSION + /** + Use \c newValue for the behaviour when the network buffer (on an accepted socket connection) is full. + + When true, an exception is thrown if the write would block. + */ + virtual void setNonBlocking(bool newValue) = 0; +#endif + + /** Close this socket. */ virtual void close() = 0; /** Returns the value of this socket's address field. */ @@ -65,6 +71,8 @@ class LOG4CXX_EXPORT Socket : public helpers::Object /** Returns the value of this socket's port field. */ int getPort() const; + /** Create a concrete instance of this class + */ static SocketUniquePtr create(InetAddressPtr& address, int port); private: @@ -76,4 +84,10 @@ class LOG4CXX_EXPORT Socket : public helpers::Object } // namespace helpers } // namespace log4cxx +#if 15 < LOG4CXX_ABI_VERSION +#define LOG4CXX_16_VIRTUAL_SPECIFIER override +#else +#define LOG4CXX_16_VIRTUAL_SPECIFIER +#endif + #endif // _LOG4CXX_HELPERS_SOCKET_H diff --git a/src/main/include/log4cxx/net/telnetappender.h b/src/main/include/log4cxx/net/telnetappender.h index ce5985af..3f6427e1 100644 --- a/src/main/include/log4cxx/net/telnetappender.h +++ b/src/main/include/log4cxx/net/telnetappender.h @@ -102,6 +102,7 @@ class LOG4CXX_EXPORT TelnetAppender : public AppenderSkeleton MaxConnections | {int} | 20 | Encoding | C,UTF-8,UTF-16,UTF-16BE,UTF-16LE,646,US-ASCII,ISO646-US,ANSI_X3.4-1968,ISO-8859-1,ISO-LATIN-1 | UTF-8 | ReuseAddress | True,False | False | + NonBlocking | True,False | False | \sa AppenderSkeleton::setOption() */ @@ -151,6 +152,15 @@ class LOG4CXX_EXPORT TelnetAppender : public AppenderSkeleton */ void setReuseAddress(bool newValue); + /** + Use \c newValue for the behaviour when the TCP send buffer (on an accepted socket connection) is full. + + When true, the socket connection is closed if the write would block. + + \sa setOption + */ + void setNonBlocking(bool newValue); + /** Shutdown this appender. */ void close() override; diff --git a/src/main/include/log4cxx/private/aprsocket.h b/src/main/include/log4cxx/private/aprsocket.h index 9c566af6..addb3898 100644 --- a/src/main/include/log4cxx/private/aprsocket.h +++ b/src/main/include/log4cxx/private/aprsocket.h @@ -36,10 +36,12 @@ class LOG4CXX_EXPORT APRSocket : public helpers::Socket APRSocket(InetAddressPtr& address, int port); APRSocket(apr_socket_t*, apr_pool_t* pool); - virtual size_t write(ByteBuffer&); + size_t write(ByteBuffer&) override; + + void setNonBlocking(bool newValue) LOG4CXX_16_VIRTUAL_SPECIFIER; /** Closes this socket. */ - virtual void close(); + void close() override; apr_socket_t* getSocketPtr() const; diff --git a/src/test/cpp/net/telnetappendertestcase.cpp b/src/test/cpp/net/telnetappendertestcase.cpp index b7ba0c4b..cfa92c07 100644 --- a/src/test/cpp/net/telnetappendertestcase.cpp +++ b/src/test/cpp/net/telnetappendertestcase.cpp @@ -16,11 +16,18 @@ */ #include <log4cxx/net/telnetappender.h> +#include <log4cxx/basicconfigurator.h> +#include <log4cxx/file.h> #include <log4cxx/patternlayout.h> #include "../appenderskeletontestcase.h" +#include <log4cxx/helpers/inetaddress.h> +#include <log4cxx/helpers/stringhelper.h> +#include <log4cxx/helpers/loglog.h> +#include <log4cxx/helpers/fileoutputstream.h> +#include <log4cxx/helpers/socket.h> +#include <log4cxx/spi/configurator.h> #include <apr_thread_proc.h> #include <apr_time.h> -#include <thread> using namespace log4cxx; using namespace log4cxx::helpers; @@ -40,6 +47,10 @@ class TelnetAppenderTestCase : public AppenderSkeletonTestCase LOGUNIT_TEST(testActivateClose); LOGUNIT_TEST(testActivateSleepClose); LOGUNIT_TEST(testActivateWriteClose); +#define CONNECT_WITHOUT_READ_TEST_IS_REPEATABLE +#ifdef CONNECT_WITHOUT_READ_TEST_IS_REPEATABLE + LOGUNIT_TEST(testConnectNoRead); +#endif LOGUNIT_TEST(testActivateWriteNoClose); LOGUNIT_TEST_SUITE_END(); @@ -77,21 +88,25 @@ class TelnetAppenderTestCase : public AppenderSkeletonTestCase appender->setPort(TEST_PORT); Pool p; appender->activateOptions(p); - std::this_thread::sleep_for( std::chrono::milliseconds( 100 ) ); + apr_sleep(100000); // 100 milliseconds appender->close(); } void testActivateWriteClose() { - TelnetAppenderPtr appender(new TelnetAppender()); + auto appender = std::make_shared<TelnetAppender>(); appender->setLayout(createLayout()); appender->setPort(TEST_PORT); + appender->setNonBlocking(true); Pool p; appender->activateOptions(p); - LoggerPtr root(Logger::getRootLogger()); - root->addAppender(appender); + BasicConfigurator::configure(appender); + auto root = Logger::getRootLogger(); - for (int i = 0; i < 50; i++) +#ifdef CONNECT_WITHOUT_READ_TEST_IS_REPEATABLE + apr_sleep(1000000); // 1 second +#endif + for (int i = 0; i < 5000; ++i) { LOG4CXX_INFO(root, "Hello, World " << i); } @@ -102,15 +117,15 @@ class TelnetAppenderTestCase : public AppenderSkeletonTestCase void testActivateWriteNoClose() { - TelnetAppenderPtr appender(new TelnetAppender()); + auto appender = std::make_shared<TelnetAppender>(); appender->setPort(TEST_PORT); appender->setMaxConnections(1); appender->setReuseAddress(true); appender->setHostname(LOG4CXX_STR("127.0.0.1")); Pool p; appender->activateOptions(p); - LoggerPtr root(Logger::getRootLogger()); - root->addAppender(appender); + BasicConfigurator::configure(appender); + auto root = Logger::getRootLogger(); for (int i = 0; i < 50; i++) { @@ -122,6 +137,76 @@ class TelnetAppenderTestCase : public AppenderSkeletonTestCase } } +#ifdef CONNECT_WITHOUT_READ_TEST_IS_REPEATABLE + void testConnectNoRead() + { + auto thisProgram = GetExecutableFileName(); + helpers::Pool p; + bool thisProgramExists = File(thisProgram).exists(p); + LOGUNIT_ASSERT(thisProgramExists); + const char* args[] = {thisProgram.c_str(), "testActivateWriteClose", 0}; + apr_procattr_t* attr = NULL; + helpers::FileOutputStream output(LOG4CXX_STR("output/testConnectNoRead.log"), false); + setTestAttributes(&attr, output.getFilePtr(), p); + apr_proc_t pid; + startTestInstance(&pid, attr, args, p); + auto addr = helpers::InetAddress::getByName(LOG4CXX_STR("127.0.0.1")); + auto s = helpers::Socket::create(addr, TEST_PORT); // Opens a connection + int exitCode; + apr_exit_why_e reason; + apr_proc_wait(&pid, &exitCode, &reason, APR_WAIT); + if (exitCode != 0 && helpers::LogLog::isDebugEnabled()) + { + LogString msg = LOG4CXX_STR("child exit code: "); + helpers::StringHelper::toString(exitCode, p, msg); + msg += LOG4CXX_STR("; reason: "); + helpers::StringHelper::toString(reason, p, msg); + helpers::LogLog::debug(msg); + } + LOGUNIT_ASSERT_EQUAL(exitCode, 0); + } + +private: + + void setTestAttributes(apr_procattr_t** attr, apr_file_t* output, helpers::Pool& p) + { + if (apr_procattr_create(attr, p.getAPRPool()) != APR_SUCCESS) + { + LOGUNIT_FAIL("apr_procattr_create"); + } + if (apr_procattr_cmdtype_set(*attr, APR_PROGRAM) != APR_SUCCESS) + { + LOGUNIT_FAIL("apr_procattr_cmdtype_set"); + } + if (apr_procattr_child_out_set(*attr, output, NULL) != APR_SUCCESS) + { + LOGUNIT_FAIL("apr_procattr_child_out_set"); + } + if (apr_procattr_child_err_set(*attr, output, NULL) != APR_SUCCESS) + { + LOGUNIT_FAIL("apr_procattr_child_err_set"); + } + } + + void startTestInstance(apr_proc_t* pid, apr_procattr_t* attr, const char** argv, helpers::Pool& p) + { + if (apr_proc_create(pid, argv[0], argv, NULL, attr, p.getAPRPool()) == APR_SUCCESS) + { + apr_sleep(100000); // 100 milliseconds + } + else + { + LOGUNIT_FAIL("apr_proc_create"); + } + } + + std::string GetExecutableFileName() + { + auto lsProgramFilePath = spi::Configurator::properties().getProperty(LOG4CXX_STR("PROGRAM_FILE_PATH")); + LOG4CXX_ENCODE_CHAR(programFilePath, lsProgramFilePath); + return programFilePath; + } +#endif }; LOGUNIT_TEST_SUITE_REGISTRATION(TelnetAppenderTestCase);
