Changeset: e5e05e52087c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=e5e05e52087c
Modified Files:
monetdb5/extras/jaql/jaqlscenario.c
monetdb5/mal/mal_client.c
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_debugger.c
monetdb5/mal/mal_import.c
monetdb5/mal/mal_session.c
monetdb5/modules/mal/mdb.c
sql/backends/monet5/sql_scenario.c
sql/test/BugTracker-2013/Tests/nestedcalls.sql
sql/test/BugTracker-2013/Tests/nestedcalls.stable.out
Branch: default
Log Message:
Merge with Feb2013 branch.
diffs (truncated from 625 to 300 lines):
diff --git a/monetdb5/extras/jaql/jaqlscenario.c
b/monetdb5/extras/jaql/jaqlscenario.c
--- a/monetdb5/extras/jaql/jaqlscenario.c
+++ b/monetdb5/extras/jaql/jaqlscenario.c
@@ -294,6 +294,7 @@ JAQLengine(Client c)
printtree(c->fdout, j->p, 0, j->planf);
mnstr_printf(c->fdout, "\n");
freetree(j->p);
+ c->glb = oldglb;
return MAL_SUCCEED; /* don't have a plan generated */
} else if (j->debug) {
msg = runMALDebugger(c, c->curprg);
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -171,7 +171,7 @@ MCnewClient(void)
Client
MCgetClient(int id)
{
- if (id < 0 || id > MAL_MAXCLIENTS)
+ if (id < 0 || id >= MAL_MAXCLIENTS)
return NULL;
return mal_clients + id;
}
@@ -430,7 +430,7 @@ MCcleanupClients(void)
str
MCsuspendClient(int id)
{
- if (id < 0 || id > MAL_MAXCLIENTS)
+ if (id < 0 || id >= MAL_MAXCLIENTS)
throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
mal_clients[id].itrace = 'S';
return MAL_SUCCEED;
@@ -439,7 +439,7 @@ MCsuspendClient(int id)
str
MCawakeClient(int id)
{
- if (id < 0 || id > MAL_MAXCLIENTS)
+ if (id < 0 || id >= MAL_MAXCLIENTS)
throw(INVCRED, "mal.clients", INVCRED_WRONG_ID);
mal_clients[id].itrace = 0;
return MAL_SUCCEED;
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -3,19 +3,19 @@
* Version 1.1 (the "License"); you may not use this file except in
* compliance with the License. You may obtain a copy of the License at
* http://www.monetdb.org/Legal/MonetDBLicense
- *
+ *
* Software distributed under the License is distributed on an "AS IS"
* basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
* License for the specific language governing rights and limitations
* under the License.
- *
+ *
* The Original Code is the MonetDB Database System.
- *
+ *
* The Initial Developer of the Original Code is CWI.
* Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
* Copyright August 2008-2013 MonetDB B.V.
* All Rights Reserved.
-*/
+ */
/*
* Out of order execution
@@ -37,7 +37,6 @@
*/
#include "monetdb_config.h"
#include "mal_dataflow.h"
-#include "mal_client.h"
#define DFLOWpending 0 /* runnable */
#define DFLOWrunning 1 /* currently in progress */
@@ -60,6 +59,7 @@ typedef struct FLOWEVENT {
typedef struct queue {
int size; /* size of queue */
int last; /* last element in the queue */
+ int exitcount; /* how many threads should exit */
FlowEvent *data;
MT_Lock l; /* it's a shared resource, ie we need locks */
MT_Sema s; /* threads wait on empty queues */
@@ -83,9 +83,11 @@ typedef struct DATAFLOW {
Queue *done; /* instructions handled */
} *DataFlow, DataFlowRec;
-#define MAXQ 256
-static Queue *todos[MAXQ] = {0}; /* pending instructions organized by
dataflow block */
-static bit occupied[MAXQ]={0}; /* worker pool is in use? */
+static struct worker {
+ MT_Id id;
+ enum {IDLE, RUNNING, EXITED} flag;
+} workers[THREADS];
+static Queue *todo = 0; /* pending instructions */
static int volatile exiting = 0;
/*
@@ -122,6 +124,7 @@ q_create(int sz, const char *name)
GDKfree(q);
return NULL;
}
+ q->exitcount = 0;
(void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */
MT_lock_init(&q->l, name);
@@ -211,6 +214,11 @@ q_dequeue(Queue *q)
if (exiting)
return NULL;
MT_lock_set(&q->l, "q_dequeue");
+ if (q->exitcount > 0) {
+ q->exitcount--;
+ MT_lock_unset(&q->l, "q_dequeue");
+ return NULL;
+ }
assert(q->last > 0);
if (q->last > 0) {
/* LIFO favors garbage collection */
@@ -253,13 +261,14 @@ q_dequeue(Queue *q)
*/
static void
-DFLOWworker(void *t)
+DFLOWworker(void *T)
{
+ struct worker *t = (struct worker *) T;
DataFlow flow;
FlowEvent fe = 0, fnxt = 0;
+ int id = (int) (t - workers);
Thread thr;
str error = 0;
- Queue *todo = *(Queue **) t;
int i,last;
thr = THRnew("DFLOWworker");
@@ -267,9 +276,10 @@ DFLOWworker(void *t)
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
GDKerrbuf[0] = 0;
while (1) {
- if (fnxt == 0)
- fe = q_dequeue(todo);
- else
+ if (fnxt == 0) {
+ if ((fe = q_dequeue(todo)) == NULL)
+ break;;
+ } else
fe = fnxt;
if (exiting) {
break;
@@ -297,8 +307,8 @@ DFLOWworker(void *t)
}
#endif
error = runMALsequence(flow->cntxt, flow->mb, fe->pc,
fe->pc + 1, flow->stk, 0, 0);
- PARDEBUG mnstr_printf(GDKstdout, "#executed pc= %d wrk=
%d claim= " LLFMT "," LLFMT " %s\n",
- fe->pc,
(int)((Queue **)t - todos), fe->argclaim, fe->hotclaim, error ? error : "");
+ PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d
claim= " LLFMT "," LLFMT " %s\n",
+ fe->pc, id,
fe->argclaim, fe->hotclaim, error ? error : "");
#ifdef USE_MAL_ADMISSION
/* release the memory claim */
MALadmission(-fe->argclaim, -fe->hotclaim);
@@ -308,7 +318,7 @@ DFLOWworker(void *t)
if (error) {
MT_lock_set(&flow->flowlock, "runMALdataflow");
/* only collect one error (from one thread,
needed for stable testing) */
- if (!flow->error)
+ if (!flow->error)
flow->error = error;
MT_lock_unset(&flow->flowlock,
"runMALdataflow");
/* after an error we skip the rest of the block
*/
@@ -334,7 +344,7 @@ DFLOWworker(void *t)
}
#endif
MT_lock_set(&flow->flowlock, "MALworker");
-
+
for (last = fe->pc - flow->start; last >= 0 && (i =
flow->nodes[last]) > 0; last = flow->edges[last])
if (flow->status[i].state == DFLOWpending &&
flow->status[i].blocks == 1) {
@@ -358,59 +368,36 @@ DFLOWworker(void *t)
GDKfree(GDKerrbuf);
GDKsetbuf(0);
THRdel(thr);
+ t->flag = EXITED;
}
-/*
+/*
* Create an interpreter pool.
* One worker will adaptively be available for each client.
* The remainder are taken from the GDKnr_threads argument and
- * typically is equal to the number of cores.
- * A recursive MAL function call would make for one worker less,
- * which limits the number of cores for parallel processing.
+ * typically is equal to the number of cores
* The workers are assembled in a local table to enable debugging.
- *
- * BEWARE, failure to create a new worker thread is not an error
- * but would lead to serial execution.
*/
-static int
+static void
DFLOWinitialize(void)
{
- int i, threads, grp;
- MT_Id worker;
+ int i, limit;
- threads = GDKnr_threads ? GDKnr_threads : 1;
MT_lock_set(&mal_contextLock, "DFLOWinitialize");
- for(grp = 0; grp< MAXQ; grp++)
- if ( occupied[grp] == FALSE){
- occupied[grp] = TRUE;
- break;
- }
+ if (todo) {
+ MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
+ return;
+ }
+ todo = q_create(2048, "DFLOWinitialize");
+ limit = GDKnr_threads ? GDKnr_threads : 1;
+ for (i = 0; i < limit; i++) {
+ workers[i].flag = RUNNING;
+ if (MT_create_thread(&workers[i].id, DFLOWworker, (void *)
&workers[i], MT_THR_JOINABLE) < 0)
+ workers[i].flag = IDLE;
+ }
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
- if (grp > THREADS) {
- // continue non-parallel
- return -1;
- }
- if ( todos[grp] )
- return grp;
+}
- todos[grp] = q_create(2048, "todo");
- if (todos[grp] == NULL)
- return -1;
-
- // associate a set of workers with the pool
- for (i = 0; grp>= 0 && i < threads; i++){
- if (MT_create_thread(&worker, DFLOWworker, (void *)
&todos[grp], MT_THR_JOINABLE) < 0) {
- //Can not create interpreter thread
- grp = -1;
- }
- if (worker == 0) {
- //Failed to create interpreter thread
- grp = -1;
- }
- }
- return grp;
-}
-
/*
* The dataflow administration is based on administration of
* how many variables are still missing before it can be executed.
@@ -428,7 +415,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
throw(MAL, "dataflow", "DFLOWinitBlk(): Called with flow ==
NULL");
if (mb == NULL)
throw(MAL, "dataflow", "DFLOWinitBlk(): Called with mb ==
NULL");
- PARDEBUG printf("Initialize dflow block\n");
+ PARDEBUG fprintf(stderr, "Initialize dflow block\n");
assign = (int *) GDKzalloc(mb->vtop * sizeof(int));
if (assign == NULL)
throw(MAL, "dataflow", "DFLOWinitBlk(): Failed to allocate
assign");
@@ -479,7 +466,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
l = getEndOfLife(mb, getArg(p, j));
if (l != pc && l < flow->stop && l >
flow->start) {
/* add edge to the target instruction
for wakeup call */
- PARDEBUG mnstr_printf(GDKstdout,
"endoflife for %s is %d -> %d\n", getVarName(mb, getArg(p, j)), n +
flow->start, l);
+ PARDEBUG fprintf(stderr, "endoflife for
%s is %d -> %d\n", getVarName(mb, getArg(p, j)), n + flow->start, l);
assert(pc < l); /* only dependencies on
earlier instructions */
l -= flow->start;
if (flow->nodes[n]) {
@@ -504,17 +491,18 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
assign[getArg(p, j)] = pc; /* ensure recognition of
dependency on first instruction and constant */
}
GDKfree(assign);
- PARDEBUG
- for (n = 0; n < flow->stop - flow->start; n++) {
- mnstr_printf(GDKstdout, "#[%d] %d: ", flow->start + n, n);
- printInstruction(GDKstdout, mb, 0, getInstrPtr(mb, n +
flow->start), LIST_MAL_STMT | LIST_MAPI);
- mnstr_printf(GDKstdout, "#[%d]Dependents block count %d
wakeup", flow->start + n, flow->status[n].blocks);
- for (j = n; flow->edges[j]; j = flow->edges[j]) {
- mnstr_printf(GDKstdout, "%d ", flow->start +
flow->nodes[j]);
- if (flow->edges[j] == -1)
- break;
+ PARDEBUG {
+ for (n = 0; n < flow->stop - flow->start; n++) {
+ mnstr_printf(GDKstdout, "#[%d] %d: ", flow->start + n,
n);
+ printInstruction(GDKstdout, mb, 0, getInstrPtr(mb, n +
flow->start), LIST_MAL_STMT | LIST_MAPI);
+ mnstr_printf(GDKstdout, "#[%d]Dependents block count %d
wakeup", flow->start + n, flow->status[n].blocks);
+ for (j = n; flow->edges[j]; j = flow->edges[j]) {
+ mnstr_printf(GDKstdout, "%d ", flow->start +
flow->nodes[j]);
+ if (flow->edges[j] == -1)
+ break;
+ }
+ mnstr_printf(GDKstdout, "\n");
}
- mnstr_printf(GDKstdout, "\n");
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list