Hi Aaron, Part 2...
On Thu, May 22, 2025 at 06:28:52PM -0400, Aaron Merey wrote: > +typedef enum { > + /* pthread_create has not been called. */ > + NOT_STARTED, > + > + /* pthread_create has been called. */ > + STARTED, > + > + /* The thread has finished running the job but has not been joined. */ > + DONE, > + > + /* pthread_join has been called. */ > + JOINED > +} thread_state_t; Do we have to think about the state of other possibly failed jobs? Or does a failed job just call exit and that terminates everything? > +struct job_t { > + /* A job consists of calling this function then printing any output > + to stdout. This function is run from thread_start_job, which also > + initializes the FILE *. */ > + void *(*start_routine)(void *, FILE *); > + > + /* Arg passed to start_routine. */ > + void *arg; > + > + /* Thread to run start_routine. */ > + pthread_t thread; > + > + /* See thread_state_t. */ > + _Atomic thread_state_t state; > + > + /* Dynamic buffer for output generated during start_routine. > + Contents will get printed to stdout when a job finishes. */ > + output_stream_t stream; > + > + /* Next job in the linked list. */ > + struct job_t *next; > +}; OK. > +typedef struct { > + struct job_t *head; > + struct job_t *tail; > +} job_queue_t; > + > +static job_queue_t jobs = { NULL, NULL }; OK. Maybe add what it means for head and/or tail to be NULL or not. head is the first job to run next. tail is the last job to run? > +void > +add_job (void *(*start_routine)(void *, FILE *), void *arg) > +{ > + struct job_t *job = malloc (sizeof (struct job_t)); > + > + if (job == NULL) > + error (1, 0, _("cannot create job")); > + > + job->start_routine = start_routine; > + job->arg = arg; > + job->next = NULL; > + atomic_store (&job->state, NOT_STARTED); > + > + /* Insert job into the job queue. */ > + if (jobs.head == NULL) > + { > + assert (jobs.tail == NULL); > + jobs.head = job; > + jobs.tail = job; > + } > + else > + { > + assert (jobs.tail != NULL); > + jobs.tail->next = job; > + jobs.tail = job; > + } > +} OK. > + > +/* Thread entry point. */ > +static void * > +thread_start_job (void *arg) > +{ > + struct job_t *job = (struct job_t *) arg; > + > + init_thread_output_stream (&job->stream); > + job->start_routine (job->arg, job->stream.file); > + atomic_store (&job->state, DONE); > + > + return NULL; > +} OK. Will be called when job->state has been set to STARTED. > +/* Run all jobs that have been added to the job queue by add_job. */ > +void > +run_jobs (int max_threads) > +{ > + if (jobs.head == NULL) > + { > + assert (jobs.tail == NULL); > + return; > + } > + assert (jobs.tail != NULL); OK. Immediately done if not jobs were ever added. > + /* jobs.tail was only needed to facilitate adding jobs. */ > + jobs.tail = NULL; > + int num_threads = 0; > + > + /* Start no more than MAX_THREAD jobs. */ > + for (struct job_t *job = jobs.head; > + job != NULL && num_threads < max_threads; > + job = job->next) > + { > + assert (job->start_routine != NULL); > + atomic_store (&job->state, STARTED); > + > + if (pthread_create (&job->thread, NULL, > + thread_start_job, (void *) job) != 0) > + error(1, 0, _("cannot create thread")); > + num_threads++; > + } > + > + int available_threads = max_threads - num_threads; > + assert (available_threads >= 0); > + > + /* Iterate over the jobs until all have completed and all output has > + been printed. */ > + while (jobs.head != NULL) > + { > + /* Job output should be printed in the same order that the jobs > + were added. Track whether there is at least one previous job > + whose output hasn't been printed yet. If true, then defer > + printing for the current job. */ > + bool wait_to_print = false; > + > + struct job_t *job = jobs.head; > + struct job_t *prev = NULL; > + while (job != NULL) > + { > + /* Check whether this job should be started. */ > + if (atomic_load (&job->state) == NOT_STARTED) > + { > + /* Start this job if there is an available thread. */ > + if (available_threads > 0) > + { > + atomic_store (&job->state, STARTED); > + if (pthread_create (&job->thread, NULL, > + thread_start_job, (void *) job) != 0) > + error (1, 0, _("cannot create thread")); > + > + available_threads--; > + } > + } > + > + /* Join thread if we haven't done so already. */ > + if (atomic_load (&job->state) == DONE) > + { > + if (pthread_join (job->thread, NULL) != 0) > + error (1, 0, _("cannot join thread")); > + > + atomic_store (&job->state, JOINED); > + available_threads++; > + } > + > + /* Print job output if it hasn't already been printed and > + there is no unprinted output from a previous job. > + > + Once a job's output has been printed all resources for > + the job can be freed and it can be removed from the > + job queue. */ > + if (atomic_load (&job->state) == JOINED && !wait_to_print) > + { > + print_thread_output_stream (&job->stream); > + > + /* Remove this job from the queue. */ > + if (prev == NULL) > + { > + /* This job is at the beginning of the queue. */ > + jobs.head = job->next; > + > + free (job); > + job = jobs.head; > + } > + else > + { > + prev->next = job->next; > + > + free (job); > + job = prev->next; > + } > + > + continue; > + } > + > + prev = job; > + job = job->next; > + wait_to_print = true; > + } > + } This looks like a busy loop. Where the thread is constantly checking job states. It would be more efficient if there was a mutex/condition on which to pthread_cond_wait in this loop and each job thread would pthread_cond_signal when changing the state. > +} > diff --git a/src/threadlib.h b/src/threadlib.h > new file mode 100644 > index 00000000..33cff946 > --- /dev/null > +++ b/src/threadlib.h > @@ -0,0 +1,34 @@ > +/* Copyright (C) 2025 Red Hat, Inc. > + This file is part of elfutils. > + > + This file is free software; you can redistribute it and/or modify > + it under the terms of the GNU General Public License as published by > + the Free Software Foundation; either version 3 of the License, or > + (at your option) any later version. > + > + elfutils is distributed in the hope that it will be useful, but > + WITHOUT ANY WARRANTY; without even the implied warranty of > + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the > + GNU General Public License for more details. > + > + You should have received a copy of the GNU General Public License > + along with this program. If not, see <http://www.gnu.org/licenses/>. */ > + > +#ifndef _THREADLIB_H > +#define _THREADLIB_H 1 > + > +/* Add a job to the job queue. When the job is run using run_job, it will > + consist of start_routine called with ARG as well as a FILE *. The > + contents of the FILE will be printed to stdout once start_routine > + finishes. */ > +extern void add_job (void *(*start_routine)(void *, FILE *), void *arg); > + > +/* Run all jobs that have been added by add_job. Jobs run concurrently > + using at most MAX_THREADS threads. > + > + run_jobs returns when all jobs have finished and any output from the > + jobs has been printed to stdout. Output from each job is printed in > + the order which jobs were added using add_job. */ > +extern void run_jobs (int max_threads); > + > +#endif /* threadlib.h */ Looks good. Maybe clarify add_job cannot be called after run_jobs has been called? Cheers, Mark