Thanks for the patch

I think the caller of dp_netdev_execute_actions() should always pass a valid 
timestamp.  We can pass it from aux->now to dp_execute_userspace_actions(), we 
can add it to fast_path_processing() so that it can be passed down to 
handle_packet_upcall().  In the other cases it's fine to call time_msec(), 
we're in the slow path anyway.

One more thing: I think we should avoid XPS entirely if there are enough txqs, 
to avoid any possible locks and even writing tx->last_used.


Thanks,

Daniele

On 13/07/2016 05:34, "Ilya Maximets" <i.maxim...@samsung.com> wrote:

>If CPU number in pmd-cpu-mask is not divisible by the number of queues and
>in a few more complex situations there may be unfair distribution of TX
>queue-ids between PMD threads.
>
>For example, if we have 2 ports with 4 queues and 6 CPUs in pmd-cpu-mask
>such distribution is possible:
><------------------------------------------------------------------------>
>pmd thread numa_id 0 core_id 13:
>        port: vhost-user1       queue-id: 1
>        port: dpdk0     queue-id: 3
>pmd thread numa_id 0 core_id 14:
>        port: vhost-user1       queue-id: 2
>pmd thread numa_id 0 core_id 16:
>        port: dpdk0     queue-id: 0
>pmd thread numa_id 0 core_id 17:
>        port: dpdk0     queue-id: 1
>pmd thread numa_id 0 core_id 12:
>        port: vhost-user1       queue-id: 0
>        port: dpdk0     queue-id: 2
>pmd thread numa_id 0 core_id 15:
>        port: vhost-user1       queue-id: 3
><------------------------------------------------------------------------>
>
>As we can see above dpdk0 port polled by threads on cores:
>       12, 13, 16 and 17.
>
>By design of dpif-netdev, there is only one TX queue-id assigned to each
>pmd thread. This queue-id's are sequential similar to core-id's. And
>thread will send packets to queue with exact this queue-id regardless
>of port.
>
>In previous example:
>
>       pmd thread on core 12 will send packets to tx queue 0
>       pmd thread on core 13 will send packets to tx queue 1
>       ...
>       pmd thread on core 17 will send packets to tx queue 5
>
>So, for dpdk0 port after truncating in netdev-dpdk:
>
>       core 12 --> TX queue-id 0 % 4 == 0
>       core 13 --> TX queue-id 1 % 4 == 1
>       core 16 --> TX queue-id 4 % 4 == 0
>       core 17 --> TX queue-id 5 % 4 == 1
>
>As a result only 2 of 4 queues used.
>
>To fix this issue some kind of XPS implemented in following way:
>
>       * TX queue-ids are allocated dynamically.
>       * When PMD thread first time tries to send packets to new port
>         it allocates less used TX queue for this port.
>       * PMD threads periodically performes revalidation of
>         allocated TX queue-ids. If queue wasn't used in last
>         XPS_TIMEOUT_MS milliseconds it will be freed while revalidation.
>
>Reported-by: Zhihong Wang <zhihong.w...@intel.com>
>Signed-off-by: Ilya Maximets <i.maxim...@samsung.com>
>---
> lib/dpif-netdev.c | 170 +++++++++++++++++++++++++++++++++++++-----------------
> 1 file changed, 117 insertions(+), 53 deletions(-)
>
>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>index e0107b7..6345944 100644
>--- a/lib/dpif-netdev.c
>+++ b/lib/dpif-netdev.c
>@@ -248,6 +248,8 @@ enum pmd_cycles_counter_type {
>     PMD_N_CYCLES
> };
> 
>+#define XPS_TIMEOUT_MS 500LL
>+
> /* A port in a netdev-based datapath. */
> struct dp_netdev_port {
>     odp_port_t port_no;
>@@ -256,6 +258,8 @@ struct dp_netdev_port {
>     struct netdev_saved_flags *sf;
>     unsigned n_rxq;             /* Number of elements in 'rxq' */
>     struct netdev_rxq **rxq;
>+    unsigned *txq_used;         /* Number of threads that uses each tx queue. 
>*/
>+    struct ovs_mutex txq_used_mutex;
>     char *type;                 /* Port type as requested by user. */
> };
> 
>@@ -384,8 +388,9 @@ struct rxq_poll {
> 
> /* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */
> struct tx_port {
>-    odp_port_t port_no;
>-    struct netdev *netdev;
>+    struct dp_netdev_port *port;
>+    int qid;
>+    long long last_used;
>     struct hmap_node node;
> };
> 
>@@ -498,7 +503,8 @@ static void dp_netdev_execute_actions(struct 
>dp_netdev_pmd_thread *pmd,
>                                       struct dp_packet_batch *,
>                                       bool may_steal,
>                                       const struct nlattr *actions,
>-                                      size_t actions_len);
>+                                      size_t actions_len,
>+                                      long long now);
> static void dp_netdev_input(struct dp_netdev_pmd_thread *,
>                             struct dp_packet_batch *, odp_port_t port_no);
> static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
>@@ -541,6 +547,12 @@ static void dp_netdev_pmd_flow_flush(struct 
>dp_netdev_pmd_thread *pmd);
> static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
>     OVS_REQUIRES(pmd->port_mutex);
> 
>+static void
>+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
>+                               long long now, bool purge);
>+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
>+                                      struct tx_port *tx, long long now);
>+
> static inline bool emc_entry_alive(struct emc_entry *ce);
> static void emc_clear_entry(struct emc_entry *ce);
> 
>@@ -1185,7 +1197,9 @@ port_create(const char *devname, const char *open_type, 
>const char *type,
>     port->netdev = netdev;
>     port->n_rxq = netdev_n_rxq(netdev);
>     port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq);
>+    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
>     port->type = xstrdup(type);
>+    ovs_mutex_init(&port->txq_used_mutex);
> 
>     for (i = 0; i < port->n_rxq; i++) {
>         error = netdev_rxq_open(netdev, &port->rxq[i], i);
>@@ -1211,7 +1225,9 @@ out_rxq_close:
>     for (i = 0; i < n_open_rxqs; i++) {
>         netdev_rxq_close(port->rxq[i]);
>     }
>+    ovs_mutex_destroy(&port->txq_used_mutex);
>     free(port->type);
>+    free(port->txq_used);
>     free(port->rxq);
>     free(port);
> 
>@@ -1351,7 +1367,8 @@ port_destroy(struct dp_netdev_port *port)
>     for (unsigned i = 0; i < port->n_rxq; i++) {
>         netdev_rxq_close(port->rxq[i]);
>     }
>-
>+    ovs_mutex_destroy(&port->txq_used_mutex);
>+    free(port->txq_used);
>     free(port->rxq);
>     free(port->type);
>     free(port);
>@@ -1374,13 +1391,6 @@ get_port_by_name(struct dp_netdev *dp,
> }
> 
> static int
>-get_n_pmd_threads(struct dp_netdev *dp)
>-{
>-    /* There is one non pmd thread in dp->poll_threads */
>-    return cmap_count(&dp->poll_threads) - 1;
>-}
>-
>-static int
> get_n_pmd_threads_on_numa(struct dp_netdev *dp, int numa_id)
> {
>     struct dp_netdev_pmd_thread *pmd;
>@@ -2476,7 +2486,7 @@ dpif_netdev_execute(struct dpif *dpif, struct 
>dpif_execute *execute)
> 
>     packet_batch_init_packet(&pp, execute->packet);
>     dp_netdev_execute_actions(pmd, &pp, false, execute->actions,
>-                              execute->actions_len);
>+                              execute->actions_len, 0);
> 
>     if (pmd->core_id == NON_PMD_CORE_ID) {
>         ovs_mutex_unlock(&dp->non_pmd_mutex);
>@@ -2660,6 +2670,10 @@ port_reconfigure(struct dp_netdev_port *port)
>     }
>     /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
>     port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev));
>+    /* Realloc 'used' counters for tx queues. */
>+    free(port->txq_used);
>+    port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
>+
>     for (i = 0; i < netdev_n_rxq(netdev); i++) {
>         err = netdev_rxq_open(netdev, &port->rxq[i], i);
>         if (err) {
>@@ -2737,6 +2751,7 @@ dpif_netdev_run(struct dpif *dpif)
>             }
>         }
>     }
>+    dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
>     ovs_mutex_unlock(&dp->non_pmd_mutex);
> 
>     dp_netdev_pmd_unref(non_pmd);
>@@ -2786,6 +2801,9 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
> {
>     struct tx_port *tx_port_cached;
> 
>+    /* Free all used tx queue ids. */
>+    dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
>+
>     HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) {
>         free(tx_port_cached);
>     }
>@@ -2805,7 +2823,7 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
>     HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
>         tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
>         hmap_insert(&pmd->port_cache, &tx_port_cached->node,
>-                    hash_port_no(tx_port_cached->port_no));
>+                    hash_port_no(tx_port_cached->port->port_no));
>     }
> }
> 
>@@ -3021,11 +3039,6 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread 
>*pmd, struct dp_netdev *dp,
>     pmd->numa_id = numa_id;
>     pmd->poll_cnt = 0;
> 
>-    atomic_init(&pmd->tx_qid,
>-                (core_id == NON_PMD_CORE_ID)
>-                ? ovs_numa_get_n_cores()
>-                : get_n_pmd_threads(dp));
>-
>     ovs_refcount_init(&pmd->ref_cnt);
>     latch_init(&pmd->exit_latch);
>     atomic_init(&pmd->change_seq, PMD_INITIAL_SEQ);
>@@ -3116,18 +3129,16 @@ dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
>     free(pmd_list);
> }
> 
>-/* Deletes all pmd threads on numa node 'numa_id' and
>- * fixes tx_qids of other threads to keep them sequential. */
>+/* Deletes all pmd threads on numa node 'numa_id'. */
> static void
> dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
> {
>     struct dp_netdev_pmd_thread *pmd;
>-    int n_pmds_on_numa, n_pmds;
>-    int *free_idx, k = 0;
>+    int n_pmds_on_numa;
>+    int k = 0;
>     struct dp_netdev_pmd_thread **pmd_list;
> 
>     n_pmds_on_numa = get_n_pmd_threads_on_numa(dp, numa_id);
>-    free_idx = xcalloc(n_pmds_on_numa, sizeof *free_idx);
>     pmd_list = xcalloc(n_pmds_on_numa, sizeof *pmd_list);
> 
>     CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>@@ -3135,7 +3146,6 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int 
>numa_id)
>          * 'dp->poll_threads' (while we're iterating it) and it
>          * might quiesce. */
>         if (pmd->numa_id == numa_id) {
>-            atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);
>             pmd_list[k] = pmd;
>             ovs_assert(k < n_pmds_on_numa);
>             k++;
>@@ -3146,21 +3156,7 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int 
>numa_id)
>         dp_netdev_del_pmd(dp, pmd_list[i]);
>     }
> 
>-    n_pmds = get_n_pmd_threads(dp);
>-    CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
>-        int old_tx_qid;
>-
>-        atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid);
>-
>-        if (old_tx_qid >= n_pmds) {
>-            int new_tx_qid = free_idx[--k];
>-
>-            atomic_store_relaxed(&pmd->tx_qid, new_tx_qid);
>-        }
>-    }
>-
>     free(pmd_list);
>-    free(free_idx);
> }
> 
> /* Deletes all rx queues from pmd->poll_list and all the ports from
>@@ -3188,7 +3184,7 @@ tx_port_lookup(const struct hmap *hmap, odp_port_t 
>port_no)
>     struct tx_port *tx;
> 
>     HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
>-        if (tx->port_no == port_no) {
>+        if (tx->port->port_no == port_no) {
>             return tx;
>         }
>     }
>@@ -3313,11 +3309,11 @@ dp_netdev_add_port_tx_to_pmd(struct 
>dp_netdev_pmd_thread *pmd,
> {
>     struct tx_port *tx = xzalloc(sizeof *tx);
> 
>-    tx->netdev = port->netdev;
>-    tx->port_no = port->port_no;
>+    tx->port = port;
>+    tx->qid = -1;
> 
>     ovs_mutex_lock(&pmd->port_mutex);
>-    hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no));
>+    hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
>     ovs_mutex_unlock(&pmd->port_mutex);
> }
> 
>@@ -3658,7 +3654,7 @@ packet_batch_per_flow_execute(struct 
>packet_batch_per_flow *batch,
>     actions = dp_netdev_flow_get_actions(flow);
> 
>     dp_netdev_execute_actions(pmd, &batch->array, true,
>-                              actions->actions, actions->size);
>+                              actions->actions, actions->size, now);
> }
> 
> static inline void
>@@ -3785,7 +3781,7 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, 
>struct dp_packet *packet,
>      * we'll send the packet up twice. */
>     packet_batch_init_packet(&b, packet);
>     dp_netdev_execute_actions(pmd, &b, true,
>-                              actions->data, actions->size);
>+                              actions->data, actions->size, 0);
> 
>     add_actions = put_actions->size ? put_actions : actions;
>     if (OVS_LIKELY(error != ENOSPC)) {
>@@ -3954,6 +3950,7 @@ dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
> 
> struct dp_netdev_execute_aux {
>     struct dp_netdev_pmd_thread *pmd;
>+    long long now;
> };
> 
> static void
>@@ -3974,6 +3971,74 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif, 
>upcall_callback *cb,
>     dp->upcall_cb = cb;
> }
> 
>+static void
>+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
>+                               long long now, bool purge)
>+{
>+    struct tx_port *tx;
>+    struct dp_netdev_port *port;
>+    long long interval;
>+
>+    HMAP_FOR_EACH (tx, node, &pmd->port_cache) {
>+        interval = now - tx->last_used;
>+        if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
>+            port = tx->port;
>+            ovs_mutex_lock(&port->txq_used_mutex);
>+            port->txq_used[tx->qid]--;
>+            ovs_mutex_unlock(&port->txq_used_mutex);
>+            tx->qid = -1;
>+        }
>+    }
>+}
>+
>+static int
>+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
>+                           struct tx_port *tx, long long now)
>+{
>+    struct dp_netdev_port *port;
>+    long long interval;
>+    int i, min_cnt, min_qid;
>+
>+    if (OVS_UNLIKELY(!now)) {
>+        now = time_msec();
>+    }
>+
>+    interval = now - tx->last_used;
>+    tx->last_used = now;
>+
>+    if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
>+        return tx->qid;
>+    }
>+
>+    port = tx->port;
>+
>+    ovs_mutex_lock(&port->txq_used_mutex);
>+    if (tx->qid >= 0) {
>+        port->txq_used[tx->qid]--;
>+        tx->qid = -1;
>+    }
>+
>+    min_cnt = -1;
>+    min_qid = 0;
>+    for (i = 0; i < netdev_n_txq(port->netdev); i++) {
>+        if (port->txq_used[i] < min_cnt || min_cnt == -1) {
>+            min_cnt = port->txq_used[i];
>+            min_qid = i;
>+        }
>+    }
>+
>+    port->txq_used[min_qid]++;
>+    tx->qid = min_qid;
>+
>+    ovs_mutex_unlock(&port->txq_used_mutex);
>+
>+    dpif_netdev_xps_revalidate_pmd(pmd, now, false);
>+
>+    VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
>+             pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
>+    return min_qid;
>+}
>+
> static struct tx_port *
> pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd,
>                          odp_port_t port_no)
>@@ -3997,7 +4062,7 @@ push_tnl_action(const struct dp_netdev_pmd_thread *pmd,
>         err = -EINVAL;
>         goto error;
>     }
>-    err = netdev_push_header(tun_port->netdev, batch, data);
>+    err = netdev_push_header(tun_port->port->netdev, batch, data);
>     if (!err) {
>         return 0;
>     }
>@@ -4024,7 +4089,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread 
>*pmd,
>     if (!error || error == ENOSPC) {
>         packet_batch_init_packet(&b, packet);
>         dp_netdev_execute_actions(pmd, &b, may_steal,
>-                                  actions->data, actions->size);
>+                                  actions->data, actions->size, 0);
>     } else if (may_steal) {
>         dp_packet_delete(packet);
>     }
>@@ -4045,11 +4110,9 @@ dp_execute_cb(void *aux_, struct dp_packet_batch 
>*packets_,
>     case OVS_ACTION_ATTR_OUTPUT:
>         p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a)));
>         if (OVS_LIKELY(p)) {
>-            int tx_qid;
>-
>-            atomic_read_relaxed(&pmd->tx_qid, &tx_qid);
>+            int tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, aux->now);
> 
>-            netdev_send(p->netdev, tx_qid, packets_, may_steal);
>+            netdev_send(p->port->netdev, tx_qid, packets_, may_steal);
>             return;
>         }
>         break;
>@@ -4096,7 +4159,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch 
>*packets_,
> 
>                 dp_packet_batch_apply_cutlen(packets_);
> 
>-                netdev_pop_header(p->netdev, packets_);
>+                netdev_pop_header(p->port->netdev, packets_);
>                 if (!packets_->count) {
>                     return;
>                 }
>@@ -4210,9 +4273,10 @@ static void
> dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
>                           struct dp_packet_batch *packets,
>                           bool may_steal,
>-                          const struct nlattr *actions, size_t actions_len)
>+                          const struct nlattr *actions, size_t actions_len,
>+                          long long now)
> {
>-    struct dp_netdev_execute_aux aux = { pmd };
>+    struct dp_netdev_execute_aux aux = { pmd, now };
> 
>     odp_execute_actions(&aux, packets, may_steal, actions,
>                         actions_len, dp_execute_cb);
>-- 
>2.7.4
>
_______________________________________________
dev mailing list
dev@openvswitch.org
http://openvswitch.org/mailman/listinfo/dev

Reply via email to