Three things:

1. Hi, I'm Rick Richardson.  I just got a hold of cpp-netlib because I am
looking to make a highly scalable and runtime reconfigurable reverse proxy.
I am fairly new to boost::asio but have been doing event driven network
coding for 10 years.  I still don't fully understand the netlib source so if
my approach is way off, please forgive me.


2. I added keepalive capabilities to the async server. I altered the
existing async server because I don't see how any self respecting server
wouldn't offer keepalive :)

My approach:  I wrapped the chain of async functions back around to call
read_more. Specifically, in server/async_connection.hpp, in write(Range) I
register a callback to be read_more_wrapper(), which does the work of
default_error() and then calls read_more.  This is to handle requests that
may be pipelined over the same connection. Since this callback is carried
through to be executed after all writes have completed. It should be safe to
handle pipelined requests in a serial fashion.

If there are no pending additional requests, then the async_read_some will
block indefinitely.

The second phase of the plan, then, is to register a deadline timer at start
of the new connection. Every new request that gets read will kick the
deadline timer back up to its max timeout time.  When the timer does go off,
it cancels any operations that are pending on the Connection's socket.
This does two things, first and foremost, it keeps a hold of the
Connection's shared ptr, so that it is guaranteed not to be destroyed while
the keepalive is in effect.  Secondly, it quite obviously will free the
async_read_some from its wait.

I haven't bothered to fork cpp-netlib on github so I don't have a checkin to
make, but I did create a 'git diff' from  boost/   I have attached it here
for your perusal.
Please let me know your preferred approach for reviewing and accepting
patches (I am assuming a pull request, which I can get around to)


3.  In attempting to benchmark my changes.  I did note that yes, keepalive
works, and also that it breaks httperf.  My problem is that httperf doesn't
seem to recognize a completed response unless the server closes the
connection.  It claims to be a 1.1 compliant tester, and when it sends
requests it doesn't mention a connection: close in its headers, so I'm
assuming it should expect the connection to remain open.

Does anyone know the correct procedure here, or why it might be thinking
that the responses have not completed?  My concern here is that there is
some nuance of the HTTP 1.1 spec that I may have missed which allows servers
to delineate between responses since, with pipelining, multiple responses
may come all at once over the same connection.


Thanks,
Rick Richardson
diff --git a/boost/network/protocol/http/server/async_connection.hpp b/boost/network/protocol/http/server/async_connection.hpp
index c400541..ba75c92 100644
--- a/boost/network/protocol/http/server/async_connection.hpp
+++ b/boost/network/protocol/http/server/async_connection.hpp
@@ -26,9 +26,12 @@
 #include <boost/thread/locks.hpp>
 #include <boost/thread/recursive_mutex.hpp>
 #include <boost/utility/enable_if.hpp>
+#include <boost/asio/deadline_timer.hpp>
 #include <list>
 #include <vector>
 #include <iterator>
+#include <iostream>
+
 
 #ifndef BOOST_NETWORK_HTTP_SERVER_CONNECTION_HEADER_BUFFER_MAX_SIZE
 /** Here we define a page's worth of header connection buffer data.
@@ -45,6 +48,8 @@
 #define BOOST_NETWORK_HTTP_SERVER_CONNECTION_HEADER_BUFFER_MAX_SIZE 4096
 #endif /* BOOST_NETWORK_HTTP_SERVER_CONNECTION_HEADER_BUFFER_MAX_SIZE */
 
