Changeset: 60bfed39bef9 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=60bfed39bef9
Modified Files:
gdk/gdk_mapreduce.c
monetdb5/mal/mal_client.c
monetdb5/mal/mal_interpreter.c
monetdb5/modules/mal/tablet.c
Branch: default
Log Message:
sema_init string is name of semaphore, sema_{up,down} string is function.
diffs (266 lines):
diff --git a/gdk/gdk_mapreduce.c b/gdk/gdk_mapreduce.c
--- a/gdk/gdk_mapreduce.c
+++ b/gdk/gdk_mapreduce.c
@@ -54,19 +54,19 @@ MRqueueCreate(int sz)
MT_Id tid;
#ifdef NEED_MT_LOCK_INIT
- MT_lock_init(&mrqlock, "q_create");
+ MT_lock_init(&mrqlock, "mrqlock");
#endif
- MT_lock_set(&mrqlock, "q_create");
- MT_sema_init(&mrqsema, 0, "q_create");
+ MT_lock_set(&mrqlock, "MRqueueCreate");
+ MT_sema_init(&mrqsema, 0, "mrqsema");
if ( mrqueue ) {
- MT_lock_unset(&mrqlock, "q_create");
+ MT_lock_unset(&mrqlock, "MRqueueCreate");
GDKerror("One map-reduce queue allowed");
return;
}
sz *= 2;
mrqueue = (MRqueue *) GDKzalloc(sizeof(MRqueue) * sz);
if ( mrqueue == 0) {
- MT_lock_unset(&mrqlock, "q_create");
+ MT_lock_unset(&mrqlock, "MRqueueCreate");
GDKerror("Could not create the map-reduce queue");
return;
}
@@ -75,19 +75,19 @@ MRqueueCreate(int sz)
/* create a worker thread for each core as specified as system
parameter */
for (i = 0; i < GDKnr_threads; i++)
MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_DETACHED);
- MT_lock_unset(&mrqlock, "q_create");
+ MT_lock_unset(&mrqlock, "MRqueueCreate");
}
static void
MRenqueue(int taskcnt, MRtask ** tasks)
{
assert(taskcnt > 0);
- MT_lock_set(&mrqlock, "mrqlock");
+ MT_lock_set(&mrqlock, "MRenqueue");
if (mrqlast == mrqsize) {
mrqsize <<= 1;
mrqueue = (MRqueue *) GDKrealloc(mrqueue, sizeof(MRqueue) *
mrqsize);
if ( mrqueue == 0) {
- MT_lock_unset(&mrqlock, "mrqlock");
+ MT_lock_unset(&mrqlock, "MRenqueue");
GDKerror("Could not enlarge the map-reduce queue");
return;
}
@@ -96,10 +96,10 @@ MRenqueue(int taskcnt, MRtask ** tasks)
mrqueue[mrqlast].tasks = tasks;
mrqueue[mrqlast].size = taskcnt;
mrqlast++;
- MT_lock_unset(&mrqlock, "mrqlock");
+ MT_lock_unset(&mrqlock, "MRenqueue");
/* a task list is added for consumption */
while (taskcnt-- > 0)
- MT_sema_up(&mrqsema, "mrqsema");
+ MT_sema_up(&mrqsema, "MRenqueue");
}
static MRtask *
@@ -108,9 +108,9 @@ MRdequeue(void)
MRtask *r = NULL;
int idx;
- MT_sema_down(&mrqsema, "mrqsema");
+ MT_sema_down(&mrqsema, "MRdequeue");
assert(mrqlast);
- MT_lock_set(&mrqlock, "mrqlock");
+ MT_lock_set(&mrqlock, "MRdequeue");
if (mrqlast > 0) {
idx = mrqueue[mrqlast - 1].index;
r = mrqueue[mrqlast - 1].tasks[idx++];
@@ -119,7 +119,7 @@ MRdequeue(void)
else
mrqueue[mrqlast - 1].index = idx;
}
- MT_lock_unset(&mrqlock, "mrqlock");
+ MT_lock_unset(&mrqlock, "MRdequeue");
assert(r);
return r;
}
@@ -132,7 +132,7 @@ MRworker(void *arg)
do {
task = MRdequeue();
(task->cmd) (task);
- MT_sema_up(task->sema, "mrqsema");
+ MT_sema_up(task->sema, "MRworker");
} while (1);
}
@@ -147,7 +147,7 @@ MRschedule(int taskcnt, void **arg, void
if (mrqueue == 0)
MRqueueCreate(1024);
- MT_sema_init(&sema, 0, "q_create");
+ MT_sema_init(&sema, 0, "sema");
for (i = 0; i < taskcnt; i++) {
task[i]->sema = &sema;
task[i]->cmd = cmd;
@@ -155,5 +155,5 @@ MRschedule(int taskcnt, void **arg, void
MRenqueue(taskcnt, task);
/* waiting for all report result */
for (i = 0; i < taskcnt; i++)
- MT_sema_down(&sema, "mrqsema");
+ MT_sema_down(&sema, "MRschedule");
}
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -246,7 +246,7 @@ MCinitClientRecord(Client c, oid user, b
c->rcc = (RecPtr) GDKzalloc(sizeof(RecStat));
c->rcc->curQ = -1;
c->exception_buf_initialized = 0;
- MT_sema_init(&c->s, 0, "MCinitClient");
+ MT_sema_init(&c->s, 0, "Client->s");
return c;
}
diff --git a/monetdb5/mal/mal_interpreter.c b/monetdb5/mal/mal_interpreter.c
--- a/monetdb5/mal/mal_interpreter.c
+++ b/monetdb5/mal/mal_interpreter.c
@@ -438,7 +438,7 @@ callMAL(Client cntxt, MalBlkPtr mb, MalS
* is determined by an environment variable. It is initially set equal to the
* number of cores, which may be too coarse.
*/
- MT_sema_down(&mal_parallelism,"mal_parallelism");
+ MT_sema_down(&mal_parallelism,"callMAL");
runtimeProfileInit(mb, &runtimeProfile, cntxt->flags & memoryFlag);
#ifdef DEBUG_CALLMAL
mnstr_printf(cntxt->fdout, "callMAL\n");
@@ -477,10 +477,10 @@ callMAL(Client cntxt, MalBlkPtr mb, MalS
case PATcall:
case CMDcall:
default:
- MT_sema_up(&mal_parallelism,"mal_parallelism");
+ MT_sema_up(&mal_parallelism,"callMAL");
throw(MAL, "mal.interpreter", RUNTIME_UNKNOWN_INSTRUCTION);
}
- MT_sema_up(&mal_parallelism,"mal_parallelism");
+ MT_sema_up(&mal_parallelism,"callMAL");
if (cntxt->qtimeout && time(NULL) - stk->clock.tv_usec >
cntxt->qtimeout)
throw(MAL, "mal.interpreter", RUNTIME_QRY_TIMEOUT);
return ret;
diff --git a/monetdb5/modules/mal/tablet.c b/monetdb5/modules/mal/tablet.c
--- a/monetdb5/modules/mal/tablet.c
+++ b/monetdb5/modules/mal/tablet.c
@@ -1330,14 +1330,14 @@ SQLloader(void *p)
mnstr_printf(GDKout, "SQLloader started\n");
#endif
while (task->ateof == 0) {
- MT_sema_down(&task->producer, "tablet loader");
+ MT_sema_down(&task->producer, "SQLloader");
#ifdef _DEBUG_TABLET_
mnstr_printf(GDKout, "SQL loader got buffer \n");
#endif
if (task->ateof) /* forced exit received */
break;
task->ateof = tablet_read_more(task->b, task->out,
task->b->size - (task->b->len - task->b->pos)) == EOF;
- MT_sema_up(&task->consumer, "tablet loader");
+ MT_sema_up(&task->consumer, "SQLloader");
}
}
@@ -1395,8 +1395,8 @@ SQLload_file(Client cntxt, Tablet *as, b
task->input = task->base + 1; /* wrap the buffer with null bytes */
task->base[b->size + 1] = 0;
- MT_sema_init(&task->consumer, 0, "tablet load");
- MT_sema_init(&task->producer, 0, "tablet load");
+ MT_sema_init(&task->consumer, 0, "task->consumer");
+ MT_sema_init(&task->producer, 0, "task->producer");
task->ateof = 0;
task->b = b;
task->out = out;
@@ -1443,8 +1443,8 @@ SQLload_file(Client cntxt, Tablet *as, b
#ifdef MLOCK_TST
mlock(ptask[j].cols, sizeof(char *) * task->limit);
#endif
- MT_sema_init(&ptask[j].sema, 0, "sqlworker");
- MT_sema_init(&ptask[j].reply, 0, "sqlworker");
+ MT_sema_init(&ptask[j].sema, 0, "ptask[j].sema");
+ MT_sema_init(&ptask[j].reply, 0, "ptask[j].reply");
MT_create_thread(&ptask[j].tid, SQLworker, (void *) &ptask[j],
MT_THR_JOINABLE);
}
@@ -1454,8 +1454,8 @@ SQLload_file(Client cntxt, Tablet *as, b
#endif
tio = GDKusec();
- MT_sema_up(&task->producer, "tablet load");
- MT_sema_down(&task->consumer, "tablet loader");
+ MT_sema_up(&task->producer, "SQLload_file");
+ MT_sema_down(&task->consumer, "SQLload_file");
tio = GDKusec() - tio;
t1 = GDKusec();
#ifdef MLOCK_TST
@@ -1590,7 +1590,7 @@ SQLload_file(Client cntxt, Tablet *as, b
}
}
/* start feeding new data */
- MT_sema_up(&task->producer, "tablet load");
+ MT_sema_up(&task->producer, "SQLload_file");
t1 = GDKusec() - t1;
total += t1;
iototal += tio;
@@ -1607,13 +1607,13 @@ SQLload_file(Client cntxt, Tablet *as, b
ptask[j].next = task->next;
ptask[j].fields = task->fields;
ptask[j].limit = task->limit;
- MT_sema_up(&ptask[j].sema, "SQLworker");
+ MT_sema_up(&ptask[j].sema, "SQLload_file");
}
}
if (task->next) {
/* await completion of line break phase */
for (j = 0; j < threads; j++) {
- MT_sema_down(&ptask[j].reply, "sqlreader");
+ MT_sema_down(&ptask[j].reply, "SQLload_file");
if (ptask[j].error)
res = -1;
}
@@ -1627,7 +1627,7 @@ SQLload_file(Client cntxt, Tablet *as, b
for (j = 0; j < threads; j++) {
/* stage two, update the BATs */
ptask[j].state = UPDATEBAT;
- MT_sema_up(&ptask[j].sema, "SQLworker");
+ MT_sema_up(&ptask[j].sema,
"SQLload_file");
}
}
}
@@ -1642,11 +1642,11 @@ SQLload_file(Client cntxt, Tablet *as, b
if (res == 0 && task->next) {
/* await completion of the BAT updates */
for (j = 0; j < threads; j++)
- MT_sema_down(&ptask[j].reply, "sqlreader");
+ MT_sema_down(&ptask[j].reply, "SQLload_file");
}
if (task->ateof)
break;
- MT_sema_down(&task->consumer, "tablet loader");
+ MT_sema_down(&task->consumer, "SQLload_file");
}
if (task->b->pos < task->b->len && cnt < (BUN) maxrow && task->ateof) {
@@ -1673,14 +1673,14 @@ SQLload_file(Client cntxt, Tablet *as, b
}
task->ateof = 1;
- MT_sema_up(&task->producer, "tablet load");
+ MT_sema_up(&task->producer, "SQLload_file");
for (j = 0; j < threads; j++) {
ptask[j].next = -1;
- MT_sema_up(&ptask[j].sema, "SQLworker");
+ MT_sema_up(&ptask[j].sema, "SQLload_file");
}
/* wait for their death */
for (j = 0; j < threads; j++)
- MT_sema_down(&ptask[j].reply, "sqlreader");
+ MT_sema_down(&ptask[j].reply, "SQLload_file");
#ifdef _DEBUG_TABLET_
mnstr_printf(GDKout, "Kill the workers\n");
#endif
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list