Thank you.
Comments inline.

On 23.01.2016 04:21, Daniele Di Proietto wrote:
> Hi Ilya,
> 
> Thank you very much for the patch.
> 
> I definitely like that the queue assignment is performed by the
> main thread: not only is less bug-prone, but the logic will be more
> easily customizable.
> 
> I absolutely welcome the changes to do_add_port and do_del_port to
> keep the queues to the currently assigned threads.
> 
> I think we can avoid pausing and resuming the threads each time and,
> instead, leave the current reloading logic unaltered. Here's a way:
> 
> * pmd_thread_main() would be identical to master.  pmd_load_queues(),
>   instead, would return a poll_list by copying the struct rxq_poll
>   from 'pmd->poll_list'.
> * do_add_port() and do_del_port() would still write on the pmd
>   specific lists while the threads are running.  After updating
>   a list for a pmd thread, they would call dp_netdev_reload_pmd__().
>
> This behaviour should still fix the bugs, but it requires less
> sychronization.  What do you think?

Originally there was 3 or 4 ideas of how to implement synchronization
between main and pmd thread. This was one of them. Yes, I agree that
this requires less synchronization. It seemed to me that it is more
difficult to implement. But, apparently, I was wrong.

> I don't think this should
> create any problems to the following patch, right?

Yes. Just a little fix.

> 
> I've prepared an incremental on top of this patch to illustrate the
> idea, but other ideas/implementations/fixes are welcome.

Thanks for this.

1.
There is a race in your implementation. While loading
queues of newly started pmd threads main thread can modify
poll_list. So, pmd_load_queues must be called with
poll_mutex locked.

2.
There is a copying of references to rxq_poll in pmd_load_queues().
I think, ref/unref of corresponding ports needed.

3.
Having poll_mutex and condition variable in pmd_thread_main
simultaneously is an over-synchronization. As soon as poll_mutex
is mandatory here and we actually don't need to wait for a pmd
thread at some particular point, condition variable may be
removed with dp_netdev_pmd_reload_done() and cond_mutex.
Must be no problems here if port_ref/unref done.

I'll prepare new version soon based on your suggestion and
this thoughts.

Best regards, Ilya Maximets.