+#define BOOST_NETWORK_KEEPALIVE_SECONDS 30  // TODO make this a config setting
+
 namespace boost { namespace network { namespace http {
 
     template <class Tag, class Handler>
@@ -137,6 +142,7 @@ namespace boost { namespace network { namespace http {
         , headers_in_progress(false)
         , first_line_in_progress(false)
         , headers_buffer(BOOST_NETWORK_HTTP_SERVER_CONNECTION_HEADER_BUFFER_MAX_SIZE)
+        , keepalive_timer_(io_service)
         {
             new_start = read_buffer_.begin();
         }
@@ -201,15 +207,22 @@ namespace boost { namespace network { namespace http {
             lock_guard lock(headers_mutex);
             if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
 
+            /* rrichardson 2011/3/20
             boost::function<void(boost::system::error_code)> f = 
                 boost::bind(
                     &async_connection<Tag,Handler>::default_error
                     , async_connection<Tag,Handler>::shared_from_this()
                     , _1);
+            */ 
+
 
             write_impl(
                 boost::make_iterator_range(range)
-                , f
+                , boost::bind(
+                    &async_connection<Tag,Handler>::read_more_wrapper
+                    , async_connection<Tag,Handler>::shared_from_this()
+                    , method
+                    , _1)
                 );
         }
 
@@ -237,6 +250,7 @@ namespace boost { namespace network { namespace http {
 
         void read(read_callback_function callback) {
             if (error_encountered) boost::throw_exception(boost::system::system_error(*error_encountered));
+            
             if (new_start != read_buffer_.begin())
             {
                 input_range input = boost::make_iterator_range(new_start, read_buffer_.end());
@@ -282,6 +296,7 @@ namespace boost { namespace network { namespace http {
                     , ec
                     , bytes_transferred
                     , async_connection<Tag,Handler>::shared_from_this()));
+            
         }
 
         void default_error(boost::system::error_code const & ec) {
@@ -303,6 +318,7 @@ namespace boost { namespace network { namespace http {
         volatile bool headers_already_sent, first_line_already_sent, headers_in_progress, first_line_in_progress;
         asio::streambuf headers_buffer, first_line_buffer;
 
+        boost::asio::deadline_timer keepalive_timer_;
         boost::recursive_mutex headers_mutex;
         buffer_type read_buffer_;
         status_t status;
@@ -324,6 +340,18 @@ namespace boost { namespace network { namespace http {
             ip_stream << socket_.remote_endpoint().address().to_v4().to_string() << ':'
                 << socket_.remote_endpoint().port();
             request_.source = ip_stream.str();
+            
+            keepalive_timer_.expires_from_now( 
+                boost::posix_time::seconds(BOOST_NETWORK_KEEPALIVE_SECONDS)
+                );
+
+            keepalive_timer_.async_wait(
+                boost::bind(
+                    &async_connection<Tag,Handler>::handle_timer
+                    , async_connection<Tag,Handler>::shared_from_this()
+                    , asio::placeholders::error
+                ));
+             
             read_more(method);
         }
 
@@ -342,8 +370,25 @@ namespace boost { namespace network { namespace http {
                 );
         }
 
+        void read_more_wrapper(state_t state, boost::system::error_code const & ec)
+        {
+          error_encountered = in_place<boost::system::system_error>(ec);
+          read_more(state);
+        }
+
         void handle_read_data(state_t state, boost::system::error_code const & ec, std::size_t bytes_transferred) {
             if (!ec) {
+                
+                if (keepalive_timer_.expires_from_now(
+                      boost::posix_time::seconds(BOOST_NETWORK_KEEPALIVE_SECONDS))) {
+
+                    keepalive_timer_.async_wait(
+                        boost::bind(
+                            &async_connection<Tag,Handler>::handle_timer
+                            , async_connection<Tag,Handler>::shared_from_this()
+                            , asio::placeholders::error
+                        ));
+                } 
                 logic::tribool parsed_ok;
                 iterator_range<buffer_type::iterator> result_range, input_range;
                 data_end = read_buffer_.begin();
@@ -498,12 +543,16 @@ namespace boost { namespace network { namespace http {
         void client_error_sent(boost::system::error_code const & ec, std::size_t bytes_transferred) {
             if (!ec) {
                 boost::system::error_code ignored;
-                socket().shutdown(asio::ip::tcp::socket::shutdown_both, ignored);
-                socket().close(ignored);
+                socket_.shutdown(asio::ip::tcp::socket::shutdown_both, ignored);
+                socket_.close(ignored);
             } else {
                 error_encountered = in_place<boost::system::system_error>(ec);
             }
         }
+        
+        void handle_timer(boost::system::error_code const & error) {
+          socket_.cancel();
+        }
 
         void parse_headers(string_type & input, typename request::headers_container_type & container) {
             using namespace boost::spirit::qi;
diff --git a/boost/network/protocol/http/server/async_server.hpp b/boost/network/protocol/http/server/async_server.hpp
index abc05d6..29f904a 100644
--- a/boost/network/protocol/http/server/async_server.hpp
+++ b/boost/network/protocol/http/server/async_server.hpp
@@ -38,8 +38,8 @@ namespace boost { namespace network { namespace http {
             acceptor.open(endpoint.protocol());
             acceptor.bind(endpoint);
             acceptor.listen();
-            new_connection.reset(new connection(io_service, handler, thread_pool));
-            acceptor.async_accept(new_connection->socket(),
+            next_connection.reset(new connection(io_service, handler, thread_pool));
+            acceptor.async_accept(next_connection->socket(),
                 boost::bind(
                     &async_server_base<Tag,Handler>::handle_accept
                     , this
@@ -63,20 +63,20 @@ namespace boost { namespace network { namespace http {
         asio::io_service io_service;
         asio::ip::tcp::acceptor acceptor;
         bool stopping;
-        connection_ptr new_connection;
+        connection_ptr next_connection;
 
         void handle_accept(boost::system::error_code const & ec) {
             if (!ec) {
-                new_connection->start();
+                next_connection->start();
                 if (!stopping) {
-                    new_connection.reset(
+                    next_connection.reset(
                         new connection(
                             io_service
                             , handler
                             , thread_pool
                             )
                         );
-                    acceptor.async_accept(new_connection->socket(),
+                    acceptor.async_accept(next_connection->socket(),
                         boost::bind(
                             &async_server_base<Tag,Handler>::handle_accept
                             , this
diff --git a/boost/network/protocol/http/tags.hpp b/boost/network/protocol/http/tags.hpp
index 76974eb..bad04b2 100644
--- a/boost/network/protocol/http/tags.hpp
+++ b/boost/network/protocol/http/tags.hpp
@@ -26,7 +26,7 @@ namespace boost { namespace network { namespace http { namespace tags {
     typedef mpl::vector<http, simple, async, udp, default_string> http_async_8bit_udp_resolve_tags;
     typedef mpl::vector<http, simple, async, tcp, default_string> http_async_8bit_tcp_resolve_tags;
     typedef mpl::vector<http, simple, sync, pod, default_string> http_server_tags;
-    typedef mpl::vector<http, simple, async, pod, default_string> http_async_server_tags;
+    typedef mpl::vector<http, keepalive, async, pod, default_string> http_async_server_tags;
 
     BOOST_NETWORK_DEFINE_TAG(http_default_8bit_tcp_resolve);
     BOOST_NETWORK_DEFINE_TAG(http_default_8bit_udp_resolve);
diff --git a/libs/network/test/http_async_server.cpp b/libs/network/test/http_async_server.cpp
index 3dfcad6..07af19c 100644
--- a/libs/network/test/http_async_server.cpp
+++ b/libs/network/test/http_async_server.cpp
@@ -19,7 +19,7 @@ typedef http::async_server<async_hello_world> server;
 struct async_hello_world {
     void operator()(server::request const & request, server::connection_ptr connection) {
         static server::response_header headers[] = {
-            {"Connection", "close"}
+            {"Connection", "Keep-Alive"}
             , {"Content-Type", "text/plain"}
             , {"Server", "cpp-netlib/0.8-devel"}
         };
@@ -40,7 +40,7 @@ struct async_hello_world {
 };
 
 int main(int argc, char * argv[]) {
-    utils::thread_pool thread_pool(2); 
+    utils::thread_pool thread_pool(20); 
     async_hello_world handler;
     server instance("127.0.0.1", "8000", handler, thread_pool);
     instance.run();
diff --git a/libs/network/test/server/http_test_server.hpp b/libs/network/test/server/http_test_server.hpp
index 7540dc1..c576e50 100644
--- a/libs/network/test/server/http_test_server.hpp
+++ b/libs/network/test/server/http_test_server.hpp
@@ -8,6 +8,8 @@
 #ifndef __NETWORK_TEST_HTTP_TEST_SERVER_HPP__
 #define __NETWORK_TEST_HTTP_TEST_SERVER_HPP__
 
+#define BOOST_FILESYSTEM_VERSION 2
+
 #include <boost/filesystem.hpp>
 
 #if defined(_WIN32) || defined(__WIN32__) || defined(WIN32) || defined(MIN)
------------------------------------------------------------------------------
Colocation vs. Managed Hosting
A question and answer guide to determining the best fit
for your organization - today and in the future.
http://p.sf.net/sfu/internap-sfd2d
_______________________________________________
Cpp-netlib-devel mailing list
[email protected]
https://lists.sourceforge.net/lists/listinfo/cpp-netlib-devel

Reply via email to