All 7 patches looks good.  I am still get up to speed on OVSDB, but
was able to follow almost all the changes. Did not find any obvious
issues.

Acked-by: Andy Zhou <[email protected]>



On Wed, Apr 2, 2014 at 2:27 PM, Ben Pfaff <[email protected]> wrote:
> Connections that queue up too much data, because they are monitoring a
> table that is changing quickly and failing to keep up with the updates,
> cause problems with buffer management.  Since commit 60533a405b2e
> (jsonrpc-server: Disconnect connections that queue too much data.),
> ovsdb-server has dealt with them by disconnecting the connection and
> letting them start up again with a fresh copy of the database.  However,
> this is not ideal because of situations where disconnection happens
> repeatedly.  For example:
>
>      - A manager toggles a column back and forth between two or more values
>        quickly (in which case the data transmitted over the monitoring
>        connections always increases quickly, without bound).
>
>      - A manager repeatedly extends the contents of some column in some row
>        (in which case the data transmitted over the monitoring connection
>        grows with O(n**2) in the length of the string).
>
> A better way to deal with this problem is to combine updates when they are
> sent to the monitoring connection, if that connection is not keeping up.
> In both the above cases, this reduces the data that must be sent to a
> manageable amount.  This commit implements this new way.
>
> Bug #1211786.
> Bug #1221378.
> Signed-off-by: Ben Pfaff <[email protected]>
> ---
>  ovsdb/jsonrpc-server.c |   84 ++++++++++++++++++++++++++++--------
>  tests/ovsdb-server.at  |  112 
> ++++++++++++++++++++++++++++++++++++++++++++++++
>  2 files changed, 179 insertions(+), 17 deletions(-)
>
> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
> index 3e4e71e..692830c 100644
> --- a/ovsdb/jsonrpc-server.c
> +++ b/ovsdb/jsonrpc-server.c
> @@ -27,6 +27,7 @@
>  #include "ovsdb-error.h"
>  #include "ovsdb-parser.h"
>  #include "ovsdb.h"
> +#include "poll-loop.h"
>  #include "reconnect.h"
>  #include "row.h"
>  #include "server.h"
> @@ -62,6 +63,8 @@ static bool ovsdb_jsonrpc_session_get_status(
>      struct ovsdb_jsonrpc_remote_status *);
>  static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *);
>  static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *);
> +static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *,
> +                                       struct jsonrpc_msg *);
>
>  /* Triggers. */
>  static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *,
> @@ -82,6 +85,8 @@ static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
>      struct json_array *params,
>      const struct json *request_id);
>  static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *);
> +static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *);
> +static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session 
> *);
>
>  /* JSON-RPC database server. */
>
> @@ -437,7 +442,11 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session 
> *s)
>      ovsdb_jsonrpc_trigger_complete_done(s);
>
>      if (!jsonrpc_session_get_backlog(s->js)) {
> -        struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js);
> +        struct jsonrpc_msg *msg;
> +
> +        ovsdb_jsonrpc_monitor_flush_all(s);
> +
> +        msg = jsonrpc_session_recv(s->js);
>          if (msg) {
>              if (msg->type == JSONRPC_REQUEST) {
>                  ovsdb_jsonrpc_session_got_request(s, msg);
> @@ -482,7 +491,11 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session 
> *s)
>  {
>      jsonrpc_session_wait(s->js);
>      if (!jsonrpc_session_get_backlog(s->js)) {
> -        jsonrpc_session_recv_wait(s->js);
> +        if (ovsdb_jsonrpc_monitor_needs_flush(s)) {
> +            poll_immediate_wake();
> +        } else {
> +            jsonrpc_session_recv_wait(s->js);
> +        }
>      }
>  }
>
> @@ -698,7 +711,7 @@ ovsdb_jsonrpc_session_notify(struct ovsdb_session 
> *session,
>
>      s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up);
>      params = json_array_create_1(json_string_create(lock_name));
> -    jsonrpc_session_send(s->js, jsonrpc_create_notify(method, params));
> +    ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params));
>  }
>
>  static struct jsonrpc_msg *
> @@ -873,7 +886,7 @@ ovsdb_jsonrpc_session_got_request(struct 
> ovsdb_jsonrpc_session *s,
>
>      if (reply) {
>          jsonrpc_msg_destroy(request);
> -        jsonrpc_session_send(s->js, reply);
> +        ovsdb_jsonrpc_session_send(s, reply);
>      }
>  }
>
> @@ -901,6 +914,14 @@ ovsdb_jsonrpc_session_got_notify(struct 
> ovsdb_jsonrpc_session *s,
>      }
>      jsonrpc_msg_destroy(request);
>  }
> +
> +static void
> +ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s,
> +                           struct jsonrpc_msg *msg)
> +{
> +    ovsdb_jsonrpc_monitor_flush_all(s);
> +    jsonrpc_session_send(s->js, msg);
> +}
>
>  /* JSON-RPC database server triggers.
>   *
> @@ -928,7 +949,7 @@ ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session 
> *s, struct ovsdb *db,
>
>          msg = jsonrpc_create_error(json_string_create("duplicate request 
> ID"),
>                                     id);
> -        jsonrpc_session_send(s->js, msg);
> +        ovsdb_jsonrpc_session_send(s, msg);
>          json_destroy(id);
>          json_destroy(params);
>          return;
> @@ -979,7 +1000,7 @@ ovsdb_jsonrpc_trigger_complete(struct 
> ovsdb_jsonrpc_trigger *t)
>              reply = jsonrpc_create_error(json_string_create("canceled"),
>                                           t->id);
>          }
> -        jsonrpc_session_send(s->js, reply);
> +        ovsdb_jsonrpc_session_send(s, reply);
>      }
>
>      json_destroy(t->id);
> @@ -1639,6 +1660,46 @@ ovsdb_jsonrpc_monitor_compose_table_update(
>      return json;
>  }
>
> +static bool
> +ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s)
> +{
> +    struct ovsdb_jsonrpc_monitor *m;
> +
> +    HMAP_FOR_EACH (m, node, &s->monitors) {
> +        struct shash_node *node;
> +
> +        SHASH_FOR_EACH (node, &m->tables) {
> +            struct ovsdb_jsonrpc_monitor_table *mt = node->data;
> +
> +            if (!hmap_is_empty(&mt->changes)) {
> +                return true;
> +            }
> +        }
> +    }
> +
> +    return false;
> +}
> +
> +static void
> +ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s)
> +{
> +    struct ovsdb_jsonrpc_monitor *m;
> +
> +    HMAP_FOR_EACH (m, node, &s->monitors) {
> +        struct json *json;
> +
> +        json = ovsdb_jsonrpc_monitor_compose_table_update(m, false);
> +        if (json) {
> +            struct jsonrpc_msg *msg;
> +            struct json *params;
> +
> +            params = json_array_create_2(json_clone(m->monitor_id), json);
> +            msg = jsonrpc_create_notify("update", params);
> +            jsonrpc_session_send(s->js, msg);
> +        }
> +    }
> +}
> +
>  static void
>  ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux,
>                                 const struct ovsdb_jsonrpc_monitor *m)
> @@ -1654,20 +1715,9 @@ ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica 
> *replica,
>  {
>      struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica);
>      struct ovsdb_jsonrpc_monitor_aux aux;
> -    struct json *json;
>
>      ovsdb_jsonrpc_monitor_init_aux(&aux, m);
>      ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux);
> -    json = ovsdb_jsonrpc_monitor_compose_table_update(m, false);
> -    if (json) {
> -        struct jsonrpc_msg *msg;
> -        struct json *params;
> -
> -        params = json_array_create_2(json_clone(aux.monitor->monitor_id),
> -                                     json);
> -        msg = jsonrpc_create_notify("update", params);
> -        jsonrpc_session_send(aux.monitor->session->js, msg);
> -    }
>
>      return NULL;
>  }
> diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at
> index aee6f77..3393b94 100644
> --- a/tests/ovsdb-server.at
> +++ b/tests/ovsdb-server.at
> @@ -38,6 +38,8 @@ cat stdout >> output
>
>  EXECUTION_EXAMPLES
>
> +AT_BANNER([ovsdb-server miscellaneous features])
> +
>  AT_SETUP([truncating corrupted database log])
>  AT_KEYWORDS([ovsdb server positive unix])
>  OVS_RUNDIR=`pwd`; export OVS_RUNDIR
> @@ -662,6 +664,116 @@ _uuid                                name  number
>  ], [], [test ! -e pid || kill `cat pid`])
>  OVSDB_SERVER_SHUTDOWN
>  AT_CLEANUP
> +
> +AT_SETUP([ovsdb-server combines updates on backlogged connections])
> +OVS_LOGDIR=`pwd`; export OVS_LOGDIR
> +OVS_RUNDIR=`pwd`; export OVS_RUNDIR
> +ON_EXIT([kill `cat *.pid`])
> +
> +# The maximum socket receive buffer size is important for this test, which
> +# tests behavior when the receive buffer overflows.
> +if test -e /proc/sys/net/core/rmem_max; then
> +    # Linux
> +    rmem_max=`cat /proc/sys/net/core/rmem_max`
> +elif rmem_max=`sysctl -n net.inet.tcp.recvbuf_max 2>/dev/null`; then
> +    : # FreeBSD
> +else
> +    # Don't know how to get maximum socket receive buffer on this OS
> +    AT_SKIP_IF([:])
> +fi
> +
> +# Calculate the number of iterations we need to queue.  Each of the
> +# iterations we execute, by itself, yields a monitor update of about
> +# 25 kB, so fill up that much space plus a few for luck.
> +n_iterations=`expr $rmem_max / 2500 + 5`
> +echo rmem_max=$rmem_max n_iterations=$n_iterations
> +
> +# Calculate the exact number of monitor updates expected for $n_iterations,
> +# assuming no updates are combined.  The "extra" update is for the initial
> +# contents of the database.
> +n_updates=`expr $n_iterations \* 3 + 1`
> +
> +# Start an ovsdb-server with the vswitchd schema.
> +OVSDB_INIT([db])
> +AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --log-file 
> --remote=punix:db.sock db],
> +  [0], [ignore], [ignore])
> +
> +# Executes a set of transactions that add a bridge with 100 ports, and
> +# then deletes that bridge.  This yields three monitor updates that
> +# add up to about 25 kB in size.
> +#
> +# The update also increments a counter held in the database so that we can
> +# verify that the overall effect of the transactions took effect (e.g.
> +# monitor updates at the end weren't just dropped).  We add an arbitrary
> +# string to the counter to make grepping for it more reliable.
> +counter=0
> +trigger_big_update () {
> +    counter=`expr $counter + 1`
> +    ovs-vsctl --no-wait -- set open_vswitch . system_version=xyzzy$counter
> +    ovs-vsctl --no-wait -- add-br br0 $add
> +    ovs-vsctl --no-wait -- del-br br0
> +}
> +add_ports () {
> +    for j in `seq 1 100`; do
> +        printf " -- add-port br0 p%d" $j
> +    done
> +}
> +add=`add_ports`
> +
> +AT_CAPTURE_FILE([ovsdb-client.err])
> +
> +# Start an ovsdb-client monitoring all changes to the database,
> +# make it block to force the buffers to fill up, and then execute
> +# enough iterations that ovsdb-server starts combining updates.
> +AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL 
> >ovsdb-client.out 2>ovsdb-client.err])
> +AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/block])
> +for i in `seq 1 $n_iterations`; do
> +    echo "blocked update ($i of $n_iterations)"
> +    trigger_big_update $i
> +done
> +AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/unblock])
> +OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out])
> +AT_CHECK([ovs-appctl -t ovsdb-client exit])
> +OVS_WAIT_WHILE([test -e ovsdb-client.pid])
> +
> +# Count the number of updates in the ovsdb-client output, by counting
> +# the number of changes to the Open_vSwitch table.  (All of our
> +# transactions modify the Open_vSwitch table.)  It should be less than
> +# $n_updates updates.
> +#
> +# Check that the counter is what we expect.
> +logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out`
> +echo "logged_updates=$logged_updates (expected less than $n_updates)"
> +AT_CHECK([test $logged_updates -lt $n_updates])
> +AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0],
> +  ["xyzzy$counter"
> +])
> +
> +# Start an ovsdb-client monitoring all changes to the database,
> +# without making it block, and then execute the same transactions that
> +# we did before.
> +AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL 
> >ovsdb-client.out 2>ovsdb-client.err])
> +for i in `seq 1 $n_iterations`; do
> +    echo "unblocked update ($i of $n_iterations)"
> +    trigger_big_update
> +
> +    # Make sure that ovsdb-client gets enough CPU time to process the 
> updates.
> +    ovs-appctl -t ovsdb-client version > /dev/null
> +done
> +OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out])
> +AT_CHECK([ovs-appctl -t ovsdb-client exit])
> +OVS_WAIT_WHILE([test -e ovsdb-client.pid])
> +
> +# The ovsdb-client output should have exactly $n_updates updates.
> +#
> +# Also check that the counter is what we expect.
> +logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out`
> +echo "logged_updates=$logged_updates (expected $n_updates)"
> +AT_CHECK([test $logged_updates -eq $n_updates])
> +AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0],
> +  ["xyzzy$counter"
> +])
> +AT_CLEANUP
>
>  AT_BANNER([OVSDB -- ovsdb-server transactions (SSL IPv4 sockets)])
>
> --
> 1.7.10.4
>
> _______________________________________________
> dev mailing list
> [email protected]
> http://openvswitch.org/mailman/listinfo/dev
_______________________________________________
dev mailing list
[email protected]
http://openvswitch.org/mailman/listinfo/dev

Reply via email to