Changeset: 7ab711b55897 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=7ab711b55897
Added Files:
sql/test/BugTracker-2013/Tests/nestedcalls.stable.err
sql/test/BugTracker-2013/Tests/nestedcalls.stable.out
Modified Files:
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_dataflow.h
monetdb5/modules/mal/language.c
monetdb5/modules/mal/language.h
sql/test/BugTracker-2013/Tests/All
sql/test/BugTracker-2013/Tests/nestedcalls.sql
Branch: default
Log Message:
Merged with Feb2013 branch.
diffs (truncated from 527 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -83,10 +83,9 @@ typedef struct DATAFLOW {
Queue *done; /* instructions handled */
} *DataFlow, DataFlowRec;
-#define MAXQ 1024
-static MT_Id workers[THREADS] = {0};
-static int workerqueue[THREADS] = {0}; /* maps workers towards the todo queues
*/
-static Queue *todo[MAXQ] = {0}; /* pending instructions organized by
user MAXTODO > #users */
+#define MAXQ 256
+static Queue *todos[MAXQ] = {0}; /* pending instructions organized by
dataflow block */
+static bit occupied[MAXQ]={0}; /* worker pool is in use? */
static int volatile exiting = 0;
/*
@@ -258,22 +257,18 @@ DFLOWworker(void *t)
{
DataFlow flow;
FlowEvent fe = 0, fnxt = 0;
- int id = (int) ((MT_Id *) t - workers), last = 0;
- int wq;
Thread thr;
str error = 0;
-
- int i;
+ Queue *todo = *(Queue **) t;
+ int i,last;
thr = THRnew("DFLOWworker");
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
GDKerrbuf[0] = 0;
while (1) {
- assert(workerqueue[id] > 0);
- wq = workerqueue[id] - 1;
if (fnxt == 0)
- fe = q_dequeue(todo[wq]);
+ fe = q_dequeue(todo);
else
fe = fnxt;
if (exiting) {
@@ -295,16 +290,15 @@ DFLOWworker(void *t)
#ifdef USE_MAL_ADMISSION
if (MALadmission(fe->argclaim, fe->hotclaim)) {
fe->hotclaim = 0; /* don't assume priority
anymore */
- assert(todo[wq]);
- if (todo[wq]->last == 0)
+ if (todo->last == 0)
MT_sleep_ms(DELAYUNIT);
- q_requeue(todo[wq], fe);
+ q_requeue(todo, fe);
continue;
}
#endif
error = runMALsequence(flow->cntxt, flow->mb, fe->pc,
fe->pc + 1, flow->stk, 0, 0);
PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk=
%d claim= " LLFMT "," LLFMT " %s\n",
- fe->pc, id,
fe->argclaim, fe->hotclaim, error ? error : "");
+ fe->pc,
(int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : "");
#ifdef USE_MAL_ADMISSION
/* release the memory claim */
MALadmission(-fe->argclaim, -fe->hotclaim);
@@ -357,69 +351,64 @@ DFLOWworker(void *t)
MALresourceFairness(GDKusec()-
flow->mb->starttime);
q_enqueue(flow->done, fe);
if ( fnxt == 0) {
- assert(todo[wq]);
- if (todo[wq]->last == 0)
+ if (todo->last == 0)
profilerHeartbeatEvent("wait");
}
}
GDKfree(GDKerrbuf);
GDKsetbuf(0);
- workerqueue[wq] = 0;
- workers[wq] = 0;
THRdel(thr);
}
/*
- * Create a set of DFLOW interpreters.
+ * Create an interpreter pool.
* One worker will adaptively be available for each client.
* The remainder are taken from the GDKnr_threads argument and
* typically is equal to the number of cores.
* A recursive MAL function call would make for one worker less,
* which limits the number of cores for parallel processing.
* The workers are assembled in a local table to enable debugging.
+ *
+ * BEWARE, failure to create a new worker thread is not an error
+ * but would lead to serial execution.
*/
-static str
-DFLOWinitialize(int index)
+static int
+DFLOWinitialize(void)
{
- int i, worker, limit;
+ int i, threads, grp;
+ MT_Id worker;
- assert(index >= 0);
- assert(index < THREADS);
+ threads = GDKnr_threads ? GDKnr_threads : 1;
MT_lock_set(&mal_contextLock, "DFLOWinitialize");
- if (todo[index]) {
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- return MAL_SUCCEED;
+ for(grp = 0; grp< MAXQ; grp++)
+ if ( occupied[grp] == FALSE){
+ occupied[grp] = TRUE;
+ break;
+ }
+ MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+ if (grp > THREADS) {
+ // continue non-parallel
+ return -1;
}
- todo[index] = q_create(2048, "todo");
- if (todo[index] == NULL) {
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- throw(MAL, "dataflow", "DFLOWinitialize(): Failed to create
todo queue");
+ if ( todos[grp] )
+ return grp;
+
+ todos[grp] = q_create(2048, "todo");
+ if (todos[grp] == NULL)
+ return -1;
+
+ // associate a set of workers with the pool
+ for (i = 0; grp>= 0 && i < threads; i++){
+ if (MT_create_thread(&worker, DFLOWworker, (void *)
&todos[grp], MT_THR_JOINABLE) < 0) {
+ //Can not create interpreter thread
+ grp = -1;
+ }
+ if (worker == 0) {
+ //Failed to create interpreter thread
+ grp = -1;
+ }
}
- limit = GDKnr_threads ? GDKnr_threads : 1;
- if (limit > THREADS) {
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- throw(MAL, "dataflow", "DFLOWinitialize(): using more threads
than thread slots: %d > %d", limit, THREADS);
- }
- for (worker = 0, i = 0; i < limit; i++){
- for (; worker < THREADS; worker++)
- if( workers[worker] == 0)
- break;
- if (worker >= THREADS || workers[worker] > 0) {
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- throw(MAL, "dataflow", "No free worker slot found");
- }
- if (MT_create_thread(&workers[worker], DFLOWworker, (void *)
&workers[worker], MT_THR_JOINABLE) < 0) {
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- throw(MAL, "dataflow", "Can not create interpreter
thread");
- }
- if (workers[worker] == 0) {
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- throw(MAL, "dataflow", "Failed to create interpreter
thread");
- }
- workerqueue[worker] = index + 1;
- }
- MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- return MAL_SUCCEED;
+ return grp;
}
/*
@@ -558,7 +547,7 @@ static void showFlowEvent(DataFlow flow,
*/
static str
-DFLOWscheduler(DataFlow flow)
+DFLOWscheduler(DataFlow flow, Queue *todo)
{
int last;
int i;
@@ -569,7 +558,6 @@ DFLOWscheduler(DataFlow flow)
int tasks=0, actions;
str ret = MAL_SUCCEED;
FlowEvent fe, f = 0;
- int wq;
if (flow == NULL)
throw(MAL, "dataflow", "DFLOWscheduler(): Called with flow ==
NULL");
@@ -580,7 +568,6 @@ DFLOWscheduler(DataFlow flow)
fe = flow->status;
MT_lock_set(&flow->flowlock, "MALworker");
- wq = flow->cntxt->idx;
for (i = 0; i < actions; i++)
if (fe[i].blocks == 0) {
#ifdef USE_MAL_ADMISSION
@@ -592,7 +579,7 @@ DFLOWscheduler(DataFlow flow)
for (j = p->retc; j < p->argc; j++)
fe[i].argclaim = getMemoryClaim(fe[0].flow->mb,
fe[0].flow->stk, p, j, FALSE);
#endif
- q_enqueue(todo[wq], flow->status + i);
+ q_enqueue(todo, flow->status + i);
flow->status[i].state = DFLOWrunning;
PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d
claim=" LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
}
@@ -621,7 +608,7 @@ DFLOWscheduler(DataFlow flow)
if (flow->status[i].blocks == 1 ) {
flow->status[i].state = DFLOWrunning;
flow->status[i].blocks--;
- q_enqueue(todo[wq], flow->status + i);
+ q_enqueue(todo, flow->status + i);
PARDEBUG
mnstr_printf(GDKstdout, "#enqueue pc=%d
claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
} else {
@@ -643,8 +630,9 @@ str
runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr
stk)
{
DataFlow flow = NULL;
- str ret = MAL_SUCCEED;
- int size;
+ str msg = MAL_SUCCEED;
+ int size, pool;
+ int *ret;
#ifdef DEBUG_FLOW
mnstr_printf(GDKstdout, "runMALdataflow for block %d - %d\n", startpc,
stoppc);
@@ -654,19 +642,24 @@ runMALdataflow(Client cntxt, MalBlkPtr m
/* in debugging mode we should not start multiple threads */
if (stk == NULL)
throw(MAL, "dataflow", "runMALdataflow(): Called with stk ==
NULL");
- if (stk->cmd)
+ ret = (int*) getArgReference(stk,getInstrPtr(mb,startpc),0);
+ if (stk->cmd){
+ *ret = TRUE;
return MAL_SUCCEED;
+ }
/* too many threads turns dataflow processing off */
- if ( cntxt->idx > MAXQ)
+ if ( cntxt->idx > MAXQ){
+ *ret = TRUE;
return MAL_SUCCEED;
+ }
assert(stoppc > startpc);
- /* check existence of workers */
- if (todo[cntxt->idx] == 0)
- ret = DFLOWinitialize(cntxt->idx);
- if ( ret != MAL_SUCCEED)
- return ret;
+ /* check existence of free worker group, resort to sequential upon
failure */
+ if( (pool= DFLOWinitialize()) < 0){
+ *ret = TRUE;
+ return MAL_SUCCEED;
+ }
flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
if (flow == NULL)
@@ -686,6 +679,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
if (flow->done == NULL) {
MT_lock_destroy(&flow->flowlock);
GDKfree(flow);
+ occupied[pool]= FALSE;
throw(MAL, "dataflow", "runMALdataflow(): Failed to create
flow->done queue");
}
@@ -694,6 +688,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
q_destroy(flow->done);
MT_lock_destroy(&flow->flowlock);
GDKfree(flow);
+ occupied[pool]= FALSE;
throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate
flow->status");
}
size = DFLOWgraphSize(mb, startpc, stoppc);
@@ -704,6 +699,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
q_destroy(flow->done);
MT_lock_destroy(&flow->flowlock);
GDKfree(flow);
+ occupied[pool]= FALSE;
throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate
flow->nodes");
}
flow->edges = (int*)GDKzalloc(sizeof(int) * size);
@@ -713,37 +709,26 @@ runMALdataflow(Client cntxt, MalBlkPtr m
q_destroy(flow->done);
MT_lock_destroy(&flow->flowlock);
GDKfree(flow);
+ occupied[pool]= FALSE;
throw(MAL, "dataflow", "runMALdataflow(): Failed to allocate
flow->edges");
}
- ret = DFLOWinitBlk(flow, mb, size);
+ msg = DFLOWinitBlk(flow, mb, size);
- if (ret == MAL_SUCCEED)
- ret = DFLOWscheduler(flow);
+ if (msg == MAL_SUCCEED)
+ msg = DFLOWscheduler(flow,todos[pool]);
+ occupied[pool]= FALSE;
GDKfree(flow->status);
GDKfree(flow->edges);
GDKfree(flow->nodes);
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list