Hi,
Attached is a patch implement the proposed thread pool API, it is
compilable and have *NOT* been tested. I post it here and hopefully get
an early review.
Please let me know if you have any comments. :-)
Cheers,
Henry
--- apr-util-svn/include/apr_thread_pool.h 1969-12-31 16:00:00.000000000
-0800
+++ apr-util/include/apr_thread_pool.h 2006-04-28 20:33:01.412857000 -0700
@@ -0,0 +1,168 @@
+/* Copyright 2006 Sun Microsystems, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef APR_THREAD_POOL_H
+#define APR_THREAD_POOL_H
+
+#include "apr.h"
+#include "apr_thread_proc.h"
+
+/**
+ * @file apr_thread_pool.h
+ * @brief APR Thread Pool Library
+
+ * @remarks This library implements a thread pool using apr_thread_t. A thread
+ * pool is a set of threads that can be created in advance or on demand until a
+ * maximum number. When a task is scheduled, the thread pool will find an idle
+ * thread to handle the task. In case all existing threads are busy, the pool
+ * will try to create a new thread to serve the task if the maximum number has
+ * not been reached. Otherwise, the task will be put into a queue based on
+ * priority, which can be valued from 0 to 255, with higher value been served
+ * first. In case there are tasks with the same priority, the new task is put
at
+ * the top or the bottom depeneds on which function is used to put the task.
+ *
+ * @remarks There may be the case that a thread pool can use up the maximum
+ * number of threads at peak load, but having those threads idle afterwards. A
+ * maximum number of idle threads can be set so that extra idling threads will
+ * be terminated to save system resrouces.
+ */
+#if APR_HAS_THREADS
+
+#ifdef __cplusplus
+extern "C" {
+#if 0
+};
+#endif
+#endif /* __cplusplus */
+
+/** Opaque Thread Pool structure. */
+typedef struct _apr_thread_pool apr_thread_pool_t;
+
+/**
+ * The prototype for any APR thread pool task functions.
+ * @param apr_thread_t the apr_thread_t structure for the thread is executing
the task
+ * @param void * a pointer to the user data passed in when schedule the task
+ */
+typedef void* (APR_THREAD_FUNC *apr_thread_pool_task_t) (apr_thread_t *, void
*);
+
+#define APR_THREAD_TASK_PRIORITY_LOWEST 0
+#define APR_THREAD_TASK_PRIORITY_LOW 63
+#define APR_THREAD_TASK_PRIORITY_NORMAL 127
+#define APR_THREAD_TASK_PRIORITY_HIGH 191
+#define APR_THREAD_TASK_PRIORITY_HIGHEST 255
+
+/**
+ * Setup all of the internal structures required to use thread pools
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_init(void);
+
+/**
+ * Tear down all of the internal structures required to use pools
+ */
+APR_DECLARE(void) apr_thread_pool_terminate(void);
+
+/**
+ * Create a thread pool
+ * @param pool The pool to use
+ * @param init_threads The number of threads to be created initially, the
number
+ * will also be used as the initial value for maximum number of idle threads.
+ * @param max_threads The maximum number of threads that can be created
+ * @param err Receive the error code, can be NULL if not needed.
+ * @return The thread pool. NULL if failed to create the thread pool. Put the
+ * error code in the err parameter if it is not NULL.
+ */
+APR_DECLARE(apr_thread_pool_t*) apr_thread_pool_create(apr_pool_t *pool,
+ apr_size_t
init_threads,
+ apr_size_t max_threads,
+ apr_status_t *err);
+
+/**
+ * Destroy the thread pool and stop all the threads
+ * @return APR_SUCCESS if all threads are stopped.
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t *me);
+
+/**
+ * Schedule a task to the bottom of the tasks of same priority.
+ * @param me The thread pool
+ * @param task The task function
+ * @param param The parameter for the task function
+ * @param priority The priority of the task.
+ * @return APR_SUCCESS if the task had been scheduled successfully
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
+ apr_thread_pool_task_t task,
+ void *param,
+ apr_byte_t priority);
+
+/**
+ * Schedule a task to the top of the tasks of same priority.
+ * @param me The thread pool
+ * @param task The task function
+ * @param param The parameter for the task function
+ * @param priority The priority of the task.
+ * @return APR_SUCCESS if the task had been scheduled successfully
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
+ apr_thread_pool_task_t task,
+ void *param,
+ apr_byte_t priority);
+
+/**
+ * Get current number of tasks waiting in the queue
+ * @param me The thread pool
+ * @return Number of tasks in the queue
+ */
+APR_DECLARE(int) apr_thread_pool_tasks_cnt_get(apr_thread_pool_t *me);
+
+/**
+ * Access function for the maximum number of idling thread
+ * @param me The thread pool
+ * @param cnt The number
+ */
+APR_DECLARE(void) apr_thread_pool_unused_max_set(apr_thread_pool_t *me,
+ int cnt);
+
+/**
+ * Access function for the maximum number of idling thread
+ * @param me The thread pool
+ * @return The current maximum number
+ */
+APR_DECLARE(int) apr_thread_pool_unused_max_get(apr_thread_pool_t *me);
+
+/**
+ * Get current number of idling thread
+ * @param me The thread pool
+ * @return Number of idling threads
+ */
+APR_DECLARE(int) apr_thread_pool_unused_cnt_get(apr_thread_pool_t *me);
+
+/**
+ * Stop all unused threads. Ignore the maximum number of idling threads.
+ * @param me The thread pool
+ * @return The total number of threads stopped.
+ */
+APR_DECLARE(int) apr_thread_pool_stop_unused_threads(apr_thread_pool_t *me);
+
+#ifdef __cplusplus
+#if 0
+{
+#endif
+}
+#endif
+
+#endif /* APR_HAS_THREADS */
+
+#endif /* APR_THREAD_POOL_H */
--- apr-util-svn/misc/apr_thread_pool.c 1969-12-31 16:00:00.000000000 -0800
+++ apr-util/misc/apr_thread_pool.c 2006-05-01 00:09:46.804781000 -0700
@@ -0,0 +1,462 @@
+/* Copyright 2006 Sun Microsystems, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include "apr_thread_pool.h"
+#include "apr_ring.h"
+#include "apr_thread_cond.h"
+
+#if APR_HAS_THREADS
+
+#define TASK_PRIORITY_SEGS 4
+#define TASK_PRIORITY_SEG(x) (((x)->priority & 0xFF) / 64)
+
+struct _apr_thread_pool_task {
+ APR_RING_ENTRY(_apr_thread_pool_task) link;
+ apr_thread_pool_task_t func;
+ void *param;
+ apr_byte_t priority;
+};
+
+APR_RING_HEAD(_apr_thread_pool_tasks, _apr_thread_pool_task);
+
+struct _apr_thread_list_elt {
+ APR_RING_ENTRY(_apr_thread_list_elt) link;
+ apr_thread_t *thd;
+ volatile int stop;
+};
+
+APR_RING_HEAD(_apr_thread_list, _apr_thread_list_elt);
+
+struct _apr_thread_pool {
+ apr_pool_t *parent;
+ apr_pool_t *pool;
+ volatile apr_size_t cnt_max;
+ volatile apr_size_t unused_max;
+ volatile apr_size_t busy_cnt;
+ volatile apr_size_t unused_cnt;
+ volatile apr_size_t task_cnt;
+ struct _apr_thread_pool_tasks *tasks;
+ struct _apr_thread_list *busy_thds;
+ struct _apr_thread_list *unused_thds;
+ apr_thread_mutex_t *lock;
+ apr_thread_mutex_t *cond_lock;
+ apr_thread_cond_t *cond;
+ volatile int terminated;
+ struct _apr_thread_pool_tasks *recycled_tasks;
+ struct _apr_thread_pool_task *task_idx[TASK_PRIORITY_SEGS];
+};
+
+APR_DECLARE(apr_status_t) apr_thread_pool_init(void)
+{
+ return APR_SUCCESS;
+}
+
+APR_DECLARE(void) apr_thread_pool_terminate(void)
+{
+ return;
+}
+
+static apr_status_t apr_thread_pool_construct(apr_thread_pool_t *me,
+ apr_size_t init_threads,
+ apr_size_t max_threads)
+{
+ apr_status_t rv;
+ int i;
+
+ me->cnt_max = max_threads;
+ me->unused_max = init_threads;
+ rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED, me->pool);
+ if (APR_SUCCESS != rv) {
+ return rv;
+ }
+ rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_DEFAULT,
+ me->pool);
+ if (APR_SUCCESS != rv) {
+ apr_thread_mutex_destroy(me->lock);
+ return rv;
+ }
+ rv = apr_thread_cond_create(&me->cond, me->pool);
+ if (APR_SUCCESS != rv) {
+ apr_thread_mutex_destroy(me->lock);
+ apr_thread_mutex_destroy(me->cond_lock);
+ return rv;
+ }
+ me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
+ APR_RING_INIT(me->tasks, _apr_thread_pool_task, link);
+ me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
+ APR_RING_INIT(me->recycled_tasks, _apr_thread_pool_task, link);
+ me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
+ APR_RING_INIT(me->busy_thds, _apr_thread_list_elt, link);
+ me->unused_thds = apr_palloc(me->pool, sizeof(*me->unused_thds));
+ APR_RING_INIT(me->unused_thds, _apr_thread_list_elt, link);
+ me->busy_cnt = me->unused_cnt = me->task_cnt = 0;
+ me->terminated = 0;
+ for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
+ me->task_idx[i] = NULL;
+ }
+ return APR_SUCCESS;
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the
lock
+ */
+struct _apr_thread_pool_task * apr_thread_pool_tasks_pop(apr_thread_pool_t *me)
+{
+ struct _apr_thread_pool_task *task;
+ int seg;
+
+ if (0 == me->task_cnt) {
+ return NULL;
+ }
+
+ task = APR_RING_FIRST(me->tasks);
+ --me->task_cnt;
+ seg = TASK_PRIORITY_SEG(task);
+ if (task == me->task_idx[seg]) {
+ me->task_idx[seg] = APR_RING_NEXT(task, link);
+ if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+ _apr_thread_pool_task, link) ||
+ TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+ me->task_idx[seg] = NULL;
+ }
+ }
+
+ APR_RING_REMOVE(task, link);
+ return task;
+}
+
+/*
+ * The worker thread function. Take a task from the queue and perform it if
+ * there is any. Otherwise, put itself into the unused thread list and waiting
+ * for signal to wake up.
+ * The thread terminate directly by detach and exit when it is asked to stop
+ * after finishing a task. Otherwise, the thread should be in unused thread
list
+ * and should be joined by apr_thread_pool_stop_unused_threads().
+ */
+static void * APR_THREAD_FUNC apr_thread_pool_func(apr_thread_t *t, void
*param)
+{
+ apr_thread_pool_t *me = (apr_thread_pool_t*) param;
+ struct _apr_thread_pool_task *task;
+ struct _apr_thread_list_elt *elt;
+ apr_status_t rv;
+
+ elt = (struct _apr_thread_list_elt*) apr_pcalloc(me->pool, sizeof(*elt));
+ APR_RING_ELEM_INIT(elt, link);
+ elt->thd = t;
+ elt->stop = 0;
+ apr_thread_mutex_lock(me->lock);
+ ++me->unused_cnt;
+ APR_RING_INSERT_TAIL(me->unused_thds, elt, _apr_thread_list_elt, link);
+ apr_thread_mutex_unlock(me->lock);
+
+ apr_thread_mutex_lock(me->cond_lock);
+ apr_thread_cond_wait(me->cond, me->cond_lock);
+ apr_thread_mutex_unlock(me->cond_lock);
+
+ apr_thread_mutex_lock(me->lock);
+ while (!me->terminated && !elt->stop) {
+ APR_RING_REMOVE(elt, link);
+ --me->unused_cnt;
+ ++me->busy_cnt;
+ APR_RING_INSERT_TAIL(me->busy_thds, elt, _apr_thread_list_elt, link);
+ task = apr_thread_pool_tasks_pop(me);
+ while (NULL != task && !me->terminated) {
+ apr_thread_mutex_unlock(me->lock);
+ task->func(t, task->param);
+ apr_thread_mutex_lock(me->lock);
+ APR_RING_INSERT_TAIL(me->recycled_tasks, task,
+ _apr_thread_pool_task, link);
+ task = apr_thread_pool_tasks_pop(me);
+ }
+ APR_RING_REMOVE(elt, link);
+ --me->busy_cnt;
+
+ if (me->unused_cnt >= me->unused_max || me->terminated) {
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_detach(t);
+ apr_thread_exit(t, APR_SUCCESS);
+ return NULL; /* should not be here, safe net */
+ }
+
+ ++me->unused_cnt;
+ APR_RING_INSERT_TAIL(me->unused_thds, elt, _apr_thread_list_elt, link);
+ apr_thread_mutex_unlock(me->lock);
+
+ apr_thread_mutex_lock(me->cond_lock);
+ apr_thread_cond_wait(me->cond, me->cond_lock);
+ apr_thread_mutex_unlock(me->cond_lock);
+
+ apr_thread_mutex_lock(me->lock);
+ }
+
+ apr_thread_mutex_unlock(me->lock);
+ apr_thread_exit(t, APR_SUCCESS);
+}
+
+static apr_status_t apr_thread_pool_cleanup(void *me)
+{
+ apr_thread_pool_t *_self = me;
+
+ _self->terminated = 1;
+ apr_thread_pool_stop_unused_threads(_self);
+ while (_self->busy_cnt) {
+ apr_sleep(20 * 1000); /* spin lock with 20 ms */
+ }
+ apr_thread_mutex_destroy(_self->lock);
+ apr_thread_mutex_destroy(_self->cond_lock);
+ apr_thread_cond_destroy(_self->cond);
+ return APR_SUCCESS;
+}
+
+APR_DECLARE(apr_thread_pool_t*) apr_thread_pool_create(apr_pool_t *pool,
+ apr_size_t
init_threads,
+ apr_size_t max_threads,
+ apr_status_t *err)
+{
+ apr_thread_pool_t *me = NULL;
+ apr_thread_t *t;
+ apr_status_t rv;
+ apr_pool_t *p;
+
+ me = (apr_thread_pool_t*) apr_pcalloc(pool, sizeof(*me));
+ if (!me) {
+ rv = APR_ENOMEM;
+ goto FINAL_EXIT;
+ }
+
+ me->pool = pool;
+
+ rv = apr_thread_pool_construct(me, init_threads, max_threads);
+ if (APR_SUCCESS != rv) {
+ me = NULL;
+ goto FINAL_EXIT;
+ }
+ apr_pool_cleanup_register(pool, me, apr_thread_pool_cleanup,
+ apr_pool_cleanup_null);
+
+ while (init_threads) {
+ rv = apr_thread_create(&t, NULL, apr_thread_pool_func, me, me->pool);
+ if (APR_SUCCESS == rv) {
+ --init_threads;
+ }
+ }
+
+FINAL_EXIT:
+ if (err) {
+ *err = rv;
+ }
+
+ return me;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t *me)
+{
+ return apr_pool_cleanup_run(me->pool, me, apr_thread_pool_cleanup);
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the
lock
+ */
+static struct _apr_thread_pool_task * task_new(apr_thread_pool_t *me,
+ apr_thread_pool_task_t func,
+ void *param,
+ apr_byte_t priority)
+{
+ struct _apr_thread_pool_task *t;
+
+ if (APR_RING_EMPTY(me->recycled_tasks, _apr_thread_pool_task, link)) {
+ t = apr_pcalloc(me->pool, sizeof(*t));
+ if (NULL == t) {
+ return NULL;
+ }
+ } else {
+ t = APR_RING_FIRST(me->recycled_tasks);
+ APR_RING_REMOVE(t, link);
+ }
+
+ APR_RING_ELEM_INIT(t, link);
+ t->func = func;
+ t->priority = priority;
+ t->param = param;
+
+ return t;
+}
+
+/*
+ * Test it the task is the only one within the priority segment.
+ * If it is not, return the first element with same or lower priority.
+ * Otherwise, add the task into the queue and return NULL.
+ *
+ * NOTE: This function is not thread safe by itself. Caller should hold the
lock
+ */
+static struct _apr_thread_pool_task *
+add_if_empty(apr_thread_pool_t *me, struct _apr_thread_pool_task *t)
+{
+ int seg;
+ int next;
+ struct _apr_thread_pool_task *t_next;
+
+ seg = TASK_PRIORITY_SEG(t);
+ if (me->task_idx[seg]) {
+ assert(APR_RING_SENTINEL(me->tasks, _apr_thread_pool_task, link) !=
+ me->task_idx[seg]);
+ t_next = me->task_idx[seg];
+ while (t_next->priority > t->priority) {
+ t_next = APR_RING_NEXT(t_next, link);
+ if (APR_RING_SENTINEL(me->tasks, _apr_thread_pool_task, link) ==
+ t_next) {
+ return t_next;
+ }
+ }
+ return t_next;
+ }
+
+ for (next = seg + 1; next < TASK_PRIORITY_SEGS; next++) {
+ if (me->task_idx[next]) {
+ APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
+ break;
+ }
+ }
+ if (TASK_PRIORITY_SEGS == next) {
+ APR_RING_INSERT_TAIL(me->tasks, t, _apr_thread_pool_task, link);
+ }
+ return NULL;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t *me,
+ apr_thread_pool_task_t task,
+ void *param,
+ apr_byte_t priority)
+{
+ struct _apr_thread_pool_task *t;
+ struct _apr_thread_pool_task *t_loc;
+
+ apr_thread_mutex_lock(me->lock);
+ t = task_new(me, task, param, priority);
+ if (NULL == t) {
+ apr_thread_mutex_unlock(me->lock);
+ return APR_ENOMEM;
+ }
+
+ t_loc = add_if_empty(me, t);
+ if (NULL == t_loc) {
+ apr_thread_mutex_unlock(me->lock);
+ return APR_SUCCESS;
+ }
+
+ while (APR_RING_SENTINEL(me->tasks, _apr_thread_pool_task, link) !=
+ t_loc && t_loc->priority >= t->priority) {
+ t_loc = APR_RING_NEXT(t_loc, link);
+ }
+ APR_RING_INSERT_BEFORE(t_loc, t, link);
+ apr_thread_mutex_unlock(me->lock);
+ return APR_SUCCESS;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t *me,
+ apr_thread_pool_task_t task,
+ void *param,
+ apr_byte_t priority)
+{
+ struct _apr_thread_pool_task *t;
+ struct _apr_thread_pool_task *t_loc;
+
+ apr_thread_mutex_lock(me->lock);
+ t = task_new(me, task, param, priority);
+ if (NULL == t) {
+ apr_thread_mutex_unlock(me->lock);
+ return APR_ENOMEM;
+ }
+
+ t_loc = add_if_empty(me, t);
+ if (NULL == t_loc) {
+ apr_thread_mutex_unlock(me->lock);
+ return APR_SUCCESS;
+ }
+
+ APR_RING_INSERT_BEFORE(t_loc, t, link);
+ if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
+ me->task_idx[TASK_PRIORITY_SEG(t)] = t;
+ }
+ apr_thread_mutex_unlock(me->lock);
+ return APR_SUCCESS;
+}
+
+APR_DECLARE(int) apr_thread_pool_tasks_cnt_get(apr_thread_pool_t *me)
+{
+ return me->task_cnt;
+}
+
+APR_DECLARE(void) apr_thread_pool_unused_max_set(apr_thread_pool_t *me,
+ int cnt)
+{
+ me->unused_max = cnt;
+}
+
+APR_DECLARE(int) apr_thread_pool_unused_max_get(apr_thread_pool_t *me)
+{
+ return me->unused_max;
+}
+
+APR_DECLARE(int) apr_thread_pool_unused_cnt_get(apr_thread_pool_t *me)
+{
+ return me->unused_cnt;
+}
+
+/*
+ * This function stop all the unused thread at the moment when this function is
+ * called.
+ * NOTE: There could be busy threads become unused during this function
+ */
+APR_DECLARE(int) apr_thread_pool_stop_unused_threads(apr_thread_pool_t *me)
+{
+ int n, n_dbg;
+ struct _apr_thread_list_elt *head, *tail, *elt;
+ apr_status_t rv;
+
+ apr_thread_mutex_lock(me->lock);
+ n = me->unused_cnt;
+ if (0 == n) {
+ apr_thread_mutex_unlock(me->lock);
+ return 0;
+ }
+
+ head = APR_RING_FIRST(me->unused_thds);
+ tail = APR_RING_LAST(me->unused_thds);
+ me->unused_cnt = 0;
+ APR_RING_INIT(me->unused_thds, _apr_thread_list_elt, link);
+ apr_thread_mutex_unlock(me->lock);
+
+ for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
+ elt->stop = 1;
+ }
+ elt->stop = 1;
+
+ apr_thread_mutex_lock(me->cond_lock);
+ apr_thread_cond_broadcast(me->cond);
+ apr_thread_mutex_unlock(me->cond_lock);
+
+ n_dbg = 0;
+ for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
+ apr_thread_join(&rv, elt->thd);
+ n_dbg++;
+ }
+ apr_thread_join(&rv, elt->thd);
+ n_dbg++;
+ assert(n == n_dbg);
+}
+
+#endif /* APR_HAS_THREADS */