On Wed, Feb 3, 2016 at 5:53 AM, Liran Schour <lir...@il.ibm.com> wrote:

> ovsdb-server now accepts "monitor_cond_update" request. On conditions
> update
> we insert all rows of table in a new changes list - OVSDB_MONITOR_ALL that
> are
> being indexed by the transaction-id at the moment of insertion.
> JSON cache is being used only for empty condition monitor sessions.
> Sees ovsdb-server (1) man page for details of monitor_cond_update.
>
> Signed-off-by: Liran Schour <lir...@il.ibm.com>
>

Ah. I see why you want to only compare monitored columns. It makes handling
condition updates
easier. :-)

There is no test for this feature. It seems to be a too big of a feature to
be missing test coverage.

>
> ---
> v2->v3:
> * ovsdb_monitor_table_condition_update() accepts only single json condition
> * Allow non-monitored columns in cond_update.
> * Flush monitor session after monitor_cond_update to guarantee empty
> changes list
> * Bug fix: use json cache when all condition are empty
> * Simplify inserting row change to changes lists
> ---
>  ovsdb/jsonrpc-server.c | 154 +++++++++++++++++++++++++--
>  ovsdb/monitor.c        | 283
> ++++++++++++++++++++++++++++++++++++++++++-------
>  ovsdb/monitor.h        |  26 +++--
>  3 files changed, 415 insertions(+), 48 deletions(-)
>
> diff --git a/ovsdb/jsonrpc-server.c b/ovsdb/jsonrpc-server.c
> index cd6a70a..15dc406 100644
> --- a/ovsdb/jsonrpc-server.c
> +++ b/ovsdb/jsonrpc-server.c
> @@ -87,6 +87,10 @@ static void ovsdb_jsonrpc_trigger_complete_done(
>  static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_create(
>      struct ovsdb_jsonrpc_session *, struct ovsdb *, struct json *params,
>      enum ovsdb_monitor_version, const struct json *request_id);
> +static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cond_update(
> +    struct ovsdb_jsonrpc_session *s,
> +    struct json *params,
> +    const struct json *request_id);
>  static struct jsonrpc_msg *ovsdb_jsonrpc_monitor_cancel(
>      struct ovsdb_jsonrpc_session *,
>      struct json_array *params,
> @@ -407,7 +411,8 @@ static void ovsdb_jsonrpc_session_wait(struct
> ovsdb_jsonrpc_session *);
>  static void ovsdb_jsonrpc_session_get_memory_usage(
>      const struct ovsdb_jsonrpc_session *, struct simap *usage);
>  static void ovsdb_jsonrpc_session_got_request(struct
> ovsdb_jsonrpc_session *,
> -                                             struct jsonrpc_msg *);
> +                                              struct jsonrpc_msg *,
> +                                              bool *);
>  static void ovsdb_jsonrpc_session_got_notify(struct ovsdb_jsonrpc_session
> *,
>                                               struct jsonrpc_msg *);
>
> @@ -463,13 +468,14 @@ ovsdb_jsonrpc_session_run(struct
> ovsdb_jsonrpc_session *s)
>
>      if (!jsonrpc_session_get_backlog(s->js)) {
>          struct jsonrpc_msg *msg;
> +        bool needs_flush = false;
>
>          ovsdb_jsonrpc_monitor_flush_all(s);
>
>          msg = jsonrpc_session_recv(s->js);
>          if (msg) {
>              if (msg->type == JSONRPC_REQUEST) {
> -                ovsdb_jsonrpc_session_got_request(s, msg);
> +                ovsdb_jsonrpc_session_got_request(s, msg, &needs_flush);
>              } else if (msg->type == JSONRPC_NOTIFY) {
>                  ovsdb_jsonrpc_session_got_notify(s, msg);
>              } else {
> @@ -480,6 +486,9 @@ ovsdb_jsonrpc_session_run(struct ovsdb_jsonrpc_session
> *s)
>                  jsonrpc_msg_destroy(msg);
>              }
>          }
> +        if (needs_flush) {
> +            ovsdb_jsonrpc_monitor_flush_all(s);
> +        }
>      }
>      return jsonrpc_session_is_alive(s->js) ? 0 : ETIMEDOUT;
>  }
> @@ -840,10 +849,12 @@ execute_transaction(struct ovsdb_jsonrpc_session *s,
> struct ovsdb *db,
>
>  static void
>  ovsdb_jsonrpc_session_got_request(struct ovsdb_jsonrpc_session *s,
> -                                  struct jsonrpc_msg *request)
> +                                  struct jsonrpc_msg *request,
> +                                  bool *needs_flush)
>  {
>      struct jsonrpc_msg *reply;
>
> +    *needs_flush = false;
>      if (!strcmp(request->method, "transact")) {
>          struct ovsdb *db = ovsdb_jsonrpc_lookup_db(s, request, &reply);
>          if (!reply) {
> @@ -861,6 +872,10 @@ ovsdb_jsonrpc_session_got_request(struct
> ovsdb_jsonrpc_session *s,
>                                                   version,
>                                                   request->id);
>          }
> +    } else if (!strcmp(request->method, "monitor_cond_update")) {
> +        reply = ovsdb_jsonrpc_monitor_cond_update(s, request->params,
> +                                                  request->id);
> +        *needs_flush = true;
>      } else if (!strcmp(request->method, "monitor_cancel")) {
>          reply = ovsdb_jsonrpc_monitor_cancel(s,
> json_array(request->params),
>                                               request->id);
> @@ -1050,6 +1065,8 @@ struct ovsdb_jsonrpc_monitor {
>      struct ovsdb_monitor *dbmon;
>      uint64_t unflushed;         /* The first transaction that has not been
>                                         flushed to the jsonrpc remote
> client. */
> +    bool all_rows;              /* Indicates if in the next flush we
> request all
> +                                   rows (due to a condition change)
>     */
>      enum ovsdb_monitor_version version;
>      struct ovsdb_monitor_session_condition *condition;/* Session's
> condition */
>  };
> @@ -1213,6 +1230,7 @@ ovsdb_jsonrpc_monitor_create(struct
> ovsdb_jsonrpc_session *s, struct ovsdb *db,
>          m->condition = ovsdb_monitor_session_condition_create();
>      }
>      m->unflushed = 0;
> +    m->all_rows = false;
>      m->version = version;
>      hmap_insert(&s->monitors, &m->node, json_hash(monitor_id, 0));
>      m->monitor_id = json_clone(monitor_id);
> @@ -1294,6 +1312,124 @@ error:
>      return jsonrpc_create_error(json, request_id);
>  }
>
> +static struct ovsdb_error *
> +ovsdb_jsonrpc_parse_monitor_cond_update_request(
> +                                struct ovsdb_jsonrpc_monitor *m,
> +                                const struct ovsdb_table *table,
> +                                const struct json *cond_update_req)
> +{
> +    const struct ovsdb_table_schema *ts = table->schema;
> +    const struct json *condition, *columns;
> +    struct ovsdb_parser parser;
> +    struct ovsdb_error *error;
> +
> +    ovsdb_parser_init(&parser, cond_update_req, "table %s", ts->name);
> +    columns = ovsdb_parser_member(&parser, "columns", OP_ARRAY |
> OP_OPTIONAL);
> +    condition = ovsdb_parser_member(&parser, "where", OP_ARRAY |
> OP_OPTIONAL);
> +
> +    error = ovsdb_parser_finish(&parser);
> +    if (error) {
> +        return error;
> +    }
> +
> +    if (columns) {
> +        error = ovsdb_syntax_error(cond_update_req, NULL, "changing
> columns "
> +                                   "is unsupported");
> +        return error;
> +    }
> +    error = ovsdb_monitor_table_condition_update(m->dbmon, m->condition,
> table,
> +                                                 condition);
> +
> +    return error;
> +}
> +
> +static struct jsonrpc_msg *
> +ovsdb_jsonrpc_monitor_cond_update(struct ovsdb_jsonrpc_session *s,
> +                                  struct json *params,
> +                                  const struct json *request_id)
> +{
> +    struct ovsdb_error *error;
> +    struct ovsdb_jsonrpc_monitor *m;
> +    struct json *monitor_cond_update_reqs;
> +    struct shash_node *node;
> +    struct json *json;
> +
> +    if (json_array(params)->n != 3) {
> +        error = ovsdb_syntax_error(params, NULL, "invalid parameters");
> +        goto error;
> +    }
> +
> +    m = ovsdb_jsonrpc_monitor_find(s, params->u.array.elems[0]);
> +    if (!m) {
> +        error = ovsdb_syntax_error(request_id, NULL,
> +                                   "unknown monitor session");
> +        goto error;
> +    }
> +
> +    monitor_cond_update_reqs = params->u.array.elems[2];
> +    if (monitor_cond_update_reqs->type != JSON_OBJECT) {
> +        error =
> +            ovsdb_syntax_error(NULL, NULL,
> +                               "monitor-cond-change-requests must be
> object");
> +        goto error;
> +    }
> +
> +    SHASH_FOR_EACH (node, json_object(monitor_cond_update_reqs)) {
> +        const struct ovsdb_table *table;
> +        const struct json *mr_value;
> +        size_t i;
> +
> +        table = ovsdb_get_table(m->db, node->name);
> +        if (!table) {
> +            error = ovsdb_syntax_error(NULL, NULL,
> +                                       "no table named %s", node->name);
> +            goto error;
> +        }
> +        if (!ovsdb_monitor_table_exists(m->dbmon, table)) {
> +            error = ovsdb_syntax_error(NULL, NULL,
> +                                       "no table named %s in monitor
> session",
> +                                       node->name);
> +            goto error;
> +        }
> +
> +        mr_value = node->data;
> +        if (mr_value->type == JSON_ARRAY) {
> +            const struct json_array *array = &mr_value->u.array;
> +
> +            for (i = 0; i < array->n; i++) {
> +                error = ovsdb_jsonrpc_parse_monitor_cond_update_request(
> +                                            m, table, array->elems[i]);
> +                if (error) {
> +                    goto error;
> +                }
> +            }
> +        } else {
> +            error = ovsdb_syntax_error(
> +                       NULL, NULL,
> +                       "table %s no monitor-cond-change JSON array",
> +                       node->name);
> +            goto error;
> +        }
> +    }
> +
> +    ovsdb_monitor_get_all_rows(m->dbmon, m->unflushed);
> +    m->all_rows = true;
> +
> +    /* Change monitor id */
> +    hmap_remove(&s->monitors, &m->node);
> +    json_destroy(m->monitor_id);
> +    m->monitor_id = json_clone(params->u.array.elems[1]);
> +    hmap_insert(&s->monitors, &m->node, json_hash(m->monitor_id, 0));
> +
> +    return jsonrpc_create_reply(json_object_create(), request_id);
> +
> +error:
> +
> +    json = ovsdb_error_to_json(error);
> +    ovsdb_error_destroy(error);
> +    return jsonrpc_create_error(json, request_id);
> +}
> +
>  static struct jsonrpc_msg *
>  ovsdb_jsonrpc_monitor_cancel(struct ovsdb_jsonrpc_session *s,
>                               struct json_array *params,
> @@ -1330,8 +1466,14 @@ static struct json *
>  ovsdb_jsonrpc_monitor_compose_update(struct ovsdb_jsonrpc_monitor *m,
>                                       bool initial)
>  {
> -    return ovsdb_monitor_get_update(m->dbmon, initial, &m->unflushed,
> -                                    m->condition, m->version);
> +    struct json * json = ovsdb_monitor_get_update(m->dbmon, initial,
> +                                                  m->all_rows,
> +                                                  &m->unflushed,
> +                                                  m->condition,
> +                                                  m->version);
> +
> +    m->all_rows = false;
> +    return json;
>  }
>
>  static bool
> @@ -1340,7 +1482,7 @@ ovsdb_jsonrpc_monitor_needs_flush(struct
> ovsdb_jsonrpc_session *s)
>      struct ovsdb_jsonrpc_monitor *m;
>
>      HMAP_FOR_EACH (m, node, &s->monitors) {
> -        if (ovsdb_monitor_needs_flush(m->dbmon, m->unflushed)) {
> +        if (m->all_rows || ovsdb_monitor_needs_flush(m->dbmon,
> m->unflushed)) {
>              return true;
>          }
>      }
> diff --git a/ovsdb/monitor.c b/ovsdb/monitor.c
> index 1614d67..d087a5a 100644
> --- a/ovsdb/monitor.c
> +++ b/ovsdb/monitor.c
> @@ -120,6 +120,11 @@ struct ovsdb_monitor_changes {
>                                      hmap.  */
>  };
>
> +enum ovsdb_monitor_changes_type {
> +    OVSDB_MONITOR_CHANGES,
> +    OVSDB_MONITOR_ALL
> +};
> +
>  /* A particular table being monitored. */
>  struct ovsdb_monitor_table {
>      const struct ovsdb_table *table;
> @@ -142,6 +147,9 @@ struct ovsdb_monitor_table {
>
>      /* Contains 'ovsdb_monitor_changes' indexed by 'transaction'. */
>      struct hmap changes;
> +    /* Contains 'ovsdb_monitor_changes' of all rows in table at
> transaction
> +       point in time. indexed by 'transaction'. */
> +    struct hmap all;
>  };
>
>  typedef struct json *
> @@ -153,12 +161,15 @@ typedef struct json *
>
>  static void ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon);
>  static struct ovsdb_monitor_changes * ovsdb_monitor_table_add_changes(
> -    struct ovsdb_monitor_table *mt, uint64_t next_txn);
> +    struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type,
> +    uint64_t next_txn);
>  static struct ovsdb_monitor_changes *ovsdb_monitor_table_find_changes(
> -    struct ovsdb_monitor_table *mt, uint64_t unflushed);
> +    struct ovsdb_monitor_table *mt, enum ovsdb_monitor_changes_type type,
> +    uint64_t unflushed);
>  static void ovsdb_monitor_changes_destroy(
>                                    struct ovsdb_monitor_changes *changes);
>  static void ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table
> *mt,
> +                                  enum ovsdb_monitor_changes_type type,
>                                    uint64_t unflushed);
>
>  static uint32_t
> @@ -262,8 +273,8 @@ ovsdb_monitor_changes_row_find(const struct
> ovsdb_monitor_changes *changes,
>   *
>   * If 'row' is NULL, returns NULL. */
>  static struct ovsdb_datum *
> -clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
> -                       const struct ovsdb_row *row)
> +clone_monitor_ovsdb_row_data(const struct ovsdb_monitor_table *mt,
> +                             const struct ovsdb_row *row)
>  {
>      struct ovsdb_datum *data;
>      size_t i;
> @@ -284,6 +295,44 @@ clone_monitor_row_data(const struct
> ovsdb_monitor_table *mt,
>      return data;
>  }
>
> +/* Allocates an array of 'mt->n_columns' ovsdb_datums and initializes
> them as
> + * copies of the data in 'fields' drawn from the columns represented by
> + * mt->columns[].  Returns the array.
> + *
> + * If 'row' is NULL, returns NULL. */
> +static struct ovsdb_datum *
> +clone_monitor_row_data(const struct ovsdb_monitor_table *mt,
> +                       const struct ovsdb_datum *fields)
> +{
> +    struct ovsdb_datum *data;
> +    size_t i;
> +
> +    if (!fields) {
> +        return NULL;
> +    }
> +
> +    data = xmalloc(mt->n_columns * sizeof *data);
> +    for (i = 0; i < mt->n_columns; i++) {
> +        const struct ovsdb_column *c = mt->columns[i].column;
> +        const struct ovsdb_datum *src = &fields[c->index];
> +        struct ovsdb_datum *dst = &data[i];
> +        const struct ovsdb_type *type = &c->type;
> +
> +        ovsdb_datum_clone(dst, src, type);
> +    }
> +    return data;
> +}
> +
> +static void
> +clone_monitor_row(const struct ovsdb_monitor_table *mt,
> +                  struct ovsdb_monitor_row *to,
> +                  const struct ovsdb_monitor_row *from)
> +{
> +    to->uuid = from->uuid;
> +    to->old = clone_monitor_row_data(mt, from->old);
> +    to->new = clone_monitor_row_data(mt,from->new);
> +}
> +
>  /* Replaces the mt->n_columns ovsdb_datums in row[] by copies of the data
> from
>   * in 'row' drawn from the columns represented by mt->columns[]. */
>  static void
> @@ -393,6 +442,7 @@ ovsdb_monitor_add_table(struct ovsdb_monitor *m,
>      mt->dbmon = m;
>      shash_add(&m->tables, table->schema->name, mt);
>      hmap_init(&mt->changes);
> +    hmap_init(&mt->all);
>      mt->columns_index_map =
>          xmalloc(sizeof(unsigned int) *
> shash_count(&table->schema->columns));
>      for (i = 0; i < shash_count(&table->schema->columns); i++) {
> @@ -472,6 +522,13 @@ ovsdb_monitor_add_all_condition_columns(
>      }
>  }
>
> +bool
> +ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
> +                           const struct ovsdb_table *table)
> +{
> +    return shash_find_data(&m->tables, table->schema->name);
> +}
> +
>  /* Check for duplicated column names. Return the first
>   * duplicated column's name if found. Otherwise return
>   * NULL.  */
> @@ -499,9 +556,12 @@ ovsdb_monitor_table_check_duplicates(struct
> ovsdb_monitor *m,
>
>  static struct ovsdb_monitor_changes *
>  ovsdb_monitor_table_add_changes(struct ovsdb_monitor_table *mt,
> +                                enum ovsdb_monitor_changes_type type,
>                                  uint64_t next_txn)
>  {
>      struct ovsdb_monitor_changes *changes;
> +    struct hmap *changes_hmap =
> +        type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
>
>      changes = xzalloc(sizeof *changes);
>
> @@ -509,19 +569,22 @@ ovsdb_monitor_table_add_changes(struct
> ovsdb_monitor_table *mt,
>      changes->mt = mt;
>      changes->n_refs = 1;
>      hmap_init(&changes->rows);
> -    hmap_insert(&mt->changes, &changes->hmap_node, hash_uint64(next_txn));
> +    hmap_insert(changes_hmap, &changes->hmap_node, hash_uint64(next_txn));
>
>      return changes;
>  };
>
>  static struct ovsdb_monitor_changes *
>  ovsdb_monitor_table_find_changes(struct ovsdb_monitor_table *mt,
> +                                 enum ovsdb_monitor_changes_type type,
>                                   uint64_t transaction)
>  {
>      struct ovsdb_monitor_changes *changes;
> +    struct hmap *changes_hmap =
> +        type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
>      size_t hash = hash_uint64(transaction);
>
> -    HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, &mt->changes) {
> +    HMAP_FOR_EACH_WITH_HASH(changes, hmap_node, hash, changes_hmap) {
>          if (changes->transaction == transaction) {
>              return changes;
>          }
> @@ -533,13 +596,16 @@ ovsdb_monitor_table_find_changes(struct
> ovsdb_monitor_table *mt,
>  /* Stop currently tracking changes to table 'mt' since 'transaction'. */
>  static void
>  ovsdb_monitor_table_untrack_changes(struct ovsdb_monitor_table *mt,
> +                                    enum ovsdb_monitor_changes_type type,
>                                      uint64_t transaction)
>  {
> +    struct hmap *changes_hmap =
> +        type == OVSDB_MONITOR_CHANGES ? &mt->changes : &mt->all;
>      struct ovsdb_monitor_changes *changes =
> -                ovsdb_monitor_table_find_changes(mt, transaction);
> +        ovsdb_monitor_table_find_changes(mt, type, transaction);
>      if (changes) {
>          if (--changes->n_refs == 0) {
> -            hmap_remove(&mt->changes, &changes->hmap_node);
> +            hmap_remove(changes_hmap, &changes->hmap_node);
>              ovsdb_monitor_changes_destroy(changes);
>          }
>      }
> @@ -549,15 +615,16 @@ ovsdb_monitor_table_untrack_changes(struct
> ovsdb_monitor_table *mt,
>   */
>  static void
>  ovsdb_monitor_table_track_changes(struct ovsdb_monitor_table *mt,
> +                                  enum ovsdb_monitor_changes_type type,
>                                    uint64_t transaction)
>  {
>      struct ovsdb_monitor_changes *changes;
>
> -    changes = ovsdb_monitor_table_find_changes(mt, transaction);
> +    changes = ovsdb_monitor_table_find_changes(mt, type, transaction);
>      if (changes) {
>          changes->n_refs++;
>      } else {
> -        ovsdb_monitor_table_add_changes(mt, transaction);
> +        ovsdb_monitor_table_add_changes(mt, type, transaction);
>      }
>  }
>
> @@ -674,6 +741,46 @@ ovsdb_monitor_get_table_conditions(
>      return true;
>  }
>
> +struct ovsdb_error *
> +ovsdb_monitor_table_condition_update(
> +                            struct ovsdb_monitor *dbmon,
> +                            struct ovsdb_monitor_session_condition
> *condition,
> +                            const struct ovsdb_table *table,
> +                            const struct json *cond_json)
> +{
> +    struct ovsdb_monitor_table_condition *mtc =
> +        shash_find_data(&condition->tables, table->schema->name);
> +    struct ovsdb_error *error;
> +    struct ovsdb_condition cond = OVSDB_CONDITION_INITIALIZER;
> +    bool empty = ovsdb_condition_empty(&mtc->new_condition);
> +
> +    if (!condition) {
> +        return NULL;
> +    }
> +
> +    error = ovsdb_condition_from_json(table->schema, cond_json,
> +                                      NULL, &cond);
> +    if (error) {
> +        return error;
> +    }
> +
> +    ovsdb_condition_destroy(&mtc->new_condition);
> +    ovsdb_condition_clone(&mtc->new_condition, &cond);
>
I feel like I am missing something here: Should we copy copy the "old"
new_condition
to "old_condition" here?

> +
> +    if (empty && !ovsdb_condition_empty(&mtc->new_condition)) {
> +        condition->n_empty_cnd--;
> +    }
> +    if (!empty && ovsdb_condition_empty(&mtc->new_condition)) {
> +        condition->n_empty_cnd++;
> +    }

+

+    ovsdb_monitor_condition_add_columns(dbmon,
> +                                        table,
> +                                        &mtc->new_condition);
> +
> +    return NULL;
> +}
> +
>  static enum ovsdb_monitor_selection
>  ovsdb_monitor_row_update_type_condition(
>                        const struct ovsdb_monitor_table *mt,
> @@ -827,7 +934,7 @@ ovsdb_monitor_compose_row_update(
>   * for 'row' within * 'mt', or NULL if no row update should be sent.
>   *
>   * The caller should specify 'initial' as true if the returned JSON is
> - * going to be used as part of the initial reply to a "monitor2" request,
> + * going to be used as part of the initial reply to a "monitor_cond"
> request,
>   * false if it is going to be used as part of an "update2" notification.
>   *
>   * 'changed' must be a scratch buffer for internal use that is at least
> @@ -916,7 +1023,7 @@ ovsdb_monitor_max_columns(struct ovsdb_monitor *dbmon)
>  static struct json*
>  ovsdb_monitor_compose_update(
>                        struct ovsdb_monitor *dbmon,
> -                      bool initial, uint64_t transaction,
> +                      bool initial,  bool all_rows, uint64_t transaction,
>                        const struct ovsdb_monitor_session_condition
> *condition,
>                        compose_row_update_cb_func row_update)
>  {
> @@ -932,7 +1039,16 @@ ovsdb_monitor_compose_update(
>          struct ovsdb_monitor_changes *changes;
>          struct json *table_json = NULL;
>
> -        changes = ovsdb_monitor_table_find_changes(mt, transaction);
> +        if (!all_rows) {
> +            changes = ovsdb_monitor_table_find_changes(mt,
> +
>  OVSDB_MONITOR_CHANGES,
> +                                                       transaction);
> +        } else {
> +            /* Get changes that includes all rows from all_txn point in
> time */
> +            changes = ovsdb_monitor_table_find_changes(mt,
> +                                                       OVSDB_MONITOR_ALL,
> +                                                       transaction);
> +        }
>          if (!changes) {
>              continue;
>          }
> @@ -968,7 +1084,8 @@ ovsdb_monitor_compose_update(
>
>  /* Returns JSON for a <table-updates> object (as described in RFC 7047)
>   * for all the outstanding changes within 'monitor' that starts from
> - * '*unflushed' transaction id.
> + * '*unflushed'.
> + * If all_rows is true all_rows in the db that match conditions will be
> sent.
>   *
>   * The caller should specify 'initial' as true if the returned JSON is
> going to
>   * be used as part of the initial reply to a "monitor" request, false if
> it is
> @@ -976,7 +1093,8 @@ ovsdb_monitor_compose_update(
>  struct json *
>  ovsdb_monitor_get_update(
>               struct ovsdb_monitor *dbmon,
> -             bool initial, uint64_t *unflushed,
> +             bool initial, bool all_rows,
> +             uint64_t *unflushed,
>               const struct ovsdb_monitor_session_condition *condition,
>               enum ovsdb_monitor_version version)
>  {
> @@ -988,7 +1106,7 @@ ovsdb_monitor_get_update(
>
>      /* Return a clone of cached json if one exists. Otherwise,
>       * generate a new one and add it to the cache.  */
> -    if (!condition || (condition && ovsdb_can_cache(condition))) {
> +    if (!condition || (!all_rows && condition &&
> ovsdb_can_cache(condition))) {
>          cache_node = ovsdb_monitor_json_cache_search(dbmon, version,
> prev_txn);
>      }
>      if (cache_node) {
> @@ -996,17 +1114,19 @@ ovsdb_monitor_get_update(
>      } else {
>          if (version == OVSDB_MONITOR_V1) {
>              json =
> -               ovsdb_monitor_compose_update(dbmon, initial, prev_txn,
> -                                            condition,
> +               ovsdb_monitor_compose_update(dbmon, initial, all_rows,
> +                                            prev_txn, condition,
>
>  ovsdb_monitor_compose_row_update);
>          } else {
>              ovs_assert(version == OVSDB_MONITOR_V2);
>              json =
> -               ovsdb_monitor_compose_update(dbmon, initial, prev_txn,
> -                                            condition,
> +               ovsdb_monitor_compose_update(dbmon, initial, all_rows,
> +                                            prev_txn, condition,
>
>  ovsdb_monitor_compose_row_update2);
>          }
> -        if (!condition || (condition && ovsdb_can_cache(condition))) {
> +
> +        if (!condition ||
> +            (!all_rows && condition && ovsdb_can_cache(condition))) {
>              ovsdb_monitor_json_cache_insert(dbmon, version, prev_txn,
> json);
>          }
>      }
> @@ -1014,9 +1134,28 @@ ovsdb_monitor_get_update(
>      /* Maintain transaction id of 'changes'. */
>      SHASH_FOR_EACH (node, &dbmon->tables) {
>          struct ovsdb_monitor_table *mt = node->data;
> +        struct ovsdb_condition *old_condition, *new_condition;
>
> -        ovsdb_monitor_table_untrack_changes(mt, prev_txn);
> -        ovsdb_monitor_table_track_changes(mt, next_txn);
> +        if (!all_rows) {
> +            ovsdb_monitor_table_untrack_changes(mt,
> +                                                OVSDB_MONITOR_CHANGES,
> +                                                prev_txn);
> +        } else {
> +            ovsdb_monitor_table_untrack_changes(mt,
> +                                                OVSDB_MONITOR_ALL,
> +                                                prev_txn);
> +        }
> +        ovsdb_monitor_table_track_changes(mt, OVSDB_MONITOR_CHANGES,
> next_txn);
> +
> +        if (ovsdb_monitor_get_table_conditions(mt,
> +                                               condition,
> +                                               &old_condition,
> +                                               &new_condition)) {
> +            if (ovsdb_condition_cmp(old_condition, new_condition)) {
> +                ovsdb_condition_destroy(old_condition);
> +                ovsdb_condition_clone(old_condition, new_condition);
> +            }
> +        }
>      }
>      *unflushed = next_txn;
>
> @@ -1097,8 +1236,8 @@ ovsdb_monitor_changes_update(const struct ovsdb_row
> *old,
>          change = xzalloc(sizeof *change);
>          hmap_insert(&changes->rows, &change->hmap_node, uuid_hash(uuid));
>          change->uuid = *uuid;
> -        change->old = clone_monitor_row_data(mt, old);
> -        change->new = clone_monitor_row_data(mt, new);
> +        change->old = clone_monitor_ovsdb_row_data(mt, old);
> +        change->new = clone_monitor_ovsdb_row_data(mt, new);
>      } else {
>          if (new) {
>              update_monitor_row_data(mt, new, change->new);
> @@ -1115,6 +1254,20 @@ ovsdb_monitor_changes_update(const struct ovsdb_row
> *old,
>      }
>  }
>
> +static void
> +ovsdb_monitor_changes_clone_insert_row(const struct ovsdb_monitor_row
> *row,
> +                                       const struct ovsdb_monitor_table
> *mt,
> +                                       struct ovsdb_monitor_changes
> *changes)
> +{
> +    struct ovsdb_monitor_row *change;
> +
> +    ovs_assert(ovsdb_monitor_changes_row_find(changes, &row->uuid) ==
> NULL);
> +
> +    change = xzalloc(sizeof *change);
> +    hmap_insert(&changes->rows, &change->hmap_node,
> uuid_hash(&row->uuid));
> +    clone_monitor_row(mt, change, row);
> +}
> +
>  static bool
>  ovsdb_monitor_columns_changed(const struct ovsdb_monitor_table *mt,
>                                const unsigned long int *changed)
> @@ -1162,6 +1315,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
>      struct ovsdb_table *table = new ? new->table : old->table;
>      struct ovsdb_monitor_table *mt;
>      struct ovsdb_monitor_changes *changes;
> +    enum ovsdb_monitor_changes_efficacy efficacy;
> +    enum ovsdb_monitor_selection type;
>
>      if (!aux->mt || table != aux->mt->table) {
>          aux->mt = shash_find_data(&m->tables, table->schema->name);
> @@ -1173,16 +1328,17 @@ ovsdb_monitor_change_cb(const struct ovsdb_row
> *old,
>      }
>      mt = aux->mt;
>
> -    HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
> -        enum ovsdb_monitor_changes_efficacy efficacy;
> -        enum ovsdb_monitor_selection type;
> +    type = ovsdb_monitor_row_update_type(false, old, new);
> +    efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
>
> -        type = ovsdb_monitor_row_update_type(false, old, new);
> -        efficacy = ovsdb_monitor_changes_classify(type, mt, changed);
> -        if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
> +    if (efficacy > OVSDB_CHANGES_NO_EFFECT) {
> +        /* insert row change to changes lists */
> +        HMAP_FOR_EACH(changes, hmap_node, &mt->changes) {
> +            ovsdb_monitor_changes_update(old, new, mt, changes);
> +        }
> +        HMAP_FOR_EACH(changes, hmap_node, &mt->all) {
>              ovsdb_monitor_changes_update(old, new, mt, changes);
>          }
> -
>          if (aux->efficacy < efficacy) {
>              aux->efficacy = efficacy;
>          }
> @@ -1194,10 +1350,8 @@ ovsdb_monitor_change_cb(const struct ovsdb_row *old,
>  void
>  ovsdb_monitor_get_initial(const struct ovsdb_monitor *dbmon)
>  {
> -    struct ovsdb_monitor_aux aux;
>      struct shash_node *node;
>
> -    ovsdb_monitor_init_aux(&aux, dbmon);
>      SHASH_FOR_EACH (node, &dbmon->tables) {
>          struct ovsdb_monitor_table *mt = node->data;
>
> @@ -1205,9 +1359,14 @@ ovsdb_monitor_get_initial(const struct
> ovsdb_monitor *dbmon)
>              struct ovsdb_row *row;
>              struct ovsdb_monitor_changes *changes;
>
> -            changes = ovsdb_monitor_table_find_changes(mt, 0);
> +            changes = ovsdb_monitor_table_find_changes(mt,
> +
>  OVSDB_MONITOR_CHANGES,
> +                                                       0);
>              if (!changes) {
> -                changes = ovsdb_monitor_table_add_changes(mt, 0);
> +                changes =
> +                    ovsdb_monitor_table_add_changes(mt,
> +                                                    OVSDB_MONITOR_CHANGES,
> +                                                    0);
>                  HMAP_FOR_EACH (row, hmap_node, &mt->table->rows) {
>                      ovsdb_monitor_changes_update(NULL, row, mt, changes);
>                  }
> @@ -1218,6 +1377,53 @@ ovsdb_monitor_get_initial(const struct
> ovsdb_monitor *dbmon)
>      }
>  }
>
> +/* Record all rows in DB and mark this changes at unflushed tranaction id
> */
> +void
> +ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
> +                           uint64_t unflushed)
> +{
> +    struct shash_node *node;
> +
> +    SHASH_FOR_EACH (node, &dbmon->tables) {
> +        struct ovsdb_monitor_table *mt = node->data;
> +        struct ovsdb_row *new;
> +        struct ovsdb_monitor_changes *changes, *all_changes;
> +
> +        all_changes = ovsdb_monitor_table_find_changes(mt,
> +                                                       OVSDB_MONITOR_ALL,
> +                                                       unflushed);
> +        if (!all_changes) {
> +            all_changes = ovsdb_monitor_table_add_changes(mt,
> +
> OVSDB_MONITOR_ALL,
> +                                                          unflushed);
> +            changes = ovsdb_monitor_table_find_changes(mt,
> +
>  OVSDB_MONITOR_CHANGES,
> +                                                       unflushed);
> +            HMAP_FOR_EACH (new, hmap_node, &mt->table->rows) {
> +                struct ovsdb_monitor_row *row = NULL;
> +                if (changes) {
> +                    /* Check if we have a change record for this row */
> +                    row = ovsdb_monitor_changes_row_find(
> +                                                   changes,
> +
>  ovsdb_row_get_uuid(new));
> +                }
> +                if (row) {
> +                    ovsdb_monitor_changes_clone_insert_row(row, mt,
> +                                                           all_changes);
> +                } else {
> +                    ovsdb_monitor_changes_update(new, new, mt,
> all_changes);
> +                }
> +            }
> +        } else {
> +            all_changes->n_refs++;
> +        }
> +
> +        ovsdb_monitor_table_untrack_changes(mt,
> +                                            OVSDB_MONITOR_CHANGES,
> +                                            unflushed);
> +    }
> +}
> +
>
I don't feel I fully understand the design here. It seems all_changes are
only generate in the special case of cond update. Then I am not sure why it
should be
part of ovsdb_monitor_table . At any rate. this function can use some more
comments.

 ovsdb_monitor_remove_jsonrpc_monitor(struct ovsdb_monitor *dbmon,
>                     struct ovsdb_jsonrpc_monitor *jsonrpc_monitor)
> @@ -1367,7 +1573,12 @@ ovsdb_monitor_destroy(struct ovsdb_monitor *dbmon)
>              hmap_remove(&mt->changes, &changes->hmap_node);
>              ovsdb_monitor_changes_destroy(changes);
>          }
> +        HMAP_FOR_EACH_SAFE (changes, next, hmap_node, &mt->all) {
> +            hmap_remove(&mt->changes, &changes->hmap_node);
> +            ovsdb_monitor_changes_destroy(changes);
> +        }
>          hmap_destroy(&mt->changes);
> +        hmap_destroy(&mt->all);
>          free(mt->columns);
>          free(mt->columns_index_map);
>          free(mt);
> diff --git a/ovsdb/monitor.h b/ovsdb/monitor.h
> index 0529e5a..935c65f 100644
> --- a/ovsdb/monitor.h
> +++ b/ovsdb/monitor.h
> @@ -57,6 +57,10 @@ void ovsdb_monitor_remove_jsonrpc_monitor(struct
> ovsdb_monitor *dbmon,
>  void ovsdb_monitor_add_table(struct ovsdb_monitor *m,
>                               const struct ovsdb_table *table);
>
> +bool
> +ovsdb_monitor_table_exists(struct ovsdb_monitor *m,
> +                           const struct ovsdb_table *table);
> +
>  void ovsdb_monitor_add_column(struct ovsdb_monitor *dbmon,
>                                const struct ovsdb_table *table,
>                                const struct ovsdb_column *column,
> @@ -70,12 +74,12 @@ const char * OVS_WARN_UNUSED_RESULT
>  ovsdb_monitor_table_check_duplicates(struct ovsdb_monitor *,
>                            const struct ovsdb_table *);
>
> -struct json *ovsdb_monitor_get_update(
> -               struct ovsdb_monitor *dbmon,
> -               bool initial,
> -               uint64_t *unflushed_transaction,
> -               const struct ovsdb_monitor_session_condition *condition,
> -               enum ovsdb_monitor_version version);
> +struct json *ovsdb_monitor_get_update(struct ovsdb_monitor *dbmon,
> +                                      bool initial,
> +                                      bool all_rows,
> +                                      uint64_t *unflushed_transaction,
> +                                      const struct
> ovsdb_monitor_session_condition *condition,
> +                                      enum ovsdb_monitor_version version);
>
>  void ovsdb_monitor_table_add_select(struct ovsdb_monitor *dbmon,
>                                      const struct ovsdb_table *table,
> @@ -105,8 +109,18 @@ ovsdb_monitor_table_condition_add(
>                            const struct ovsdb_table *table,
>                            const struct json *json_cnd);
>
> +void ovsdb_monitor_get_all_rows(const struct ovsdb_monitor *dbmon,
> +                                uint64_t unflushed);
> +
>  void ovsdb_monitor_session_condition_bind(
>                             const struct ovsdb_monitor_session_condition *,
>                             const struct ovsdb_monitor *);
>
> +struct ovsdb_error *
> +ovsdb_monitor_table_condition_update(
> +                           struct ovsdb_monitor *dbmon,
> +                           struct ovsdb_monitor_session_condition
> *condition,
> +                           const struct ovsdb_table *table,
> +                           const struct json *cond_json);
> +
>  #endif
> --
> 2.1.4
>
>
> _______________________________________________
> dev mailing list
> dev@openvswitch.org
> http://openvswitch.org/mailman/listinfo/dev
>
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to