I don't think dynamic_txqs should be atomic, since we change it when the pmd
threads are stopped.
Also, in port_create() we should check for 'netdev_n_txq(netdev) < n_cores + 1'
after we reconfigure the device.
Other than that this looks good to me, so I applied the following incremental
and pushed this to master.
Thanks,
Daniele
---8<---
diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
index d1ba6f3..d45aba0 100644
--- a/lib/dpif-netdev.c
+++ b/lib/dpif-netdev.c
@@ -258,7 +258,7 @@ struct dp_netdev_port {
struct netdev_saved_flags *sf;
unsigned n_rxq; /* Number of elements in 'rxq' */
struct netdev_rxq **rxq;
- atomic_bool dynamic_txqs; /* If true XPS will be used. */
+ bool dynamic_txqs; /* If true XPS will be used. */
unsigned *txq_used; /* Number of threads that uses each tx queue.
*/
struct ovs_mutex txq_used_mutex;
char *type; /* Port type as requested by user. */
@@ -1151,6 +1151,7 @@ port_create(const char *devname, const char *open_type,
const char *type,
enum netdev_flags flags;
struct netdev *netdev;
int n_open_rxqs = 0;
+ int n_cores = 0;
int i, error;
bool dynamic_txqs = false;
@@ -1171,7 +1172,7 @@ port_create(const char *devname, const char *open_type,
const char *type,
}
if (netdev_is_pmd(netdev)) {
- int n_cores = ovs_numa_get_n_cores();
+ n_cores = ovs_numa_get_n_cores();
if (n_cores == OVS_CORE_UNSPEC) {
VLOG_ERR("%s, cannot get cpu core info", devname);
@@ -1186,9 +1187,6 @@ port_create(const char *devname, const char *open_type,
const char *type,
VLOG_ERR("%s, cannot set multiq", devname);
goto out;
}
- if (netdev_n_txq(netdev) < n_cores + 1) {
- dynamic_txqs = true;
- }
}
if (netdev_is_reconf_required(netdev)) {
@@ -1198,6 +1196,12 @@ port_create(const char *devname, const char *open_type,
const char *type,
}
}
+ if (netdev_is_pmd(netdev)) {
+ if (netdev_n_txq(netdev) < n_cores + 1) {
+ dynamic_txqs = true;
+ }
+ }
+
port = xzalloc(sizeof *port);
port->port_no = port_no;
port->netdev = netdev;
@@ -1206,7 +1210,7 @@ port_create(const char *devname, const char *open_type,
const char *type,
port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
port->type = xstrdup(type);
ovs_mutex_init(&port->txq_used_mutex);
- atomic_init(&port->dynamic_txqs, dynamic_txqs);
+ port->dynamic_txqs = dynamic_txqs;
for (i = 0; i < port->n_rxq; i++) {
error = netdev_rxq_open(netdev, &port->rxq[i], i);
@@ -2718,8 +2722,7 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
seq_change(dp->port_seq);
port_destroy(port);
} else {
- atomic_init(&port->dynamic_txqs,
- netdev_n_txq(port->netdev) < n_cores + 1);
+ port->dynamic_txqs = netdev_n_txq(port->netdev) < n_cores + 1;
}
}
/* Restores the non-pmd. */
@@ -4015,11 +4018,9 @@ dpif_netdev_xps_revalidate_pmd(const struct
dp_netdev_pmd_thread *pmd,
struct tx_port *tx;
struct dp_netdev_port *port;
long long interval;
- bool dynamic_txqs;
HMAP_FOR_EACH (tx, node, &pmd->port_cache) {
- atomic_read_relaxed(&tx->port->dynamic_txqs, &dynamic_txqs);
- if (dynamic_txqs) {
+ if (tx->port->dynamic_txqs) {
continue;
}
interval = now - tx->last_used;
@@ -4156,7 +4157,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
*packets_,
int tx_qid;
bool dynamic_txqs;
- atomic_read_relaxed(&p->port->dynamic_txqs, &dynamic_txqs);
+ dynamic_txqs = p->port->dynamic_txqs;
if (dynamic_txqs) {
tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
} else {
---8<---
On 27/07/2016 07:44, "Ilya Maximets" <[email protected]> wrote:
>If CPU number in pmd-cpu-mask is not divisible by the number of queues and
>in a few more complex situations there may be unfair distribution of TX
>queue-ids between PMD threads.
>
>For example, if we have 2 ports with 4 queues and 6 CPUs in pmd-cpu-mask
>such distribution is possible:
><------------------------------------------------------------------------>
>pmd thread numa_id 0 core_id 13:
> port: vhost-user1 queue-id: 1
> port: dpdk0 queue-id: 3
>pmd thread numa_id 0 core_id 14:
> port: vhost-user1 queue-id: 2
>pmd thread numa_id 0 core_id 16:
> port: dpdk0 queue-id: 0
>pmd thread numa_id 0 core_id 17:
> port: dpdk0 queue-id: 1
>pmd thread numa_id 0 core_id 12:
> port: vhost-user1 queue-id: 0
> port: dpdk0 queue-id: 2
>pmd thread numa_id 0 core_id 15:
> port: vhost-user1 queue-id: 3
><------------------------------------------------------------------------>
>
>As we can see above dpdk0 port polled by threads on cores:
> 12, 13, 16 and 17.
>
>By design of dpif-netdev, there is only one TX queue-id assigned to each
>pmd thread. This queue-id's are sequential similar to core-id's. And
>thread will send packets to queue with exact this queue-id regardless
>of port.
>
>In previous example:
>
> pmd thread on core 12 will send packets to tx queue 0
> pmd thread on core 13 will send packets to tx queue 1
> ...
> pmd thread on core 17 will send packets to tx queue 5
>
>So, for dpdk0 port after truncating in netdev-dpdk:
>
> core 12 --> TX queue-id 0 % 4 == 0
> core 13 --> TX queue-id 1 % 4 == 1
> core 16 --> TX queue-id 4 % 4 == 0
> core 17 --> TX queue-id 5 % 4 == 1
>
>As a result only 2 of 4 queues used.
>
>To fix this issue some kind of XPS implemented in following way:
>
> * TX queue-ids are allocated dynamically.
> * When PMD thread first time tries to send packets to new port
> it allocates less used TX queue for this port.
> * PMD threads periodically performes revalidation of
> allocated TX queue-ids. If queue wasn't used in last
> XPS_TIMEOUT_MS milliseconds it will be freed while revalidation.
> * XPS is not working if we have enough TX queues.
>
>Reported-by: Zhihong Wang <[email protected]>
>Signed-off-by: Ilya Maximets <[email protected]>
>---
> lib/dpif-netdev.c | 204 ++++++++++++++++++++++++++++++++++++++++----------
> lib/netdev-bsd.c | 3 +-
> lib/netdev-dpdk.c | 32 +++-----
> lib/netdev-dummy.c | 3 +-
> lib/netdev-linux.c | 3 +-
> lib/netdev-provider.h | 11 +--
> lib/netdev.c | 13 ++--
> lib/netdev.h | 2 +-
> 8 files changed, 198 insertions(+), 73 deletions(-)
>
>diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
>index f05ca4e..d1ba6f3 100644
>--- a/lib/dpif-netdev.c
>+++ b/lib/dpif-netdev.c
>@@ -248,6 +248,8 @@ enum pmd_cycles_counter_type {
> PMD_N_CYCLES
> };
>
>+#define XPS_TIMEOUT_MS 500LL
>+
> /* A port in a netdev-based datapath. */
> struct dp_netdev_port {
> odp_port_t port_no;
>@@ -256,6 +258,9 @@ struct dp_netdev_port {
> struct netdev_saved_flags *sf;
> unsigned n_rxq; /* Number of elements in 'rxq' */
> struct netdev_rxq **rxq;
>+ atomic_bool dynamic_txqs; /* If true XPS will be used. */
>+ unsigned *txq_used; /* Number of threads that uses each tx queue.
>*/
>+ struct ovs_mutex txq_used_mutex;
> char *type; /* Port type as requested by user. */
> };
>
>@@ -384,8 +389,9 @@ struct rxq_poll {
>
> /* Contained by struct dp_netdev_pmd_thread's 'port_cache' or 'tx_ports'. */
> struct tx_port {
>- odp_port_t port_no;
>- struct netdev *netdev;
>+ struct dp_netdev_port *port;
>+ int qid;
>+ long long last_used;
> struct hmap_node node;
> };
>
>@@ -443,9 +449,10 @@ struct dp_netdev_pmd_thread {
> unsigned core_id; /* CPU core id of this pmd thread. */
> int numa_id; /* numa node id of this pmd thread. */
>
>- /* Queue id used by this pmd thread to send packets on all netdevs.
>- * All tx_qid's are unique and less than 'ovs_numa_get_n_cores() + 1'. */
>- atomic_int tx_qid;
>+ /* Queue id used by this pmd thread to send packets on all netdevs if
>+ * XPS disabled for this netdev. All static_tx_qid's are unique and less
>+ * than 'ovs_numa_get_n_cores() + 1'. */
>+ atomic_int static_tx_qid;
>
> struct ovs_mutex port_mutex; /* Mutex for 'poll_list' and 'tx_ports'.
> */
> /* List of rx queues to poll. */
>@@ -498,7 +505,8 @@ static void dp_netdev_execute_actions(struct
>dp_netdev_pmd_thread *pmd,
> struct dp_packet_batch *,
> bool may_steal,
> const struct nlattr *actions,
>- size_t actions_len);
>+ size_t actions_len,
>+ long long now);
> static void dp_netdev_input(struct dp_netdev_pmd_thread *,
> struct dp_packet_batch *, odp_port_t port_no);
> static void dp_netdev_recirculate(struct dp_netdev_pmd_thread *,
>@@ -541,6 +549,12 @@ static void dp_netdev_pmd_flow_flush(struct
>dp_netdev_pmd_thread *pmd);
> static void pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
> OVS_REQUIRES(pmd->port_mutex);
>
>+static void
>+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
>+ long long now, bool purge);
>+static int dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
>+ struct tx_port *tx, long long now);
>+
> static inline bool emc_entry_alive(struct emc_entry *ce);
> static void emc_clear_entry(struct emc_entry *ce);
>
>@@ -1138,6 +1152,7 @@ port_create(const char *devname, const char *open_type,
>const char *type,
> struct netdev *netdev;
> int n_open_rxqs = 0;
> int i, error;
>+ bool dynamic_txqs = false;
>
> *portp = NULL;
>
>@@ -1171,6 +1186,9 @@ port_create(const char *devname, const char *open_type,
>const char *type,
> VLOG_ERR("%s, cannot set multiq", devname);
> goto out;
> }
>+ if (netdev_n_txq(netdev) < n_cores + 1) {
>+ dynamic_txqs = true;
>+ }
> }
>
> if (netdev_is_reconf_required(netdev)) {
>@@ -1185,7 +1203,10 @@ port_create(const char *devname, const char *open_type,
>const char *type,
> port->netdev = netdev;
> port->n_rxq = netdev_n_rxq(netdev);
> port->rxq = xcalloc(port->n_rxq, sizeof *port->rxq);
>+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
> port->type = xstrdup(type);
>+ ovs_mutex_init(&port->txq_used_mutex);
>+ atomic_init(&port->dynamic_txqs, dynamic_txqs);
>
> for (i = 0; i < port->n_rxq; i++) {
> error = netdev_rxq_open(netdev, &port->rxq[i], i);
>@@ -1211,7 +1232,9 @@ out_rxq_close:
> for (i = 0; i < n_open_rxqs; i++) {
> netdev_rxq_close(port->rxq[i]);
> }
>+ ovs_mutex_destroy(&port->txq_used_mutex);
> free(port->type);
>+ free(port->txq_used);
> free(port->rxq);
> free(port);
>
>@@ -1351,7 +1374,8 @@ port_destroy(struct dp_netdev_port *port)
> for (unsigned i = 0; i < port->n_rxq; i++) {
> netdev_rxq_close(port->rxq[i]);
> }
>-
>+ ovs_mutex_destroy(&port->txq_used_mutex);
>+ free(port->txq_used);
> free(port->rxq);
> free(port->type);
> free(port);
>@@ -2476,7 +2500,7 @@ dpif_netdev_execute(struct dpif *dpif, struct
>dpif_execute *execute)
>
> packet_batch_init_packet(&pp, execute->packet);
> dp_netdev_execute_actions(pmd, &pp, false, execute->actions,
>- execute->actions_len);
>+ execute->actions_len, time_msec());
>
> if (pmd->core_id == NON_PMD_CORE_ID) {
> ovs_mutex_unlock(&dp->non_pmd_mutex);
>@@ -2650,6 +2674,10 @@ port_reconfigure(struct dp_netdev_port *port)
> }
> /* If the netdev_reconfigure() above succeeds, reopens the 'rxq's. */
> port->rxq = xrealloc(port->rxq, sizeof *port->rxq * netdev_n_rxq(netdev));
>+ /* Realloc 'used' counters for tx queues. */
>+ free(port->txq_used);
>+ port->txq_used = xcalloc(netdev_n_txq(netdev), sizeof *port->txq_used);
>+
> for (i = 0; i < netdev_n_rxq(netdev); i++) {
> err = netdev_rxq_open(netdev, &port->rxq[i], i);
> if (err) {
>@@ -2666,9 +2694,21 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
> OVS_REQUIRES(dp->port_mutex)
> {
> struct dp_netdev_port *port, *next;
>+ int n_cores;
>
> dp_netdev_destroy_all_pmds(dp);
>
>+ /* Reconfigures the cpu mask. */
>+ ovs_numa_set_cpu_mask(dp->requested_pmd_cmask);
>+ free(dp->pmd_cmask);
>+ dp->pmd_cmask = nullable_xstrdup(dp->requested_pmd_cmask);
>+
>+ n_cores = ovs_numa_get_n_cores();
>+ if (n_cores == OVS_CORE_UNSPEC) {
>+ VLOG_ERR("Cannot get cpu core info");
>+ return;
>+ }
>+
> HMAP_FOR_EACH_SAFE (port, next, node, &dp->ports) {
> int err;
>
>@@ -2677,13 +2717,11 @@ reconfigure_pmd_threads(struct dp_netdev *dp)
> hmap_remove(&dp->ports, &port->node);
> seq_change(dp->port_seq);
> port_destroy(port);
>+ } else {
>+ atomic_init(&port->dynamic_txqs,
>+ netdev_n_txq(port->netdev) < n_cores + 1);
> }
> }
>- /* Reconfigures the cpu mask. */
>- ovs_numa_set_cpu_mask(dp->requested_pmd_cmask);
>- free(dp->pmd_cmask);
>- dp->pmd_cmask = nullable_xstrdup(dp->requested_pmd_cmask);
>-
> /* Restores the non-pmd. */
> dp_netdev_set_nonpmd(dp);
> /* Restores all pmd threads. */
>@@ -2727,6 +2765,7 @@ dpif_netdev_run(struct dpif *dpif)
> }
> }
> }
>+ dpif_netdev_xps_revalidate_pmd(non_pmd, time_msec(), false);
> ovs_mutex_unlock(&dp->non_pmd_mutex);
>
> dp_netdev_pmd_unref(non_pmd);
>@@ -2776,6 +2815,9 @@ pmd_free_cached_ports(struct dp_netdev_pmd_thread *pmd)
> {
> struct tx_port *tx_port_cached;
>
>+ /* Free all used tx queue ids. */
>+ dpif_netdev_xps_revalidate_pmd(pmd, 0, true);
>+
> HMAP_FOR_EACH_POP (tx_port_cached, node, &pmd->port_cache) {
> free(tx_port_cached);
> }
>@@ -2795,7 +2837,7 @@ pmd_load_cached_ports(struct dp_netdev_pmd_thread *pmd)
> HMAP_FOR_EACH (tx_port, node, &pmd->tx_ports) {
> tx_port_cached = xmemdup(tx_port, sizeof *tx_port_cached);
> hmap_insert(&pmd->port_cache, &tx_port_cached->node,
>- hash_port_no(tx_port_cached->port_no));
>+ hash_port_no(tx_port_cached->port->port_no));
> }
> }
>
>@@ -3011,7 +3053,7 @@ dp_netdev_configure_pmd(struct dp_netdev_pmd_thread
>*pmd, struct dp_netdev *dp,
> pmd->numa_id = numa_id;
> pmd->poll_cnt = 0;
>
>- atomic_init(&pmd->tx_qid,
>+ atomic_init(&pmd->static_tx_qid,
> (core_id == NON_PMD_CORE_ID)
> ? ovs_numa_get_n_cores()
> : get_n_pmd_threads(dp));
>@@ -3107,7 +3149,7 @@ dp_netdev_destroy_all_pmds(struct dp_netdev *dp)
> }
>
> /* Deletes all pmd threads on numa node 'numa_id' and
>- * fixes tx_qids of other threads to keep them sequential. */
>+ * fixes static_tx_qids of other threads to keep them sequential. */
> static void
> dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int numa_id)
> {
>@@ -3125,7 +3167,7 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int
>numa_id)
> * 'dp->poll_threads' (while we're iterating it) and it
> * might quiesce. */
> if (pmd->numa_id == numa_id) {
>- atomic_read_relaxed(&pmd->tx_qid, &free_idx[k]);
>+ atomic_read_relaxed(&pmd->static_tx_qid, &free_idx[k]);
> pmd_list[k] = pmd;
> ovs_assert(k < n_pmds_on_numa);
> k++;
>@@ -3140,12 +3182,12 @@ dp_netdev_del_pmds_on_numa(struct dp_netdev *dp, int
>numa_id)
> CMAP_FOR_EACH (pmd, node, &dp->poll_threads) {
> int old_tx_qid;
>
>- atomic_read_relaxed(&pmd->tx_qid, &old_tx_qid);
>+ atomic_read_relaxed(&pmd->static_tx_qid, &old_tx_qid);
>
> if (old_tx_qid >= n_pmds) {
> int new_tx_qid = free_idx[--k];
>
>- atomic_store_relaxed(&pmd->tx_qid, new_tx_qid);
>+ atomic_store_relaxed(&pmd->static_tx_qid, new_tx_qid);
> }
> }
>
>@@ -3178,7 +3220,7 @@ tx_port_lookup(const struct hmap *hmap, odp_port_t
>port_no)
> struct tx_port *tx;
>
> HMAP_FOR_EACH_IN_BUCKET (tx, node, hash_port_no(port_no), hmap) {
>- if (tx->port_no == port_no) {
>+ if (tx->port->port_no == port_no) {
> return tx;
> }
> }
>@@ -3303,11 +3345,11 @@ dp_netdev_add_port_tx_to_pmd(struct
>dp_netdev_pmd_thread *pmd,
> {
> struct tx_port *tx = xzalloc(sizeof *tx);
>
>- tx->netdev = port->netdev;
>- tx->port_no = port->port_no;
>+ tx->port = port;
>+ tx->qid = -1;
>
> ovs_mutex_lock(&pmd->port_mutex);
>- hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port_no));
>+ hmap_insert(&pmd->tx_ports, &tx->node, hash_port_no(tx->port->port_no));
> ovs_mutex_unlock(&pmd->port_mutex);
> }
>
>@@ -3648,7 +3690,7 @@ packet_batch_per_flow_execute(struct
>packet_batch_per_flow *batch,
> actions = dp_netdev_flow_get_actions(flow);
>
> dp_netdev_execute_actions(pmd, &batch->array, true,
>- actions->actions, actions->size);
>+ actions->actions, actions->size, now);
> }
>
> static inline void
>@@ -3736,7 +3778,7 @@ static inline void
> handle_packet_upcall(struct dp_netdev_pmd_thread *pmd, struct dp_packet
> *packet,
> const struct netdev_flow_key *key,
> struct ofpbuf *actions, struct ofpbuf *put_actions,
>- int *lost_cnt)
>+ int *lost_cnt, long long now)
> {
> struct ofpbuf *add_actions;
> struct dp_packet_batch b;
>@@ -3775,7 +3817,7 @@ handle_packet_upcall(struct dp_netdev_pmd_thread *pmd,
>struct dp_packet *packet,
> * we'll send the packet up twice. */
> packet_batch_init_packet(&b, packet);
> dp_netdev_execute_actions(pmd, &b, true,
>- actions->data, actions->size);
>+ actions->data, actions->size, now);
>
> add_actions = put_actions->size ? put_actions : actions;
> if (OVS_LIKELY(error != ENOSPC)) {
>@@ -3804,7 +3846,8 @@ static inline void
> fast_path_processing(struct dp_netdev_pmd_thread *pmd,
> struct dp_packet_batch *packets_,
> struct netdev_flow_key *keys,
>- struct packet_batch_per_flow batches[], size_t
>*n_batches)
>+ struct packet_batch_per_flow batches[], size_t
>*n_batches,
>+ long long now)
> {
> int cnt = packets_->count;
> #if !defined(__CHECKER__) && !defined(_WIN32)
>@@ -3850,8 +3893,8 @@ fast_path_processing(struct dp_netdev_pmd_thread *pmd,
> }
>
> miss_cnt++;
>- handle_packet_upcall(pmd, packets[i], &keys[i], &actions,
>&put_actions,
>- &lost_cnt);
>+ handle_packet_upcall(pmd, packets[i], &keys[i], &actions,
>+ &put_actions, &lost_cnt, now);
> }
>
> ofpbuf_uninit(&actions);
>@@ -3915,7 +3958,7 @@ dp_netdev_input__(struct dp_netdev_pmd_thread *pmd,
> md_is_valid, port_no);
> if (OVS_UNLIKELY(newcnt)) {
> packets->count = newcnt;
>- fast_path_processing(pmd, packets, keys, batches, &n_batches);
>+ fast_path_processing(pmd, packets, keys, batches, &n_batches, now);
> }
>
> for (i = 0; i < n_batches; i++) {
>@@ -3944,6 +3987,7 @@ dp_netdev_recirculate(struct dp_netdev_pmd_thread *pmd,
>
> struct dp_netdev_execute_aux {
> struct dp_netdev_pmd_thread *pmd;
>+ long long now;
> };
>
> static void
>@@ -3964,6 +4008,79 @@ dpif_netdev_register_upcall_cb(struct dpif *dpif,
>upcall_callback *cb,
> dp->upcall_cb = cb;
> }
>
>+static void
>+dpif_netdev_xps_revalidate_pmd(const struct dp_netdev_pmd_thread *pmd,
>+ long long now, bool purge)
>+{
>+ struct tx_port *tx;
>+ struct dp_netdev_port *port;
>+ long long interval;
>+ bool dynamic_txqs;
>+
>+ HMAP_FOR_EACH (tx, node, &pmd->port_cache) {
>+ atomic_read_relaxed(&tx->port->dynamic_txqs, &dynamic_txqs);
>+ if (dynamic_txqs) {
>+ continue;
>+ }
>+ interval = now - tx->last_used;
>+ if (tx->qid >= 0 && (purge || interval >= XPS_TIMEOUT_MS)) {
>+ port = tx->port;
>+ ovs_mutex_lock(&port->txq_used_mutex);
>+ port->txq_used[tx->qid]--;
>+ ovs_mutex_unlock(&port->txq_used_mutex);
>+ tx->qid = -1;
>+ }
>+ }
>+}
>+
>+static int
>+dpif_netdev_xps_get_tx_qid(const struct dp_netdev_pmd_thread *pmd,
>+ struct tx_port *tx, long long now)
>+{
>+ struct dp_netdev_port *port;
>+ long long interval;
>+ int i, min_cnt, min_qid;
>+
>+ if (OVS_UNLIKELY(!now)) {
>+ now = time_msec();
>+ }
>+
>+ interval = now - tx->last_used;
>+ tx->last_used = now;
>+
>+ if (OVS_LIKELY(tx->qid >= 0 && interval < XPS_TIMEOUT_MS)) {
>+ return tx->qid;
>+ }
>+
>+ port = tx->port;
>+
>+ ovs_mutex_lock(&port->txq_used_mutex);
>+ if (tx->qid >= 0) {
>+ port->txq_used[tx->qid]--;
>+ tx->qid = -1;
>+ }
>+
>+ min_cnt = -1;
>+ min_qid = 0;
>+ for (i = 0; i < netdev_n_txq(port->netdev); i++) {
>+ if (port->txq_used[i] < min_cnt || min_cnt == -1) {
>+ min_cnt = port->txq_used[i];
>+ min_qid = i;
>+ }
>+ }
>+
>+ port->txq_used[min_qid]++;
>+ tx->qid = min_qid;
>+
>+ ovs_mutex_unlock(&port->txq_used_mutex);
>+
>+ dpif_netdev_xps_revalidate_pmd(pmd, now, false);
>+
>+ VLOG_DBG("Core %d: New TX queue ID %d for port \'%s\'.",
>+ pmd->core_id, tx->qid, netdev_get_name(tx->port->netdev));
>+ return min_qid;
>+}
>+
> static struct tx_port *
> pmd_tx_port_cache_lookup(const struct dp_netdev_pmd_thread *pmd,
> odp_port_t port_no)
>@@ -3987,7 +4104,7 @@ push_tnl_action(const struct dp_netdev_pmd_thread *pmd,
> err = -EINVAL;
> goto error;
> }
>- err = netdev_push_header(tun_port->netdev, batch, data);
>+ err = netdev_push_header(tun_port->port->netdev, batch, data);
> if (!err) {
> return 0;
> }
>@@ -4001,7 +4118,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread
>*pmd,
> struct dp_packet *packet, bool may_steal,
> struct flow *flow, ovs_u128 *ufid,
> struct ofpbuf *actions,
>- const struct nlattr *userdata)
>+ const struct nlattr *userdata, long long now)
> {
> struct dp_packet_batch b;
> int error;
>@@ -4014,7 +4131,7 @@ dp_execute_userspace_action(struct dp_netdev_pmd_thread
>*pmd,
> if (!error || error == ENOSPC) {
> packet_batch_init_packet(&b, packet);
> dp_netdev_execute_actions(pmd, &b, may_steal,
>- actions->data, actions->size);
>+ actions->data, actions->size, now);
> } else if (may_steal) {
> dp_packet_delete(packet);
> }
>@@ -4029,6 +4146,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>*packets_,
> struct dp_netdev_pmd_thread *pmd = aux->pmd;
> struct dp_netdev *dp = pmd->dp;
> int type = nl_attr_type(a);
>+ long long now = aux->now;
> struct tx_port *p;
>
> switch ((enum ovs_action_attr)type) {
>@@ -4036,10 +4154,17 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>*packets_,
> p = pmd_tx_port_cache_lookup(pmd, u32_to_odp(nl_attr_get_u32(a)));
> if (OVS_LIKELY(p)) {
> int tx_qid;
>+ bool dynamic_txqs;
>
>- atomic_read_relaxed(&pmd->tx_qid, &tx_qid);
>+ atomic_read_relaxed(&p->port->dynamic_txqs, &dynamic_txqs);
>+ if (dynamic_txqs) {
>+ tx_qid = dpif_netdev_xps_get_tx_qid(pmd, p, now);
>+ } else {
>+ atomic_read_relaxed(&pmd->static_tx_qid, &tx_qid);
>+ }
>
>- netdev_send(p->netdev, tx_qid, packets_, may_steal);
>+ netdev_send(p->port->netdev, tx_qid, packets_, may_steal,
>+ dynamic_txqs);
> return;
> }
> break;
>@@ -4086,7 +4211,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>*packets_,
>
> dp_packet_batch_apply_cutlen(packets_);
>
>- netdev_pop_header(p->netdev, packets_);
>+ netdev_pop_header(p->port->netdev, packets_);
> if (!packets_->count) {
> return;
> }
>@@ -4134,7 +4259,7 @@ dp_execute_cb(void *aux_, struct dp_packet_batch
>*packets_,
> flow_extract(packets[i], &flow);
> dpif_flow_hash(dp->dpif, &flow, sizeof flow, &ufid);
> dp_execute_userspace_action(pmd, packets[i], may_steal, &flow,
>- &ufid, &actions, userdata);
>+ &ufid, &actions, userdata, now);
> }
>
> if (clone) {
>@@ -4200,9 +4325,10 @@ static void
> dp_netdev_execute_actions(struct dp_netdev_pmd_thread *pmd,
> struct dp_packet_batch *packets,
> bool may_steal,
>- const struct nlattr *actions, size_t actions_len)
>+ const struct nlattr *actions, size_t actions_len,
>+ long long now)
> {
>- struct dp_netdev_execute_aux aux = { pmd };
>+ struct dp_netdev_execute_aux aux = { pmd, now };
>
> odp_execute_actions(&aux, packets, may_steal, actions,
> actions_len, dp_execute_cb);
>diff --git a/lib/netdev-bsd.c b/lib/netdev-bsd.c
>index f963c6e..2f57c1a 100644
>--- a/lib/netdev-bsd.c
>+++ b/lib/netdev-bsd.c
>@@ -680,7 +680,8 @@ netdev_bsd_rxq_drain(struct netdev_rxq *rxq_)
> */
> static int
> netdev_bsd_send(struct netdev *netdev_, int qid OVS_UNUSED,
>- struct dp_packet_batch *batch, bool may_steal)
>+ struct dp_packet_batch *batch, bool may_steal,
>+ bool concurrent_txq OVS_UNUSED)
> {
> struct netdev_bsd *dev = netdev_bsd_cast(netdev_);
> const char *name = netdev_get_name(netdev_);
>diff --git a/lib/netdev-dpdk.c b/lib/netdev-dpdk.c
>index af87f18..c208f32 100644
>--- a/lib/netdev-dpdk.c
>+++ b/lib/netdev-dpdk.c
>@@ -298,7 +298,7 @@ struct dpdk_tx_queue {
> rte_spinlock_t tx_lock; /* Protects the members and the NIC queue
> * from concurrent access. It is used only
> * if the queue is shared among different
>- * pmd threads (see 'txq_needs_locking').
>*/
>+ * pmd threads (see 'concurrent_txq'). */
> int map; /* Mapping of configured vhost-user queues
> * to enabled by guest. */
> };
>@@ -349,13 +349,6 @@ struct netdev_dpdk {
> struct rte_eth_link link;
> int link_reset_cnt;
>
>- /* Caller of netdev_send() might want to use more txqs than the device
>has.
>- * For physical NICs, if the 'requested_n_txq' less or equal to
>'up.n_txq',
>- * 'txq_needs_locking' is false, otherwise it is true and we will take a
>- * spinlock on transmission. For vhost devices, 'requested_n_txq' is
>- * always true. */
>- bool txq_needs_locking;
>-
> /* virtio-net structure for vhost device */
> OVSRCU_TYPE(struct virtio_net *) virtio_dev;
>
>@@ -778,10 +771,8 @@ netdev_dpdk_init(struct netdev *netdev, unsigned int
>port_no,
> goto unlock;
> }
> netdev_dpdk_alloc_txq(dev, netdev->n_txq);
>- dev->txq_needs_locking = netdev->n_txq < dev->requested_n_txq;
> } else {
> netdev_dpdk_alloc_txq(dev, OVS_VHOST_MAX_QUEUE_NUM);
>- dev->txq_needs_locking = true;
> /* Enable DPDK_DEV_VHOST device and set promiscuous mode flag. */
> dev->flags = NETDEV_UP | NETDEV_PROMISC;
> }
>@@ -1468,7 +1459,7 @@ dpdk_do_tx_copy(struct netdev *netdev, int qid, struct
>dp_packet_batch *batch)
> static int
> netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
> struct dp_packet_batch *batch,
>- bool may_steal)
>+ bool may_steal, bool concurrent_txq OVS_UNUSED)
> {
>
> if (OVS_UNLIKELY(batch->packets[0]->source != DPBUF_DPDK)) {
>@@ -1484,9 +1475,10 @@ netdev_dpdk_vhost_send(struct netdev *netdev, int qid,
>
> static inline void
> netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
>- struct dp_packet_batch *batch, bool may_steal)
>+ struct dp_packet_batch *batch, bool may_steal,
>+ bool concurrent_txq)
> {
>- if (OVS_UNLIKELY(dev->txq_needs_locking)) {
>+ if (OVS_UNLIKELY(concurrent_txq)) {
> qid = qid % dev->up.n_txq;
> rte_spinlock_lock(&dev->tx_q[qid].tx_lock);
> }
>@@ -1551,18 +1543,19 @@ netdev_dpdk_send__(struct netdev_dpdk *dev, int qid,
> }
> }
>
>- if (OVS_UNLIKELY(dev->txq_needs_locking)) {
>+ if (OVS_UNLIKELY(concurrent_txq)) {
> rte_spinlock_unlock(&dev->tx_q[qid].tx_lock);
> }
> }
>
> static int
> netdev_dpdk_eth_send(struct netdev *netdev, int qid,
>- struct dp_packet_batch *batch, bool may_steal)
>+ struct dp_packet_batch *batch, bool may_steal,
>+ bool concurrent_txq)
> {
> struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
>
>- netdev_dpdk_send__(dev, qid, batch, may_steal);
>+ netdev_dpdk_send__(dev, qid, batch, may_steal, concurrent_txq);
> return 0;
> }
>
>@@ -2533,7 +2526,8 @@ dpdk_ring_open(const char dev_name[], unsigned int
>*eth_port_id)
>
> static int
> netdev_dpdk_ring_send(struct netdev *netdev, int qid,
>- struct dp_packet_batch *batch, bool may_steal)
>+ struct dp_packet_batch *batch, bool may_steal,
>+ bool concurrent_txq)
> {
> struct netdev_dpdk *dev = netdev_dpdk_cast(netdev);
> unsigned i;
>@@ -2546,7 +2540,7 @@ netdev_dpdk_ring_send(struct netdev *netdev, int qid,
> dp_packet_rss_invalidate(batch->packets[i]);
> }
>
>- netdev_dpdk_send__(dev, qid, batch, may_steal);
>+ netdev_dpdk_send__(dev, qid, batch, may_steal, concurrent_txq);
> return 0;
> }
>
>@@ -2823,8 +2817,6 @@ netdev_dpdk_reconfigure(struct netdev *netdev)
> err = dpdk_eth_dev_init(dev);
> netdev_dpdk_alloc_txq(dev, netdev->n_txq);
>
>- dev->txq_needs_locking = netdev->n_txq < dev->requested_n_txq;
>-
> out:
>
> ovs_mutex_unlock(&dev->mutex);
>diff --git a/lib/netdev-dummy.c b/lib/netdev-dummy.c
>index a95f7bb..813ce69 100644
>--- a/lib/netdev-dummy.c
>+++ b/lib/netdev-dummy.c
>@@ -1034,7 +1034,8 @@ netdev_dummy_rxq_drain(struct netdev_rxq *rxq_)
>
> static int
> netdev_dummy_send(struct netdev *netdev, int qid OVS_UNUSED,
>- struct dp_packet_batch *batch, bool may_steal)
>+ struct dp_packet_batch *batch, bool may_steal,
>+ bool concurrent_txq OVS_UNUSED)
> {
> struct netdev_dummy *dev = netdev_dummy_cast(netdev);
> int error = 0;
>diff --git a/lib/netdev-linux.c b/lib/netdev-linux.c
>index c71a3df..2da8c18 100644
>--- a/lib/netdev-linux.c
>+++ b/lib/netdev-linux.c
>@@ -1161,7 +1161,8 @@ netdev_linux_rxq_drain(struct netdev_rxq *rxq_)
> * expected to do additional queuing of packets. */
> static int
> netdev_linux_send(struct netdev *netdev_, int qid OVS_UNUSED,
>- struct dp_packet_batch *batch, bool may_steal)
>+ struct dp_packet_batch *batch, bool may_steal,
>+ bool concurrent_txq OVS_UNUSED)
> {
> int i;
> int error = 0;
>diff --git a/lib/netdev-provider.h b/lib/netdev-provider.h
>index 915a5a5..41fa9e7 100644
>--- a/lib/netdev-provider.h
>+++ b/lib/netdev-provider.h
>@@ -303,10 +303,6 @@ struct netdev_class {
> * otherwise a positive errno value.
> *
> * 'n_txq' specifies the exact number of transmission queues to create.
>- * The caller will call netdev_send() concurrently from 'n_txq' different
>- * threads (with different qid). The netdev provider is responsible for
>- * making sure that these concurrent calls do not create a race condition
>- * by using multiple hw queues or locking.
> *
> * The caller will call netdev_reconfigure() (if necessary) before using
> * netdev_send() on any of the newly configured queues, giving the
> provider
>@@ -328,6 +324,11 @@ struct netdev_class {
> * packets. If 'may_steal' is true, the caller transfers ownership of all
> * the packets to the network device, regardless of success.
> *
>+ * If 'concurrent_txq' is true, the caller may perform concurrent calls
>+ * to netdev_send() with the same 'qid'. The netdev provider is
>responsible
>+ * for making sure that these concurrent calls do not create a race
>+ * condition by using locking or other synchronization if required.
>+ *
> * The network device is expected to maintain one or more packet
> * transmission queues, so that the caller does not ordinarily have to
> * do additional queuing of packets. 'qid' specifies the queue to use
>@@ -341,7 +342,7 @@ struct netdev_class {
> * datapath". It will also prevent the OVS implementation of bonding from
> * working properly over 'netdev'.) */
> int (*send)(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
>- bool may_steal);
>+ bool may_steal, bool concurrent_txq);
>
> /* Registers with the poll loop to wake up from the next call to
> * poll_block() when the packet transmission queue for 'netdev' has
>diff --git a/lib/netdev.c b/lib/netdev.c
>index 31a6a46..a792eb6 100644
>--- a/lib/netdev.c
>+++ b/lib/netdev.c
>@@ -655,9 +655,6 @@ netdev_rxq_drain(struct netdev_rxq *rx)
> * otherwise a positive errno value.
> *
> * 'n_txq' specifies the exact number of transmission queues to create.
>- * If this function returns successfully, the caller can make 'n_txq'
>- * concurrent calls to netdev_send() (each one with a different 'qid' in the
>- * range [0..'n_txq'-1]).
> *
> * The change might not effective immediately. The caller must check if a
> * reconfiguration is required with netdev_is_reconf_required() and eventually
>@@ -694,6 +691,11 @@ netdev_set_tx_multiq(struct netdev *netdev, unsigned int
>n_txq)
> * If 'may_steal' is true, the caller transfers ownership of all the packets
> * to the network device, regardless of success.
> *
>+ * If 'concurrent_txq' is true, the caller may perform concurrent calls
>+ * to netdev_send() with the same 'qid'. The netdev provider is responsible
>+ * for making sure that these concurrent calls do not create a race condition
>+ * by using locking or other synchronization if required.
>+ *
> * The network device is expected to maintain one or more packet
> * transmission queues, so that the caller does not ordinarily have to
> * do additional queuing of packets. 'qid' specifies the queue to use
>@@ -704,14 +706,15 @@ netdev_set_tx_multiq(struct netdev *netdev, unsigned int
>n_txq)
> * cases this function will always return EOPNOTSUPP. */
> int
> netdev_send(struct netdev *netdev, int qid, struct dp_packet_batch *batch,
>- bool may_steal)
>+ bool may_steal, bool concurrent_txq)
> {
> if (!netdev->netdev_class->send) {
> dp_packet_delete_batch(batch, may_steal);
> return EOPNOTSUPP;
> }
>
>- int error = netdev->netdev_class->send(netdev, qid, batch, may_steal);
>+ int error = netdev->netdev_class->send(netdev, qid, batch, may_steal,
>+ concurrent_txq);
> if (!error) {
> COVERAGE_INC(netdev_sent);
> if (!may_steal) {
>diff --git a/lib/netdev.h b/lib/netdev.h
>index 591d861..dc7ede8 100644
>--- a/lib/netdev.h
>+++ b/lib/netdev.h
>@@ -149,7 +149,7 @@ int netdev_rxq_drain(struct netdev_rxq *);
>
> /* Packet transmission. */
> int netdev_send(struct netdev *, int qid, struct dp_packet_batch *,
>- bool may_steal);
>+ bool may_steal, bool concurrent_txq);
> void netdev_send_wait(struct netdev *, int qid);
>
> /* native tunnel APIs */
>--
>2.7.4
_______________________________________________
dev mailing list
[email protected]
http://openvswitch.org/mailman/listinfo/dev