On Sun, Mar 13, 2011 at 7:57 PM, Ivan Zhakov <[email protected]> wrote:

> On Sun, Mar 13, 2011 at 18:38,  <[email protected]> wrote:
> > Author: lgo
> > Date: Sun Mar 13 15:38:01 2011
> > New Revision: 1081141
> >
> > URL: http://svn.apache.org/viewvc?rev=1081141&view=rev
> > Log:
> > ra_serf: Drastically limit memory usage on large checkouts. Stop reading
> the update
> > report response when too many fetches/propfinds are already active.
> >
> > * subversion/libsvn_ra_serf/update.c
> >  (MAX_OUTSTANDING_REQS): New define.
> >  (throttled_handle_xml_parser): Wraps a normal xml parser, stops reading
> when too
> >   many active requests.
> >  (finish_report): use the new throttled_handle_xml_parser.
> >
> >
> Hi Lieven,
>
> I understand the problem, but on my Windows box your change has three
> problems:
> 1. svn export uses 100% CPU, because serf call
> throttled_handle_xml_parser all the time. Because APR_POLLIN is still
> enabled in pollset.
>
I haven't seen this behavior on my Mac and Linux boxes, but I see where it's
coming from. I assume that returning EAGAIN from handle_response will make
serf 'wait a bit' before it invokes handle_response again, but because of
the APR_POLLIN that doesn't happen.

2. Memory usage didn't change on my tests, svn export of svn-trunk
> still uses 50MB of memory
>

It's only visible when running an export/checkout of a large folder, like
subversion/branches. From our discussion on IRC I see that you've also
noticed the improvements in terms of memory usage this patch brings for
large exports.

3. Time of svn export increased from 50s to 120s.
>

I guess that's because of problem 1, but to be reviewed.

Because of problem 1 I have reverted the change in r1081201.

The cause of the memory usage problem is that ra_serf reads the while update
report response, and then starts preparing requests (update.c:push_state)
for each file to fetch, even of these requests are only to be send much
later. With this approach I made ra_serf stop reading the response when a
certain limit of outstanding fetches/propfinds was reached. This has the
problem of 100% CPU usage you mentioned, and the risk of timing out the
report response connection.

An alternative solution is to keep reading the full report but storing it in
memory or on disc, so that this can serve as a queue when the max. amount of
fetches is reached. I have a work-in-progress patch with an in-memory cache
attached. Note, it's WIP so not clean, aborts on cleanup and still leaks
memory, so just to demonstrate the idea. Disadvantage of the memory cache is
that the part of the report that's read but not yet parsed
is temporarily stored in memory twice.

Thanks for the review.

Lieven
Index: subversion/libsvn_ra_serf/util.c
===================================================================
--- subversion/libsvn_ra_serf/util.c    (revision 1081269)
+++ subversion/libsvn_ra_serf/util.c    (working copy)
@@ -1151,58 +1151,20 @@ cdata_xml(void *userData, const char *data, int le
   parser->error = parser->cdata(parser, parser->user_data, data, len);
 }
 
-/* Implements svn_ra_serf__response_handler_t */
+typedef svn_error_t *
+(*parser_impl_t)(svn_ra_serf__xml_parser_t *ctx,
+                 serf_bucket_t *response);
+
+/* Implements parser_impl_t. */
 svn_error_t *
