Changeset: eb0a130ae7a2 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/eb0a130ae7a2
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Reworked allocation code in the dataflow.
diffs (truncated from 401 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -80,20 +80,19 @@ typedef struct DATAFLOW {
bool set_qry_ctx;
} *DataFlow, DataFlowRec;
-static struct worker {
+struct worker {
MT_Id id;
- enum { IDLE, WAITING, RUNNING, FREE, EXITED } flag;
+ enum { WAITING, RUNNING, FREE, EXITED } flag;
ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any)
*/
- char *errbuf; /* GDKerrbuf so that we can
allocate before fork */
MT_Sema s;
- int self;
- int next;
-} workers[THREADS];
-/* heads of two mutually exclusive linked lists, both using the .next
+ struct worker *next;
+ char errbuf[GDKMAXERRLEN]; /* GDKerrbuf so that we can allocate
before fork */
+};
+/* heads of three mutually exclusive linked lists, all using the .next
* field in the worker struct */
-static int exited_workers = -1; /* to be joined threads */
-static int idle_workers = -1; /* idle workers (no thread associated) */
-static int free_workers = -1; /* free workers (thread doing nothing) */
+static struct worker *workers; /* "working" workers */
+static struct worker *exited_workers; /* to be joined threads (.flag==EXITED)
*/
+static struct worker *free_workers; /* free workers (.flag==FREE) */
static int free_count = 0; /* number of free threads */
static int free_max = 0; /* max number of spare free threads */
@@ -101,23 +100,6 @@ static Queue *todo = 0; /* pending ins
static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock);
-static void stopMALdataflow(void);
-
-void
-mal_dataflow_reset(void)
-{
- stopMALdataflow();
- memset((char *) workers, 0, sizeof(workers));
- idle_workers = -1;
- exited_workers = -1;
- if (todo) {
- MT_lock_destroy(&todo->l);
- MT_sema_destroy(&todo->s);
- GDKfree(todo);
- }
- todo = 0; /* pending instructions
*/
- ATOMIC_SET(&exiting, 0);
-}
/*
* Calculate the size of the dataflow dependency graph.
@@ -126,9 +108,8 @@ static int
DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
{
int cnt = 0;
- int i;
- for (i = start; i < stop; i++)
+ for (int i = start; i < stop; i++)
cnt += getInstrPtr(mb, i)->argc;
return cnt;
}
@@ -267,10 +248,8 @@ DFLOWworker(void *T)
#ifdef _MSC_VER
srand((unsigned int) GDKusec());
#endif
- assert(t->errbuf != NULL);
- t->errbuf[0] = 0;
GDKsetbuf(t->errbuf); /* where to leave errors */
- t->errbuf = NULL;
+ snprintf(t->s.name, sizeof(t->s.name), "DFLOWsema%04zu", MT_getpid());
for (;;) {
DataFlow flow;
@@ -437,24 +416,32 @@ DFLOWworker(void *T)
break;
}
if (free_count >= free_max) {
+ struct worker **tp = &workers;
+ while (*tp && *tp != t)
+ tp = &(*tp)->next;
+ assert(*tp && *tp == t);
+ *tp = t->next;
t->flag = EXITED;
t->next = exited_workers;
- exited_workers = t->self;
+ exited_workers = t;
MT_lock_unset(&dataflowLock);
break;
}
free_count++;
+ struct worker **tp = &workers;
+ while (*tp && *tp != t)
+ tp = &(*tp)->next;
+ assert(*tp && *tp == t);
+ *tp = t->next;
t->flag = FREE;
- assert(free_workers != t->self);
t->next = free_workers;
- free_workers = t->self;
+ free_workers = t;
MT_lock_unset(&dataflowLock);
MT_sema_down(&t->s);
if (GDKexiting() || ATOMIC_GET(&exiting))
break;
assert(t->flag == WAITING);
}
- GDKfree(GDKerrbuf);
GDKsetbuf(NULL);
}
@@ -468,9 +455,8 @@ DFLOWworker(void *T)
static int
DFLOWinitialize(void)
{
- int i, limit;
+ int limit;
int created = 0;
- static bool first = true;
MT_lock_set(&mal_contextLock);
MT_lock_set(&dataflowLock);
@@ -488,44 +474,27 @@ DFLOWinitialize(void)
MT_lock_unset(&mal_contextLock);
return -1;
}
- assert(idle_workers == -1);
- for (i = 0; i < THREADS; i++) {
- char name[MT_NAME_LEN];
- snprintf(name, sizeof(name), "DFLOWsema%d", i);
- MT_sema_init(&workers[i].s, 0, name);
- workers[i].flag = IDLE;
- workers[i].self = i;
- workers[i].id = 0;
- workers[i].next = idle_workers;
- idle_workers = i;
- if (first) /* only initialize once
*/
- ATOMIC_PTR_INIT(&workers[i].cntxt, NULL);
- }
- first = false;
limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
- if (limit > THREADS)
- limit = THREADS;
while (limit > 0) {
limit--;
- i = idle_workers;
- workers[i].errbuf = GDKmalloc(GDKMAXERRLEN);
- if (workers[i].errbuf == NULL) {
- TRC_CRITICAL(MAL_SERVER, "cannot allocate error buffer
for worker");
+ struct worker *t = GDKmalloc(sizeof(*t));
+ if (t == NULL) {
+ TRC_CRITICAL(MAL_SERVER, "cannot allocate structure for
worker");
continue;
}
- idle_workers = workers[i].next;
- workers[i].flag = RUNNING;
- ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
- char name[MT_NAME_LEN];
- snprintf(name, sizeof(name), "DFLOWworker%d", i);
- if (MT_create_thread(&workers[i].id, DFLOWworker, (void
*)&workers[i],
- MT_THR_JOINABLE, name)
< 0) {
- GDKfree(workers[i].errbuf);
- workers[i].errbuf = NULL;
- workers[i].flag = IDLE;
- workers[i].next = idle_workers;
- idle_workers = i;
+ *t = (struct worker) {
+ .flag = RUNNING,
+ };
+ ATOMIC_PTR_INIT(&t->cntxt, NULL);
+ MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
+ if (MT_create_thread(&t->id, DFLOWworker, t,
+ MT_THR_JOINABLE,
"DFLOWworkerXXXX") < 0) {
+ ATOMIC_PTR_DESTROY(&t->cntxt);
+ MT_sema_destroy(&t->s);
+ GDKfree(t);
} else {
+ t->next = workers;
+ workers = t;
created++;
}
}
@@ -773,6 +742,19 @@ DFLOWscheduler(DataFlow flow, struct wor
return ret;
}
+/* called and returns with dataflowLock locked, temporarily unlocks
+ * join the thread associated with the worker and destroy the structure */
+static inline void
+finish_worker(struct worker *t)
+{
+ MT_lock_unset(&dataflowLock);
+ MT_join_thread(t->id);
+ MT_sema_destroy(&t->s);
+ ATOMIC_PTR_DESTROY(&t->cntxt);
+ GDKfree(t);
+ MT_lock_set(&dataflowLock);
+}
+
/* We create a pool of GDKnr_threads-1 generic workers, that is,
* workers that will take on jobs from any clients. In addition, we
* create a single specific worker per client (i.e. each time we enter
@@ -795,7 +777,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
str msg = MAL_SUCCEED;
int size;
bit *ret;
- int i;
+ struct worker *t;
if (stk == NULL)
throw(MAL, "dataflow", "runMALdataflow(): Called with stk ==
NULL");
@@ -819,81 +801,51 @@ runMALdataflow(Client cntxt, MalBlkPtr m
* until all work is done */
MT_lock_set(&dataflowLock);
/* join with already exited threads */
- while ((i = exited_workers) >= 0) {
- assert(workers[i].flag == EXITED);
- exited_workers = workers[i].next;
- workers[i].flag = IDLE;
- MT_lock_unset(&dataflowLock);
- MT_join_thread(workers[i].id);
- MT_lock_set(&dataflowLock);
- workers[i].id = 0;
- workers[i].next = idle_workers;
- idle_workers = i;
+ while (exited_workers != NULL) {
+ assert(exited_workers->flag == EXITED);
+ struct worker *t = exited_workers;
+ exited_workers = exited_workers->next;
+ finish_worker(t);
}
assert(cntxt != NULL);
- if ((i = free_workers) >= 0) {
- assert(workers[i].flag == FREE);
+ if (free_workers != NULL) {
+ t = free_workers;
+ assert(t->flag == FREE);
assert(free_count > 0);
free_count--;
- free_workers = workers[i].next;
- workers[i].flag = WAITING;
- ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
- if (stk->calldepth > 1) {
- MT_Id pid = MT_getpid();
-
- /* doing a recursive call: copy specificity from
- * current worker to new worker */
- for (int j = 0; j < THREADS; j++) {
- if (workers[j].id == pid && workers[j].flag ==
RUNNING) {
- ATOMIC_PTR_SET(&workers[i].cntxt,
-
ATOMIC_PTR_GET(&workers[j].cntxt));
- break;
- }
+ free_workers = t->next;
+ t->next = workers;
+ workers = t;
+ t->flag = WAITING;
+ ATOMIC_PTR_SET(&t->cntxt, cntxt);
+ MT_sema_up(&t->s);
+ } else {
+ t = GDKmalloc(sizeof(*t));
+ if (t != NULL) {
+ *t = (struct worker) {
+ .flag = WAITING,
+ };
+ ATOMIC_PTR_INIT(&t->cntxt, cntxt);
+ MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder
name */
+ if (MT_create_thread(&t->id, DFLOWworker, t,
+
MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
+ ATOMIC_PTR_DESTROY(&t->cntxt);
+ MT_sema_destroy(&t->s);
+ GDKfree(t);
+ t = NULL;
+ } else {
+ t->next = workers;
+ workers = t;
}
}
- MT_sema_up(&workers[i].s);
- } else if ((i = idle_workers) >= 0) {
- assert(workers[i].flag == IDLE);
- ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
- /* only create specific worker if we are not doing a
- * recursive call */
- if (stk->calldepth > 1) {
- MT_Id pid = MT_getpid();
-
- /* doing a recursive call: copy specificity from
- * current worker to new worker */
- for (int j = 0; j < THREADS; j++) {
- if (workers[j].flag == RUNNING && workers[j].id
== pid) {
- ATOMIC_PTR_SET(&workers[i].cntxt,
-
ATOMIC_PTR_GET(&workers[j].cntxt));
- break;
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]