William A. Rowe, Jr. wrote:

Anyways, I ment to add, nice work thus far.  If you have yet to do so,
please submit your CLA to our secretary Jim so that your implementation
can be considered for commit.  (We are not picking about a few-line patch
here and there, but for major works, it's necessary.)  Find it here;
http://www.apache.org/licenses/#clas

I'm happy to commit this to a sandbox or trunk for now to let everyone
begin iteratively improving it, for adoption with a 1.3, 1.4 or 2.0 APR
release, once we think it's baked.  Just need the CLA first, and your
final draft (I'd hold off just a bit for feedback from more of the
peanut gallery.)


Attached please find the latest patch incorporate the feedbacks and bug fixes for adding a task.

I had fax in the CLA.

Cheers,
Henry
--- /dev/null	2006-05-02 10:28:27.000000000 -0700
+++ include/apr_thread_pool.h	2006-05-01 18:34:54.427538000 -0700
@@ -0,0 +1,151 @@
+/* Copyright 2006 Sun Microsystems, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef APR_THREAD_POOL_H
+#define APR_THREAD_POOL_H
+
+#include "apr.h"
+#include "apr_thread_proc.h"
+
+/**
+ * @file apr_thread_pool.h
+ * @brief APR Thread Pool Library
+
+ * @remarks This library implements a thread pool using apr_thread_t. A thread
+ * pool is a set of threads that can be created in advance or on demand until a
+ * maximum number. When a task is scheduled, the thread pool will find an idle
+ * thread to handle the task. In case all existing threads are busy, the pool
+ * will try to create a new thread to serve the task if the maximum number has
+ * not been reached. Otherwise, the task will be put into a queue based on
+ * priority, which can be valued from 0 to 255, with higher value been served
+ * first. In case there are tasks with the same priority, the new task is put at
+ * the top or the bottom depeneds on which function is used to put the task.
+ *
+ * @remarks There may be the case that a thread pool can use up the maximum
+ * number of threads at peak load, but having those threads idle afterwards. A
+ * maximum number of idle threads can be set so that extra idling threads will
+ * be terminated to save system resrouces. 
+ */
+#if APR_HAS_THREADS
+
+#ifdef __cplusplus
+extern "C"
+{
+#if 0
+};
+#endif
+#endif /* __cplusplus */
+
+/** Opaque Thread Pool structure. */
+typedef struct _apr_thread_pool apr_thread_pool_t;
+
+#define APR_THREAD_TASK_PRIORITY_LOWEST 0
+#define APR_THREAD_TASK_PRIORITY_LOW 63
+#define APR_THREAD_TASK_PRIORITY_NORMAL 127
+#define APR_THREAD_TASK_PRIORITY_HIGH 191
+#define APR_THREAD_TASK_PRIORITY_HIGHEST 255
+
+/**
+ * Create a thread pool
+ * @param me A pointer points to the pointer recieves the created
+ * apr_thread_pool object. The returned value will be NULL if failed to create
+ * the thread pool.
+ * @param pool The pool to use
+ * @param init_threads The number of threads to be created initially, the number
+ * will also be used as the initial value for maximum number of idle threads. 
+ * @param max_threads The maximum number of threads that can be created
+ * @return APR_SUCCESS if the thread pool was created successfully. Otherwise,
+ * the error code.
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
+                                                 apr_pool_t * pool,
+                                                 apr_size_t init_threads,
+                                                 apr_size_t max_threads);
+
+/**
+ * Destroy the thread pool and stop all the threads
+ * @return APR_SUCCESS if all threads are stopped.
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me);
+
+/**
+ * Schedule a task to the bottom of the tasks of same priority.
+ * @param me The thread pool
+ * @param func The task function
+ * @param param The parameter for the task function
+ * @param priority The priority of the task.
+ * @return APR_SUCCESS if the task had been scheduled successfully
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me,
+                                               apr_thread_start_t func,
+                                               void *param,
+                                               apr_byte_t priority);
+
+/**
+ * Schedule a task to the top of the tasks of same priority.
+ * @param me The thread pool
+ * @param func The task function
+ * @param param The parameter for the task function
+ * @param priority The priority of the task.
+ * @return APR_SUCCESS if the task had been scheduled successfully
+ */
+APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me,
+                                              apr_thread_start_t func,
+                                              void *param,
+                                              apr_byte_t priority);
+
+/**
+ * Get current number of tasks waiting in the queue
+ * @param me The thread pool
+ * @return Number of tasks in the queue
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me);
+
+/**
+ * Get current number of idling thread
+ * @param me The thread pool
+ * @return Number of idling threads
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me);
+
+/**
+ * Access function for the maximum number of idling thread. Number of current
+ * idle threads will be reduced to the new limit.
+ * @param me The thread pool
+ * @param cnt The number
+ * @return The number of threads were stopped.
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me,
+                                                     apr_size_t cnt);
+
+/**
+ * Access function for the maximum number of idling thread
+ * @param me The thread pool
+ * @return The current maximum number
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me);
+
+#ifdef __cplusplus
+#if 0
+{
+#endif
+}
+#endif
+
+#endif /* APR_HAS_THREADS */
+
+#endif /* APR_THREAD_POOL_H */
+
+/* vim: set ts=4 sw=4 et cin tw=80: */
--- /dev/null	2006-05-02 10:28:27.000000000 -0700
+++ misc/apr_thread_pool.c	2006-05-02 10:28:16.222289000 -0700
@@ -0,0 +1,467 @@
+/* Copyright 2006 Sun Microsystems, Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include <assert.h>
+#include "apr_thread_pool.h"
+#include "apr_ring.h"
+#include "apr_thread_cond.h"
+
+#if APR_HAS_THREADS
+
+#define TASK_PRIORITY_SEGS 4
+#define TASK_PRIORITY_SEG(x) (((x)->priority & 0xFF) / 64)
+
+typedef struct _apr_thread_pool_task
+{
+    APR_RING_ENTRY(_apr_thread_pool_task) link;
+    apr_thread_start_t func;
+    void *param;
+    apr_byte_t priority;
+} apr_thread_pool_task_t;
+
+APR_RING_HEAD(_apr_thread_pool_tasks, _apr_thread_pool_task);
+
+struct _apr_thread_list_elt
+{
+    APR_RING_ENTRY(_apr_thread_list_elt) link;
+    apr_thread_t *thd;
+    volatile int stop;
+};
+
+APR_RING_HEAD(_apr_thread_list, _apr_thread_list_elt);
+
+struct _apr_thread_pool
+{
+    apr_pool_t *parent;
+    apr_pool_t *pool;
+    volatile apr_size_t cnt_max;
+    volatile apr_size_t idle_max;
+    volatile apr_size_t busy_cnt;
+    volatile apr_size_t idle_cnt;
+    volatile apr_size_t task_cnt;
+    struct _apr_thread_pool_tasks *tasks;
+    struct _apr_thread_list *busy_thds;
+    struct _apr_thread_list *idle_thds;
+    apr_thread_mutex_t *lock;
+    apr_thread_mutex_t *cond_lock;
+    apr_thread_cond_t *cond;
+    volatile int terminated;
+    struct _apr_thread_pool_tasks *recycled_tasks;
+    apr_thread_pool_task_t *task_idx[TASK_PRIORITY_SEGS];
+};
+
+static apr_status_t apr_thread_pool_construct(apr_thread_pool_t * me,
+                                              apr_size_t init_threads,
+                                              apr_size_t max_threads)
+{
+    apr_status_t rv;
+    int i;
+
+    me->cnt_max = max_threads;
+    me->idle_max = init_threads;
+    rv = apr_thread_mutex_create(&me->lock, APR_THREAD_MUTEX_NESTED,
+                                 me->pool);
+    if (APR_SUCCESS != rv) {
+        return rv;
+    }
+    rv = apr_thread_mutex_create(&me->cond_lock, APR_THREAD_MUTEX_DEFAULT,
+                                 me->pool);
+    if (APR_SUCCESS != rv) {
+        apr_thread_mutex_destroy(me->lock);
+        return rv;
+    }
+    rv = apr_thread_cond_create(&me->cond, me->pool);
+    if (APR_SUCCESS != rv) {
+        apr_thread_mutex_destroy(me->lock);
+        apr_thread_mutex_destroy(me->cond_lock);
+        return rv;
+    }
+    me->tasks = apr_palloc(me->pool, sizeof(*me->tasks));
+    APR_RING_INIT(me->tasks, _apr_thread_pool_task, link);
+    me->recycled_tasks = apr_palloc(me->pool, sizeof(*me->recycled_tasks));
+    APR_RING_INIT(me->recycled_tasks, _apr_thread_pool_task, link);
+    me->busy_thds = apr_palloc(me->pool, sizeof(*me->busy_thds));
+    APR_RING_INIT(me->busy_thds, _apr_thread_list_elt, link);
+    me->idle_thds = apr_palloc(me->pool, sizeof(*me->idle_thds));
+    APR_RING_INIT(me->idle_thds, _apr_thread_list_elt, link);
+    me->busy_cnt = me->idle_cnt = me->task_cnt = 0;
+    me->terminated = 0;
+    for (i = 0; i < TASK_PRIORITY_SEGS; i++) {
+        me->task_idx[i] = NULL;
+    }
+    return APR_SUCCESS;
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+apr_thread_pool_task_t *apr_thread_pool_tasks_pop(apr_thread_pool_t * me)
+{
+    apr_thread_pool_task_t *task;
+    int seg;
+
+    if (0 == me->task_cnt) {
+        return NULL;
+    }
+
+    task = APR_RING_FIRST(me->tasks);
+    --me->task_cnt;
+    seg = TASK_PRIORITY_SEG(task);
+    if (task == me->task_idx[seg]) {
+        me->task_idx[seg] = APR_RING_NEXT(task, link);
+        if (me->task_idx[seg] == APR_RING_SENTINEL(me->tasks,
+                                                   _apr_thread_pool_task,
+                                                   link)
+            || TASK_PRIORITY_SEG(me->task_idx[seg]) != seg) {
+            me->task_idx[seg] = NULL;
+        }
+    }
+
+    APR_RING_REMOVE(task, link);
+    return task;
+}
+
+/*
+ * The worker thread function. Take a task from the queue and perform it if
+ * there is any. Otherwise, put itself into the idle thread list and waiting
+ * for signal to wake up.
+ * The thread terminate directly by detach and exit when it is asked to stop
+ * after finishing a task. Otherwise, the thread should be in idle thread list
+ * and should be joined.
+ */
+static void *APR_THREAD_FUNC apr_thread_pool_func(apr_thread_t * t,
+                                                  void *param)
+{
+    apr_thread_pool_t *me = (apr_thread_pool_t *) param;
+    apr_thread_pool_task_t *task;
+    struct _apr_thread_list_elt *elt;
+    apr_status_t rv;
+
+    elt = (struct _apr_thread_list_elt *) apr_pcalloc(me->pool, sizeof(*elt));
+    APR_RING_ELEM_INIT(elt, link);
+    elt->thd = t;
+    elt->stop = 0;
+    apr_thread_mutex_lock(me->lock);
+    ++me->idle_cnt;
+    APR_RING_INSERT_TAIL(me->idle_thds, elt, _apr_thread_list_elt, link);
+    apr_thread_mutex_unlock(me->lock);
+
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_wait(me->cond, me->cond_lock);
+    apr_thread_mutex_unlock(me->cond_lock);
+
+    apr_thread_mutex_lock(me->lock);
+    while (!me->terminated && !elt->stop) {
+        APR_RING_REMOVE(elt, link);
+        --me->idle_cnt;
+        ++me->busy_cnt;
+        APR_RING_INSERT_TAIL(me->busy_thds, elt, _apr_thread_list_elt, link);
+        task = apr_thread_pool_tasks_pop(me);
+        while (NULL != task && !me->terminated) {
+            apr_thread_mutex_unlock(me->lock);
+            task->func(t, task->param);
+            apr_thread_mutex_lock(me->lock);
+            APR_RING_INSERT_TAIL(me->recycled_tasks, task,
+                                 _apr_thread_pool_task, link);
+            task = apr_thread_pool_tasks_pop(me);
+        }
+        APR_RING_REMOVE(elt, link);
+        --me->busy_cnt;
+
+        if (me->idle_cnt >= me->idle_max || me->terminated) {
+            apr_thread_mutex_unlock(me->lock);
+            apr_thread_detach(t);
+            apr_thread_exit(t, APR_SUCCESS);
+            return NULL;        /* should not be here, safe net */
+        }
+
+        ++me->idle_cnt;
+        APR_RING_INSERT_TAIL(me->idle_thds, elt, _apr_thread_list_elt, link);
+        apr_thread_mutex_unlock(me->lock);
+
+        apr_thread_mutex_lock(me->cond_lock);
+        apr_thread_cond_wait(me->cond, me->cond_lock);
+        apr_thread_mutex_unlock(me->cond_lock);
+
+        apr_thread_mutex_lock(me->lock);
+    }
+
+    apr_thread_mutex_unlock(me->lock);
+    apr_thread_exit(t, APR_SUCCESS);
+    return NULL;                /* should not be here, safe net */
+}
+
+static apr_status_t apr_thread_pool_cleanup(void *me)
+{
+    apr_thread_pool_t *_self = me;
+
+    _self->terminated = 1;
+    apr_thread_pool_idle_max_set(_self, 0);
+    while (_self->busy_cnt) {
+        apr_sleep(20 * 1000);   /* spin lock with 20 ms */
+    }
+    apr_thread_mutex_destroy(_self->lock);
+    apr_thread_mutex_destroy(_self->cond_lock);
+    apr_thread_cond_destroy(_self->cond);
+    return APR_SUCCESS;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_create(apr_thread_pool_t ** me,
+                                                 apr_pool_t * pool,
+                                                 apr_size_t init_threads,
+                                                 apr_size_t max_threads)
+{
+    apr_thread_t *t;
+    apr_status_t rv = APR_SUCCESS;
+    apr_pool_t *p;
+
+    if (!me) {
+        return APR_BADARG;
+    }
+
+    *me = (apr_thread_pool_t *) apr_pcalloc(pool, sizeof(**me));
+    if (!*me) {
+        return APR_ENOMEM;
+    }
+
+    (*me)->pool = pool;
+
+    rv = apr_thread_pool_construct(*me, init_threads, max_threads);
+    if (APR_SUCCESS != rv) {
+        *me = NULL;
+        return rv;
+    }
+    apr_pool_cleanup_register(pool, *me, apr_thread_pool_cleanup,
+                              apr_pool_cleanup_null);
+
+    while (init_threads) {
+        rv = apr_thread_create(&t, NULL, apr_thread_pool_func, *me,
+                               (*me)->pool);
+        if (APR_SUCCESS == rv) {
+            --init_threads;
+        }
+    }
+
+    return rv;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_destroy(apr_thread_pool_t * me)
+{
+    return apr_pool_cleanup_run(me->pool, me, apr_thread_pool_cleanup);
+}
+
+/*
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *task_new(apr_thread_pool_t * me,
+                                        apr_thread_start_t func,
+                                        void *param, apr_byte_t priority)
+{
+    apr_thread_pool_task_t *t;
+
+    if (APR_RING_EMPTY(me->recycled_tasks, _apr_thread_pool_task, link)) {
+        t = apr_pcalloc(me->pool, sizeof(*t));
+        if (NULL == t) {
+            return NULL;
+        }
+    }
+    else {
+        t = APR_RING_FIRST(me->recycled_tasks);
+        APR_RING_REMOVE(t, link);
+    }
+
+    APR_RING_ELEM_INIT(t, link);
+    t->func = func;
+    t->priority = priority;
+    t->param = param;
+
+    return t;
+}
+
+/*
+ * Test it the task is the only one within the priority segment. 
+ * If it is not, return the first element with same or lower priority. 
+ * Otherwise, add the task into the queue and return NULL.
+ *
+ * NOTE: This function is not thread safe by itself. Caller should hold the lock
+ */
+static apr_thread_pool_task_t *add_if_empty(apr_thread_pool_t * me,
+                                            apr_thread_pool_task_t * t)
+{
+    int seg;
+    int next;
+    apr_thread_pool_task_t *t_next;
+
+    seg = TASK_PRIORITY_SEG(t);
+    if (me->task_idx[seg]) {
+        assert(APR_RING_SENTINEL(me->tasks, _apr_thread_pool_task, link) !=
+               me->task_idx[seg]);
+        t_next = me->task_idx[seg];
+        while (t_next->priority > t->priority) {
+            t_next = APR_RING_NEXT(t_next, link);
+            if (APR_RING_SENTINEL(me->tasks, _apr_thread_pool_task, link) ==
+                t_next) {
+                return t_next;
+            }
+        }
+        return t_next;
+    }
+
+    for (next = seg + 1; next < TASK_PRIORITY_SEGS; next++) {
+        if (me->task_idx[next]) {
+            APR_RING_INSERT_BEFORE(me->task_idx[next], t, link);
+            break;
+        }
+    }
+    if (TASK_PRIORITY_SEGS == next) {
+        APR_RING_INSERT_TAIL(me->tasks, t, _apr_thread_pool_task, link);
+    }
+    return NULL;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_push(apr_thread_pool_t * me,
+                                               apr_thread_start_t func,
+                                               void *param,
+                                               apr_byte_t priority)
+{
+    apr_thread_pool_task_t *t;
+    apr_thread_pool_task_t *t_loc;
+
+    apr_thread_mutex_lock(me->lock);
+
+    t = task_new(me, func, param, priority);
+    if (NULL == t) {
+        apr_thread_mutex_unlock(me->lock);
+        return APR_ENOMEM;
+    }
+
+    t_loc = add_if_empty(me, t);
+    if (NULL != t_loc) {
+        while (APR_RING_SENTINEL(me->tasks, _apr_thread_pool_task, link) !=
+               t_loc && t_loc->priority >= t->priority) {
+            t_loc = APR_RING_NEXT(t_loc, link);
+        }
+        APR_RING_INSERT_BEFORE(t_loc, t, link);
+    }
+
+    me->task_cnt++;
+    apr_thread_mutex_unlock(me->lock);
+
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_signal(me->cond);
+    apr_thread_mutex_unlock(me->cond_lock);
+
+    return APR_SUCCESS;
+}
+
+APR_DECLARE(apr_status_t) apr_thread_pool_top(apr_thread_pool_t * me,
+                                              apr_thread_start_t func,
+                                              void *param,
+                                              apr_byte_t priority)
+{
+    apr_thread_pool_task_t *t;
+    apr_thread_pool_task_t *t_loc;
+
+    apr_thread_mutex_lock(me->lock);
+
+    t = task_new(me, func, param, priority);
+    if (NULL == t) {
+        apr_thread_mutex_unlock(me->lock);
+        return APR_ENOMEM;
+    }
+
+    t_loc = add_if_empty(me, t);
+    if (NULL != t_loc) {
+        APR_RING_INSERT_BEFORE(t_loc, t, link);
+        if (t_loc == me->task_idx[TASK_PRIORITY_SEG(t)]) {
+            me->task_idx[TASK_PRIORITY_SEG(t)] = t;
+        }
+    }
+
+    me->task_cnt++;
+    apr_thread_mutex_unlock(me->lock);
+
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_signal(me->cond);
+    apr_thread_mutex_unlock(me->cond_lock);
+
+    return APR_SUCCESS;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_tasks_count(apr_thread_pool_t * me)
+{
+    return me->task_cnt;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_count(apr_thread_pool_t * me)
+{
+    return me->idle_cnt;
+}
+
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_get(apr_thread_pool_t * me)
+{
+    return me->idle_max;
+}
+
+/*
+ * This function stop extra idle threads to the new limit.
+ * NOTE: There could be busy threads become idle during this function
+ */
+APR_DECLARE(apr_size_t) apr_thread_pool_idle_max_set(apr_thread_pool_t * me,
+                                                     apr_size_t cnt)
+{
+    int n, n_dbg, i;
+    struct _apr_thread_list_elt *head, *tail, *elt;
+    apr_status_t rv;
+
+    apr_thread_mutex_lock(me->lock);
+    n = me->idle_cnt;
+    if (n <= cnt) {
+        apr_thread_mutex_unlock(me->lock);
+        return 0;
+    }
+    n -= cnt;
+
+    head = APR_RING_FIRST(me->idle_thds);
+    for (i = 0; i < cnt; i++) {
+        head = APR_RING_NEXT(head, link);
+    }
+    tail = APR_RING_LAST(me->idle_thds);
+    me->idle_cnt = cnt;
+    APR_RING_UNSPLICE(head, tail, link);
+    apr_thread_mutex_unlock(me->lock);
+
+    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
+        elt->stop = 1;
+    }
+    elt->stop = 1;
+
+    apr_thread_mutex_lock(me->cond_lock);
+    apr_thread_cond_broadcast(me->cond);
+    apr_thread_mutex_unlock(me->cond_lock);
+
+    n_dbg = 0;
+    for (elt = head; elt != tail; elt = APR_RING_NEXT(elt, link)) {
+        apr_thread_join(&rv, elt->thd);
+        n_dbg++;
+    }
+    apr_thread_join(&rv, elt->thd);
+    n_dbg++;
+    assert(n == n_dbg);
+}
+
+#endif /* APR_HAS_THREADS */
+
+/* vim: set ts=4 sw=4 et cin tw=80: */

Reply via email to