Changeset: 3e23519c40ce for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/3e23519c40ce
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Create a pool of threads that the dataflow can use.
diffs (truncated from 492 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -80,16 +80,17 @@ typedef struct DATAFLOW {
static struct worker {
MT_Id id;
- enum {IDLE, RUNNING, JOINING, EXITED} flag;
+ enum {IDLE, WAITING, RUNNING, FREE } flag;
ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
char *errbuf; /* GDKerrbuf so that we can allocate before
fork */
MT_Sema s;
+ int self;
int next;
} workers[THREADS];
/* heads of two mutually exclusive linked lists, both using the .next
* field in the worker struct */
-static int exited_workers = -1; /* to be joined threads */
static int idle_workers = -1; /* idle workers (no thread associated) */
+static int free_workers = -1; /* free workers (thread doing nothing) */
static Queue *todo = 0; /* pending instructions */
@@ -102,7 +103,6 @@ mal_dataflow_reset(void)
{
stopMALdataflow();
memset((char*) workers, 0, sizeof(workers));
- exited_workers = -1;
idle_workers = -1;
if( todo) {
GDKfree(todo->data);
@@ -292,161 +292,176 @@ static void
DFLOWworker(void *T)
{
struct worker *t = (struct worker *) T;
- DataFlow flow;
- FlowEvent fe = 0, fnxt = 0;
- str error = 0;
- int i;
- lng claim;
- Client cntxt;
- InstrPtr p;
-
#ifdef _MSC_VER
srand((unsigned int) GDKusec());
#endif
assert(t->errbuf != NULL);
GDKsetbuf(t->errbuf); /* where to leave errors */
t->errbuf = NULL;
- GDKclrerr();
+
+ for (;;) {
+ DataFlow flow;
+ FlowEvent fe = 0, fnxt = 0;
+ str error = 0;
+ int i;
+ lng claim;
+ Client cntxt;
+ InstrPtr p;
+
+ GDKclrerr();
- cntxt = ATOMIC_PTR_GET(&t->cntxt);
- if (cntxt) {
- /* wait until we are allowed to start working */
- MT_sema_down(&t->s);
- }
- while (1) {
- if (fnxt == 0) {
- MT_thread_setworking(NULL);
- cntxt = ATOMIC_PTR_GET(&t->cntxt);
- fe = q_dequeue(todo, cntxt);
- if (fe == NULL) {
- if (cntxt) {
- /* we're not done yet with work for the
current
- * client (as far as we know), so give
up the CPU
- * and let the scheduler enter some
more work, but
- * first compensate for the down we did
in
- * dequeue */
- MT_sema_up(&todo->s);
- MT_sleep_ms(1);
- continue;
+ if (t->flag == WAITING) {
+ /* wait until we are allowed to start working */
+ MT_sema_down(&t->s);
+ t->flag = RUNNING;
+ }
+ assert(t->flag == RUNNING);
+ cntxt = ATOMIC_PTR_GET(&t->cntxt);
+ while (1) {
+ if (fnxt == 0) {
+ MT_thread_setworking(NULL);
+ cntxt = ATOMIC_PTR_GET(&t->cntxt);
+ fe = q_dequeue(todo, cntxt);
+ if (fe == NULL) {
+ if (cntxt) {
+ /* we're not done yet with work
for the current
+ * client (as far as we know),
so give up the CPU
+ * and let the scheduler enter
some more work, but
+ * first compensate for the
down we did in
+ * dequeue */
+ MT_sema_up(&todo->s);
+ MT_sleep_ms(1);
+ continue;
+ }
+ /* no more work to be done: exit */
+ break;
}
- /* no more work to be done: exit */
+ if (fe->flow->cntxt &&
fe->flow->cntxt->mythread)
+
MT_thread_setworking(fe->flow->cntxt->mythread->name);
+ } else
+ fe = fnxt;
+ if (ATOMIC_GET(&exiting)) {
break;
}
- if (fe->flow->cntxt && fe->flow->cntxt->mythread)
-
MT_thread_setworking(fe->flow->cntxt->mythread->name);
- } else
- fe = fnxt;
- if (ATOMIC_GET(&exiting)) {
- break;
- }
- fnxt = 0;
- assert(fe);
- flow = fe->flow;
- assert(flow);
+ fnxt = 0;
+ assert(fe);
+ flow = fe->flow;
+ assert(flow);
- /* whenever we have a (concurrent) error, skip it */
- if (ATOMIC_PTR_GET(&flow->error)) {
- q_enqueue(flow->done, fe);
- continue;
- }
-
- p= getInstrPtr(flow->mb,fe->pc);
- claim = fe->argclaim;
- if (MALadmission_claim(flow->cntxt, flow->mb, flow->stk, p,
claim)) {
- // never block on deblockdataflow()
- if( p->fcn != (MALfcn) deblockdataflow){
- fe->hotclaim = 0; /* don't assume priority
anymore */
- fe->maxclaim = 0;
- if (todo->last == 0)
- MT_sleep_ms(DELAYUNIT);
- q_requeue(todo, fe);
+ /* whenever we have a (concurrent) error, skip it */
+ if (ATOMIC_PTR_GET(&flow->error)) {
+ q_enqueue(flow->done, fe);
continue;
}
- }
- error = runMALsequence(flow->cntxt, flow->mb, fe->pc, fe->pc +
1, flow->stk, 0, 0);
- /* release the memory claim */
- MALadmission_release(flow->cntxt, flow->mb, flow->stk, p,
claim);
- MT_lock_set(&flow->flowlock);
- fe->state = DFLOWwrapup;
- MT_lock_unset(&flow->flowlock);
- if (error) {
- void *null = NULL;
- /* only collect one error (from one thread, needed for
stable testing) */
- if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
- freeException(error);
- /* after an error we skip the rest of the block */
- q_enqueue(flow->done, fe);
- continue;
- }
+ p= getInstrPtr(flow->mb,fe->pc);
+ claim = fe->argclaim;
+ if (MALadmission_claim(flow->cntxt, flow->mb,
flow->stk, p, claim)) {
+ // never block on deblockdataflow()
+ if( p->fcn != (MALfcn) deblockdataflow){
+ fe->hotclaim = 0; /* don't assume
priority anymore */
+ fe->maxclaim = 0;
+ if (todo->last == 0)
+ MT_sleep_ms(DELAYUNIT);
+ q_requeue(todo, fe);
+ continue;
+ }
+ }
+ error = runMALsequence(flow->cntxt, flow->mb, fe->pc,
fe->pc + 1, flow->stk, 0, 0);
+ /* release the memory claim */
+ MALadmission_release(flow->cntxt, flow->mb, flow->stk,
p, claim);
- /* see if you can find an eligible instruction that uses the
- * result just produced. Then we can continue with it right
away.
- * We are just looking forward for the last block, which means
we
- * are safe from concurrent actions. No other thread can steal
it,
- * because we hold the logical lock.
- * All eligible instructions are queued
- */
- {
- InstrPtr p = getInstrPtr(flow->mb, fe->pc);
- assert(p);
- fe->hotclaim = 0;
- fe->maxclaim = 0;
+ MT_lock_set(&flow->flowlock);
+ fe->state = DFLOWwrapup;
+ MT_lock_unset(&flow->flowlock);
+ if (error) {
+ void *null = NULL;
+ /* only collect one error (from one thread,
needed for stable testing) */
+ if (!ATOMIC_PTR_CAS(&flow->error, &null, error))
+ freeException(error);
+ /* after an error we skip the rest of the block
*/
+ q_enqueue(flow->done, fe);
+ continue;
+ }
- for (i = 0; i < p->retc; i++){
- lng footprint;
- footprint = getMemoryClaim(flow->mb, flow->stk, p, i,
FALSE);
- fe->hotclaim += footprint;
- if( footprint > fe->maxclaim) fe->maxclaim = footprint;
- }
- }
+ /* see if you can find an eligible instruction that
uses the
+ * result just produced. Then we can continue with it
right away.
+ * We are just looking forward for the last block,
which means we
+ * are safe from concurrent actions. No other thread
can steal it,
+ * because we hold the logical lock.
+ * All eligible instructions are queued
+ */
+ p = getInstrPtr(flow->mb, fe->pc);
+ assert(p);
+ fe->hotclaim = 0;
+ fe->maxclaim = 0;
+
+ for (i = 0; i < p->retc; i++){
+ lng footprint;
+ footprint = getMemoryClaim(flow->mb, flow->stk,
p, i, FALSE);
+ fe->hotclaim += footprint;
+ if( footprint > fe->maxclaim)
+ fe->maxclaim = footprint;
+ }
+
/* Try to get rid of the hot potato or locate an alternative to proceed.
*/
#define HOTPOTATO
#ifdef HOTPOTATO
- /* HOT potato choice */
- int last = 0, nxt = -1;
- lng nxtclaim = -1;
+ /* HOT potato choice */
+ int last = 0, nxt = -1;
+ lng nxtclaim = -1;
- MT_lock_set(&flow->flowlock);
- for (last = fe->pc - flow->start; last >= 0 && (i = flow->nodes[last])
> 0; last = flow->edges[last]){
- if (flow->status[i].state == DFLOWpending &&
flow->status[i].blocks == 1) {
- /* find the one with the largest footprint */
- if( nxt == -1){
- nxt = i;
- nxtclaim = flow->status[i].argclaim;
+ MT_lock_set(&flow->flowlock);
+ for (last = fe->pc - flow->start; last >= 0 && (i =
flow->nodes[last]) > 0; last = flow->edges[last]){
+ if (flow->status[i].state == DFLOWpending &&
flow->status[i].blocks == 1) {
+ /* find the one with the largest
footprint */
+ if( nxt == -1){
+ nxt = i;
+ nxtclaim =
flow->status[i].argclaim;
+ }
+ if( flow->status[i].argclaim >
nxtclaim){
+ nxt = i;
+ nxtclaim =
flow->status[i].argclaim;
+ }
+ }
}
- if( flow->status[i].argclaim > nxtclaim){
- nxt = i;
- nxtclaim = flow->status[i].argclaim;
+ /* hot potato can not be removed, use alternative to
proceed */
+ if( nxt >= 0){
+ flow->status[nxt].state = DFLOWrunning;
+ flow->status[nxt].blocks = 0;
+ flow->status[nxt].hotclaim = fe->hotclaim;
+ flow->status[nxt].argclaim += fe->hotclaim;
+ if( flow->status[nxt].maxclaim < fe->maxclaim)
+ flow->status[nxt].maxclaim =
fe->maxclaim;
+ fnxt = flow->status + nxt;
+ }
+ MT_lock_unset(&flow->flowlock);
+#endif
+
+ q_enqueue(flow->done, fe);
+ if ( fnxt == 0 && malProfileMode) {
+ profilerHeartbeatEvent("wait");
}
}
+ MT_lock_set(&dataflowLock);
+ if (GDKexiting() || ATOMIC_GET(&exiting)) {
+ MT_lock_unset(&dataflowLock);
+ break;
+ }
+ t->flag = FREE;
+ assert(free_workers != t->self);
+ t->next = free_workers;
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list