Changeset: f185e3fa6c50 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=f185e3fa6c50
Modified Files:
monetdb5/mal/mal.c
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Merge with Feb2013 branch.
diffs (123 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -64,6 +64,7 @@ typedef struct queue {
FlowEvent *data;
MT_Lock l; /* it's a shared resource, ie we need locks */
MT_Sema s; /* threads wait on empty queues */
+ MT_Sema e; /* synchronize exiting of thread */
} Queue;
/*
@@ -130,6 +131,7 @@ q_create(int sz, const char *name)
(void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */
MT_lock_init(&q->l, name);
MT_sema_init(&q->s, 0, name);
+ MT_sema_init(&q->e, 0, name);
return q;
}
@@ -218,6 +220,7 @@ q_dequeue(Queue *q)
if (q->exitcount > 0) {
q->exitcount--;
MT_lock_unset(&q->l, "q_dequeue");
+ MT_sema_up(&q->e, "q_dequeue");
return NULL;
}
assert(q->last > 0);
@@ -379,24 +382,40 @@ DFLOWworker(void *T)
* typically is equal to the number of cores
* The workers are assembled in a local table to enable debugging.
*/
-static void
+static int
DFLOWinitialize(void)
{
int i, limit;
+ int created = 0;
MT_lock_set(&mal_contextLock, "DFLOWinitialize");
if (todo) {
+ /* somebody else beat us to it */
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- return;
+ return 0;
}
todo = q_create(2048, "DFLOWinitialize");
+ if (todo == NULL) {
+ MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+ return -1;
+ }
limit = GDKnr_threads ? GDKnr_threads : 1;
for (i = 0; i < limit; i++) {
workers[i].flag = RUNNING;
if (MT_create_thread(&workers[i].id, DFLOWworker, (void *)
&workers[i], MT_THR_JOINABLE) < 0)
workers[i].flag = IDLE;
+ else
+ created++;
+ }
+ if (created == 0) {
+ /* no threads created */
+ q_destroy(todo);
+ todo = NULL;
+ MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+ return -1;
}
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+ return 0;
}
/*
@@ -632,6 +651,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
if (stk == NULL)
throw(MAL, "dataflow", "runMALdataflow(): Called with stk ==
NULL");
ret = (int*) getArgReference(stk,getInstrPtr(mb,startpc),0);
+ *ret = FALSE;
if (stk->cmd) {
*ret = TRUE;
return MAL_SUCCEED;
@@ -641,7 +661,12 @@ runMALdataflow(Client cntxt, MalBlkPtr m
/* check existence of workers */
if (todo == NULL) {
- DFLOWinitialize(); /* create the whole pool */
+ /* create thread pool */
+ if (DFLOWinitialize() < 0) {
+ /* no threads created, run serially */
+ *ret = TRUE;
+ return MAL_SUCCEED;
+ }
i = THREADS; /* we didn't create an extra
thread */
} else {
/* create one more worker to compensate for our waiting until
@@ -650,11 +675,21 @@ runMALdataflow(Client cntxt, MalBlkPtr m
for (i = 0; i < THREADS; i++) {
if (workers[i].flag == IDLE) {
workers[i].flag = RUNNING;
- MT_create_thread(&workers[i].id, DFLOWworker,
(void *) &workers[i], MT_THR_JOINABLE);
+ if (MT_create_thread(&workers[i].id,
DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) {
+ /* cannot start new thread, run
serially */
+ *ret = TRUE;
+ MT_lock_unset(&mal_contextLock,
"runMALdataflow");
+ return MAL_SUCCEED;
+ }
break;
}
}
MT_lock_unset(&mal_contextLock, "runMALdataflow");
+ if (i == THREADS) {
+ /* no empty threads slots found, run serially */
+ *ret = TRUE;
+ return MAL_SUCCEED;
+ }
}
assert(todo);
@@ -723,6 +758,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
todo->exitcount++;
MT_lock_unset(&todo->l, "runMALdataflow");
MT_sema_up(&todo->s, "runMALdataflow");
+ MT_sema_down(&todo->e, "runMALdataflow");
MT_lock_set(&mal_contextLock, "runMALdataflow");
for (i = 0; i < THREADS; i++) {
if (workers[i].flag == EXITED) {
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list