-svn_ra_serf__handle_xml_parser(serf_request_t *request,
-                               serf_bucket_t *response,
-                               void *baton,
-                               apr_pool_t *pool)
+svn_ra_serf__parser_impl(svn_ra_serf__xml_parser_t *ctx,
+                         serf_bucket_t *response)
 {
+  apr_status_t status;
+  int xml_status;
   const char *data;
   apr_size_t len;
-  serf_status_line sl;
-  apr_status_t status;
-  int xml_status;
-  svn_ra_serf__xml_parser_t *ctx = baton;
-  svn_error_t *err;
 
-  serf_bucket_response_status(response, &sl);
-
-  if (ctx->status_code)
-    {
-      *ctx->status_code = sl.code;
-    }
-
-  if (sl.code == 301 || sl.code == 302 || sl.code == 307)
-    {
-      ctx->location = svn_ra_serf__response_get_location(response, pool);
-    }
-
-  /* Woo-hoo.  Nothing here to see.  */
-  if (sl.code == 404 && ctx->ignore_errors == FALSE)
-    {
-      /* If our caller won't know about the 404, abort() for now. */
-      SVN_ERR_ASSERT(ctx->status_code);
-
-      if (*ctx->done == FALSE)
-        {
-          *ctx->done = TRUE;
-          if (ctx->done_list)
-            {
-              ctx->done_item->data = ctx->user_data;
-              ctx->done_item->next = *ctx->done_list;
-              *ctx->done_list = ctx->done_item;
-            }
-        }
-
-      err = svn_ra_serf__handle_server_error(request, response, pool);
-
-      SVN_ERR(svn_error_compose_create(
-        svn_ra_serf__handle_discard_body(request, response, NULL, pool),
-        err));
-      return SVN_NO_ERROR;
-    }
-
   if (!ctx->xmlp)
     {
       ctx->xmlp = XML_ParserCreate(NULL);
@@ -1214,7 +1176,6 @@ svn_error_t *
         }
     }
 
-  while (1)
     {
       status = serf_bucket_read(response, 8000, &data, &len);
 
@@ -1240,9 +1201,10 @@ svn_error_t *
                   *ctx->done_list = ctx->done_item;
                 }
             }
+          /* TODO */
           SVN_ERR(svn_error_createf(SVN_ERR_RA_DAV_MALFORMED_DATA, NULL,
                                     _("XML parsing failed: (%d %s)"),
-                                    sl.code, sl.reason));
+                                    999, "TODO"));
         }
 
       if (ctx->error && ctx->ignore_errors == FALSE)
@@ -1270,14 +1232,157 @@ svn_error_t *
             }
           return svn_error_wrap_apr(status, NULL);
         }
+    }
+    return SVN_NO_ERROR;
+}
 
-      /* feed me! */
+static apr_status_t
+resp_cache_eof(void *baton,
+               serf_bucket_t *aggregate_bucket)
+{
+  svn_ra_serf__xml_parser_t *ctx = baton;
+
+  if (ctx->done_reading)
+    return APR_EOF;
+  else
+    return APR_EAGAIN;
+}
+
+/* Implements parser_impl_t. */
+static svn_error_t *
+svn_ra_serf__store_data_impl(svn_ra_serf__xml_parser_t *ctx,
+                             serf_bucket_t *response)
+{
+  if (!ctx->response_stream)
+    {
+      ctx->bkt_alloc = serf_bucket_allocator_create(ctx->pool, NULL, NULL);
+      ctx->response_stream = serf_bucket_aggregate_create(ctx->bkt_alloc);
+      serf_bucket_aggregate_hold_open(ctx->response_stream,
+                                      resp_cache_eof,
+                                      ctx);
     }
-  /* not reached */
+
+  while (1)
+    {
+      static int debug_ctr = 0;
+      const char *data;
+      apr_size_t len;
+      serf_bucket_t *tmp;
+      apr_status_t status;
+
+      status = serf_bucket_read(response, 8000, &data, &len);
+
+      if (SERF_BUCKET_READ_ERROR(status))
+        {
+          return svn_error_wrap_apr(status, NULL);
+        }
+
+      debug_ctr += len;
+      printf("Storing %d bytes.\n", debug_ctr);
+
+      /* Store a copy of this data. */
+      if (len)
+        {
+          tmp = serf_bucket_simple_copy_create(data, len,
+                                               ctx->bkt_alloc);
+
+          serf_bucket_aggregate_append(ctx->response_stream, tmp);
+        }
+
+      if (APR_STATUS_IS_EAGAIN(status))
+        {
+          return svn_error_wrap_apr(status, NULL);
+        }
+
+      /* Are we done reading? */
+      if (APR_STATUS_IS_EOF(status))
+        {
+          ctx->done_reading = TRUE;
+          return svn_error_wrap_apr(status, NULL);
+        }
+    }
+
+  /* Can't reach. */
 }
 
 /* Implements svn_ra_serf__response_handler_t */
