On Thu, 24 Mar 2016 14:10:14 +0800
yewgang <[email protected]> wrote:
> So basically, you replace ovs_mutex_rwlock (or sth) into ovs_mutex_trylock
> in the loop of "other tasks after some time processing the RX queues".
> Is it ?
It isn't replacing since the original locking remains the same. But yeah,
it tries to lock before it is actually needed so make sure it will not
block later on.
Before:
-------
while () {
for() {
process_RX_queues()
}
other_tasks() /* can block on seq_mutex */
}
After:
------
while () {
for() {
process_RX_queues()
}
if (ovs_mutex_trylock()) {
other_tasks() /* can't block on seq_mutex */
}
}
fbl
>
> 2016-03-24 11:54 GMT+08:00 Flavio Leitner <[email protected]>:
>
> > The PMD thread needs to keep processing RX queues in order
> > archive maximum throughput. However, it also needs to run
> > other tasks after some time processing the RX queues which
> > a mutex can block the PMD thread. That causes latency
> > spikes and affects the throughput.
> >
> > Convert to recursive mutex so that PMD thread can test first
> > and if it gets the lock, continue as before, otherwise try
> > again another time. There is an additional logic to make
> > sure the PMD thread will try harder as the attempt to get
> > the mutex continues to fail.
> >
> > Co-authored-by: Karl Rister <[email protected]>
> > Signed-off-by: Flavio Leitner <[email protected]>
> > ---
> > include/openvswitch/thread.h | 3 +++
> > lib/dpif-netdev.c | 33 ++++++++++++++++++++++-----------
> > lib/seq.c | 15 ++++++++++++++-
> > lib/seq.h | 3 +++
> > 4 files changed, 42 insertions(+), 12 deletions(-)
> >
> > diff --git a/include/openvswitch/thread.h b/include/openvswitch/thread.h
> > index af6f2bb..6d20720 100644
> > --- a/include/openvswitch/thread.h
> > +++ b/include/openvswitch/thread.h
> > @@ -44,6 +44,9 @@ struct OVS_LOCKABLE ovs_mutex {
> > #define OVS_ADAPTIVE_MUTEX_INITIALIZER OVS_MUTEX_INITIALIZER
> > #endif
> >
> > +#define OVS_RECURSIVE_MUTEX_INITIALIZER \
> > + { PTHREAD_RECURSIVE_MUTEX_INITIALIZER_NP, "<unlocked>" }
> > +
> > /* ovs_mutex functions analogous to pthread_mutex_*() functions.
> > *
> > * Most of these functions abort the process with an error message on any
> > diff --git a/lib/dpif-netdev.c b/lib/dpif-netdev.c
> > index 0f2385a..a10b2d1 100644
> > --- a/lib/dpif-netdev.c
> > +++ b/lib/dpif-netdev.c
> > @@ -2668,12 +2668,15 @@ static void *
> > pmd_thread_main(void *f_)
> > {
> > struct dp_netdev_pmd_thread *pmd = f_;
> > - unsigned int lc = 0;
> > + unsigned int lc_max = 1024;
> > + unsigned int lc_start;
> > + unsigned int lc;
> > struct rxq_poll *poll_list;
> > unsigned int port_seq = PMD_INITIAL_SEQ;
> > int poll_cnt;
> > int i;
> >
> > + lc_start = 0;
> > poll_cnt = 0;
> > poll_list = NULL;
> >
> > @@ -2698,24 +2701,32 @@ reload:
> > * reloading the updated configuration. */
> > dp_netdev_pmd_reload_done(pmd);
> >
> > + lc = lc_start;
> > for (;;) {
> > for (i = 0; i < poll_cnt; i++) {
> > dp_netdev_process_rxq_port(pmd, poll_list[i].port,
> > poll_list[i].rx);
> > }
> >
> > - if (lc++ > 1024) {
> > - unsigned int seq;
> > + if (lc++ > lc_max) {
> > + if (!seq_pmd_trylock()) {
> > + unsigned int seq;
> > + lc_start = 0;
> > + lc = 0;
> >
> > - lc = 0;
> > + emc_cache_slow_sweep(&pmd->flow_cache);
> > + coverage_try_clear();
> > + ovsrcu_quiesce();
> >
> > - emc_cache_slow_sweep(&pmd->flow_cache);
> > - coverage_try_clear();
> > - ovsrcu_quiesce();
> > + seq_pmd_unlock();
> >
> > - atomic_read_relaxed(&pmd->change_seq, &seq);
> > - if (seq != port_seq) {
> > - port_seq = seq;
> > - break;
> > + atomic_read_relaxed(&pmd->change_seq, &seq);
> > + if (seq != port_seq) {
> > + port_seq = seq;
> > + break;
> > + }
> > + } else {
> > + lc_start += (lc_start + lc_max)/2;
> > + lc = lc_start;
> > }
> > }
> > }
> > diff --git a/lib/seq.c b/lib/seq.c
> > index 9c3257c..198b2ce 100644
> > --- a/lib/seq.c
> > +++ b/lib/seq.c
> > @@ -55,7 +55,7 @@ struct seq_thread {
> > bool waiting OVS_GUARDED; /* True if latch_wait() already
> > called. */
> > };
> >
> > -static struct ovs_mutex seq_mutex = OVS_MUTEX_INITIALIZER;
> > +static struct ovs_mutex seq_mutex = OVS_RECURSIVE_MUTEX_INITIALIZER;
> >
> > static uint64_t seq_next OVS_GUARDED_BY(seq_mutex) = 1;
> >
> > @@ -68,6 +68,19 @@ static void seq_thread_woke(struct seq_thread *)
> > OVS_REQUIRES(seq_mutex);
> > static void seq_waiter_destroy(struct seq_waiter *)
> > OVS_REQUIRES(seq_mutex);
> > static void seq_wake_waiters(struct seq *) OVS_REQUIRES(seq_mutex);
> >
> > +
> > +int seq_pmd_trylock(void)
> > + OVS_TRY_LOCK(0, seq_mutex)
> > +{
> > + return ovs_mutex_trylock(&seq_mutex);
> > +}
> > +
> > +void seq_pmd_unlock(void)
> > + OVS_RELEASES(seq_mutex)
> > +{
> > + ovs_mutex_unlock(&seq_mutex);
> > +}
> > +
> > /* Creates and returns a new 'seq' object. */
> > struct seq * OVS_EXCLUDED(seq_mutex)
> > seq_create(void)
> > diff --git a/lib/seq.h b/lib/seq.h
> > index b0ec6bf..f6b6e1a 100644
> > --- a/lib/seq.h
> > +++ b/lib/seq.h
> > @@ -117,6 +117,9 @@
> > #include <stdint.h>
> > #include "util.h"
> >
> > +int seq_pmd_trylock(void);
> > +void seq_pmd_unlock(void);
> > +
> > /* For implementation of an object with a sequence number attached. */
> > struct seq *seq_create(void);
> > void seq_destroy(struct seq *);
> > --
> > 2.5.0
> >
> > _______________________________________________
> > dev mailing list
> > [email protected]
> > http://openvswitch.org/mailman/listinfo/dev
> >
> _______________________________________________
> dev mailing list
> [email protected]
> http://openvswitch.org/mailman/listinfo/dev
--
fbl
_______________________________________________
dev mailing list
[email protected]
http://openvswitch.org/mailman/listinfo/dev