Changeset: 21892a4f04a1 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=21892a4f04a1
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: Feb2013
Log Message:

Fix for bug 3258.
We now maintain a pool of N-1 generic worker threads which is extended
by one client-specific worker thread for each client that enters the
dataflow code.


diffs (truncated from 338 to 300 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -88,6 +88,8 @@ typedef struct DATAFLOW {
 static struct worker {
        MT_Id id;
        enum {IDLE, RUNNING, EXITED} flag;
+       Client cntxt;                           /* client we do work for (NULL 
-> any) */
+       MT_Sema s;
 } workers[THREADS];
 static Queue *todo = 0;        /* pending instructions */
 static int volatile exiting = 0;
@@ -207,16 +209,33 @@ q_requeue(Queue *q, FlowEvent d)
 }
 #endif
 
-static void *
-q_dequeue(Queue *q)
+static FlowEvent
+q_dequeue(Queue *q, Client cntxt)
 {
-       void *r = NULL;
+       FlowEvent r = NULL;
 
        assert(q);
        MT_sema_down(&q->s, "q_dequeue");
        if (exiting)
                return NULL;
        MT_lock_set(&q->l, "q_dequeue");
+       if (cntxt) {
+               int i;
+
+               for (i = q->last - 1; i >= 0; i--) {
+                       if (q->data[i]->flow->cntxt == cntxt) {
+                               r = q->data[i];
+                               q->last--;
+                               while (i < q->last) {
+                                       q->data[i] = q->data[i + 1];
+                                       i++;
+                               }
+                               break;
+                       }
+               }
+               MT_lock_unset(&q->l, "q_dequeue");
+               return r;
+       }
        if (q->exitcount > 0) {
                q->exitcount--;
                MT_lock_unset(&q->l, "q_dequeue");
@@ -228,7 +247,7 @@ q_dequeue(Queue *q)
        assert(q->last > 0);
        if (q->last > 0) {
                /* LIFO favors garbage collection */
-               r = (void*) q->data[--q->last];
+               r = q->data[--q->last];
                q->data[q->last] = 0;
        }
        /* else: terminating */
@@ -281,10 +300,28 @@ DFLOWworker(void *T)
 
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
+       if (t->cntxt) {
+               /* wait until we are allowed to start working */
+               MT_sema_down(&t->s, "DFLOWworker");
+       }
        while (1) {
                if (fnxt == 0) {
-                       if ((fe = q_dequeue(todo)) == NULL)
-                               break;;
+                       Client cntxt = t->cntxt;
+                       fe = q_dequeue(todo, cntxt);
+                       if (fe == NULL) {
+                               if (cntxt) {
+                                       /* we're not done yet with work for the 
current
+                                        * client (as far as we know), so give 
up the CPU
+                                        * and let the scheduler enter some 
more work, but
+                                        * first compensate for the down we did 
in
+                                        * dequeue */
+                                       MT_sema_up(&todo->s, "DFLOWworker");
+                                       MT_sleep_ms(1);
+                                       continue;
+                               }
+                               /* no more work to be done: exit */
+                               break;
+                       }
                } else
                        fe = fnxt;
                if (exiting) {
@@ -322,11 +359,11 @@ DFLOWworker(void *T)
 
                        fe->state = DFLOWwrapup;
                        if (error) {
-                               MT_lock_set(&flow->flowlock, "runMALdataflow");
+                               MT_lock_set(&flow->flowlock, "DFLOWworker");
                                /* only collect one error (from one thread, 
needed for stable testing) */
                                if (!flow->error)
                                        flow->error = error;
-                               MT_lock_unset(&flow->flowlock, 
"runMALdataflow");
+                               MT_lock_unset(&flow->flowlock, "DFLOWworker");
                                /* after an error we skip the rest of the block 
*/
                                q_enqueue(flow->done, fe);
                                continue;
@@ -344,7 +381,7 @@ DFLOWworker(void *T)
                assert(getInstrPtr(flow->mb, fe->pc));
                fe->hotclaim = 0;
 #endif
-               MT_lock_set(&flow->flowlock, "MALworker");
+               MT_lock_set(&flow->flowlock, "DFLOWworker");
 
                for (last = fe->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
                        if (flow->status[i].state == DFLOWpending &&
@@ -356,7 +393,7 @@ DFLOWworker(void *T)
                                fnxt = flow->status + i;
                                break;
                        }
-               MT_lock_unset(&flow->flowlock, "MALworker");
+               MT_lock_unset(&flow->flowlock, "DFLOWworker");
 
                q_enqueue(flow->done, fe);
                if ( fnxt == 0) {
@@ -389,14 +426,17 @@ DFLOWinitialize(void)
                MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
                return 0;
        }
-       todo = q_create(2048, "DFLOWinitialize");
+       todo = q_create(2048, "todo");
        if (todo == NULL) {
                MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
                return -1;
        }
-       limit = GDKnr_threads ? GDKnr_threads : 1;
+       for (i = 0; i < THREADS; i++)
+               MT_sema_init(&workers[i].s, 0, "DFLOWinitialize");
+       limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
        for (i = 0; i < limit; i++) {
                workers[i].flag = RUNNING;
+               workers[i].cntxt = NULL;
                if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) 
&workers[i], MT_THR_JOINABLE) < 0)
                        workers[i].flag = IDLE;
                else
@@ -550,7 +590,7 @@ static void showFlowEvent(DataFlow flow,
 */
 
 static str
-DFLOWscheduler(DataFlow flow)
+DFLOWscheduler(DataFlow flow, struct worker *w)
 {
        int last;
        int i;
@@ -569,13 +609,13 @@ DFLOWscheduler(DataFlow flow)
        /* initialize the eligible statements */
        fe = flow->status;
 
-       MT_lock_set(&flow->flowlock, "MALworker");
+       MT_lock_set(&flow->flowlock, "DFLOWscheduler");
        for (i = 0; i < actions; i++)
                if (fe[i].blocks == 0) {
 #ifdef USE_MAL_ADMISSION
                        p = getInstrPtr(flow->mb,fe[i].pc);
                        if (p == NULL) {
-                               MT_lock_unset(&flow->flowlock, "MALworker");
+                               MT_lock_unset(&flow->flowlock, 
"DFLOWscheduler");
                                throw(MAL, "dataflow", "DFLOWscheduler(): 
getInstrPtr(flow->mb,fe[i].pc) returned NULL");
                        }
 #endif
@@ -583,12 +623,13 @@ DFLOWscheduler(DataFlow flow)
                        flow->status[i].state = DFLOWrunning;
                        PARDEBUG fprintf(stderr, "#enqueue pc=%d claim=" LLFMT 
"\n", flow->status[i].pc, flow->status[i].argclaim);
                }
-       MT_lock_unset(&flow->flowlock, "MALworker");
+       MT_lock_unset(&flow->flowlock, "DFLOWscheduler");
+       MT_sema_up(&w->s, "DFLOWscheduler");
 
        PARDEBUG fprintf(stderr, "#run %d instructions in dataflow block\n", 
actions);
 
        while (actions != tasks ) {
-               f = q_dequeue(flow->done);
+               f = q_dequeue(flow->done, NULL);
                if (exiting)
                        break;
                if (f == NULL)
@@ -600,7 +641,7 @@ DFLOWscheduler(DataFlow flow)
                 * drops to zero we can scheduler it we do it here instead of 
the scheduler
                 */
 
-               MT_lock_set(&flow->flowlock, "MALworker");
+               MT_lock_set(&flow->flowlock, "DFLOWscheduler");
                tasks++;
                for (last = f->pc - flow->start; last >= 0 && (i = 
flow->nodes[last]) > 0; last = flow->edges[last])
                        if (flow->status[i].state == DFLOWpending) {
@@ -614,8 +655,11 @@ DFLOWscheduler(DataFlow flow)
                                        flow->status[i].blocks--;
                                }
                        }
-               MT_lock_unset(&flow->flowlock, "MALworker");
+               MT_lock_unset(&flow->flowlock, "DFLOWscheduler");
        }
+       /* release the worker from its specific task (turn it into a
+        * generic worker) */
+       w->cntxt = NULL;
        /* wrap up errors */
        assert(flow->done->last == 0);
        if (flow->error ) {
@@ -625,6 +669,20 @@ DFLOWscheduler(DataFlow flow)
        return ret;
 }
 
+/* We create a pool of GDKnr_threads-1 generic workers, that is,
+ * workers that will take on jobs from any clients.  In addition, we
+ * create a single specific worker per client (i.e. each time we enter
+ * here).  This specific worker will only do work for the client for
+ * which it was started.  In this way we can guarantee that there will
+ * always be progress for the client, even if all other workers are
+ * doing something big.
+ *
+ * When all jobs for a client have been done (there are no more
+ * entries for the client in the queue), the specific worker turns
+ * itself into a generic worker.  At the same time, we signal that one
+ * generic worker should exit and this function returns.  In this way
+ * we make sure that there are once again GDKnr_threads-1 generic
+ * workers. */
 str
 runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr 
stk)
 {
@@ -654,43 +712,64 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        /* check existence of workers */
        if (todo == NULL) {
                /* create thread pool */
-               if (DFLOWinitialize() < 0) {
+               if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
                        /* no threads created, run serially */
                        *ret = TRUE;
                        return MAL_SUCCEED;
                }
                i = THREADS;                    /* we didn't create an extra 
thread */
-       } else {
-               /* create one more worker to compensate for our waiting until
-                * all work is done */
-               MT_lock_set(&mal_contextLock, "runMALdataflow");
-               for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
-                       if (workers[i].flag == EXITED) {
-                               todo->exitedcount--;
-                               workers[i].flag = IDLE;
-                               MT_join_thread(workers[i].id);
-                       }
-               }
-               for (i = 0; i < THREADS; i++) {
-                       if (workers[i].flag == IDLE) {
-                               workers[i].flag = RUNNING;
-                               if (MT_create_thread(&workers[i].id, 
DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) {
-                                       /* cannot start new thread, run 
serially */
-                                       *ret = TRUE;
-                                       MT_lock_unset(&mal_contextLock, 
"runMALdataflow");
-                                       return MAL_SUCCEED;
-                               }
-                               break;
-                       }
-               }
-               MT_lock_unset(&mal_contextLock, "runMALdataflow");
-               if (i == THREADS) {
-                       /* no empty threads slots found, run serially */
-                       *ret = TRUE;
-                       return MAL_SUCCEED;
+       }
+       assert(todo);
+       /* in addition, create one more worker that will only execute
+        * tasks for the current client to compensate for our waiting
+        * until all work is done */
+       MT_lock_set(&mal_contextLock, "runMALdataflow");
+       /* join with already exited threads */
+       for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
+               if (workers[i].flag == EXITED) {
+                       todo->exitedcount--;
+                       workers[i].flag = IDLE;
+                       workers[i].cntxt = NULL;
+                       MT_join_thread(workers[i].id);
                }
        }
-       assert(todo);
+       for (i = 0; i < THREADS; i++) {
+               if (workers[i].flag == IDLE) {
+                       /* only create specific worker if we are not doing a
+                        * recursive call */
+                       if (stk->calldepth > 1) {
+                               int j;
+                               MT_Id pid = MT_getpid();
+
+                               /* doing a recursive call: copy specificity from
+                                * current worker to new worker */
+                               workers[i].cntxt = NULL;
+                               for (j = 0; j < THREADS; j++) {
+                                       if (workers[j].flag == RUNNING && 
workers[j].id == pid) {
+                                               workers[i].cntxt = 
workers[j].cntxt;
+                                               break;
+                                       }
+                               }
+                       } else {
+                               /* not doing a recursive call: create specific 
worker */
+                               workers[i].cntxt = cntxt;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to