+static svn_error_t *
+parser_impl_wrapper(serf_request_t *request,
+                    serf_bucket_t *response,
+                    void *baton,
+                    parser_impl_t parser_impl,
+                    apr_pool_t *pool)
+{
+  svn_ra_serf__xml_parser_t *ctx = baton;
+  svn_error_t *err;
+  serf_status_line sl;
+
+  serf_bucket_response_status(response, &sl);
+
+  if (ctx->status_code)
+    {
+      *ctx->status_code = sl.code;
+    }
+
+  if (sl.code == 301 || sl.code == 302 || sl.code == 307)
+    {
+      ctx->location = svn_ra_serf__response_get_location(response, pool);
+    }
+
+  /* Woo-hoo.  Nothing here to see.  */
+  if (sl.code == 404 && ctx->ignore_errors == FALSE)
+    {
+      /* If our caller won't know about the 404, abort() for now. */
+      SVN_ERR_ASSERT(ctx->status_code);
+
+      if (*ctx->done == FALSE)
+        {
+          *ctx->done = TRUE;
+          if (ctx->done_list)
+            {
+              ctx->done_item->data = ctx->user_data;
+              ctx->done_item->next = *ctx->done_list;
+              *ctx->done_list = ctx->done_item;
+            }
+        }
+
+      err = svn_ra_serf__handle_server_error(request, response, pool);
+
+      SVN_ERR(svn_error_compose_create(
+        svn_ra_serf__handle_discard_body(request, response, NULL, pool),
+        err));
+      return SVN_NO_ERROR;
+    }
+
+  return parser_impl(ctx, response);
+}
+
+/* Implements svn_ra_serf__response_handler_t */
 svn_error_t *
+svn_ra_serf__handle_xml_parser(serf_request_t *request,
+                               serf_bucket_t *response,
+                               void *baton,
+                               apr_pool_t *pool)
+{
+  return parser_impl_wrapper(request, response, baton,
+                             svn_ra_serf__parser_impl,
+                             pool);
+}
+
+/* Implements svn_ra_serf__response_handler_t */
+svn_error_t *
+svn_ra_serf__store_data(serf_request_t *request,
+                        serf_bucket_t *response,
+                        void *baton,
+                        apr_pool_t *pool)
+{
+  return parser_impl_wrapper(request, response, baton,
+                             svn_ra_serf__store_data_impl,
+                             pool);
+}
+
+/* Implements svn_ra_serf__response_handler_t */
+svn_error_t *
 svn_ra_serf__handle_server_error(serf_request_t *request,
                                  serf_bucket_t *response,
                                  apr_pool_t *pool)
