Thanks. I'm doing a final run of tests and then I'll apply these to master.
On Thu, Apr 03, 2014 at 12:06:46AM -0700, Andy Zhou wrote: > All 7 patches looks good. I am still get up to speed on OVSDB, but > was able to follow almost all the changes. Did not find any obvious > issues. > > Acked-by: Andy Zhou <[email protected]> > > > > On Wed, Apr 2, 2014 at 2:27 PM, Ben Pfaff <[email protected]> wrote: > > Connections that queue up too much data, because they are monitoring a > > table that is changing quickly and failing to keep up with the updates, > > cause problems with buffer management. Since commit 60533a405b2e > > (jsonrpc-server: Disconnect connections that queue too much data.), > > ovsdb-server has dealt with them by disconnecting the connection and > > letting them start up again with a fresh copy of the database. However, > > this is not ideal because of situations where disconnection happens > > repeatedly. For example: > > > > - A manager toggles a column back and forth between two or more values > > quickly (in which case the data transmitted over the monitoring > > connections always increases quickly, without bound). > > > > - A manager repeatedly extends the contents of some column in some row > > (in which case the data transmitted over the monitoring connection > > grows with O(n**2) in the length of the string). > > > > A better way to deal with this problem is to combine updates when they are > > sent to the monitoring connection, if that connection is not keeping up. > > In both the above cases, this reduces the data that must be sent to a > > manageable amount. This commit implements this new way. > > > > Bug #1211786. > > Bug #1221378. > > Signed-off-by: Ben Pfaff <[email protected]> > > --- > > ovsdb/jsonrpc-server.c | 84 ++++++++++++++++++++++++++++-------- > > tests/ovsdb-server.at | 112 > > ++++++++++++++++++++++++++++++++++++++++++++++++ > > 2 files changed, 179 insertions(+), 17 deletions(-) > > > > diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c > > index 3e4e71e..692830c 100644 > > --- a/ovsdb/jsonrpc-server.c > > +++ b/ovsdb/jsonrpc-server.c > > @@ -27,6 +27,7 @@ > > #include "ovsdb-error.h" > > #include "ovsdb-parser.h" > > #include "ovsdb.h" > > +#include "poll-loop.h" > > #include "reconnect.h" > > #include "row.h" > > #include "server.h" > > @@ -62,6 +63,8 @@ static bool ovsdb_jsonrpc_session_get_status( > > struct ovsdb_jsonrpc_remote_status *); > > static void ovsdb_jsonrpc_session_unlock_all(struct ovsdb_jsonrpc_session > > *); > > static void ovsdb_jsonrpc_session_unlock__(struct ovsdb_lock_waiter *); > > +static void ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *, > > + struct jsonrpc_msg *); > > > > /* Triggers. */ > > static void ovsdb_jsonrpc_trigger_create(struct ovsdb_jsonrpc_session *, > > @@ -82,6 +85,8 @@ static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel( > > struct json_array *params, > > const struct json *request_id); > > static void ovsdb_jsonrpc_monitor_remove_all(struct ovsdb_jsonrpc_session > > *); > > +static void ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session > > *); > > +static bool ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session > > *); > > > > /* JSON-RPC database server. */ > > > > @@ -437,7 +442,11 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session > > *s) > > ovsdb_jsonrpc_trigger_complete_done(s); > > > > if (!jsonrpc_session_get_backlog(s->js)) { > > - struct jsonrpc_msg *msg = jsonrpc_session_recv(s->js); > > + struct jsonrpc_msg *msg; > > + > > + ovsdb_jsonrpc_monitor_flush_all(s); > > + > > + msg = jsonrpc_session_recv(s->js); > > if (msg) { > > if (msg->type == JSONRPC_REQUEST) { > > ovsdb_jsonrpc_session_got_request(s, msg); > > @@ -482,7 +491,11 @@ ovsdb_jsonrpc_session_wait(struct > > ovsdb_jsonrpc_session *s) > > { > > jsonrpc_session_wait(s->js); > > if (!jsonrpc_session_get_backlog(s->js)) { > > - jsonrpc_session_recv_wait(s->js); > > + if (ovsdb_jsonrpc_monitor_needs_flush(s)) { > > + poll_immediate_wake(); > > + } else { > > + jsonrpc_session_recv_wait(s->js); > > + } > > } > > } > > > > @@ -698,7 +711,7 @@ ovsdb_jsonrpc_session_notify(struct ovsdb_session > > *session, > > > > s = CONTAINER_OF(session, struct ovsdb_jsonrpc_session, up); > > params = json_array_create_1(json_string_create(lock_name)); > > - jsonrpc_session_send(s->js, jsonrpc_create_notify(method, params)); > > + ovsdb_jsonrpc_session_send(s, jsonrpc_create_notify(method, params)); > > } > > > > static struct jsonrpc_msg * > > @@ -873,7 +886,7 @@ ovsdb_jsonrpc_session_got_request(struct > > ovsdb_jsonrpc_session *s, > > > > if (reply) { > > jsonrpc_msg_destroy(request); > > - jsonrpc_session_send(s->js, reply); > > + ovsdb_jsonrpc_session_send(s, reply); > > } > > } > > > > @@ -901,6 +914,14 @@ ovsdb_jsonrpc_session_got_notify(struct > > ovsdb_jsonrpc_session *s, > > } > > jsonrpc_msg_destroy(request); > > } > > + > > +static void > > +ovsdb_jsonrpc_session_send(struct ovsdb_jsonrpc_session *s, > > + struct jsonrpc_msg *msg) > > +{ > > + ovsdb_jsonrpc_monitor_flush_all(s); > > + jsonrpc_session_send(s->js, msg); > > +} > > > > /* JSON-RPC database server triggers. > > * > > @@ -928,7 +949,7 @@ ovsdb_jsonrpc_trigger_create(struct > > ovsdb_jsonrpc_session *s, struct ovsdb *db, > > > > msg = jsonrpc_create_error(json_string_create("duplicate request > > ID"), > > id); > > - jsonrpc_session_send(s->js, msg); > > + ovsdb_jsonrpc_session_send(s, msg); > > json_destroy(id); > > json_destroy(params); > > return; > > @@ -979,7 +1000,7 @@ ovsdb_jsonrpc_trigger_complete(struct > > ovsdb_jsonrpc_trigger *t) > > reply = jsonrpc_create_error(json_string_create("canceled"), > > t->id); > > } > > - jsonrpc_session_send(s->js, reply); > > + ovsdb_jsonrpc_session_send(s, reply); > > } > > > > json_destroy(t->id); > > @@ -1639,6 +1660,46 @@ ovsdb_jsonrpc_monitor_compose_table_update( > > return json; > > } > > > > +static bool > > +ovsdb_jsonrpc_monitor_needs_flush(struct ovsdb_jsonrpc_session *s) > > +{ > > + struct ovsdb_jsonrpc_monitor *m; > > + > > + HMAP_FOR_EACH (m, node, &s->monitors) { > > + struct shash_node *node; > > + > > + SHASH_FOR_EACH (node, &m->tables) { > > + struct ovsdb_jsonrpc_monitor_table *mt = node->data; > > + > > + if (!hmap_is_empty(&mt->changes)) { > > + return true; > > + } > > + } > > + } > > + > > + return false; > > +} > > + > > +static void > > +ovsdb_jsonrpc_monitor_flush_all(struct ovsdb_jsonrpc_session *s) > > +{ > > + struct ovsdb_jsonrpc_monitor *m; > > + > > + HMAP_FOR_EACH (m, node, &s->monitors) { > > + struct json *json; > > + > > + json = ovsdb_jsonrpc_monitor_compose_table_update(m, false); > > + if (json) { > > + struct jsonrpc_msg *msg; > > + struct json *params; > > + > > + params = json_array_create_2(json_clone(m->monitor_id), json); > > + msg = jsonrpc_create_notify("update", params); > > + jsonrpc_session_send(s->js, msg); > > + } > > + } > > +} > > + > > static void > > ovsdb_jsonrpc_monitor_init_aux(struct ovsdb_jsonrpc_monitor_aux *aux, > > const struct ovsdb_jsonrpc_monitor *m) > > @@ -1654,20 +1715,9 @@ ovsdb_jsonrpc_monitor_commit(struct ovsdb_replica > > *replica, > > { > > struct ovsdb_jsonrpc_monitor *m = ovsdb_jsonrpc_monitor_cast(replica); > > struct ovsdb_jsonrpc_monitor_aux aux; > > - struct json *json; > > > > ovsdb_jsonrpc_monitor_init_aux(&aux, m); > > ovsdb_txn_for_each_change(txn, ovsdb_jsonrpc_monitor_change_cb, &aux); > > - json = ovsdb_jsonrpc_monitor_compose_table_update(m, false); > > - if (json) { > > - struct jsonrpc_msg *msg; > > - struct json *params; > > - > > - params = json_array_create_2(json_clone(aux.monitor->monitor_id), > > - json); > > - msg = jsonrpc_create_notify("update", params); > > - jsonrpc_session_send(aux.monitor->session->js, msg); > > - } > > > > return NULL; > > } > > diff --git a/tests/ovsdb-server.at b/tests/ovsdb-server.at > > index aee6f77..3393b94 100644 > > --- a/tests/ovsdb-server.at > > +++ b/tests/ovsdb-server.at > > @@ -38,6 +38,8 @@ cat stdout >> output > > > > EXECUTION_EXAMPLES > > > > +AT_BANNER([ovsdb-server miscellaneous features]) > > + > > AT_SETUP([truncating corrupted database log]) > > AT_KEYWORDS([ovsdb server positive unix]) > > OVS_RUNDIR=`pwd`; export OVS_RUNDIR > > @@ -662,6 +664,116 @@ _uuid name number > > ], [], [test ! -e pid || kill `cat pid`]) > > OVSDB_SERVER_SHUTDOWN > > AT_CLEANUP > > + > > +AT_SETUP([ovsdb-server combines updates on backlogged connections]) > > +OVS_LOGDIR=`pwd`; export OVS_LOGDIR > > +OVS_RUNDIR=`pwd`; export OVS_RUNDIR > > +ON_EXIT([kill `cat *.pid`]) > > + > > +# The maximum socket receive buffer size is important for this test, which > > +# tests behavior when the receive buffer overflows. > > +if test -e /proc/sys/net/core/rmem_max; then > > + # Linux > > + rmem_max=`cat /proc/sys/net/core/rmem_max` > > +elif rmem_max=`sysctl -n net.inet.tcp.recvbuf_max 2>/dev/null`; then > > + : # FreeBSD > > +else > > + # Don't know how to get maximum socket receive buffer on this OS > > + AT_SKIP_IF([:]) > > +fi > > + > > +# Calculate the number of iterations we need to queue. Each of the > > +# iterations we execute, by itself, yields a monitor update of about > > +# 25 kB, so fill up that much space plus a few for luck. > > +n_iterations=`expr $rmem_max / 2500 + 5` > > +echo rmem_max=$rmem_max n_iterations=$n_iterations > > + > > +# Calculate the exact number of monitor updates expected for $n_iterations, > > +# assuming no updates are combined. The "extra" update is for the initial > > +# contents of the database. > > +n_updates=`expr $n_iterations \* 3 + 1` > > + > > +# Start an ovsdb-server with the vswitchd schema. > > +OVSDB_INIT([db]) > > +AT_CHECK([ovsdb-server --detach --no-chdir --pidfile --log-file > > --remote=punix:db.sock db], > > + [0], [ignore], [ignore]) > > + > > +# Executes a set of transactions that add a bridge with 100 ports, and > > +# then deletes that bridge. This yields three monitor updates that > > +# add up to about 25 kB in size. > > +# > > +# The update also increments a counter held in the database so that we can > > +# verify that the overall effect of the transactions took effect (e.g. > > +# monitor updates at the end weren't just dropped). We add an arbitrary > > +# string to the counter to make grepping for it more reliable. > > +counter=0 > > +trigger_big_update () { > > + counter=`expr $counter + 1` > > + ovs-vsctl --no-wait -- set open_vswitch . system_version=xyzzy$counter > > + ovs-vsctl --no-wait -- add-br br0 $add > > + ovs-vsctl --no-wait -- del-br br0 > > +} > > +add_ports () { > > + for j in `seq 1 100`; do > > + printf " -- add-port br0 p%d" $j > > + done > > +} > > +add=`add_ports` > > + > > +AT_CAPTURE_FILE([ovsdb-client.err]) > > + > > +# Start an ovsdb-client monitoring all changes to the database, > > +# make it block to force the buffers to fill up, and then execute > > +# enough iterations that ovsdb-server starts combining updates. > > +AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL > > >ovsdb-client.out 2>ovsdb-client.err]) > > +AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/block]) > > +for i in `seq 1 $n_iterations`; do > > + echo "blocked update ($i of $n_iterations)" > > + trigger_big_update $i > > +done > > +AT_CHECK([ovs-appctl -t ovsdb-client ovsdb-client/unblock]) > > +OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out]) > > +AT_CHECK([ovs-appctl -t ovsdb-client exit]) > > +OVS_WAIT_WHILE([test -e ovsdb-client.pid]) > > + > > +# Count the number of updates in the ovsdb-client output, by counting > > +# the number of changes to the Open_vSwitch table. (All of our > > +# transactions modify the Open_vSwitch table.) It should be less than > > +# $n_updates updates. > > +# > > +# Check that the counter is what we expect. > > +logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out` > > +echo "logged_updates=$logged_updates (expected less than $n_updates)" > > +AT_CHECK([test $logged_updates -lt $n_updates]) > > +AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0], > > + ["xyzzy$counter" > > +]) > > + > > +# Start an ovsdb-client monitoring all changes to the database, > > +# without making it block, and then execute the same transactions that > > +# we did before. > > +AT_CHECK([ovsdb-client --detach --no-chdir --pidfile monitor ALL > > >ovsdb-client.out 2>ovsdb-client.err]) > > +for i in `seq 1 $n_iterations`; do > > + echo "unblocked update ($i of $n_iterations)" > > + trigger_big_update > > + > > + # Make sure that ovsdb-client gets enough CPU time to process the > > updates. > > + ovs-appctl -t ovsdb-client version > /dev/null > > +done > > +OVS_WAIT_UNTIL([grep "\"xyzzy$counter\"" ovsdb-client.out]) > > +AT_CHECK([ovs-appctl -t ovsdb-client exit]) > > +OVS_WAIT_WHILE([test -e ovsdb-client.pid]) > > + > > +# The ovsdb-client output should have exactly $n_updates updates. > > +# > > +# Also check that the counter is what we expect. > > +logged_updates=`grep -c '^Open_vSwitch' ovsdb-client.out` > > +echo "logged_updates=$logged_updates (expected $n_updates)" > > +AT_CHECK([test $logged_updates -eq $n_updates]) > > +AT_CHECK_UNQUOTED([ovs-vsctl get open_vswitch . system_version], [0], > > + ["xyzzy$counter" > > +]) > > +AT_CLEANUP > > > > AT_BANNER([OVSDB -- ovsdb-server transactions (SSL IPv4 sockets)]) > > > > -- > > 1.7.10.4 > > > > _______________________________________________ > > dev mailing list > > [email protected] > > http://openvswitch.org/mailman/listinfo/dev _______________________________________________ dev mailing list [email protected] http://openvswitch.org/mailman/listinfo/dev
