Changeset: eb0a130ae7a2 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/eb0a130ae7a2 Modified Files: monetdb5/mal/mal_dataflow.c Branch: default Log Message:
Reworked allocation code in the dataflow. diffs (truncated from 401 to 300 lines): diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c --- a/monetdb5/mal/mal_dataflow.c +++ b/monetdb5/mal/mal_dataflow.c @@ -80,20 +80,19 @@ typedef struct DATAFLOW { bool set_qry_ctx; } *DataFlow, DataFlowRec; -static struct worker { +struct worker { MT_Id id; - enum { IDLE, WAITING, RUNNING, FREE, EXITED } flag; + enum { WAITING, RUNNING, FREE, EXITED } flag; ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */ - char *errbuf; /* GDKerrbuf so that we can allocate before fork */ MT_Sema s; - int self; - int next; -} workers[THREADS]; -/* heads of two mutually exclusive linked lists, both using the .next + struct worker *next; + char errbuf[GDKMAXERRLEN]; /* GDKerrbuf so that we can allocate before fork */ +}; +/* heads of three mutually exclusive linked lists, all using the .next * field in the worker struct */ -static int exited_workers = -1; /* to be joined threads */ -static int idle_workers = -1; /* idle workers (no thread associated) */ -static int free_workers = -1; /* free workers (thread doing nothing) */ +static struct worker *workers; /* "working" workers */ +static struct worker *exited_workers; /* to be joined threads (.flag==EXITED) */ +static struct worker *free_workers; /* free workers (.flag==FREE) */ static int free_count = 0; /* number of free threads */ static int free_max = 0; /* max number of spare free threads */ @@ -101,23 +100,6 @@ static Queue *todo = 0; /* pending ins static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0); static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock); -static void stopMALdataflow(void); - -void -mal_dataflow_reset(void) -{ - stopMALdataflow(); - memset((char *) workers, 0, sizeof(workers)); - idle_workers = -1; - exited_workers = -1; - if (todo) { - MT_lock_destroy(&todo->l); - MT_sema_destroy(&todo->s); - GDKfree(todo); - } - todo = 0; /* pending instructions */ - ATOMIC_SET(&exiting, 0); -} /* * Calculate the size of the dataflow dependency graph. @@ -126,9 +108,8 @@ static int DFLOWgraphSize(MalBlkPtr mb, int start, int stop) { int cnt = 0; - int i; - for (i = start; i < stop; i++) + for (int i = start; i < stop; i++) cnt += getInstrPtr(mb, i)->argc; return cnt; } @@ -267,10 +248,8 @@ DFLOWworker(void *T) #ifdef _MSC_VER srand((unsigned int) GDKusec()); #endif - assert(t->errbuf != NULL); - t->errbuf[0] = 0; GDKsetbuf(t->errbuf); /* where to leave errors */ - t->errbuf = NULL; + snprintf(t->s.name, sizeof(t->s.name), "DFLOWsema%04zu", MT_getpid()); for (;;) { DataFlow flow; @@ -437,24 +416,32 @@ DFLOWworker(void *T) break; } if (free_count >= free_max) { + struct worker **tp = &workers; + while (*tp && *tp != t) + tp = &(*tp)->next; + assert(*tp && *tp == t); + *tp = t->next; t->flag = EXITED; t->next = exited_workers; - exited_workers = t->self; + exited_workers = t; MT_lock_unset(&dataflowLock); break; } free_count++; + struct worker **tp = &workers; + while (*tp && *tp != t) + tp = &(*tp)->next; + assert(*tp && *tp == t); + *tp = t->next; t->flag = FREE; - assert(free_workers != t->self); t->next = free_workers; - free_workers = t->self; + free_workers = t; MT_lock_unset(&dataflowLock); MT_sema_down(&t->s); if (GDKexiting() || ATOMIC_GET(&exiting)) break; assert(t->flag == WAITING); } - GDKfree(GDKerrbuf); GDKsetbuf(NULL); } @@ -468,9 +455,8 @@ DFLOWworker(void *T) static int DFLOWinitialize(void) { - int i, limit; + int limit; int created = 0; - static bool first = true; MT_lock_set(&mal_contextLock); MT_lock_set(&dataflowLock); @@ -488,44 +474,27 @@ DFLOWinitialize(void) MT_lock_unset(&mal_contextLock); return -1; } - assert(idle_workers == -1); - for (i = 0; i < THREADS; i++) { - char name[MT_NAME_LEN]; - snprintf(name, sizeof(name), "DFLOWsema%d", i); - MT_sema_init(&workers[i].s, 0, name); - workers[i].flag = IDLE; - workers[i].self = i; - workers[i].id = 0; - workers[i].next = idle_workers; - idle_workers = i; - if (first) /* only initialize once */ - ATOMIC_PTR_INIT(&workers[i].cntxt, NULL); - } - first = false; limit = GDKnr_threads ? GDKnr_threads - 1 : 0; - if (limit > THREADS) - limit = THREADS; while (limit > 0) { limit--; - i = idle_workers; - workers[i].errbuf = GDKmalloc(GDKMAXERRLEN); - if (workers[i].errbuf == NULL) { - TRC_CRITICAL(MAL_SERVER, "cannot allocate error buffer for worker"); + struct worker *t = GDKmalloc(sizeof(*t)); + if (t == NULL) { + TRC_CRITICAL(MAL_SERVER, "cannot allocate structure for worker"); continue; } - idle_workers = workers[i].next; - workers[i].flag = RUNNING; - ATOMIC_PTR_SET(&workers[i].cntxt, NULL); - char name[MT_NAME_LEN]; - snprintf(name, sizeof(name), "DFLOWworker%d", i); - if (MT_create_thread(&workers[i].id, DFLOWworker, (void *)&workers[i], - MT_THR_JOINABLE, name) < 0) { - GDKfree(workers[i].errbuf); - workers[i].errbuf = NULL; - workers[i].flag = IDLE; - workers[i].next = idle_workers; - idle_workers = i; + *t = (struct worker) { + .flag = RUNNING, + }; + ATOMIC_PTR_INIT(&t->cntxt, NULL); + MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */ + if (MT_create_thread(&t->id, DFLOWworker, t, + MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) { + ATOMIC_PTR_DESTROY(&t->cntxt); + MT_sema_destroy(&t->s); + GDKfree(t); } else { + t->next = workers; + workers = t; created++; } } @@ -773,6 +742,19 @@ DFLOWscheduler(DataFlow flow, struct wor return ret; } +/* called and returns with dataflowLock locked, temporarily unlocks + * join the thread associated with the worker and destroy the structure */ +static inline void +finish_worker(struct worker *t) +{ + MT_lock_unset(&dataflowLock); + MT_join_thread(t->id); + MT_sema_destroy(&t->s); + ATOMIC_PTR_DESTROY(&t->cntxt); + GDKfree(t); + MT_lock_set(&dataflowLock); +} + /* We create a pool of GDKnr_threads-1 generic workers, that is, * workers that will take on jobs from any clients. In addition, we * create a single specific worker per client (i.e. each time we enter @@ -795,7 +777,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m str msg = MAL_SUCCEED; int size; bit *ret; - int i; + struct worker *t; if (stk == NULL) throw(MAL, "dataflow", "runMALdataflow(): Called with stk == NULL"); @@ -819,81 +801,51 @@ runMALdataflow(Client cntxt, MalBlkPtr m * until all work is done */ MT_lock_set(&dataflowLock); /* join with already exited threads */ - while ((i = exited_workers) >= 0) { - assert(workers[i].flag == EXITED); - exited_workers = workers[i].next; - workers[i].flag = IDLE; - MT_lock_unset(&dataflowLock); - MT_join_thread(workers[i].id); - MT_lock_set(&dataflowLock); - workers[i].id = 0; - workers[i].next = idle_workers; - idle_workers = i; + while (exited_workers != NULL) { + assert(exited_workers->flag == EXITED); + struct worker *t = exited_workers; + exited_workers = exited_workers->next; + finish_worker(t); } assert(cntxt != NULL); - if ((i = free_workers) >= 0) { - assert(workers[i].flag == FREE); + if (free_workers != NULL) { + t = free_workers; + assert(t->flag == FREE); assert(free_count > 0); free_count--; - free_workers = workers[i].next; - workers[i].flag = WAITING; - ATOMIC_PTR_SET(&workers[i].cntxt, cntxt); - if (stk->calldepth > 1) { - MT_Id pid = MT_getpid(); - - /* doing a recursive call: copy specificity from - * current worker to new worker */ - for (int j = 0; j < THREADS; j++) { - if (workers[j].id == pid && workers[j].flag == RUNNING) { - ATOMIC_PTR_SET(&workers[i].cntxt, - ATOMIC_PTR_GET(&workers[j].cntxt)); - break; - } + free_workers = t->next; + t->next = workers; + workers = t; + t->flag = WAITING; + ATOMIC_PTR_SET(&t->cntxt, cntxt); + MT_sema_up(&t->s); + } else { + t = GDKmalloc(sizeof(*t)); + if (t != NULL) { + *t = (struct worker) { + .flag = WAITING, + }; + ATOMIC_PTR_INIT(&t->cntxt, cntxt); + MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */ + if (MT_create_thread(&t->id, DFLOWworker, t, + MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) { + ATOMIC_PTR_DESTROY(&t->cntxt); + MT_sema_destroy(&t->s); + GDKfree(t); + t = NULL; + } else { + t->next = workers; + workers = t; } } - MT_sema_up(&workers[i].s); - } else if ((i = idle_workers) >= 0) { - assert(workers[i].flag == IDLE); - ATOMIC_PTR_SET(&workers[i].cntxt, cntxt); - /* only create specific worker if we are not doing a - * recursive call */ - if (stk->calldepth > 1) { - MT_Id pid = MT_getpid(); - - /* doing a recursive call: copy specificity from - * current worker to new worker */ - for (int j = 0; j < THREADS; j++) { - if (workers[j].flag == RUNNING && workers[j].id == pid) { - ATOMIC_PTR_SET(&workers[i].cntxt, - ATOMIC_PTR_GET(&workers[j].cntxt)); - break; _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org