Changeset: 6739ad5dff63 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6739ad5dff63
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: Oct2014
Log Message:
Remove superfluous test, and properly lock concurrent access.
diffs (80 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -343,47 +343,47 @@ DFLOWworker(void *T)
assert(flow);
/* whenever we have a (concurrent) error, skip it */
+ MT_lock_set(&flow->flowlock, "DFLOWworker");
if (flow->error) {
+ MT_lock_unset(&flow->flowlock, "DFLOWworker");
q_enqueue(flow->done, fe);
continue;
}
+ MT_lock_unset(&flow->flowlock, "DFLOWworker");
- /* skip all instructions when we have encontered an error */
- if (flow->error == 0) {
#ifdef USE_MAL_ADMISSION
- if (MALadmission(fe->argclaim, fe->hotclaim)) {
- fe->hotclaim = 0; /* don't assume priority
anymore */
- if (todo->last == 0)
- MT_sleep_ms(DELAYUNIT);
- q_requeue(todo, fe);
- continue;
- }
+ if (MALadmission(fe->argclaim, fe->hotclaim)) {
+ fe->hotclaim = 0; /* don't assume priority anymore */
+ if (todo->last == 0)
+ MT_sleep_ms(DELAYUNIT);
+ q_requeue(todo, fe);
+ continue;
+ }
#endif
- error = runMALsequence(flow->cntxt, flow->mb, fe->pc,
fe->pc + 1, flow->stk, 0, 0);
- PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d
claim= " LLFMT "," LLFMT " %s\n",
- fe->pc, id,
fe->argclaim, fe->hotclaim, error ? error : "");
+ error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc +
1, flow->stk, 0, 0);
+ PARDEBUG fprintf(stderr, "#executed pc= %d wrk= %d claim= "
LLFMT "," LLFMT " %s\n",
+ fe->pc, id, fe->argclaim,
fe->hotclaim, error ? error : "");
#ifdef USE_MAL_ADMISSION
- /* release the memory claim */
- MALadmission(-fe->argclaim, -fe->hotclaim);
+ /* release the memory claim */
+ MALadmission(-fe->argclaim, -fe->hotclaim);
#endif
- /* update the numa information. keep the thread-id
producing the value */
- p= getInstrPtr(flow->mb,fe->pc);
- for( i = 0; i < p->argc; i++)
- flow->mb->var[getArg(p,i)]->worker = thr->tid;
+ /* update the numa information. keep the thread-id producing
the value */
+ p= getInstrPtr(flow->mb,fe->pc);
+ for( i = 0; i < p->argc; i++)
+ flow->mb->var[getArg(p,i)]->worker = thr->tid;
+ MT_lock_set(&flow->flowlock, "DFLOWworker");
+ fe->state = DFLOWwrapup;
+ MT_lock_unset(&flow->flowlock, "DFLOWworker");
+ if (error) {
MT_lock_set(&flow->flowlock, "DFLOWworker");
- fe->state = DFLOWwrapup;
+ /* only collect one error (from one thread, needed for
stable testing) */
+ if (!flow->error)
+ flow->error = error;
MT_lock_unset(&flow->flowlock, "DFLOWworker");
- if (error) {
- MT_lock_set(&flow->flowlock, "DFLOWworker");
- /* only collect one error (from one thread,
needed for stable testing) */
- if (!flow->error)
- flow->error = error;
- MT_lock_unset(&flow->flowlock, "DFLOWworker");
- /* after an error we skip the rest of the block
*/
- q_enqueue(flow->done, fe);
- continue;
- }
+ /* after an error we skip the rest of the block */
+ q_enqueue(flow->done, fe);
+ continue;
}
/* see if you can find an eligible instruction that uses the
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list