All 7 patches looks good. I am still get up to speed on OVSDB, but was able to follow almost all the changes. Did not find any obvious issues.
Acked-by: Andy Zhou <[email protected]> On Wed, Apr 2, 2014 at 2:27 PM, Ben Pfaff <[email protected]> wrote: > Connections that queue up too much data, because they are monitoring a > table that is changing quickly and failing to keep up with the updates, > cause problems with buffer management. Since commit 60533a405b2e > (jsonrpc-server: Disconnect connections that queue too much data.), > ovsdb-server has dealt with them by disconnecting the connection and > letting them start up again with a fresh copy of the database. However, > this is not ideal because of situations where disconnection happens > repeatedly. For example: > > - A manager toggles a column back and forth between two or more values > quickly (in which case the data transmitted over the monitoring > connections always increases quickly, without bound). > > - A manager repeatedly extends the contents of some column in some row > (in which case the data transmitted over the monitoring connection > grows with O(n**2) in the length of the string). > > A better way to deal with this problem is to combine updates when they are > sent to the monitoring connection, if that connection is not keeping up. > In both the above cases, this reduces the data that must be sent to a > manageable amount. This commit implements this new way. > > Bug #1211786. > Bug #1221378. > Signed-off-by: Ben Pfaff <[email protected]> > --- > ovsdb/jsonrpc-server.c | 84 ++++++++++++++++++++++++++++-------- > tests/ovsdb-server.at | 112 > ++++++++++++++++++++++++++++++++++++++++++++++++ > 2 files changed, 179 insertions(+), 17 deletions(-) > > diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c > index 3e4e71e..692830c 100644 > --- a/ovsdb/jsonrpc-server.c > +++ b/ovsdb/jsonrpc-server.c > @@ -27,6 +27,7 @@ > #include "ovsdb-error.h" > #include "ovsdb-parser.h" > #include "ovsdb.h" > +#include "poll-loop.h" > #include "reconnect.h" > #include "row.h" > #include "server.h" > @@ -62,6 +63,8 @@ static bool ovsdb_jsonrpc_session_get_status( > struct ovsdb_jsonrpc_remote_status *); > static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session *); > static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *); > +static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *, > + struct jsonrpc_msg *); > > /* Triggers. */ > static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *, > @@ -82,6 +85,8 @@ static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel( > struct json_array *params, > const struct json *request_id); > static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session *); > +static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *); > +static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session > *); > > /* JSON-RPC database server. */ > > @@ -437,7 +442,11 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session > *s) > ovsdb_jsonrpc_trigger_complete_done(s); > > if (!jsonrpc_session_get_backlog(s->js)) { > - struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js); > + struct jsonrpc_msg *msg; > + > + ovsdb_jsonrpc_monitor_flush_all(s); > + > + msg = jsonrpc_session_recv(s->js); > if (msg) { > if (msg->type == JSONRPC_REQUEST) { > ovsdb_jsonrpc_session_got_request(s, msg); > @@ -482,7 +491,11 @@ ovsdb_jsonrpc_session_wait(struct ovsdb_jsonrpc_session > *s) > { > jsonrpc_session_wait(s->js); > if (!jsonrpc_session_get_backlog(s->js)) { > - jsonrpc_session_recv_wait(s->js); > + if (ovsdb_jsonrpc_monitor_needs_flush(s)) { > + poll_immediate_wake(); > + } else { > + jsonrpc_session_recv_wait(s->js); > + } > } > } > > @@ -698,7 +711,7 @@ ovsdb_jsonrpc_session_notify(struct ovsdb_session > *session, > > s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up); > params = json_array_create_1(json_string_create(lock_name)); > - jsonrpc_session_send(s->js, jsonrpc_create_notify(method, params)); > + ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params)); > } > > static struct jsonrpc_msg * > @@ -873,7 +886,7 @@ ovsdb_jsonrpc_session_got_request(struct > ovsdb_jsonrpc_session *s, > > if (reply) { > jsonrpc_msg_destroy(request); > - jsonrpc_session_send(s->js, reply); > + ovsdb_jsonrpc_session_send(s, reply); > } > } > > @@ -901,6 +914,14 @@ ovsdb_jsonrpc_session_got_notify(struct > ovsdb_jsonrpc_session *s, > } > jsonrpc_msg_destroy(request); > } > + > +static void > +ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s, > + struct jsonrpc_msg *msg) > +{ > + ovsdb_jsonrpc_monitor_flush_all(s); > + jsonrpc_session_send(s->js, msg); > +} > > /* JSON-RPC database server triggers. > * > @@ -928,7 +949,7 @@ ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session > *s, struct ovsdb *db, > > msg = jsonrpc_create_error(json_string_create("duplicate request > ID"), > id); > - jsonrpc_session_send(s->js, msg); > + ovsdb_jsonrpc_session_send(s, msg); > json_destroy(id); > json_destroy(params); > return; > @@ -979,7 +1000,7 @@ ovsdb_jsonrpc_trigger_complete(struct > ovsdb_jsonrpc_trigger *t) > reply = jsonrpc_create_error(json_string_create("canceled"), > t->id); > } > - jsonrpc_session_send(s->js, reply); > + ovsdb_jsonrpc_session_send(s, reply); > } > > json_destroy(t->id); > @@ -1639,6 +1660,46 @@ ovsdb_jsonrpc_monitor_compose_table_update( > return json; > } > > +static bool > +ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s) > +{ > + struct ovsdb_jsonrpc_monitor *m; > + > + HMAP_FOR_EACH (m, node, &s->monitors) { > + struct shash_node *node; > + > + SHASH_FOR_EACH (node, &m->tables) { > + struct ovsdb_jsonrpc_monitor_table *mt = node->data; > + > + if (!hmap_is_empty(&mt->changes)) { > + return true; > + } > + } > + } > + > + return false; > +} > + > +static void > +ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s) > +{ > + struct ovsdb_jsonrpc_monitor *m; > + > + HMAP_FOR_EACH (m, node, &s->monitors) { > + struct json *json; > + > + json = ovsdb_jsonrpc_monitor_compose_table_update(m, false); > + if (json) { > + struct jsonrpc_msg *msg; > + struct json *params; > + > + params = json_array_create_2(json_clone(m->monitor_id), json); > + msg = jsonrpc_create_notify("update", params); > + jsonrpc_session_send(s->js, msg); > + } > + } > +} > + > static void > ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux, > const struct ovsdb_jsonrpc_monitor *m) > @@ -1654,20 +1715,9 @@ ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica > *replica, > { > struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica); > struct ovsdb_jsonrpc_monitor_aux aux; > - struct json *json; > > ovsdb_jsonrpc_monitor_init_aux(&aux, m); > ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux); > - json = ovsdb_jsonrpc_monitor_compose_table_update(m, false); > - if (json) { > - struct jsonrpc_msg *msg; > - struct json *params; > - > - params = json_array_create_2(json_clone(aux.monitor->monitor_id), > - json); > - msg = jsonrpc_create_notify("update", params); > - jsonrpc_session_send(aux.monitor->session->js, msg); > - } > > return NULL; > } > diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at > index aee6f77..3393b94 100644 > --- a/tests/ovsdb-server.at > +++ b/tests/ovsdb-server.at > @@ -38,6 +38,8 @@ cat stdout >> output > > EXECUTION_EXAMPLES > > +AT_BANNER([ovsdb-server miscellaneous features]) > + > AT_SETUP([truncating corrupted database log]) > AT_KEYWORDS([ovsdb server positive unix]) > OVS_RUNDIR=`pwd`; export OVS_RUNDIR > @@ -662,6 +664,116 @@ _uuid name number > ], [], [test ! -e pid || kill `cat pid`]) > OVSDB_SERVER_SHUTDOWN > AT_CLEANUP > + > +AT_SETUP([ovsdb-server combines updates on backlogged connections]) > +OVS_LOGDIR=`pwd`; export OVS_LOGDIR > +OVS_RUNDIR=`pwd`; export OVS_RUNDIR > +ON_EXIT([kill `cat *.pid`]) > + > +# The maximum socket receive buffer size is important for this test, which > +# tests behavior when the receive buffer overflows. > +if test -e /proc/sys/net/core/rmem_max; then > + # Linux > + rmem_max=`cat /proc/sys/net/core/rmem_max` > +elif rmem_max=`sysctl -n net.inet.tcp.recvbuf_max 2>/dev/null`; then > + : # FreeBSD > +else > + # Don't know how to get maximum socket receive buffer on this OS > + AT_SKIP_IF([:]) > +fi > + > +# Calculate the number of iterations we need to queue. Each of the > +# iterations we execute, by itself, yields a monitor update of about > +# 25 kB, so fill up that much space plus a few for luck. > +n_iterations=`expr $rmem_max / 2500 + 5` > +echo rmem_max=$rmem_max n_iterations=$n_iterations > + > +# Calculate the exact number of monitor updates expected for $n_iterations, > +# assuming no updates are combined. The "extra" update is for the initial > +# contents of the database. > +n_updates=`expr $n_iterations \* 3 + 1` > + > +# Start an ovsdb-server with the vswitchd schema. > +OVSDB_INIT([db]) > +AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --log-file > --remote=punix:db.sock db], > + [0], [ignore], [ignore]) > + > +# Executes a set of transactions that add a bridge with 100 ports, and > +# then deletes that bridge. This yields three monitor updates that > +# add up to about 25 kB in size. > +# > +# The update also increments a counter held in the database so that we can > +# verify that the overall effect of the transactions took effect (e.g. > +# monitor updates at the end weren't just dropped). We add an arbitrary > +# string to the counter to make grepping for it more reliable. > +counter=0 > +trigger_big_update () { > + counter=`expr $counter + 1` > + ovs-vsctl --no-wait -- set open_vswitch . system_version=xyzzy$counter > + ovs-vsctl --no-wait -- add-br br0 $add > + ovs-vsctl --no-wait -- del-br br0 > +} > +add_ports () { > + for j in `seq 1 100`; do > + printf " -- add-port br0 p%d" $j > + done > +} > +add=`add_ports` > + > +AT_CAPTURE_FILE([ovsdb-client.err]) > + > +# Start an ovsdb-client monitoring all changes to the database, > +# make it block to force the buffers to fill up, and then execute > +# enough iterations that ovsdb-server starts combining updates. > +AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL > >ovsdb-client.out 2>ovsdb-client.err]) > +AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/block]) > +for i in `seq 1 $n_iterations`; do > + echo "blocked update ($i of $n_iterations)" > + trigger_big_update $i > +done > +AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/unblock]) > +OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out]) > +AT_CHECK([ovs-appctl -t ovsdb-client exit]) > +OVS_WAIT_WHILE([test -e ovsdb-client.pid]) > + > +# Count the number of updates in the ovsdb-client output, by counting > +# the number of changes to the Open_vSwitch table. (All of our > +# transactions modify the Open_vSwitch table.) It should be less than > +# $n_updates updates. > +# > +# Check that the counter is what we expect. > +logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out` > +echo "logged_updates=$logged_updates (expected less than $n_updates)" > +AT_CHECK([test $logged_updates -lt $n_updates]) > +AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0], > + ["xyzzy$counter" > +]) > + > +# Start an ovsdb-client monitoring all changes to the database, > +# without making it block, and then execute the same transactions that > +# we did before. > +AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL > >ovsdb-client.out 2>ovsdb-client.err]) > +for i in `seq 1 $n_iterations`; do > + echo "unblocked update ($i of $n_iterations)" > + trigger_big_update > + > + # Make sure that ovsdb-client gets enough CPU time to process the > updates. > + ovs-appctl -t ovsdb-client version > /dev/null > +done > +OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out]) > +AT_CHECK([ovs-appctl -t ovsdb-client exit]) > +OVS_WAIT_WHILE([test -e ovsdb-client.pid]) > + > +# The ovsdb-client output should have exactly $n_updates updates. > +# > +# Also check that the counter is what we expect. > +logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out` > +echo "logged_updates=$logged_updates (expected $n_updates)" > +AT_CHECK([test $logged_updates -eq $n_updates]) > +AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0], > + ["xyzzy$counter" > +]) > +AT_CLEANUP > > AT_BANNER([OVSDB -- ovsdb-server transactions (SSL IPv4 sockets)]) > > -- > 1.7.10.4 > > _______________________________________________ > dev mailing list > [email protected] > http://openvswitch.org/mailman/listinfo/dev _______________________________________________ dev mailing list [email protected] http://openvswitch.org/mailman/listinfo/dev