Index: subversion/libsvn_ra_serf/update.c
===================================================================
--- subversion/libsvn_ra_serf/update.c  (revision 1081269)
+++ subversion/libsvn_ra_serf/update.c  (working copy)
@@ -2175,7 +2175,17 @@ link_path(void *report_baton,
 /** Minimum nr. of outstanding requests needed before a new connection is
  *  opened. */
 #define REQS_PER_CONN 8
+/** Maximum nr. of outstanding requests. This limits the memory usage
+    for requests that are waiting to be sent on the connections.
 
+    Since this number limits the reading of more data of the REPORT response,
+    and xml_parser reads this data in blocks of 8000 bytes, the number of
+    actual outstanding requests will be higher.
+
+    Setting this to 500 with 4 connections means 125 outstanding requests per
+    connection. This seems reasonable, but requires more real-world testing. */
+#define MAX_OUTSTANDING_REQS 500
+
 /** This function creates a new connection for this serf session, but only
  * if the number of ACTIVE_REQS > REQS_PER_CONN or if there currently is
  * only one main connection open.
@@ -2297,7 +2307,7 @@ finish_report(void *report_baton,
      do anything with it. The error in parser_ctx->error is sufficient. */
   parser_ctx->status_code = &status_code;
 
-  handler->response_handler = svn_ra_serf__handle_xml_parser;
+  handler->response_handler = svn_ra_serf__store_data;
   handler->response_baton = parser_ctx;
 
   svn_ra_serf__request_create(handler);
@@ -2440,6 +2450,20 @@ finish_report(void *report_baton,
         }
       report->done_fetches = NULL;
 
+      /* Parse the REPORT response. */
+      if (parser_ctx->response_stream &&
+          report->active_fetches + report->active_propfinds < 
MAX_OUTSTANDING_REQS)
+        {
+          err = svn_ra_serf__parser_impl(parser_ctx,
+                                         parser_ctx->response_stream);
+          if (err && SERF_BUCKET_READ_ERROR(err->apr_err))
+            {
+              return err;
+            }
+          serf_bucket_aggregate_cleanup(parser_ctx->response_stream,
+                                        parser_ctx->bkt_alloc);
+        }
+
       /* Debugging purposes only! */
       serf_debug__closed_conn(sess->bkt_alloc);
       for (i = 0; i < sess->num_conns; i++)
@@ -2459,6 +2483,7 @@ finish_report(void *report_baton,
   /* FIXME subpool */
   return report->update_editor->close_edit(report->update_baton, sess->pool);
 }
+#undef MAX_OUTSTANDING_REQS
 #undef MAX_NR_OF_CONNS
 #undef EXP_REQS_PER_CONN
 
Index: subversion/libsvn_ra_serf/ra_serf.h
===================================================================
--- subversion/libsvn_ra_serf/ra_serf.h (revision 1081269)
+++ subversion/libsvn_ra_serf/ra_serf.h (working copy)
@@ -505,6 +505,8 @@ typedef struct svn_ra_serf__xml_state_t {
 /* Forward declaration of the XML parser structure. */
 typedef struct svn_ra_serf__xml_parser_t svn_ra_serf__xml_parser_t;
 
+typedef struct svn_ra_serf__store_response_t svn_ra_serf__store_response_t;
+
 /* Callback invoked with @a baton by our XML @a parser when an element with
  * the @a name containing @a attrs is opened.
  */
@@ -543,6 +545,9 @@ struct svn_ra_serf__xml_parser_t {
   /* Temporary allocations should be made in this pool. */
   apr_pool_t *pool;
 
+  /* Allocator created from the pool. */
+  serf_bucket_alloc_t *bkt_alloc;
+
   /* Caller-specific data passed to the start, end, cdata callbacks.  */
   void *user_data;
 
@@ -574,6 +579,12 @@ struct svn_ra_serf__xml_parser_t {
    */
   const char *location;
 
+  /* Used by the store_handler, to temporarely store the response data. */
+  serf_bucket_t *response_stream;
+
+  /* Is the REPORT response data read completely? */
+  svn_boolean_t done_reading;
+
   /* If non-NULL, this value will be set to TRUE when the response is
    * completed.
    */
@@ -722,6 +733,17 @@ svn_ra_serf__handle_xml_parser(serf_request_t *req
                                void *handler_baton,
                                apr_pool_t *pool);
 
+/* TODO COMMENT */
+svn_error_t *
+svn_ra_serf__parser_impl(svn_ra_serf__xml_parser_t *ctx,
+                         serf_bucket_t *response);
+
+svn_error_t *
+svn_ra_serf__store_data(serf_request_t *request,
+                        serf_bucket_t *response,
+                        void *baton,
+                        apr_pool_t *pool);
+
 /* serf_response_handler_t implementation that completely discards
  * the response.
  *

Reply via email to