[Mesa-dev] [PATCH 4/7] util/queue: add ability to kill a subset of threads
From: Marek Olšák for ARB_parallel_shader_compile --- src/util/u_queue.c | 52 ++ src/util/u_queue.h | 5 ++--- 2 files changed, 36 insertions(+), 21 deletions(-) diff --git a/src/util/u_queue.c b/src/util/u_queue.c index 48c5c79552d..cfd2a08e3c8 100644 --- a/src/util/u_queue.c +++ b/src/util/u_queue.c @@ -26,42 +26,43 @@ #include "u_queue.h" #include #include "util/os_time.h" #include "util/u_string.h" #include "util/u_thread.h" #include "u_process.h" -static void util_queue_killall_and_wait(struct util_queue *queue); +static void +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads); / * Wait for all queues to assert idle when exit() is called. * * Otherwise, C++ static variable destructors can be called while threads * are using the static variables. */ static once_flag atexit_once_flag = ONCE_FLAG_INIT; static struct list_head queue_list; static mtx_t exit_mutex = _MTX_INITIALIZER_NP; static void atexit_handler(void) { struct util_queue *iter; mtx_lock(_mutex); /* Wait for all queues to assert idle. */ LIST_FOR_EACH_ENTRY(iter, _list, head) { - util_queue_killall_and_wait(iter); + util_queue_kill_threads(iter, 0); } mtx_unlock(_mutex); } static void global_init(void) { LIST_INITHEAD(_list); atexit(atexit_handler); } @@ -259,55 +260,58 @@ util_queue_thread_func(void *input) u_thread_setname(name); } while (1) { struct util_queue_job job; mtx_lock(>lock); assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); /* wait if the queue is empty */ - while (!queue->kill_threads && queue->num_queued == 0) + while (thread_index < queue->num_threads && queue->num_queued == 0) cnd_wait(>has_queued_cond, >lock); - if (queue->kill_threads) { + /* only kill threads that are above "num_threads" */ + if (thread_index >= queue->num_threads) { mtx_unlock(>lock); break; } job = queue->jobs[queue->read_idx]; memset(>jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; queue->num_queued--; cnd_signal(>has_space_cond); mtx_unlock(>lock); if (job.job) { job.execute(job.job, thread_index); util_queue_fence_signal(job.fence); if (job.cleanup) job.cleanup(job.job, thread_index); } } - /* signal remaining jobs before terminating */ + /* signal remaining jobs if all threads are being terminated */ mtx_lock(>lock); - for (unsigned i = queue->read_idx; i != queue->write_idx; -i = (i + 1) % queue->max_jobs) { - if (queue->jobs[i].job) { - util_queue_fence_signal(queue->jobs[i].fence); - queue->jobs[i].job = NULL; + if (queue->num_threads == 0) { + for (unsigned i = queue->read_idx; i != queue->write_idx; + i = (i + 1) % queue->max_jobs) { + if (queue->jobs[i].job) { +util_queue_fence_signal(queue->jobs[i].fence); +queue->jobs[i].job = NULL; + } } + queue->read_idx = queue->write_idx; + queue->num_queued = 0; } - queue->read_idx = queue->write_idx; - queue->num_queued = 0; mtx_unlock(>lock); return 0; } static bool util_queue_create_thread(struct util_queue *queue, unsigned index) { struct thread_input *input = (struct thread_input *) malloc(sizeof(struct thread_input)); input->queue = queue; @@ -418,60 +422,72 @@ fail: cnd_destroy(>has_queued_cond); mtx_destroy(>lock); free(queue->jobs); } /* also util_queue_is_initialized can be used to check for success */ memset(queue, 0, sizeof(*queue)); return false; } static void -util_queue_killall_and_wait(struct util_queue *queue) +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads) { unsigned i; /* Signal all threads to terminate. */ + mtx_lock(>finish_lock); + + if (keep_num_threads >= queue->num_threads) { + mtx_unlock(>finish_lock); + return; + } + mtx_lock(>lock); - queue->kill_threads = 1; + unsigned old_num_threads = queue->num_threads; + /* Setting num_threads is what causes the threads to terminate. +* Then cnd_broadcast wakes them up and they will exit their function. +*/ + queue->num_threads = keep_num_threads; cnd_broadcast(>has_queued_cond); mtx_unlock(>lock); - for (i = 0; i < queue->num_threads; i++) + for (i = keep_num_threads; i < old_num_threads; i++) thrd_join(queue->threads[i], NULL); - queue->num_threads = 0; + + mtx_unlock(>finish_lock); } void util_queue_destroy(struct util_queue *queue) { - util_queue_killall_and_wait(queue); +
Re: [Mesa-dev] [PATCH 4/7] util/queue: add ability to kill a subset of threads
On Thu, Jan 3, 2019 at 3:01 PM Ian Romanick wrote: > On 11/28/18 6:59 PM, Marek Olšák wrote: > > From: Marek Olšák > > > > for ARB_parallel_shader_compile > > --- > > src/util/u_queue.c | 49 +- > > src/util/u_queue.h | 5 ++--- > > 2 files changed, 33 insertions(+), 21 deletions(-) > > > > diff --git a/src/util/u_queue.c b/src/util/u_queue.c > > index 48c5c79552d..5aaf60ae78e 100644 > > --- a/src/util/u_queue.c > > +++ b/src/util/u_queue.c > > @@ -26,42 +26,43 @@ > > > > #include "u_queue.h" > > > > #include > > > > #include "util/os_time.h" > > #include "util/u_string.h" > > #include "util/u_thread.h" > > #include "u_process.h" > > > > -static void util_queue_killall_and_wait(struct util_queue *queue); > > +static void > > +util_queue_kill_threads(struct util_queue *queue, unsigned > keep_num_threads); > > > > > / > > * Wait for all queues to assert idle when exit() is called. > > * > > * Otherwise, C++ static variable destructors can be called while > threads > > * are using the static variables. > > */ > > > > static once_flag atexit_once_flag = ONCE_FLAG_INIT; > > static struct list_head queue_list; > > static mtx_t exit_mutex = _MTX_INITIALIZER_NP; > > > > static void > > atexit_handler(void) > > { > > struct util_queue *iter; > > > > mtx_lock(_mutex); > > /* Wait for all queues to assert idle. */ > > LIST_FOR_EACH_ENTRY(iter, _list, head) { > > - util_queue_killall_and_wait(iter); > > + util_queue_kill_threads(iter, 0); > > } > > mtx_unlock(_mutex); > > } > > > > static void > > global_init(void) > > { > > LIST_INITHEAD(_list); > > atexit(atexit_handler); > > } > > @@ -259,55 +260,58 @@ util_queue_thread_func(void *input) > >u_thread_setname(name); > > } > > > > while (1) { > >struct util_queue_job job; > > > >mtx_lock(>lock); > >assert(queue->num_queued >= 0 && queue->num_queued <= > queue->max_jobs); > > > >/* wait if the queue is empty */ > > - while (!queue->kill_threads && queue->num_queued == 0) > > + while (thread_index < queue->num_threads && queue->num_queued == > 0) > > cnd_wait(>has_queued_cond, >lock); > > > > - if (queue->kill_threads) { > > + /* only kill threads that are above "num_threads" */ > > + if (thread_index >= queue->num_threads) { > > mtx_unlock(>lock); > > break; > >} > > > >job = queue->jobs[queue->read_idx]; > >memset(>jobs[queue->read_idx], 0, sizeof(struct > util_queue_job)); > >queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; > > > >queue->num_queued--; > >cnd_signal(>has_space_cond); > >mtx_unlock(>lock); > > > >if (job.job) { > > job.execute(job.job, thread_index); > > util_queue_fence_signal(job.fence); > > if (job.cleanup) > > job.cleanup(job.job, thread_index); > >} > > } > > > > - /* signal remaining jobs before terminating */ > > + /* signal remaining jobs if all threads are being terminated */ > > mtx_lock(>lock); > > - for (unsigned i = queue->read_idx; i != queue->write_idx; > > -i = (i + 1) % queue->max_jobs) { > > - if (queue->jobs[i].job) { > > - util_queue_fence_signal(queue->jobs[i].fence); > > - queue->jobs[i].job = NULL; > > + if (queue->num_threads == 0) { > > + for (unsigned i = queue->read_idx; i != queue->write_idx; > > + i = (i + 1) % queue->max_jobs) { > > + if (queue->jobs[i].job) { > > +util_queue_fence_signal(queue->jobs[i].fence); > > +queue->jobs[i].job = NULL; > > + } > >} > > + queue->read_idx = queue->write_idx; > > + queue->num_queued = 0; > > } > > - queue->read_idx = queue->write_idx; > > - queue->num_queued = 0; > > mtx_unlock(>lock); > > return 0; > > } > > > > static bool > > util_queue_create_thread(struct util_queue *queue, unsigned index) > > { > > struct thread_input *input = > >(struct thread_input *) malloc(sizeof(struct thread_input)); > > input->queue = queue; > > @@ -418,60 +422,69 @@ fail: > >cnd_destroy(>has_queued_cond); > >mtx_destroy(>lock); > >free(queue->jobs); > > } > > /* also util_queue_is_initialized can be used to check for success */ > > memset(queue, 0, sizeof(*queue)); > > return false; > > } > > > > static void > > -util_queue_killall_and_wait(struct util_queue *queue) > > +util_queue_kill_threads(struct util_queue *queue, unsigned > keep_num_threads) > > { > > unsigned i; > > > > /* Signal all threads to terminate. */ > > + mtx_lock(>finish_lock); > > + > > + if (keep_num_threads >= queue->num_threads) { > > + mtx_unlock(>finish_lock); > > + return; > > + } > > + > >
Re: [Mesa-dev] [PATCH 4/7] util/queue: add ability to kill a subset of threads
On 11/28/18 6:59 PM, Marek Olšák wrote: > From: Marek Olšák > > for ARB_parallel_shader_compile > --- > src/util/u_queue.c | 49 +- > src/util/u_queue.h | 5 ++--- > 2 files changed, 33 insertions(+), 21 deletions(-) > > diff --git a/src/util/u_queue.c b/src/util/u_queue.c > index 48c5c79552d..5aaf60ae78e 100644 > --- a/src/util/u_queue.c > +++ b/src/util/u_queue.c > @@ -26,42 +26,43 @@ > > #include "u_queue.h" > > #include > > #include "util/os_time.h" > #include "util/u_string.h" > #include "util/u_thread.h" > #include "u_process.h" > > -static void util_queue_killall_and_wait(struct util_queue *queue); > +static void > +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads); > > / > * Wait for all queues to assert idle when exit() is called. > * > * Otherwise, C++ static variable destructors can be called while threads > * are using the static variables. > */ > > static once_flag atexit_once_flag = ONCE_FLAG_INIT; > static struct list_head queue_list; > static mtx_t exit_mutex = _MTX_INITIALIZER_NP; > > static void > atexit_handler(void) > { > struct util_queue *iter; > > mtx_lock(_mutex); > /* Wait for all queues to assert idle. */ > LIST_FOR_EACH_ENTRY(iter, _list, head) { > - util_queue_killall_and_wait(iter); > + util_queue_kill_threads(iter, 0); > } > mtx_unlock(_mutex); > } > > static void > global_init(void) > { > LIST_INITHEAD(_list); > atexit(atexit_handler); > } > @@ -259,55 +260,58 @@ util_queue_thread_func(void *input) >u_thread_setname(name); > } > > while (1) { >struct util_queue_job job; > >mtx_lock(>lock); >assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); > >/* wait if the queue is empty */ > - while (!queue->kill_threads && queue->num_queued == 0) > + while (thread_index < queue->num_threads && queue->num_queued == 0) > cnd_wait(>has_queued_cond, >lock); > > - if (queue->kill_threads) { > + /* only kill threads that are above "num_threads" */ > + if (thread_index >= queue->num_threads) { > mtx_unlock(>lock); > break; >} > >job = queue->jobs[queue->read_idx]; >memset(>jobs[queue->read_idx], 0, sizeof(struct > util_queue_job)); >queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; > >queue->num_queued--; >cnd_signal(>has_space_cond); >mtx_unlock(>lock); > >if (job.job) { > job.execute(job.job, thread_index); > util_queue_fence_signal(job.fence); > if (job.cleanup) > job.cleanup(job.job, thread_index); >} > } > > - /* signal remaining jobs before terminating */ > + /* signal remaining jobs if all threads are being terminated */ > mtx_lock(>lock); > - for (unsigned i = queue->read_idx; i != queue->write_idx; > -i = (i + 1) % queue->max_jobs) { > - if (queue->jobs[i].job) { > - util_queue_fence_signal(queue->jobs[i].fence); > - queue->jobs[i].job = NULL; > + if (queue->num_threads == 0) { > + for (unsigned i = queue->read_idx; i != queue->write_idx; > + i = (i + 1) % queue->max_jobs) { > + if (queue->jobs[i].job) { > +util_queue_fence_signal(queue->jobs[i].fence); > +queue->jobs[i].job = NULL; > + } >} > + queue->read_idx = queue->write_idx; > + queue->num_queued = 0; > } > - queue->read_idx = queue->write_idx; > - queue->num_queued = 0; > mtx_unlock(>lock); > return 0; > } > > static bool > util_queue_create_thread(struct util_queue *queue, unsigned index) > { > struct thread_input *input = >(struct thread_input *) malloc(sizeof(struct thread_input)); > input->queue = queue; > @@ -418,60 +422,69 @@ fail: >cnd_destroy(>has_queued_cond); >mtx_destroy(>lock); >free(queue->jobs); > } > /* also util_queue_is_initialized can be used to check for success */ > memset(queue, 0, sizeof(*queue)); > return false; > } > > static void > -util_queue_killall_and_wait(struct util_queue *queue) > +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads) > { > unsigned i; > > /* Signal all threads to terminate. */ > + mtx_lock(>finish_lock); > + > + if (keep_num_threads >= queue->num_threads) { > + mtx_unlock(>finish_lock); > + return; > + } > + > mtx_lock(>lock); > - queue->kill_threads = 1; > + unsigned old_num_threads = queue->num_threads; > + queue->num_threads = keep_num_threads; Shouldn't this still be set below, after the threads are joined? > cnd_broadcast(>has_queued_cond); > mtx_unlock(>lock); > > - for (i = 0; i < queue->num_threads; i++) >
[Mesa-dev] [PATCH 4/7] util/queue: add ability to kill a subset of threads
From: Marek Olšák for ARB_parallel_shader_compile --- src/util/u_queue.c | 49 +- src/util/u_queue.h | 5 ++--- 2 files changed, 33 insertions(+), 21 deletions(-) diff --git a/src/util/u_queue.c b/src/util/u_queue.c index 48c5c79552d..5aaf60ae78e 100644 --- a/src/util/u_queue.c +++ b/src/util/u_queue.c @@ -26,42 +26,43 @@ #include "u_queue.h" #include #include "util/os_time.h" #include "util/u_string.h" #include "util/u_thread.h" #include "u_process.h" -static void util_queue_killall_and_wait(struct util_queue *queue); +static void +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads); / * Wait for all queues to assert idle when exit() is called. * * Otherwise, C++ static variable destructors can be called while threads * are using the static variables. */ static once_flag atexit_once_flag = ONCE_FLAG_INIT; static struct list_head queue_list; static mtx_t exit_mutex = _MTX_INITIALIZER_NP; static void atexit_handler(void) { struct util_queue *iter; mtx_lock(_mutex); /* Wait for all queues to assert idle. */ LIST_FOR_EACH_ENTRY(iter, _list, head) { - util_queue_killall_and_wait(iter); + util_queue_kill_threads(iter, 0); } mtx_unlock(_mutex); } static void global_init(void) { LIST_INITHEAD(_list); atexit(atexit_handler); } @@ -259,55 +260,58 @@ util_queue_thread_func(void *input) u_thread_setname(name); } while (1) { struct util_queue_job job; mtx_lock(>lock); assert(queue->num_queued >= 0 && queue->num_queued <= queue->max_jobs); /* wait if the queue is empty */ - while (!queue->kill_threads && queue->num_queued == 0) + while (thread_index < queue->num_threads && queue->num_queued == 0) cnd_wait(>has_queued_cond, >lock); - if (queue->kill_threads) { + /* only kill threads that are above "num_threads" */ + if (thread_index >= queue->num_threads) { mtx_unlock(>lock); break; } job = queue->jobs[queue->read_idx]; memset(>jobs[queue->read_idx], 0, sizeof(struct util_queue_job)); queue->read_idx = (queue->read_idx + 1) % queue->max_jobs; queue->num_queued--; cnd_signal(>has_space_cond); mtx_unlock(>lock); if (job.job) { job.execute(job.job, thread_index); util_queue_fence_signal(job.fence); if (job.cleanup) job.cleanup(job.job, thread_index); } } - /* signal remaining jobs before terminating */ + /* signal remaining jobs if all threads are being terminated */ mtx_lock(>lock); - for (unsigned i = queue->read_idx; i != queue->write_idx; -i = (i + 1) % queue->max_jobs) { - if (queue->jobs[i].job) { - util_queue_fence_signal(queue->jobs[i].fence); - queue->jobs[i].job = NULL; + if (queue->num_threads == 0) { + for (unsigned i = queue->read_idx; i != queue->write_idx; + i = (i + 1) % queue->max_jobs) { + if (queue->jobs[i].job) { +util_queue_fence_signal(queue->jobs[i].fence); +queue->jobs[i].job = NULL; + } } + queue->read_idx = queue->write_idx; + queue->num_queued = 0; } - queue->read_idx = queue->write_idx; - queue->num_queued = 0; mtx_unlock(>lock); return 0; } static bool util_queue_create_thread(struct util_queue *queue, unsigned index) { struct thread_input *input = (struct thread_input *) malloc(sizeof(struct thread_input)); input->queue = queue; @@ -418,60 +422,69 @@ fail: cnd_destroy(>has_queued_cond); mtx_destroy(>lock); free(queue->jobs); } /* also util_queue_is_initialized can be used to check for success */ memset(queue, 0, sizeof(*queue)); return false; } static void -util_queue_killall_and_wait(struct util_queue *queue) +util_queue_kill_threads(struct util_queue *queue, unsigned keep_num_threads) { unsigned i; /* Signal all threads to terminate. */ + mtx_lock(>finish_lock); + + if (keep_num_threads >= queue->num_threads) { + mtx_unlock(>finish_lock); + return; + } + mtx_lock(>lock); - queue->kill_threads = 1; + unsigned old_num_threads = queue->num_threads; + queue->num_threads = keep_num_threads; cnd_broadcast(>has_queued_cond); mtx_unlock(>lock); - for (i = 0; i < queue->num_threads; i++) + for (i = keep_num_threads; i < old_num_threads; i++) thrd_join(queue->threads[i], NULL); - queue->num_threads = 0; + + mtx_unlock(>finish_lock); } void util_queue_destroy(struct util_queue *queue) { - util_queue_killall_and_wait(queue); + util_queue_kill_threads(queue, 0); remove_from_atexit_list(queue); cnd_destroy(>has_space_cond); cnd_destroy(>has_queued_cond);