Changeset: 937e0ab26eea for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=937e0ab26eea
Modified Files:
gdk/gdk_utils.c
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Merge with Feb2013 branch.
diffs (truncated from 322 to 300 lines):
diff --git a/gdk/gdk_utils.c b/gdk/gdk_utils.c
--- a/gdk/gdk_utils.c
+++ b/gdk/gdk_utils.c
@@ -1018,12 +1018,6 @@ GDKinit(opt *set, int setlen)
/* Mserver by default takes 80% of all memory as a default */
GDK_mem_maxsize = GDK_mem_maxsize_max = (size_t) ((double) MT_npages()
* (double) MT_pagesize() * 0.815);
-#ifdef NATIVE_WIN32
- GDK_mmap_minsize = GDK_mem_maxsize_max;
-#else
- GDK_mmap_minsize = MIN( 1<<30 , GDK_mem_maxsize_max/6 );
- /* per op: 2 args + 1 res, each with head & tail => (2+1)*2 = 6 ^
*/
-#endif
GDK_mem_bigsize = 1024*1024;
GDKremovedir(DELDIR);
BBPinit();
@@ -1066,6 +1060,10 @@ GDKinit(opt *set, int setlen)
GDKsetenv(n[i].name, n[i].value);
free(n);
+ GDKnr_threads = GDKgetenv_int("gdk_nr_threads", 0);
+ if (GDKnr_threads == 0)
+ GDKnr_threads = MT_check_nr_cores();
+
if ((p = GDKgetenv("gdk_dbpath")) != NULL &&
(p = strrchr(p, DIR_SEP)) != NULL) {
GDKsetenv("gdk_dbname", p + 1);
@@ -1091,6 +1089,13 @@ GDKinit(opt *set, int setlen)
}
if ((p = GDKgetenv("gdk_mmap_minsize"))) {
GDK_mmap_minsize = MAX(REMAP_PAGE_MAXSIZE, (size_t) strtoll(p,
NULL, 10));
+ } else {
+#ifdef NATIVE_WIN32
+ GDK_mmap_minsize = GDK_mem_maxsize_max / (GDKnr_threads ?
GDKnr_threads : 1);
+#else
+ GDK_mmap_minsize = MIN(1 << 30, (GDK_mem_maxsize_max / 6) /
(GDKnr_threads ? GDKnr_threads : 1));
+ /* per op: 2 args + 1 res, each with head & tail => (2+1)*2 = 6
*/
+#endif
}
if (GDKgetenv("gdk_mem_pagebits") == NULL) {
snprintf(buf, sizeof(buf), "%d", GDK_mem_pagebits);
@@ -1105,18 +1110,6 @@ GDKinit(opt *set, int setlen)
GDKsetenv("monet_pid", buf);
}
- GDKnr_threads = GDKgetenv_int("gdk_nr_threads", 0);
- if (GDKnr_threads == 0)
- GDKnr_threads = MT_check_nr_cores();
-#ifdef NATIVE_WIN32
- GDK_mmap_minsize /= (GDKnr_threads ? GDKnr_threads : 1);
-#else
- /* WARNING: This unconditionally overwrites above settings, */
- /* incl. setting via MonetDB env. var. "gdk_mmap_minsize" ! */
- GDK_mmap_minsize = MIN( 1<<30 , (GDK_mem_maxsize_max/6) /
(GDKnr_threads ? GDKnr_threads : 1) );
- /* per op: 2 args + 1 res, each with head & tail => (2+1)*2 = 6
^ */
-#endif
-
if ((p = mo_find_option(set, setlen, "gdk_vmtrim")) == NULL ||
strcasecmp(p, "yes") == 0)
MT_create_thread(&GDKvmtrim_id, GDKvmtrim, &GDK_mem_maxsize,
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -88,6 +88,8 @@ typedef struct DATAFLOW {
static struct worker {
MT_Id id;
enum {IDLE, RUNNING, EXITED} flag;
+ Client cntxt; /* client we do work for (NULL
-> any) */
+ MT_Sema s;
} workers[THREADS];
static Queue *todo = 0; /* pending instructions */
static int volatile exiting = 0;
@@ -207,16 +209,33 @@ q_requeue(Queue *q, FlowEvent d)
}
#endif
-static void *
-q_dequeue(Queue *q)
+static FlowEvent
+q_dequeue(Queue *q, Client cntxt)
{
- void *r = NULL;
+ FlowEvent r = NULL;
assert(q);
MT_sema_down(&q->s, "q_dequeue");
if (exiting)
return NULL;
MT_lock_set(&q->l, "q_dequeue");
+ if (cntxt) {
+ int i;
+
+ for (i = q->last - 1; i >= 0; i--) {
+ if (q->data[i]->flow->cntxt == cntxt) {
+ r = q->data[i];
+ q->last--;
+ while (i < q->last) {
+ q->data[i] = q->data[i + 1];
+ i++;
+ }
+ break;
+ }
+ }
+ MT_lock_unset(&q->l, "q_dequeue");
+ return r;
+ }
if (q->exitcount > 0) {
q->exitcount--;
MT_lock_unset(&q->l, "q_dequeue");
@@ -228,7 +247,7 @@ q_dequeue(Queue *q)
assert(q->last > 0);
if (q->last > 0) {
/* LIFO favors garbage collection */
- r = (void*) q->data[--q->last];
+ r = q->data[--q->last];
q->data[q->last] = 0;
}
/* else: terminating */
@@ -281,10 +300,28 @@ DFLOWworker(void *T)
GDKsetbuf(GDKmalloc(GDKMAXERRLEN)); /* where to leave errors */
GDKerrbuf[0] = 0;
+ if (t->cntxt) {
+ /* wait until we are allowed to start working */
+ MT_sema_down(&t->s, "DFLOWworker");
+ }
while (1) {
if (fnxt == 0) {
- if ((fe = q_dequeue(todo)) == NULL)
- break;;
+ Client cntxt = t->cntxt;
+ fe = q_dequeue(todo, cntxt);
+ if (fe == NULL) {
+ if (cntxt) {
+ /* we're not done yet with work for the
current
+ * client (as far as we know), so give
up the CPU
+ * and let the scheduler enter some
more work, but
+ * first compensate for the down we did
in
+ * dequeue */
+ MT_sema_up(&todo->s, "DFLOWworker");
+ MT_sleep_ms(1);
+ continue;
+ }
+ /* no more work to be done: exit */
+ break;
+ }
} else
fe = fnxt;
if (exiting) {
@@ -399,9 +436,12 @@ DFLOWinitialize(void)
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
return -1;
}
- limit = GDKnr_threads ? GDKnr_threads : 1;
+ for (i = 0; i < THREADS; i++)
+ MT_sema_init(&workers[i].s, 0, "DFLOWinitialize");
+ limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
for (i = 0; i < limit; i++) {
workers[i].flag = RUNNING;
+ workers[i].cntxt = NULL;
if (MT_create_thread(&workers[i].id, DFLOWworker, (void *)
&workers[i], MT_THR_JOINABLE) < 0)
workers[i].flag = IDLE;
else
@@ -563,7 +603,7 @@ static void showFlowEvent(DataFlow flow,
*/
static str
-DFLOWscheduler(DataFlow flow)
+DFLOWscheduler(DataFlow flow, struct worker *w)
{
int last;
int i;
@@ -600,11 +640,12 @@ DFLOWscheduler(DataFlow flow)
PARDEBUG fprintf(stderr, "#enqueue pc=%d claim=" LLFMT
"\n", flow->status[i].pc, flow->status[i].argclaim);
}
MT_lock_unset(&flow->flowlock, "DFLOWscheduler");
+ MT_sema_up(&w->s, "DFLOWscheduler");
PARDEBUG fprintf(stderr, "#run %d instructions in dataflow block\n",
actions);
while (actions != tasks ) {
- f = q_dequeue(flow->done);
+ f = q_dequeue(flow->done, NULL);
if (exiting)
break;
if (f == NULL)
@@ -632,6 +673,9 @@ DFLOWscheduler(DataFlow flow)
}
MT_lock_unset(&flow->flowlock, "DFLOWscheduler");
}
+ /* release the worker from its specific task (turn it into a
+ * generic worker) */
+ w->cntxt = NULL;
/* wrap up errors */
assert(flow->done->last == 0);
if (flow->error ) {
@@ -641,6 +685,20 @@ DFLOWscheduler(DataFlow flow)
return ret;
}
+/* We create a pool of GDKnr_threads-1 generic workers, that is,
+ * workers that will take on jobs from any clients. In addition, we
+ * create a single specific worker per client (i.e. each time we enter
+ * here). This specific worker will only do work for the client for
+ * which it was started. In this way we can guarantee that there will
+ * always be progress for the client, even if all other workers are
+ * doing something big.
+ *
+ * When all jobs for a client have been done (there are no more
+ * entries for the client in the queue), the specific worker turns
+ * itself into a generic worker. At the same time, we signal that one
+ * generic worker should exit and this function returns. In this way
+ * we make sure that there are once again GDKnr_threads-1 generic
+ * workers. */
str
runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc, int stoppc, MalStkPtr
stk)
{
@@ -670,44 +728,64 @@ runMALdataflow(Client cntxt, MalBlkPtr m
/* check existence of workers */
if (todo == NULL) {
/* create thread pool */
- if (DFLOWinitialize() < 0) {
+ if (GDKnr_threads <= 1 || DFLOWinitialize() < 0) {
/* no threads created, run serially */
*ret = TRUE;
return MAL_SUCCEED;
}
i = THREADS; /* we didn't create an extra
thread */
- } else
- if( stk->calldepth){
- /* create one more worker to compensate for our waiting until
- * all work is done, provided we are about to perform a
recursive call */
- MT_lock_set(&mal_contextLock, "runMALdataflow");
- for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
- if (workers[i].flag == EXITED) {
- todo->exitedcount--;
- workers[i].flag = IDLE;
- MT_join_thread(workers[i].id);
+ }
+ assert(todo);
+ /* in addition, create one more worker that will only execute
+ * tasks for the current client to compensate for our waiting
+ * until all work is done */
+ MT_lock_set(&mal_contextLock, "runMALdataflow");
+ /* join with already exited threads */
+ for (i = 0; i < THREADS && todo->exitedcount > 0; i++) {
+ if (workers[i].flag == EXITED) {
+ todo->exitedcount--;
+ workers[i].flag = IDLE;
+ workers[i].cntxt = NULL;
+ MT_join_thread(workers[i].id);
+ }
+ }
+ for (i = 0; i < THREADS; i++) {
+ if (workers[i].flag == IDLE) {
+ /* only create specific worker if we are not doing a
+ * recursive call */
+ if (stk->calldepth > 1) {
+ int j;
+ MT_Id pid = MT_getpid();
+
+ /* doing a recursive call: copy specificity from
+ * current worker to new worker */
+ workers[i].cntxt = NULL;
+ for (j = 0; j < THREADS; j++) {
+ if (workers[j].flag == RUNNING &&
workers[j].id == pid) {
+ workers[i].cntxt =
workers[j].cntxt;
+ break;
+ }
+ }
+ } else {
+ /* not doing a recursive call: create specific
worker */
+ workers[i].cntxt = cntxt;
}
+ if (MT_create_thread(&workers[i].id, DFLOWworker, (void
*) &workers[i], MT_THR_JOINABLE) < 0) {
+ /* cannot start new thread, run serially */
+ *ret = TRUE;
+ MT_lock_unset(&mal_contextLock,
"runMALdataflow");
+ return MAL_SUCCEED;
+ }
+ workers[i].flag = RUNNING;
+ break;
}
- for (i = 0; i < THREADS; i++) {
- if (workers[i].flag == IDLE) {
- if (MT_create_thread(&workers[i].id,
DFLOWworker, (void *) &workers[i], MT_THR_JOINABLE) < 0) {
- /* cannot start new thread, run
serially */
- *ret = TRUE;
- MT_lock_unset(&mal_contextLock,
"runMALdataflow");
- return MAL_SUCCEED;
- }
- workers[i].flag = RUNNING;
- break;
- }
- }
- MT_lock_unset(&mal_contextLock, "runMALdataflow");
- if (i == THREADS) {
- /* no empty threads slots found, run serially */
- *ret = TRUE;
- return MAL_SUCCEED;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list