Changeset: 21892a4f04a1 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=21892a4f04a1
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: Feb2013
Log Message:
Fix for bug 3258.
We now maintain a pool of N-1 generic worker threads which is extended
by one client-specific worker thread for each client that enters the
dataflow code.
diffs (truncated from 338 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -88,6 +88,8 @@ typedef struct DATAFLOW {
static struct worker {
MT_Id id;
enum {IDLE, RUNNING, EXITED} flag;
+ Client cntxt; /* client we do work for (NULL
-> any) */
+ MT_Sema s;
} workers[THREADS];
static Queue *todo = 0; /* pending instructions */
static int volatile exiting = 0;
@@ -207,16 +209,33 @@ q_requeue(Queue *q, FlowEvent d)
}
#endif
-static void *
-q_dequeue(Queue *q)
+static FlowEvent
+q_dequeue(Queue *q, Client cntxt)
{
- void *r = NULL;
+ FlowEvent r = NULL;
assert(q);
MT_sema_down(&q->s, "q_dequeue");
if (exiting)
return NULL;
MT_lock_set(&q->l, "q_dequeue");
+ if (cntxt) {
+ int i;
+
+ for (i = q->last - 1; i >= 0; i--) {
+ if (q->data[i]->flow->cntxt == cntxt) {
+ r = q->data[i];
+ q->last--;
+ while (i < q->last) {
+ q->data[i] = q->data[i + 1];
+ i++;
+ }
+ break;
+ }
+ }
+ MT_lock_unset(&q->l, "q_dequeue");
+ return r;
+ }
if (q->exitcount > 0) {
q->exitcount--;
MT_lock_unset(&q->l, "q_dequeue");
@@ -228,7 +247,7 @@ q_dequeue(Queue *q)
assert(q->last > 0);
if (q->last > 0) {
/* LIFO favors garbage collection */
- r = (void*) q->data[--q->last];
+ r = q->data[--q->last];
q->data[q->last] = 0;
}
/* else: terminating */
@@ -281,10 +300,28 @@ DFLOWworker(void *T)
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
GDKerrbuf[0] = 0;
+ if (t->cntxt) {
+ /* wait until we are allowed to start working */
+ MT_sema_down(&t->s, "DFLOWworker");
+ }
while (1) {
if (fnxt == 0) {
- if ((fe = q_dequeue(todo)) == NULL)
- break;;
+ Client cntxt = t->cntxt;
+ fe = q_dequeue(todo, cntxt);
+ if (fe == NULL) {
+ if (cntxt) {
+ /* we're not done yet with work for the
current
+ * client (as far as we know), so give
up the CPU
+ * and let the scheduler enter some
more work, but
+ * first compensate for the down we did
in
+ * dequeue */
+ MT_sema_up(&todo->s, "DFLOWworker");
+ MT_sleep_ms(1);
+ continue;
+ }
+ /* no more work to be done: exit */
+ break;
+ }
} else
fe = fnxt;
if (exiting) {
@@ -322,11 +359,11 @@ DFLOWworker(void *T)
fe->state = DFLOWwrapup;
if (error) {
- MT_lock_set(&flow->flowlock, "runMALdataflow");
+ MT_lock_set(&flow->flowlock, "DFLOWworker");
/* only collect one error (from one thread,
needed for stable testing) */
if (!flow->error)
flow->error = error;
- MT_lock_unset(&flow->flowlock,
"runMALdataflow");
+ MT_lock_unset(&flow->flowlock, "DFLOWworker");
/* after an error we skip the rest of the block
*/
q_enqueue(flow->done, fe);
continue;
@@ -344,7 +381,7 @@ DFLOWworker(void *T)
assert(getInstrPtr(flow->mb, fe->pc));
fe->hotclaim = 0;
#endif
- MT_lock_set(&flow->flowlock, "MALworker");
+ MT_lock_set(&flow->flowlock, "DFLOWworker");
for (last = fe->pc - flow->start; last >= 0 && (i =
flow->nodes[last]) > 0; last = flow->edges[last])
if (flow->status[i].state == DFLOWpending &&
@@ -356,7 +393,7 @@ DFLOWworker(void *T)
fnxt = flow->status + i;
break;
}
- MT_lock_unset(&flow->flowlock, "MALworker");
+ MT_lock_unset(&flow->flowlock, "DFLOWworker");
q_enqueue(flow->done, fe);
if ( fnxt == 0) {
@@ -389,14 +426,17 @@ DFLOWinitialize(void)
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
return 0;
}
- todo = q_create(2048, "DFLOWinitialize");
+ todo = q_create(2048, "todo");
if (todo == NULL) {
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
return -1;
}
- limit = GDKnr_threads ? GDKnr_threads : 1;
+ for (i = 0; i < THREADS; i++)
+ MT_sema_init(&workers[i].s, 0, "DFLOWinitialize");
+ limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
for (i = 0; i < limit; i++) {
workers[i].flag = RUNNING;
+ workers[i].cntxt = NULL;
if (MT_create_thread(&workers[i].id, DFLOWworker, (void *)
&workers[i], MT_THR_JOINABLE) < 0)
workers[i].flag = IDLE;
else
@@ -550,7 +590,7 @@ static void showFlowEvent(DataFlow flow,
*/
static str
-DFLOWscheduler(DataFlow flow)
+DFLOWscheduler(DataFlow flow, struct worker *w)
{
int last;
int i;
@@ -569,13 +609,13 @@ DFLOWscheduler(DataFlow flow)
/* initialize the eligible statements */
fe = flow->status;
- MT_lock_set(&flow->flowlock, "MALworker");
+ MT_lock_set(&flow->flowlock, "DFLOWscheduler");
for (i = 0; i < actions; i++)
if (fe[i].blocks == 0) {
#ifdef USE_MAL_ADMISSION
p = getInstrPtr(flow->mb,fe[i].pc);
if (p == NULL) {
- MT_lock_unset(&flow->flowlock, "MALworker");
+ MT_lock_unset(&flow->flowlock,
"DFLOWscheduler");
throw(MAL, "dataflow", "DFLOWscheduler():
getInstrPtr(flow->mb,fe[i].pc) returned NULL");
}
#endif
@@ -583,12 +623,13 @@ DFLOWscheduler(DataFlow flow)
flow->status[i].state = DFLOWrunning;
PARDEBUG fprintf(stderr, "#enqueue pc=%d claim=" LLFMT
"\n", flow->status[i].pc, flow->status[i].argclaim);
}
- MT_lock_unset(&flow->flowlock, "MALworker");
+ MT_lock_unset(&flow->flowlock, "DFLOWscheduler");
+ MT_sema_up(&w->s, "DFLOWscheduler");
PARDEBUG fprintf(stderr, "#run %d instructions in dataflow block\n",
actions);
while (actions != tasks ) {
- f = q_dequeue(flow->done);
+ f = q_dequeue(flow->done, NULL);
if (exiting)
break;
if (f == NULL)
@@ -600,7 +641,7 @@ DFLOWscheduler(DataFlow flow)
* drops to zero we can scheduler it we do it here instead of
the scheduler
*/
- MT_lock_set(&flow->flowlock, "MALworker");
+ MT_lock_set(&flow->flowlock, "DFLOWscheduler");
tasks++;
for (last = f->pc - flow->start; last >= 0 && (i =
flow->nodes[last]) > 0; last = flow->edges[last])
if (flow->status[i].state == DFLOWpending) {
@@ -614,8 +655,11 @@ DFLOWscheduler(DataFlow flow)
flow->status[i].blocks--;
}
}
- MT_lock_unset(&flow->flowlock, "MALworker");
+ MT_lock_unset(&flow->flowlock, "DFLOWscheduler");
}
+ /* release the worker from its specific task (turn it into a
+ * generic worker) */
+ w->cntxt = NULL;
/* wrap up errors */
assert(flow->done->last == 0);
if (flow->error ) {
@@ -625,6 +669,20 @@ DFLOWscheduler(DataFlow flow)
return ret;
}
+/* We create a pool of GDKnr_threads-1 generic workers, that is,
+ * workers that will take on jobs from any clients. In addition, we
+ * create a single specific worker per client (i.e. each time we enter
+ * here). This specific worker will only do work for the client for
+ * which it was started. In this way we can guarantee that there will
+ * always be progress for the client, even if all other workers are
+ * doing something big.
+ *
+ * When all jobs for a client have been done (there are no more
+ * entries for the client in the queue), the specific worker turns
+ * itself into a generic worker. At the same time, we signal that one
+ * generic worker should exit and this function returns. In this way
+ * we make sure that there are once again GDKnr_threads-1 generic
+ * workers. */
str
runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr
stk)
{
@@ -654,43 +712,64 @@ runMALdataflow(Client cntxt, MalBlkPtr m
/* check existence of workers */
if (todo == NULL) {
/* create thread pool */
- if (DFLOWinitialize() < 0) {
+ if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
/* no threads created, run serially */
*ret = TRUE;
return MAL_SUCCEED;
}
i = THREADS; /* we didn't create an extra
thread */
- } else {
- /* create one more worker to compensate for our waiting until
- * all work is done */
- MT_lock_set(&mal_contextLock, "runMALdataflow");
- for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
- if (workers[i].flag == EXITED) {
- todo->exitedcount--;
- workers[i].flag = IDLE;
- MT_join_thread(workers[i].id);
- }
- }
- for (i = 0; i < THREADS; i++) {
- if (workers[i].flag == IDLE) {
- workers[i].flag = RUNNING;
- if (MT_create_thread(&workers[i].id,
DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) {
- /* cannot start new thread, run
serially */
- *ret = TRUE;
- MT_lock_unset(&mal_contextLock,
"runMALdataflow");
- return MAL_SUCCEED;
- }
- break;
- }
- }
- MT_lock_unset(&mal_contextLock, "runMALdataflow");
- if (i == THREADS) {
- /* no empty threads slots found, run serially */
- *ret = TRUE;
- return MAL_SUCCEED;
+ }
+ assert(todo);
+ /* in addition, create one more worker that will only execute
+ * tasks for the current client to compensate for our waiting
+ * until all work is done */
+ MT_lock_set(&mal_contextLock, "runMALdataflow");
+ /* join with already exited threads */
+ for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
+ if (workers[i].flag == EXITED) {
+ todo->exitedcount--;
+ workers[i].flag = IDLE;
+ workers[i].cntxt = NULL;
+ MT_join_thread(workers[i].id);
}
}
- assert(todo);
+ for (i = 0; i < THREADS; i++) {
+ if (workers[i].flag == IDLE) {
+ /* only create specific worker if we are not doing a
+ * recursive call */
+ if (stk->calldepth > 1) {
+ int j;
+ MT_Id pid = MT_getpid();
+
+ /* doing a recursive call: copy specificity from
+ * current worker to new worker */
+ workers[i].cntxt = NULL;
+ for (j = 0; j < THREADS; j++) {
+ if (workers[j].flag == RUNNING &&
workers[j].id == pid) {
+ workers[i].cntxt =
workers[j].cntxt;
+ break;
+ }
+ }
+ } else {
+ /* not doing a recursive call: create specific
worker */
+ workers[i].cntxt = cntxt;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list