Stefan Beller <sbel...@google.com> writes:

> This adds functionality to do work in parallel.
>
> The whole life cycle of such a thread pool would look like
>
>     struct task_queue * tq = create_task_queue(32); // no of threads
>     for (...)
>         add_task(tq, process_one_item_function, item); // non blocking
>     ...
>     int ret = finish_task_queue(tq); // blocks until all tasks are done
>     if (!tq)
>         die ("Not all items were be processed");
>
> The caller must take care of handling the output.
>
> Signed-off-by: Stefan Beller <sbel...@google.com>
> ---
>
> I sent this a while ago to the list, no comments on it :(

The primary reason I suspect is because you sent to a wrong set of
people.  Submodule folks have largely been working in the scripted
ones, and may not necessarily be the ones who are most familiar with
the run-command infrastructure.

"shortlog --no-merges" tells me that the obvious suspects are j6t
and peff.

> The core functionality stayed the same, but I hope to improved naming and
> location of the code.
>
> The WIP is only for the NO_PTHREADS case.

>  run-command.c | 212 
> ++++++++++++++++++++++++++++++++++++++++++++++++++++++----
>  run-command.h |  30 +++++++++
>  2 files changed, 230 insertions(+), 12 deletions(-)
>
> diff --git a/run-command.c b/run-command.c
> index 28e1d55..4029011 100644
> --- a/run-command.c
> +++ b/run-command.c
> @@ -4,6 +4,21 @@
>  #include "sigchain.h"
>  #include "argv-array.h"
>  
> +#ifdef NO_PTHREADS
> +
> +#else
> +
> +#include "thread-utils.h"
> +
> +#include <pthread.h>
> +#include <semaphore.h>
> +#include <stdio.h>
> +#include <unistd.h>
> +
> +#endif
> +
> +#include "git-compat-util.h"
> +

This goes against the way we have been organizing the header files.

http://thread.gmane.org/gmane.comp.version-control.git/276241/focus=276265

> @@ -668,6 +683,22 @@ int git_atexit(void (*handler)(void))
>  
>  #endif
>  
> +void setup_main_thread()

void setup_main_thread(void)

> @@ -852,3 +872,171 @@ int capture_command(struct child_process *cmd, struct 
> strbuf *buf, size_t hint)
>       close(cmd->out);
>       return finish_command(cmd);
>  }
> +
> +#ifndef NO_PTHREADS
> +struct job_list {
> +     int (*fct)(struct task_queue *aq, void *task);
> +     void *task;
> +     struct job_list *next;
> +};
> +#endif
> +
> +struct task_queue {
> +#ifndef NO_PTHREADS
> +     /*
> +      * To avoid deadlocks always aquire the semaphores with lowest priority

acquire.

> +      * first, priorites are in descending order as listed.
> +      *
> +      * The `mutex` is a general purpose lock for modifying data in the async
> +      * queue, such as adding a new task or adding a return value from
> +      * an already run task.
> +      *
> +      * `workingcount` and `freecount` are opposing semaphores, the sum of
> +      * their values should equal `max_threads` at any time while the `mutex`
> +      * is available.
> +      */
> +     sem_t mutex;
> +     sem_t workingcount;
> +     sem_t freecount;
> +
> +     pthread_t *threads;
> +     unsigned max_threads;
> +
> +     struct job_list *first;
> +     struct job_list *last;
> +#endif
> +     int early_return;
> +};
> +
> +#ifndef NO_PTHREADS
> +
> +static void get_task(struct task_queue *aq,
> +                  int (**fct)(struct task_queue *aq, void *task),
> +                  void **task,
> +                  int *early_return)
> +{
> +     struct job_list *job;
> +
> +     sem_wait(&aq->workingcount);
> +     sem_wait(&aq->mutex);
> +
> +     if (!aq->first)
> +             die("BUG: internal error with dequeuing jobs for threads");
> +     job = aq->first;
> +     *fct = job->fct;
> +     *task = job->task;
> +     aq->early_return |= *early_return;
> +     *early_return = aq->early_return;
> +     aq->first = job->next;
> +     if (!aq->first)
> +             aq->last = NULL;
> +
> +     sem_post(&aq->freecount);
> +     sem_post(&aq->mutex);
> +
> +     free(job);
> +}
> +
> +static void* dispatcher(void *args)

static void *dispatcher(....)

> +{
> +     void *task;
> +     int (*fct)(struct task_queue *aq, void *data);

s/data/task/?

> +     int early_return = 0;
> +     struct task_queue *aq = args;
> +
> +     get_task(aq, &fct, &task, &early_return);
> +     while (fct || early_return != 0) {
> +             early_return = fct(aq, task);
> +             get_task(aq, &fct, &task, &early_return);
> +     }

If the func said "we are done, you may stop dispatching now", do you
still want to do another get_task()?

> +     pthread_exit(0);
> +}
> +#endif
> +
> +struct task_queue *create_task_queue(unsigned max_threads)
> +{
> +     struct task_queue *aq = xmalloc(sizeof(*aq));
> +
> +#ifndef NO_PTHREADS
> +     int i;
> +     if (!max_threads)
> +             aq->max_threads = online_cpus();
> +     else
> +             aq->max_threads = max_threads;
> +
> +     sem_init(&aq->mutex, 0, 1);
> +     sem_init(&aq->workingcount, 0, 0);
> +     sem_init(&aq->freecount, 0, aq->max_threads);
> +     aq->threads = xmalloc(aq->max_threads * sizeof(pthread_t));
> +
> +     for (i = 0; i < aq->max_threads; i++)
> +             pthread_create(&aq->threads[i], 0, &dispatcher, aq);
> +
> +     aq->first = NULL;
> +     aq->last = NULL;


Shouldn't these be initialized before letting threads call into
dispatcher?  The workingcount semaphore that is initialized to 0 may
prevent them from peeking into these pointers and barfing, but still...

> +
> +     setup_main_thread();
> +#endif
> +     aq->early_return = 0;
> +
> +     return aq;
> +}
--
To unsubscribe from this list: send the line "unsubscribe git" in
the body of a message to majord...@vger.kernel.org
More majordomo info at  http://vger.kernel.org/majordomo-info.html

Reply via email to