Changeset: 419a75b94b86 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=419a75b94b86
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Be protected against concurrent access
diffs (115 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -186,9 +186,11 @@ q_dequeue(queue *q)
MT_sema_down(&q->s, "q_dequeue");
assert(q->last);
MT_lock_set(&q->l, "q_dequeue");
- if (q->last > 0)
+ if (q->last > 0) {
/* LIFO favors garbage collection */
r = q->data[--q->last];
+ q->data[q->last] = 0;
+ }
/* else: terminating */
/* try out random draw *
{
@@ -310,6 +312,7 @@ DFLOWworker(void *t)
for (i = 0; i < p->retc; i++)
fe->hotclaim += getMemoryClaim(flow->mb, flow->stk,
fe->pc, i, FALSE);
#endif
+ MT_lock_set(&flow->flowlock, "MALworker");
for (last = fe->pc - flow->start; last >= 0 && (i =
flow->nodes[last]) > 0; last = flow->edges[last])
if (flow->status[i].state == DFLOWpending &&
flow->status[i].blocks == 1) {
@@ -317,10 +320,15 @@ DFLOWworker(void *t)
flow->status[i].blocks = 0;
flow->status[i].hotclaim = fe->hotclaim;
flow->status[i].argclaim += fe->hotclaim;
- if (fnxt)
- q_enqueue(todo, fnxt);
+ if (fnxt) {
+ if ( flow->error)
+ q_enqueue(flow->done, fnxt);
+ else
+ q_enqueue(todo, fnxt);
+ }
fnxt = flow->status + i;
}
+ MT_lock_unset(&flow->flowlock, "MALworker");
q_enqueue(flow->done, fe);
MALresourceFairness(flow->cntxt, flow->mb, usec);
@@ -461,6 +469,7 @@ DFLOWinitBlk(DataFlow flow, MalBlkPtr mb
* They take effect after we have ensured that the basic properties for
* execution hold.
*/
+/*
static void showFlowEvent(DataFlow flow, int pc)
{
int i;
@@ -473,17 +482,18 @@ static void showFlowEvent(DataFlow flow,
printInstruction(GDKstdout, fe[i].flow->mb, 0,
getInstrPtr(fe[i].flow->mb, fe[i].pc), LIST_MAL_STMT | LIST_MAPI);
}
}
+*/
static str
DFLOWscheduler(DataFlow flow)
{
int last;
- int i, pc = 0;
+ int i;
#ifdef USE_MAL_ADMISSION
int j;
InstrPtr p;
#endif
- int actions = flow->stop - flow->start;
+ int tasks=0, actions = flow->stop - flow->start;
str ret = MAL_SUCCEED;
FlowEvent fe, f = 0;
@@ -509,9 +519,9 @@ DFLOWscheduler(DataFlow flow)
PARDEBUG mnstr_printf(GDKstdout, "#run %d instructions in dataflow
block\n", actions);
- while (actions > 0 ) {
+ while (actions != tasks ) {
f = q_dequeue(flow->done);
- actions--;
+ tasks++;
/*
* When an instruction is finished we have to reduce the blocked
@@ -519,26 +529,29 @@ DFLOWscheduler(DataFlow flow)
* drops to zero we can scheduler it we do it here instead of
the scheduler
*/
+ MT_lock_set(&flow->flowlock, "MALworker");
for (last = f->pc - flow->start; last >= 0 && (i =
flow->nodes[last]) > 0; last = flow->edges[last])
if (flow->status[i].state == DFLOWpending) {
flow->status[i].argclaim += f->hotclaim;
if (flow->status[i].blocks == 1 ) {
- q_enqueue(todo, flow->status + i);
flow->status[i].state = DFLOWrunning;
flow->status[i].blocks--;
+ q_enqueue(todo, flow->status + i);
PARDEBUG
mnstr_printf(GDKstdout, "#enqueue pc=%d
claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
} else {
flow->status[i].blocks--;
}
}
+ MT_lock_unset(&flow->flowlock, "MALworker");
}
/* wrap up errors */
+ assert(todo->last == 0);
+ assert(flow->done->last == 0);
if (flow->error ) {
PARDEBUG mnstr_printf(GDKstdout, "#errors encountered %s ",
flow->error ? flow->error : "unknown");
ret = flow->error;
}
- PARDEBUG showFlowEvent(flow, pc);
return ret;
}
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list