Changeset: 8a77f78a4fe4 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=8a77f78a4fe4
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: Feb2013
Log Message:

Revert creating dataflow pools per client and fix problem differently.

When a MAL function calls language.dataflow(), the thread executing
the call waits until the whole dataflow block is executed by the other
threads in the dataflow pool.  If this is done recursively, we go
through all available threads and all threads end up waiting for their
dataflow block to finish, which doesn't happen since there are no
worker threads available anymore.  The solution that was tried before
was to create N threads whenever language.dataflow() was called, and
those threads never exited.  This can very quickly cause very many
threads to be created (I have seen over 1300 threads on a system with
many cores).  The current solutions instead creates a single extra
thread whenever a thread is blocked waiting for the dataflow block to
be finished, and when the block is finished, it stops a single thread
(possibly a different one, but who cares: the result is the same).

This may also fix bug 3258 in a different way then the original fix.


diffs (284 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -60,6 +60,7 @@ typedef struct FLOWEVENT {
 typedef struct queue {
        int size;       /* size of queue */
        int last;       /* last element in the queue */
+       int exitcount;  /* how many threads should exit */
        FlowEvent *data;
        MT_Lock l;      /* it's a shared resource, ie we need locks */
        MT_Sema s;      /* threads wait on empty queues */
@@ -83,9 +84,11 @@ typedef struct DATAFLOW {
        Queue *done;        /* instructions handled */
 } *DataFlow, DataFlowRec;
 
-#define MAXQ 256
-static Queue *todos[MAXQ] = {0};       /* pending instructions organized by 
dataflow block */
-static bit occupied[MAXQ]={0};                 /* worker pool is in use? */
+static struct worker {
+       MT_Id id;
+       enum {IDLE, RUNNING, EXITED} flag;
+} workers[THREADS];
+static Queue *todo = 0;        /* pending instructions */
 
 /*
  * Calculate the size of the dataflow dependency graph.
@@ -121,6 +124,7 @@ q_create(int sz, const char *name)
                GDKfree(q);
                return NULL;
        }
+       q->exitcount = 0;
 
        (void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */
        MT_lock_init(&q->l, name);
@@ -208,6 +212,11 @@ q_dequeue(Queue *q)
        assert(q);
        MT_sema_down(&q->s, "q_dequeue");
        MT_lock_set(&q->l, "q_dequeue");
+       if (q->exitcount > 0) {
+               q->exitcount--;
+               MT_lock_unset(&q->l, "q_dequeue");
+               return NULL;
+       }
        assert(q->last > 0);
        if (q->last > 0) {
                /* LIFO favors garbage collection */
@@ -250,13 +259,14 @@ q_dequeue(Queue *q)
  */
 
 static void
-DFLOWworker(void *t)
+DFLOWworker(void *T)
 {
+       struct worker *t = (struct worker *) T;
        DataFlow flow;
        FlowEvent fe = 0, fnxt = 0;
+       int id = (int) (t - workers);
        Thread thr;
        str error = 0;
-       Queue *todo = *(Queue **) t;
        int i,last;
 
        thr = THRnew("DFLOWworker");
@@ -264,9 +274,10 @@ DFLOWworker(void *t)
        GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
        GDKerrbuf[0] = 0;
        while (1) {
-               if (fnxt == 0)
-                       fe = q_dequeue(todo);
-               else
+               if (fnxt == 0) {
+                       if ((fe = q_dequeue(todo)) == NULL)
+                               break;;
+               } else
                        fe = fnxt;
                fnxt = 0;
                assert(fe);
@@ -292,7 +303,7 @@ DFLOWworker(void *t)
 #endif
                        error = runMALsequence(flow->cntxt, flow->mb, fe->pc, 
fe->pc + 1, flow->stk, 0, 0);
                        PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk= 
%d claim= " LLFMT "," LLFMT " %s\n",
-                                                                 fe->pc, 
(int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : "");
+                                                        fe->pc, id, 
fe->argclaim, fe->hotclaim, error ? error : "");
 #ifdef USE_MAL_ADMISSION
                        /* release the memory claim */
                        MALadmission(-fe->argclaim, -fe->hotclaim);
@@ -345,57 +356,34 @@ DFLOWworker(void *t)
        GDKfree(GDKerrbuf);
        GDKsetbuf(0);
        THRdel(thr);
+       t->flag = EXITED;
 }
 
 /* 
  * Create an interpreter pool.
  * One worker will adaptively be available for each client.
  * The remainder are taken from the GDKnr_threads argument and
- * typically is equal to the number of cores.
- * A recursive MAL function call would make for one worker less,
- * which limits the number of cores for parallel processing.
+ * typically is equal to the number of cores
  * The workers are assembled in a local table to enable debugging.
- *
- * BEWARE, failure to create a new worker thread is not an error
- * but would lead to serial execution.
  */
-static int
+static void
 DFLOWinitialize(void)
 {
-       int i, threads, grp;
-       MT_Id worker;
+       int i, limit;
 
-       threads = GDKnr_threads ? GDKnr_threads : 1;
        MT_lock_set(&mal_contextLock, "DFLOWinitialize");
-       for(grp = 0; grp< MAXQ; grp++)
-               if ( occupied[grp] == FALSE){
-                       occupied[grp] = TRUE;
-                       break;
-               }
+       if (todo) {
+               MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+               return;
+       }
+       todo = q_create(2048, "DFLOWinitialize");
+       limit = GDKnr_threads ? GDKnr_threads : 1;
+       for (i = 0; i < limit; i++) {
+               workers[i].flag = RUNNING;
+               if (MT_create_thread(&workers[i].id, DFLOWworker, (void *) 
&workers[i], MT_THR_JOINABLE) < 0)
+                       workers[i].flag = IDLE;
+       }
        MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
-       if (grp >= MAXQ) {
-               // continue non-parallel
-               return -1;
-       }
-       if ( todos[grp] )
-               return grp;
-
-       todos[grp] = q_create(2048, "todo");
-       if (todos[grp] == NULL) 
-               return -1;
-
-       // associate a set of workers with the pool
-       for (i = 0; grp>= 0 && i < threads; i++){
-               if (MT_create_thread(&worker, DFLOWworker, (void *) 
&todos[grp], MT_THR_JOINABLE) < 0) {
-                       //Can not create interpreter thread
-                       grp = -1;
-               }
-               if (worker == 0) {
-                       //Failed to create interpreter thread
-                       grp = -1;
-               }
-       }
-       return grp;
 }
  
 /*
@@ -534,7 +522,7 @@ static void showFlowEvent(DataFlow flow,
 */
 
 static str
-DFLOWscheduler(DataFlow flow, Queue *todo)
+DFLOWscheduler(DataFlow flow)
 {
        int last;
        int i;
@@ -613,8 +601,9 @@ runMALdataflow(Client cntxt, MalBlkPtr m
 {
        DataFlow flow = NULL;
        str msg = MAL_SUCCEED;
-       int size, pool;
+       int size;
        int *ret;
+       int i;
 
 #ifdef DEBUG_FLOW
        mnstr_printf(GDKstdout, "runMALdataflow for block %d - %d\n", startpc, 
stoppc);
@@ -625,23 +614,31 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        if (stk == NULL)
                throw(MAL, "dataflow", "runMALdataflow(): Called with stk == 
NULL");
        ret = (int*) getArgReference(stk,getInstrPtr(mb,startpc),0);
-       if (stk->cmd){
-               *ret = TRUE;
-               return MAL_SUCCEED;
-       }
-       /* too many threads turns dataflow processing off */
-       if ( cntxt->idx > MAXQ){
+       if (stk->cmd) {
                *ret = TRUE;
                return MAL_SUCCEED;
        }
 
        assert(stoppc > startpc);
 
-       /* check existence of free worker group, resort to sequential upon 
failure */
-       if( (pool= DFLOWinitialize()) < 0){
-               *ret = TRUE;
-               return MAL_SUCCEED;
+       /* check existence of workers */
+       if (todo == NULL) {
+               DFLOWinitialize();              /* create the whole pool */
+               i = THREADS;                    /* we didn't create an extra 
thread */
+       } else {
+               /* create one more worker to compensate for our waiting until
+                * all work is done */
+               MT_lock_set(&mal_contextLock, "runMALdataflow");
+               for (i = 0; i < THREADS; i++) {
+                       if (workers[i].flag == IDLE) {
+                               workers[i].flag = RUNNING;
+                               MT_create_thread(&workers[i].id, DFLOWworker, 
(void *) &workers[i], MT_THR_JOINABLE);
+                               break;
+                       }
+               }
+               MT_lock_unset(&mal_contextLock, "runMALdataflow");
        }
+       assert(todo);
 
        flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
        if (flow == NULL)
@@ -661,7 +658,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        if (flow->done == NULL) {
                MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
-               occupied[pool]= FALSE;
                throw(MAL, "dataflow", "runMALdataflow(): Failed to create 
flow->done queue");
        }
 
@@ -670,7 +666,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                q_destroy(flow->done);
                MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
-               occupied[pool]= FALSE;
                throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate 
flow->status");
        }
        size = DFLOWgraphSize(mb, startpc, stoppc);
@@ -681,7 +676,6 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                q_destroy(flow->done);
                MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
-               occupied[pool]= FALSE;
                throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate 
flow->nodes");
        }
        flow->edges = (int*)GDKzalloc(sizeof(int) * size);
@@ -691,20 +685,35 @@ runMALdataflow(Client cntxt, MalBlkPtr m
                q_destroy(flow->done);
                MT_lock_destroy(&flow->flowlock);
                GDKfree(flow);
-               occupied[pool]= FALSE;
                throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate 
flow->edges");
        }
        msg = DFLOWinitBlk(flow, mb, size);
 
        if (msg == MAL_SUCCEED)
-               msg = DFLOWscheduler(flow,todos[pool]);
+               msg = DFLOWscheduler(flow);
 
-       occupied[pool]= FALSE;
        GDKfree(flow->status);
        GDKfree(flow->edges);
        GDKfree(flow->nodes);
        q_destroy(flow->done);
        MT_lock_destroy(&flow->flowlock);
        GDKfree(flow);
+
+       if (i != THREADS) {
+               /* we created one worker, now tell one worker to exit again */
+               MT_lock_set(&todo->l, "runMALdataflow");
+               todo->exitcount++;
+               MT_lock_unset(&todo->l, "runMALdataflow");
+               MT_sema_up(&todo->s, "runMALdataflow");
+               MT_lock_set(&mal_contextLock, "runMALdataflow");
+               for (i = 0; i < THREADS; i++) {
+                       if (workers[i].flag == EXITED) {
+                               MT_join_thread(workers[i].id);
+                               workers[i].flag = IDLE;
+                               break;
+                       }
+               }
+               MT_lock_unset(&mal_contextLock, "runMALdataflow");
+       }
        return msg;
 }
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to