Hi Aaron,
Part 2...
On Thu, May 22, 2025 at 06:28:52PM -0400, Aaron Merey wrote:
> +typedef enum {
> + /* pthread_create has not been called. */
> + NOT_STARTED,
> +
> + /* pthread_create has been called. */
> + STARTED,
> +
> + /* The thread has finished running the job but has not been joined. */
> + DONE,
> +
> + /* pthread_join has been called. */
> + JOINED
> +} thread_state_t;
Do we have to think about the state of other possibly failed jobs?
Or does a failed job just call exit and that terminates everything?
> +struct job_t {
> + /* A job consists of calling this function then printing any output
> + to stdout. This function is run from thread_start_job, which also
> + initializes the FILE *. */
> + void *(*start_routine)(void *, FILE *);
> +
> + /* Arg passed to start_routine. */
> + void *arg;
> +
> + /* Thread to run start_routine. */
> + pthread_t thread;
> +
> + /* See thread_state_t. */
> + _Atomic thread_state_t state;
> +
> + /* Dynamic buffer for output generated during start_routine.
> + Contents will get printed to stdout when a job finishes. */
> + output_stream_t stream;
> +
> + /* Next job in the linked list. */
> + struct job_t *next;
> +};
OK.
> +typedef struct {
> + struct job_t *head;
> + struct job_t *tail;
> +} job_queue_t;
> +
> +static job_queue_t jobs = { NULL, NULL };
OK. Maybe add what it means for head and/or tail to be NULL or not.
head is the first job to run next. tail is the last job to run?
> +void
> +add_job (void *(*start_routine)(void *, FILE *), void *arg)
> +{
> + struct job_t *job = malloc (sizeof (struct job_t));
> +
> + if (job == NULL)
> + error (1, 0, _("cannot create job"));
> +
> + job->start_routine = start_routine;
> + job->arg = arg;
> + job->next = NULL;
> + atomic_store (&job->state, NOT_STARTED);
> +
> + /* Insert job into the job queue. */
> + if (jobs.head == NULL)
> + {
> + assert (jobs.tail == NULL);
> + jobs.head = job;
> + jobs.tail = job;
> + }
> + else
> + {
> + assert (jobs.tail != NULL);
> + jobs.tail->next = job;
> + jobs.tail = job;
> + }
> +}
OK.
> +
> +/* Thread entry point. */
> +static void *
> +thread_start_job (void *arg)
> +{
> + struct job_t *job = (struct job_t *) arg;
> +
> + init_thread_output_stream (&job->stream);
> + job->start_routine (job->arg, job->stream.file);
> + atomic_store (&job->state, DONE);
> +
> + return NULL;
> +}
OK. Will be called when job->state has been set to STARTED.
> +/* Run all jobs that have been added to the job queue by add_job. */
> +void
> +run_jobs (int max_threads)
> +{
> + if (jobs.head == NULL)
> + {
> + assert (jobs.tail == NULL);
> + return;
> + }
> + assert (jobs.tail != NULL);
OK. Immediately done if not jobs were ever added.
> + /* jobs.tail was only needed to facilitate adding jobs. */
> + jobs.tail = NULL;
> + int num_threads = 0;
> +
> + /* Start no more than MAX_THREAD jobs. */
> + for (struct job_t *job = jobs.head;
> + job != NULL && num_threads < max_threads;
> + job = job->next)
> + {
> + assert (job->start_routine != NULL);
> + atomic_store (&job->state, STARTED);
> +
> + if (pthread_create (&job->thread, NULL,
> + thread_start_job, (void *) job) != 0)
> + error(1, 0, _("cannot create thread"));
> + num_threads++;
> + }
> +
> + int available_threads = max_threads - num_threads;
> + assert (available_threads >= 0);
> +
> + /* Iterate over the jobs until all have completed and all output has
> + been printed. */
> + while (jobs.head != NULL)
> + {
> + /* Job output should be printed in the same order that the jobs
> + were added. Track whether there is at least one previous job
> + whose output hasn't been printed yet. If true, then defer
> + printing for the current job. */
> + bool wait_to_print = false;
> +
> + struct job_t *job = jobs.head;
> + struct job_t *prev = NULL;
> + while (job != NULL)
> + {
> + /* Check whether this job should be started. */
> + if (atomic_load (&job->state) == NOT_STARTED)
> + {
> + /* Start this job if there is an available thread. */
> + if (available_threads > 0)
> + {
> + atomic_store (&job->state, STARTED);
> + if (pthread_create (&job->thread, NULL,
> + thread_start_job, (void *) job) != 0)
> + error (1, 0, _("cannot create thread"));
> +
> + available_threads--;
> + }
> + }
> +
> + /* Join thread if we haven't done so already. */
> + if (atomic_load (&job->state) == DONE)
> + {
> + if (pthread_join (job->thread, NULL) != 0)
> + error (1, 0, _("cannot join thread"));
> +
> + atomic_store (&job->state, JOINED);
> + available_threads++;
> + }
> +
> + /* Print job output if it hasn't already been printed and
> + there is no unprinted output from a previous job.
> +
> + Once a job's output has been printed all resources for
> + the job can be freed and it can be removed from the
> + job queue. */
> + if (atomic_load (&job->state) == JOINED && !wait_to_print)
> + {
> + print_thread_output_stream (&job->stream);
> +
> + /* Remove this job from the queue. */
> + if (prev == NULL)
> + {
> + /* This job is at the beginning of the queue. */
> + jobs.head = job->next;
> +
> + free (job);
> + job = jobs.head;
> + }
> + else
> + {
> + prev->next = job->next;
> +
> + free (job);
> + job = prev->next;
> + }
> +
> + continue;
> + }
> +
> + prev = job;
> + job = job->next;
> + wait_to_print = true;
> + }
> + }
This looks like a busy loop. Where the thread is constantly checking
job states. It would be more efficient if there was a mutex/condition
on which to pthread_cond_wait in this loop and each job thread would
pthread_cond_signal when changing the state.
> +}
> diff --git a/src/threadlib.h b/src/threadlib.h
> new file mode 100644
> index 00000000..33cff946
> --- /dev/null
> +++ b/src/threadlib.h
> @@ -0,0 +1,34 @@
> +/* Copyright (C) 2025 Red Hat, Inc.
> + This file is part of elfutils.
> +
> + This file is free software; you can redistribute it and/or modify
> + it under the terms of the GNU General Public License as published by
> + the Free Software Foundation; either version 3 of the License, or
> + (at your option) any later version.
> +
> + elfutils is distributed in the hope that it will be useful, but
> + WITHOUT ANY WARRANTY; without even the implied warranty of
> + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
> + GNU General Public License for more details.
> +
> + You should have received a copy of the GNU General Public License
> + along with this program. If not, see <http://www.gnu.org/licenses/>. */
> +
> +#ifndef _THREADLIB_H
> +#define _THREADLIB_H 1
> +
> +/* Add a job to the job queue. When the job is run using run_job, it will
> + consist of start_routine called with ARG as well as a FILE *. The
> + contents of the FILE will be printed to stdout once start_routine
> + finishes. */
> +extern void add_job (void *(*start_routine)(void *, FILE *), void *arg);
> +
> +/* Run all jobs that have been added by add_job. Jobs run concurrently
> + using at most MAX_THREADS threads.
> +
> + run_jobs returns when all jobs have finished and any output from the
> + jobs has been printed to stdout. Output from each job is printed in
> + the order which jobs were added using add_job. */
> +extern void run_jobs (int max_threads);
> +
> +#endif /* threadlib.h */
Looks good. Maybe clarify add_job cannot be called after run_jobs has
been called?
Cheers,
Mark