Looks Good.

Ethan

On Thu, Jul 14, 2011 at 14:27, Ben Pfaff <[email protected]> wrote:
> Once in a while someone reports a problem caused by running multiple
> ovs-vswitchd processes at the same time.  This fixes the problem by only
> requiring ovs-vswitchd to obtain a database lock before taking any actions.
> ---
>  lib/ovsdb-idl.c       |  198 
> ++++++++++++++++++++++++++++++++++++++++++++++++-
>  lib/ovsdb-idl.h       |    5 +
>  utilities/ovs-vsctl.c |    4 +
>  vswitchd/bridge.c     |   41 +++++++---
>  4 files changed, 233 insertions(+), 15 deletions(-)
>
> diff --git a/lib/ovsdb-idl.c b/lib/ovsdb-idl.c
> index 51a62dd..2ac6c2f 100644
> --- a/lib/ovsdb-idl.c
> +++ b/lib/ovsdb-idl.c
> @@ -71,6 +71,12 @@ struct ovsdb_idl {
>     unsigned int last_monitor_request_seqno;
>     unsigned int change_seqno;
>
> +    /* Database locking. */
> +    char *lock_name;            /* Name of lock we need, NULL if none. */
> +    bool has_lock;              /* Has db server told us we have the lock? */
> +    bool is_lock_contended;     /* Has db server told us we can't get lock? 
> */
> +    struct json *lock_request_id; /* JSON-RPC ID of in-flight lock request. 
> */
> +
>     /* Transaction support. */
>     struct ovsdb_idl_txn *txn;
>     struct hmap outstanding_txns;
> @@ -136,6 +142,14 @@ static void ovsdb_idl_txn_abort_all(struct ovsdb_idl *);
>  static bool ovsdb_idl_txn_process_reply(struct ovsdb_idl *,
>                                         const struct jsonrpc_msg *msg);
>
> +static void ovsdb_idl_send_lock_request(struct ovsdb_idl *);
> +static void ovsdb_idl_send_unlock_request(struct ovsdb_idl *);
> +static void ovsdb_idl_parse_lock_reply(struct ovsdb_idl *,
> +                                       const struct json *);
> +static void ovsdb_idl_parse_lock_notify(struct ovsdb_idl *,
> +                                        const struct json *params,
> +                                        bool new_has_lock);
> +
>  /* Creates and returns a connection to database 'remote', which should be in 
> a
>  * form acceptable to jsonrpc_session_open().  The connection will maintain an
>  * in-memory replica of the remote database whose schema is described by
> @@ -213,6 +227,8 @@ ovsdb_idl_destroy(struct ovsdb_idl *idl)
>         shash_destroy(&idl->table_by_name);
>         free(idl->tables);
>         json_destroy(idl->monitor_request_id);
> +        free(idl->lock_name);
> +        json_destroy(idl->lock_request_id);
>         free(idl);
>     }
>  }
> @@ -256,7 +272,9 @@ ovsdb_idl_clear(struct ovsdb_idl *idl)
>  /* Processes a batch of messages from the database server on 'idl'.  Returns
>  * true if the database as seen through 'idl' changed, false if it did not
>  * change.  The initial fetch of the entire contents of the remote database is
> - * considered to be one kind of change.
> + * considered to be one kind of change.  If 'idl' has been configured to
> + * acquire a database lock (with ovsdb_idl_set_lock()), then successfully
> + * acquiring the lock is also considered to be a change.
>  *
>  * When this function returns false, the client may continue to use any data
>  * structures it obtained from 'idl' in the past.  But when it returns true,
> @@ -290,6 +308,9 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
>             idl->last_monitor_request_seqno = seqno;
>             ovsdb_idl_txn_abort_all(idl);
>             ovsdb_idl_send_monitor_request(idl);
> +            if (idl->lock_name) {
> +                ovsdb_idl_send_lock_request(idl);
> +            }
>             break;
>         }
>
> @@ -303,18 +324,33 @@ ovsdb_idl_run(struct ovsdb_idl *idl)
>                    && msg->params->type == JSON_ARRAY
>                    && msg->params->u.array.n == 2
>                    && msg->params->u.array.elems[0]->type == JSON_NULL) {
> +            /* Database contents changed. */
>             ovsdb_idl_parse_update(idl, msg->params->u.array.elems[1]);
>         } else if (msg->type == JSONRPC_REPLY
>                    && idl->monitor_request_id
>                    && json_equal(idl->monitor_request_id, msg->id)) {
> +            /* Reply to our "monitor" request. */
>             idl->change_seqno++;
>             json_destroy(idl->monitor_request_id);
>             idl->monitor_request_id = NULL;
>             ovsdb_idl_clear(idl);
>             ovsdb_idl_parse_update(idl, msg->result);
> +        } else if (msg->type == JSONRPC_REPLY
> +                   && idl->lock_request_id
> +                   && json_equal(idl->lock_request_id, msg->id)) {
> +            /* Reply to our "lock" request. */
> +            ovsdb_idl_parse_lock_reply(idl, msg->result);
> +        } else if (msg->type == JSONRPC_NOTIFY
> +                   && !strcmp(msg->method, "locked")) {
> +            /* We got our lock. */
> +            ovsdb_idl_parse_lock_notify(idl, msg->params, true);
> +        } else if (msg->type == JSONRPC_NOTIFY
> +                   && !strcmp(msg->method, "stolen")) {
> +            /* Someone else stole our lock. */
> +            ovsdb_idl_parse_lock_notify(idl, msg->params, false);
>         } else if (msg->type == JSONRPC_REPLY && msg->id->type == JSON_STRING
>                    && !strcmp(msg->id->u.string, "echo")) {
> -            /* It's a reply to our echo request.  Ignore it. */
> +            /* Reply to our echo request.  Ignore it. */
>         } else if ((msg->type == JSONRPC_ERROR
>                     || msg->type == JSONRPC_REPLY)
>                    && ovsdb_idl_txn_process_reply(idl, msg)) {
> @@ -1139,6 +1175,8 @@ ovsdb_idl_txn_status_to_string(enum 
> ovsdb_idl_txn_status status)
>         return "success";
>     case TXN_TRY_AGAIN:
>         return "try again";
> +    case TXN_NOT_LOCKED:
> +        return "not locked";
>     case TXN_ERROR:
>         return "error";
>     }
> @@ -1366,9 +1404,24 @@ ovsdb_idl_txn_commit(struct ovsdb_idl_txn *txn)
>         return txn->status;
>     }
>
> +    /* If we need a lock but don't have it, give up quickly. */
> +    if (txn->idl->lock_name && !ovsdb_idl_has_lock(txn->idl)) {
> +        txn->status = TXN_NOT_LOCKED;
> +        ovsdb_idl_txn_disassemble(txn);
> +        return txn->status;
> +    }
> +
>     operations = json_array_create_1(
>         json_string_create(txn->idl->class->database));
>
> +    /* Assert that we have the required lock (avoiding a race). */
> +    if (txn->idl->lock_name) {
> +        struct json *op = json_object_create();
> +        json_array_add(operations, op);
> +        json_object_put_string(op, "op", "assert");
> +        json_object_put_string(op, "lock", txn->idl->lock_name);
> +    }
> +
>     /* Add prerequisites and declarations of new rows. */
>     HMAP_FOR_EACH (row, txn_node, &txn->txn_rows) {
>         /* XXX check that deleted rows exist even if no prereqs? */
> @@ -1967,6 +2020,7 @@ ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
>         struct json_array *ops = &msg->result->u.array;
>         int hard_errors = 0;
>         int soft_errors = 0;
> +        int lock_errors = 0;
>         size_t i;
>
>         for (i = 0; i < ops->n; i++) {
> @@ -1984,6 +2038,8 @@ ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
>                     if (error->type == JSON_STRING) {
>                         if (!strcmp(error->u.string, "timed out")) {
>                             soft_errors++;
> +                        } else if (!strcmp(error->u.string, "not owner")) {
> +                            lock_errors++;
>                         } else if (strcmp(error->u.string, "aborted")) {
>                             hard_errors++;
>                             ovsdb_idl_txn_set_error_json(txn, op);
> @@ -2003,7 +2059,7 @@ ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
>             }
>         }
>
> -        if (!soft_errors && !hard_errors) {
> +        if (!soft_errors && !hard_errors && !lock_errors) {
>             struct ovsdb_idl_txn_insert *insert;
>
>             if (txn->inc_table && !ovsdb_idl_txn_process_inc_reply(txn, ops)) 
> {
> @@ -2018,6 +2074,7 @@ ovsdb_idl_txn_process_reply(struct ovsdb_idl *idl,
>         }
>
>         status = (hard_errors ? TXN_ERROR
> +                  : lock_errors ? TXN_NOT_LOCKED
>                   : soft_errors ? TXN_TRY_AGAIN
>                   : TXN_SUCCESS);
>     }
> @@ -2039,4 +2096,139 @@ ovsdb_idl_txn_get_idl (struct ovsdb_idl_txn *txn)
>  {
>     return txn->idl;
>  }
> +
> +/* If 'lock_name' is nonnull, configures 'idl' to obtain the named lock from
> + * the database server and to avoid modifying the database when the lock 
> cannot
> + * be acquired (that is, when another client has the same lock).
> + *
> + * If 'lock_name' is NULL, drops the locking requirement and releases the
> + * lock. */
> +void
> +ovsdb_idl_set_lock(struct ovsdb_idl *idl, const char *lock_name)
> +{
> +    assert(!idl->txn);
> +    assert(hmap_is_empty(&idl->outstanding_txns));
> +
> +    if (idl->lock_name && (!lock_name || strcmp(lock_name, idl->lock_name))) 
> {
> +        /* Release previous lock. */
> +        ovsdb_idl_send_unlock_request(idl);
> +        free(idl->lock_name);
> +        idl->lock_name = NULL;
> +        idl->is_lock_contended = false;
> +    }
> +
> +    if (lock_name && !idl->lock_name) {
> +        /* Acquire new lock. */
> +        idl->lock_name = xstrdup(lock_name);
> +        ovsdb_idl_send_lock_request(idl);
> +    }
> +}
> +
> +/* Returns true if 'idl' is configured to obtain a lock and owns that lock.
> + *
> + * Locking and unlocking happens asynchronously from the database client's
> + * point of view, so the information is only useful for optimization (e.g. if
> + * the client doesn't have the lock then there's no point in trying to write 
> to
> + * the database). */
> +bool
> +ovsdb_idl_has_lock(const struct ovsdb_idl *idl)
> +{
> +    return idl->has_lock;
> +}
> +
> +/* Returns true if 'idl' is configured to obtain a lock but the database 
> server
> + * has indicated that some other client already owns the requested lock. */
> +bool
> +ovsdb_idl_is_lock_contended(const struct ovsdb_idl *idl)
> +{
> +    return idl->is_lock_contended;
> +}
> +
> +static void
> +ovsdb_idl_update_has_lock(struct ovsdb_idl *idl, bool new_has_lock)
> +{
> +    if (new_has_lock && !idl->has_lock) {
> +        if (!idl->monitor_request_id) {
> +            idl->change_seqno++;
> +        } else {
> +            /* We're waiting for a monitor reply, so don't signal that the
> +             * database changed.  The monitor reply will increment 
> change_seqno
> +             * anyhow. */
> +        }
> +        idl->is_lock_contended = false;
> +    }
> +    idl->has_lock = new_has_lock;
> +}
> +
> +static void
> +ovsdb_idl_send_lock_request__(struct ovsdb_idl *idl, const char *method,
> +                              struct json **idp)
> +{
> +    ovsdb_idl_update_has_lock(idl, false);
> +
> +    json_destroy(idl->lock_request_id);
> +    idl->lock_request_id = NULL;
> +
> +    if (jsonrpc_session_is_connected(idl->session)) {
> +        struct json *params;
> +
> +        params = json_array_create_1(json_string_create(idl->lock_name));
> +        jsonrpc_session_send(idl->session,
> +                             jsonrpc_create_request(method, params, idp));
> +    }
> +}
>
> +static void
> +ovsdb_idl_send_lock_request(struct ovsdb_idl *idl)
> +{
> +    ovsdb_idl_send_lock_request__(idl, "lock", &idl->lock_request_id);
> +}
> +
> +static void
> +ovsdb_idl_send_unlock_request(struct ovsdb_idl *idl)
> +{
> +    ovsdb_idl_send_lock_request__(idl, "unlock", NULL);
> +}
> +
> +static void
> +ovsdb_idl_parse_lock_reply(struct ovsdb_idl *idl, const struct json *result)
> +{
> +    bool got_lock;
> +
> +    json_destroy(idl->lock_request_id);
> +    idl->lock_request_id = NULL;
> +
> +    if (result->type == JSON_OBJECT) {
> +        const struct json *locked;
> +
> +        locked = shash_find_data(json_object(result), "locked");
> +        got_lock = locked && locked->type == JSON_TRUE;
> +    } else {
> +        got_lock = false;
> +    }
> +
> +    ovsdb_idl_update_has_lock(idl, got_lock);
> +    if (!got_lock) {
> +        idl->is_lock_contended = true;
> +    }
> +}
> +
> +static void
> +ovsdb_idl_parse_lock_notify(struct ovsdb_idl *idl,
> +                            const struct json *params,
> +                            bool new_has_lock)
> +{
> +    if (idl->lock_name
> +        && params->type == JSON_ARRAY
> +        && json_array(params)->n > 0
> +        && json_array(params)->elems[0]->type == JSON_STRING) {
> +        const char *lock_name = json_string(json_array(params)->elems[0]);
> +
> +        if (!strcmp(idl->lock_name, lock_name)) {
> +            ovsdb_idl_update_has_lock(idl, new_has_lock);
> +            if (!new_has_lock) {
> +                idl->is_lock_contended = true;
> +            }
> +        }
> +    }
> +}
> diff --git a/lib/ovsdb-idl.h b/lib/ovsdb-idl.h
> index ac61e1a..e7ae6f1 100644
> --- a/lib/ovsdb-idl.h
> +++ b/lib/ovsdb-idl.h
> @@ -50,6 +50,10 @@ void ovsdb_idl_destroy(struct ovsdb_idl *);
>  bool ovsdb_idl_run(struct ovsdb_idl *);
>  void ovsdb_idl_wait(struct ovsdb_idl *);
>
> +void ovsdb_idl_set_lock(struct ovsdb_idl *, const char *lock_name);
> +bool ovsdb_idl_has_lock(const struct ovsdb_idl *);
> +bool ovsdb_idl_is_lock_contended(const struct ovsdb_idl *);
> +
>  unsigned int ovsdb_idl_get_seqno(const struct ovsdb_idl *);
>  bool ovsdb_idl_has_ever_connected(const struct ovsdb_idl *);
>  void ovsdb_idl_force_reconnect(struct ovsdb_idl *);
> @@ -119,6 +123,7 @@ enum ovsdb_idl_txn_status {
>     TXN_TRY_AGAIN,              /* Commit failed because a "verify" operation
>                                  * reported an inconsistency, due to a network
>                                  * problem, or other transient failure. */
> +    TXN_NOT_LOCKED,             /* Server hasn't given us the lock yet. */
>     TXN_ERROR                   /* Commit failed due to a hard error. */
>  };
>
> diff --git a/utilities/ovs-vsctl.c b/utilities/ovs-vsctl.c
> index c6fc8a4..ad0ef79 100644
> --- a/utilities/ovs-vsctl.c
> +++ b/utilities/ovs-vsctl.c
> @@ -3660,6 +3660,10 @@ do_vsctl(const char *args, struct vsctl_command 
> *commands, size_t n_commands,
>     case TXN_ERROR:
>         vsctl_fatal("transaction error: %s", error);
>
> +    case TXN_NOT_LOCKED:
> +        /* Should not happen--we never call ovsdb_idl_set_lock(). */
> +        vsctl_fatal("database not locked");
> +
>     default:
>         NOT_REACHED();
>     }
> diff --git a/vswitchd/bridge.c b/vswitchd/bridge.c
> index 449957a..b711679 100644
> --- a/vswitchd/bridge.c
> +++ b/vswitchd/bridge.c
> @@ -216,6 +216,7 @@ bridge_init(const char *remote)
>  {
>     /* Create connection to database. */
>     idl = ovsdb_idl_create(remote, &ovsrec_idl_class, true);
> +    ovsdb_idl_set_lock(idl, "ovs_vswitchd");
>
>     ovsdb_idl_omit_alert(idl, &ovsrec_open_vswitch_col_cur_cfg);
>     ovsdb_idl_omit_alert(idl, &ovsrec_open_vswitch_col_statistics);
> @@ -1391,6 +1392,24 @@ bridge_run(void)
>     bool database_changed;
>     struct bridge *br;
>
> +    /* (Re)configure if necessary. */
> +    database_changed = ovsdb_idl_run(idl);
> +    if (ovsdb_idl_is_lock_contended(idl)) {
> +        static struct vlog_rate_limit rl = VLOG_RATE_LIMIT_INIT(1, 1);
> +        struct bridge *br, *next_br;
> +
> +        VLOG_ERR_RL(&rl, "another ovs-vswitchd process is running, "
> +                    "disabling this process until it goes away");
> +
> +        HMAP_FOR_EACH_SAFE (br, next_br, node, &all_bridges) {
> +            bridge_destroy(br);
> +        }
> +        return;
> +    } else if (!ovsdb_idl_has_lock(idl)) {
> +        return;
> +    }
> +    cfg = ovsrec_open_vswitch_first(idl);
> +
>     /* Let each bridge do the work that it needs to do. */
>     datapath_destroyed = false;
>     HMAP_FOR_EACH (br, node, &all_bridges) {
> @@ -1403,10 +1422,6 @@ bridge_run(void)
>         }
>     }
>
> -    /* (Re)configure if necessary. */
> -    database_changed = ovsdb_idl_run(idl);
> -    cfg = ovsrec_open_vswitch_first(idl);
> -
>     /* Re-configure SSL.  We do this on every trip through the main loop,
>      * instead of just when the database changes, because the contents of the
>      * key and certificate files can change without the database changing.
> @@ -1495,16 +1510,18 @@ bridge_run(void)
>  void
>  bridge_wait(void)
>  {
> -    struct bridge *br;
> -
> -    HMAP_FOR_EACH (br, node, &all_bridges) {
> -        ofproto_wait(br->ofproto);
> -    }
>     ovsdb_idl_wait(idl);
> -    poll_timer_wait_until(stats_timer);
> +    if (!hmap_is_empty(&all_bridges)) {
> +        struct bridge *br;
>
> -    if (db_limiter > time_msec()) {
> -        poll_timer_wait_until(db_limiter);
> +        HMAP_FOR_EACH (br, node, &all_bridges) {
> +            ofproto_wait(br->ofproto);
> +        }
> +        poll_timer_wait_until(stats_timer);
> +
> +        if (db_limiter > time_msec()) {
> +            poll_timer_wait_until(db_limiter);
> +        }
>     }
>  }
>
> --
> 1.7.4.4
>
> _______________________________________________
> dev mailing list
> [email protected]
> http://openvswitch.org/mailman/listinfo/dev
>
_______________________________________________
dev mailing list
[email protected]
http://openvswitch.org/mailman/listinfo/dev

Reply via email to