Changeset: 63a670bf0ccd for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=63a670bf0ccd
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Keep the administration on events consistent
diffs (77 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -253,6 +253,7 @@ DFLOWworker(void *t)
/* whenever we have a (concurrent) error, skip it */
if (flow->error) {
+ fe->state = DFLOWwrapup;
q_enqueue(flow->done, fe);
continue;
}
@@ -277,7 +278,6 @@ DFLOWworker(void *t)
MALadmission(-fe->argclaim, -fe->hotclaim);
#endif
- fe->state = DFLOWwrapup;
if (error) {
MT_lock_set(&flow->flowlock, "runMALdataflow");
if (flow->error) {
@@ -294,6 +294,7 @@ DFLOWworker(void *t)
flow->error = error;
MT_lock_unset(&flow->flowlock,
"runMALdataflow");
/* after an error we skip the rest of the block
*/
+ fe->state = DFLOWwrapup;
q_enqueue(flow->done, fe);
continue;
}
@@ -301,10 +302,9 @@ DFLOWworker(void *t)
/* see if you can find an eligible instruction that uses the
* result just produced. Then we can continue with it right
away.
- * We are just looking forward for the last block, which means
we
- * are safe from concurrent actions. No other thread can steal
it,
- * because we hold the logical lock.
- * All eligible instructions are queued
+ * We are just looking forward and hold the logical locks on the
+ * target variables just produced.
+ * It means we are safe from concurrent actions. No other
thread can steal it.
*/
#ifdef USE_MAL_ADMISSION
fe->hotclaim = 0;
@@ -312,7 +312,6 @@ DFLOWworker(void *t)
for (i = 0; i < p->retc; i++)
fe->hotclaim += getMemoryClaim(flow->mb, flow->stk,
fe->pc, i, FALSE);
#endif
- MT_lock_set(&flow->flowlock, "MALworker");
for (last = fe->pc - flow->start; last >= 0 && (i =
flow->nodes[last]) > 0; last = flow->edges[last])
if (flow->status[i].state == DFLOWpending &&
flow->status[i].blocks == 1) {
@@ -320,16 +319,11 @@ DFLOWworker(void *t)
flow->status[i].blocks = 0;
flow->status[i].hotclaim = fe->hotclaim;
flow->status[i].argclaim += fe->hotclaim;
- if (fnxt) {
- if ( flow->error)
- q_enqueue(flow->done, fnxt);
- else
- q_enqueue(todo, fnxt);
- }
fnxt = flow->status + i;
+ break;
}
- MT_lock_unset(&flow->flowlock, "MALworker");
+ fe->state = DFLOWwrapup;
q_enqueue(flow->done, fe);
MALresourceFairness(flow->cntxt, flow->mb, usec);
}
@@ -535,7 +529,7 @@ DFLOWscheduler(DataFlow flow)
flow->status[i].argclaim += f->hotclaim;
if (flow->status[i].blocks == 1 ) {
flow->status[i].state = DFLOWrunning;
- flow->status[i].blocks--;
+ flow->status[i].blocks= 0;
q_enqueue(todo, flow->status + i);
PARDEBUG
mnstr_printf(GDKstdout, "#enqueue pc=%d
claim= " LLFMT "\n", flow->status[i].pc, flow->status[i].argclaim);
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list