Changeset: df0b118accb2 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=df0b118accb2
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Fixed a race condition in the dataflow scheduler.
If one of the dataflow threads came across an error, it would cause
DFLOWscheduler to return. This triggered a cleanup of various
dataflow structures. However, other dataflow threads were still using
those structures and could then cause a crash.
The problem is solved by signalling all threads that they should
terminate, and then joining those threads before cleanup.
diffs (162 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -86,6 +86,8 @@ typedef struct DataFlow {
int nway; /* number of workers */
FlowTask *worker; /* worker threads for the client */
struct DataFlow *free; /* free list */
+ int terminate; /* set if we need to terminate */
+ MT_Lock termlock; /* lock to protect the above */
} *DataFlow, DataFlowRec;
/* does not seem to have a major impact */
@@ -249,14 +251,12 @@ q_create(int sz)
return q;
}
-/*
static void
q_destroy(queue *q)
{
GDKfree(q->data);
GDKfree(q);
}
-*/
/* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue
is possible */
/* we might actually sort it for better scheduling behavior */
@@ -314,9 +314,10 @@ q_dequeue(queue *q)
MT_sema_down(&q->s, "q_dequeue");
MT_lock_set(&q->l, "q_dequeue");
- assert(q->last > 0);
- /* LIFO favors garbage collection */
- r = q->data[--q->last];
+ if (q->last > 0)
+ /* LIFO favors garbage collection */
+ r = q->data[--q->last];
+ /* else: terminating */
/* try out random draw *
{
int i;
@@ -824,6 +825,12 @@ runDFLOWworker(void *t)
local = nxtfs != 0;
if (nxtfs == 0) {
fs = (FlowStatus)q_dequeue(task->todo);
+ MT_lock_set(&task->flow->termlock, "runDFLOWworker");
+ if (task->flow->terminate) {
+ MT_lock_unset(&task->flow->termlock,
"runDFLOWworker");
+ break;
+ }
+ MT_lock_unset(&task->flow->termlock, "runDFLOWworker");
#ifdef USE_DFLOW_ADMISSION
if (DFLOWadmission(fs->argclaim, fs->hotclaim)) {
@@ -1168,7 +1175,6 @@ DFLOWscheduler(DataFlow flow)
return ret;
}
-static DataFlow flows = NULL;
static int workerid = 0;
str runMALdataflow(Client cntxt, MalBlkPtr mb, int startpc,
@@ -1177,6 +1183,7 @@ str runMALdataflow(Client cntxt, MalBlkP
DataFlow flow = NULL;
str ret = MAL_SUCCEED;
int size;
+ int i;
#ifdef DEBUG_FLOW
mnstr_printf(GDKstdout, "runMALdataflow for block %d - %d\n", startpc,
stoppc);
@@ -1191,37 +1198,33 @@ str runMALdataflow(Client cntxt, MalBlkP
return MAL_SUCCEED;
assert(stoppc > startpc);
- mal_set_lock(mal_contextLock, "runMALdataflow");
- flow = flows;
- if (flow) {
- flows = flow->free;
- } else {
- int i;
+ flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
+ MT_lock_init(&flow->termlock, "runMALdataflow");
- flow = (DataFlow)GDKzalloc(sizeof(DataFlowRec));
+ /* seems enough for the time being */
+ flow->done = q_create(2048);
+ flow->todo = q_create(2048);
- /* seems enough for the time being */
- flow->done = q_create(2048);
- flow->todo = q_create(2048);
+ /* queues are available? */
+ if (flow->done == NULL || flow->todo == NULL) {
+ return MAL_SUCCEED;
+ }
- /* queues are available? */
- if (flow->done == NULL || flow->todo == NULL) {
- mal_unset_lock(mal_contextLock, "runMALdataflow");
- return MAL_SUCCEED;
- }
+ flow->worker = NULL;
+ flow->nway = GDKnr_threads ? GDKnr_threads : 1;
+ if (flow->nway > stoppc - startpc)
+ flow->nway = stoppc - startpc;
+ flow->worker = (FlowTask *)GDKzalloc(sizeof(FlowTask) * flow->nway);
+ for (i = 0; i < flow->nway; i++) {
+ flow->worker[i].id = workerid++;
+ flow->worker[i].todo = flow->todo;
+ flow->worker[i].flow = flow;
+ /* create the thread and let it wait */
+ MT_create_thread(&flow->worker[i].tid, runDFLOWworker,
+ flow->worker + i,
MT_THR_JOINABLE);
+ }
- flow->worker = NULL;
- flow->nway = GDKnr_threads ? GDKnr_threads : 1;
- flow->worker = (FlowTask *)GDKzalloc(sizeof(FlowTask) *
flow->nway);
- for (i = 0; i < flow->nway; i++) {
- flow->worker[i].id = workerid++;
- flow->worker[i].todo = flow->todo;
- flow->worker[i].flow = flow;
- /* create the thread and let it wait */
- MT_create_thread(&flow->worker[i].tid, runDFLOWworker,
flow->worker + i, MT_THR_DETACHED);
- }
- }
/* keep real block count, exclude brackets */
flow->start = startpc + 1;
flow->stop = stoppc;
@@ -1231,18 +1234,23 @@ str runMALdataflow(Client cntxt, MalBlkP
flow->nodes = (int*)GDKzalloc(sizeof(int) * size);
flow->edges = (int*)GDKzalloc(sizeof(int) * size);
DFLOWinit(flow, cntxt, mb, stk, size);
- mal_unset_lock(mal_contextLock, "runMALdataflow");
ret = DFLOWscheduler(flow);
+
+ MT_lock_set(&flow->termlock, "runMALdataflow");
+ flow->terminate = 1;
+ MT_lock_unset(&flow->termlock, "runMALdataflow");
+ for (i = 0; i < flow->nway; i++)
+ MT_sema_up(&flow->todo->s, "runMALdataflow");
+ for (i = 0; i < flow->nway; i++)
+ MT_join_thread(flow->worker[i].tid);
+
+ q_destroy(flow->done);
+ q_destroy(flow->todo);
+ GDKfree(flow->worker);
GDKfree(flow->status);
- flow->status = 0;
GDKfree(flow->edges);
- flow->edges = 0;
GDKfree(flow->nodes);
- flow->nodes = 0;
- mal_set_lock(mal_contextLock, "runMALdataflow");
- flow->free = flows;
- flows = flow;
- mal_unset_lock(mal_contextLock, "runMALdataflow");
+ GDKfree(flow);
return ret;
}
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list