> 
> Thanks,
> 
> Daniele
> 
> ----------------------------------------
> diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> index fd6ac48..3f5cf42 100644
> --- a/lib/dpif-netdev.c
> +++ b/lib/dpif-netdev.c
> @@ -480,7 +480,7 @@ static void dp_netdev_input(struct
> dp_netdev_pmd_thread *,
>                              struct dp_packet **, int cnt);
>  
>  static void dp_netdev_disable_upcall(struct dp_netdev *);
> -void dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd);
> +void dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd);
>  static void dp_netdev_configure_pmd(struct dp_netdev_pmd_thread *pmd,
>                                      struct dp_netdev *dp, int index,
>                                      unsigned core_id, int numa_id);
> @@ -1026,9 +1026,8 @@ dpif_netdev_get_stats(const struct dpif *dpif,
> struct dpif_dp_stats *stats)
>      return 0;
>  }
>  
> -/* Causes pmd thread to break from infinite polling cycle. */
>  static void
> -dp_netdev_break_pmd__(struct dp_netdev_pmd_thread *pmd)
> +dp_netdev_reload_pmd__(struct dp_netdev_pmd_thread *pmd)
>  {
>      int old_seq;
>  
> @@ -1042,39 +1041,6 @@ dp_netdev_break_pmd__(struct dp_netdev_pmd_thread
> *pmd)
>      ovs_mutex_unlock(&pmd->cond_mutex);
>  }
>  
> -/* Causes pmd thread to break from infinite polling cycle and
> - * lock on poll_mutex.  Not applicable for non-PMD threads. */
> -static void
> -dp_netdev_pause_pmd__(struct dp_netdev_pmd_thread *pmd)
> -    OVS_ACQUIRES(pmd->poll_mutex)
> -{
> -    int old_seq;
> -
> -    ovs_assert(pmd->core_id != NON_PMD_CORE_ID);
> -
> -    /* Wait until pmd thread starts polling cycle to
> -     * avoid deadlock. */
> -    while (!ovs_mutex_trylock(&pmd->poll_mutex)) {
> -        ovs_mutex_unlock(&pmd->poll_mutex);
> -    }
> -
> -    ovs_mutex_lock(&pmd->cond_mutex);
> -    atomic_add_relaxed(&pmd->change_seq, 1, &old_seq);
> -    ovs_mutex_lock(&pmd->poll_mutex);
> -    ovs_mutex_cond_wait(&pmd->cond, &pmd->cond_mutex);
> -    ovs_mutex_unlock(&pmd->cond_mutex);
> -}
> -
> -/* Unlocks pmd thread by unlocking poll_mutex.
> - * Not applicable for non-PMD threads. */
> -static void
> -dp_netdev_resume_pmd__(struct dp_netdev_pmd_thread *pmd)
> -    OVS_RELEASES(pmd->poll_mutex)
> -{
> -    ovs_assert(pmd->core_id != NON_PMD_CORE_ID);
> -    ovs_mutex_unlock(&pmd->poll_mutex);
> -}
> -
>  static uint32_t
>  hash_port_no(odp_port_t port_no)
>  {
> @@ -1181,9 +1147,10 @@ do_add_port(struct dp_netdev *dp, const char
> *devname, const char *type,
>                  break;
>              }
>  
> -            dp_netdev_pause_pmd__(pmd);
> +            ovs_mutex_lock(&pmd->poll_mutex);
>              dp_netdev_add_rxq_to_pmd(pmd, port, port->rxq[i]);
> -            dp_netdev_resume_pmd__(pmd);
> +            ovs_mutex_unlock(&pmd->poll_mutex);
> +            dp_netdev_reload_pmd__(pmd);
>          }
>      }
>      seq_change(dp->port_seq);
> @@ -1366,29 +1333,27 @@ do_del_port(struct dp_netdev *dp, struct
> dp_netdev_port *port)
>              dp_netdev_del_pmds_on_numa(dp, numa_id);
>          }
>          else {
> -            bool found;
>              struct dp_netdev_pmd_thread *pmd;
>              struct rxq_poll *poll, *next;
>  
>              CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>                  if (pmd->numa_id == numa_id) {
> -                    found = false;
> -                    dp_netdev_pause_pmd__(pmd);
> +                    bool found = false;
> +
> +                    ovs_mutex_lock(&pmd->poll_mutex);
>                      LIST_FOR_EACH_SAFE (poll, next, node,
> &pmd->poll_list) {
>                          if (poll->port == port) {
> +                            found = true;
>                              port_unref(poll->port);
>                              list_remove(&poll->node);
>                              pmd->poll_cnt--;
>                              free(poll);
> -                            found = true;
>                          }
>                      }
> +                    ovs_mutex_unlock(&pmd->poll_mutex);
>                      if (found) {
> -                        /* Clean up emc cache if poll_list modified. */
> -                        emc_cache_uninit(&pmd->flow_cache);
> -                        emc_cache_init(&pmd->flow_cache);
> +                        dp_netdev_reload_pmd__(pmd);
>                      }
> -                    dp_netdev_resume_pmd__(pmd);
>                  }
>              }
>          }
> @@ -2656,28 +2621,56 @@ dpif_netdev_wait(struct dpif *dpif)
>      seq_wait(tnl_conf_seq, dp->last_tnl_conf_seq);
>  }
>  
> +static int
> +pmd_load_queues(struct dp_netdev_pmd_thread *pmd,
> +                struct rxq_poll **ppoll_list)
> +{
> +    struct rxq_poll *poll_list = *ppoll_list;
> +    struct rxq_poll *poll;
> +    int i = 0;
> +
> +    poll_list = xrealloc(poll_list, pmd->poll_cnt * sizeof *poll_list);
> +
> +    LIST_FOR_EACH (poll, node, &pmd->poll_list) {
> +        poll_list[i++] = *poll;
> +    }
> +
> +    *ppoll_list = poll_list;
> +    return pmd->poll_cnt;
> +}
> +
>  static void *
>  pmd_thread_main(void *f_)
>  {
>      struct dp_netdev_pmd_thread *pmd = f_;
> -    struct rxq_poll *poll;
>      unsigned int lc = 0;
> +    struct rxq_poll *poll_list;
>      unsigned int port_seq = PMD_INITIAL_SEQ;
> +    int poll_cnt;
> +    int i;
> +
> +    poll_cnt = 0;
> +    poll_list = NULL;
>  
>      /* Stores the pmd thread's 'pmd' to 'per_pmd_key'. */
>      ovsthread_setspecific(pmd->dp->per_pmd_key, pmd);
>      pmd_thread_setaffinity_cpu(pmd->core_id);
>  reload:
> -    ovs_mutex_lock(&pmd->poll_mutex);
> +    emc_cache_init(&pmd->flow_cache);
> +    poll_cnt = pmd_load_queues(pmd, &poll_list);
> +
>      /* List port/core affinity */
> -    LIST_FOR_EACH (poll, node, &pmd->poll_list) {
> -       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
> -                 netdev_get_name(poll->port->netdev));
> +    for (i = 0; i < poll_cnt; i++) {
> +       VLOG_INFO("Core %d processing port \'%s\'\n", pmd->core_id,
> netdev_get_name(poll_list[i].port->netdev));
>      }
>  
> +    /* Signal here to make sure the pmd finishes
> +     * reloading the updated configuration. */
> +    dp_netdev_pmd_reload_done(pmd);
> +
>      for (;;) {
> -        LIST_FOR_EACH (poll, node, &pmd->poll_list) {
> -            dp_netdev_process_rxq_port(pmd, poll->port, poll->rx);
> +        for (i = 0; i < poll_cnt; i++) {
> +            dp_netdev_process_rxq_port(pmd, poll_list[i].port,
> poll_list[i].rx);
>          }
>  
>          if (lc++ > 1024) {
> @@ -2696,15 +2689,16 @@ reload:
>              }
>          }
>      }
> -    ovs_mutex_unlock(&pmd->poll_mutex);
>  
> -    /* Synchronize with breaker thread. */
> -    dp_netdev_pmd_break_done(pmd);
> +    emc_cache_uninit(&pmd->flow_cache);
>  
> -    if (!latch_is_set(&pmd->exit_latch)) {
> +    if (!latch_is_set(&pmd->exit_latch)){
>          goto reload;
>      }
>  
> +    dp_netdev_pmd_reload_done(pmd);
> +
> +    free(poll_list);
>      return NULL;
>  }
>  
> @@ -2739,7 +2733,7 @@ dpif_netdev_enable_upcall(struct dpif *dpif)
>  }
>  
>  void
> -dp_netdev_pmd_break_done(struct dp_netdev_pmd_thread *pmd)
> +dp_netdev_pmd_reload_done(struct dp_netdev_pmd_thread *pmd)
>  {
>      ovs_mutex_lock(&pmd->cond_mutex);
>      xpthread_cond_signal(&pmd->cond);
> @@ -2843,8 +2837,11 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread
> *pmd, struct dp_netdev *dp,
>      dpcls_init(&pmd->cls);
>      cmap_init(&pmd->flow_table);
>      list_init(&pmd->poll_list);
> -    emc_cache_init(&pmd->flow_cache);
> -
> +    /* init the 'flow_cache' since there is no
> +     * actual thread created for NON_PMD_CORE_ID. */
> +    if (core_id == NON_PMD_CORE_ID) {
> +        emc_cache_init(&pmd->flow_cache);
> +    }
>      cmap_insert(&dp->poll_threads, CONST_CAST(struct cmap_node *,
> &pmd->node),
>                  hash_int(core_id, 0));
>  }
> @@ -2870,11 +2867,13 @@ dp_netdev_del_pmd(struct dp_netdev *dp, struct
> dp_netdev_pmd_thread *pmd)
>  {
>      struct rxq_poll *poll;
>  
> -    emc_cache_uninit(&pmd->flow_cache);
> -
> -    if (pmd->core_id != NON_PMD_CORE_ID) {
> +    /* Uninit the 'flow_cache' since there is
> +     * no actual thread uninit it for NON_PMD_CORE_ID. */
> +    if (pmd->core_id == NON_PMD_CORE_ID) {
> +        emc_cache_uninit(&pmd->flow_cache);
> +    } else {
>          latch_set(&pmd->exit_latch);
> -        dp_netdev_break_pmd__(pmd);
> +        dp_netdev_reload_pmd__(pmd);
>          ovs_numa_unpin_core(pmd->core_id);
>          xpthread_join(pmd->thread, NULL);
>      }
> 

_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to