Changeset: 0166c4155d05 for MonetDB URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0166c4155d05 Added Files: MonetDB5/mal/mal_dataflow.c Branch: tracer Log Message:
Revert "Reverted to 71063:8ed9e57e46cc" This reverts commit 9070164a3dc6bd5c977cb517384f32d86193858d. diffs (truncated from 1006 to 300 lines): diff --git a/MonetDB5/mal/mal_dataflow.c b/MonetDB5/mal/mal_dataflow.c new file mode 100644 --- /dev/null +++ b/MonetDB5/mal/mal_dataflow.c @@ -0,0 +1,1001 @@ +/* + * This Source Code Form is subject to the terms of the Mozilla Public + * License, v. 2.0. If a copy of the MPL was not distributed with this + * file, You can obtain one at http://mozilla.org/MPL/2.0/. + * + * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V. + */ + +/* + * (author) M Kersten + * Out of order execution + * The alternative is to execute the instructions out of order + * using dataflow dependencies and as an independent process. + * Dataflow processing only works on a code + * sequence that does not include additional (implicit) flow of control + * statements and, ideally, consist of expensive BAT operations. + * The dataflow interpreter selects cheap instructions + * using a simple costfunction based on the size of the BATs involved. + * + * The dataflow portion is identified as a guarded block, + * whose entry is controlled by the function language.dataflow(); + * This way the function can inform the caller to skip the block + * when dataflow execution was performed. + * + * The flow graphs should be organized such that parallel threads can + * access it mostly without expensive locking. + */ +#include "monetdb_config.h" +#include "mal_dataflow.h" +#include "mal_private.h" +#include "mal_runtime.h" +#include "mal_resource.h" + +#define DFLOWpending 0 /* runnable */ +#define DFLOWrunning 1 /* currently in progress */ +#define DFLOWwrapup 2 /* done! */ +#define DFLOWretry 3 /* reschedule */ +#define DFLOWskipped 4 /* due to errors */ + +/* The per instruction status of execution */ +typedef struct FLOWEVENT { + struct DATAFLOW *flow;/* execution context */ + int pc; /* pc in underlying malblock */ + int blocks; /* awaiting for variables */ + sht state; /* of execution */ + lng clk; + sht cost; + lng hotclaim; /* memory foot print of result variables */ + lng argclaim; /* memory foot print of arguments */ + lng maxclaim; /* memory foot print of largest argument, counld be used to indicate result size */ +} *FlowEvent, FlowEventRec; + +typedef struct queue { + int size; /* size of queue */ + int last; /* last element in the queue */ + int exitcount; /* how many threads should exit */ + FlowEvent *data; + MT_Lock l; /* it's a shared resource, ie we need locks */ + MT_Sema s; /* threads wait on empty queues */ +} Queue; + +/* + * The dataflow dependency is administered in a graph list structure. + * For each instruction we keep the list of instructions that + * should be checked for eligibility once we are finished with it. + */ +typedef struct DATAFLOW { + Client cntxt; /* for debugging and client resolution */ + MalBlkPtr mb; /* carry the context */ + MalStkPtr stk; + int start, stop; /* guarded block under consideration*/ + FlowEvent status; /* status of each instruction */ + ATOMIC_PTR_TYPE error; /* error encountered */ + int *nodes; /* dependency graph nodes */ + int *edges; /* dependency graph */ + MT_Lock flowlock; /* lock to protect the above */ + Queue *done; /* instructions handled */ +} *DataFlow, DataFlowRec; + +static struct worker { + MT_Id id; + enum {IDLE, RUNNING, JOINING, EXITED} flag; + ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */ + MT_Sema s; +} workers[THREADS]; + +static Queue *todo = 0; /* pending instructions */ + +static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0); +static MT_Lock dataflowLock = MT_LOCK_INITIALIZER("dataflowLock"); +static void stopMALdataflow(void); + +void +mal_dataflow_reset(void) +{ + stopMALdataflow(); + memset((char*) workers, 0, sizeof(workers)); + if( todo) { + GDKfree(todo->data); + MT_lock_destroy(&todo->l); + MT_sema_destroy(&todo->s); + GDKfree(todo); + } + todo = 0; /* pending instructions */ + ATOMIC_SET(&exiting, 0); +} + +/* + * Calculate the size of the dataflow dependency graph. + */ +static int +DFLOWgraphSize(MalBlkPtr mb, int start, int stop) +{ + int cnt = 0; + int i; + + for (i = start; i < stop; i++) + cnt += getInstrPtr(mb, i)->argc; + return cnt; +} + +/* + * The dataflow execution is confined to a barrier block. + * Within the block there are multiple flows, which, in principle, + * can be executed in parallel. + */ + +static Queue* +q_create(int sz, const char *name) +{ + Queue *q = (Queue*)GDKmalloc(sizeof(Queue)); + + if (q == NULL) + return NULL; + *q = (Queue) { + .size = ((sz << 1) >> 1), /* we want a multiple of 2 */ + }; + q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size); + if (q->data == NULL) { + GDKfree(q); + return NULL; + } + + MT_lock_init(&q->l, name); + MT_sema_init(&q->s, 0, name); + return q; +} + +static void +q_destroy(Queue *q) +{ + assert(q); + MT_lock_destroy(&q->l); + MT_sema_destroy(&q->s); + GDKfree(q->data); + GDKfree(q); +} + +/* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue is possible */ +/* we might actually sort it for better scheduling behavior */ +static void +q_enqueue_(Queue *q, FlowEvent d) +{ + assert(q); + assert(d); + if (q->last == q->size) { + q->size <<= 1; + q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * q->size); + assert(q->data); + } + q->data[q->last++] = d; +} +static void +q_enqueue(Queue *q, FlowEvent d) +{ + assert(q); + assert(d); + MT_lock_set(&q->l); + q_enqueue_(q, d); + MT_lock_unset(&q->l); + MT_sema_up(&q->s); +} + +/* + * A priority queue over the hot claims of memory may + * be more effective. It priorizes those instructions + * that want to use a big recent result + */ + +#ifdef USE_MAL_ADMISSION +static void +q_requeue_(Queue *q, FlowEvent d) +{ + int i; + + assert(q); + assert(d); + if (q->last == q->size) { + /* enlarge buffer */ + q->size <<= 1; + q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * q->size); + assert(q->data); + } + for (i = q->last; i > 0; i--) + q->data[i] = q->data[i - 1]; + q->data[0] = d; + q->last++; +} +static void +q_requeue(Queue *q, FlowEvent d) +{ + assert(q); + assert(d); + MT_lock_set(&q->l); + q_requeue_(q, d); + MT_lock_unset(&q->l); + MT_sema_up(&q->s); +} +#endif + +static FlowEvent +q_dequeue(Queue *q, Client cntxt) +{ + FlowEvent r = NULL, s = NULL; + //int i; + + assert(q); + MT_sema_down(&q->s); + if (ATOMIC_GET(&exiting)) + return NULL; + MT_lock_set(&q->l); + if (cntxt) { + int i, minpc = -1; + + for (i = q->last - 1; i >= 0; i--) { + if (q->data[i]->flow->cntxt == cntxt) { + if (q->last > 1024) { + /* for long "queues", just grab the first eligible + * entry we encounter */ + minpc = i; + break; + } + /* for shorter "queues", find the oldest eligible entry */ + if (minpc < 0) { + minpc = i; + s = q->data[i]; + } + r = q->data[i]; + if (s && r && s->pc > r->pc) { + minpc = i; + s = r; + } + } + } + if (minpc >= 0) { + r = q->data[minpc]; + i = minpc; + q->last--; + memmove(q->data + i, q->data + i + 1, (q->last - i) * sizeof(q->data[0])); + } + + MT_lock_unset(&q->l); + return r; + } + if (q->exitcount > 0) { + q->exitcount--; + MT_lock_unset(&q->l); + return NULL; + } + assert(q->last > 0); + if (q->last > 0) { + /* LIFO favors garbage collection */ + r = q->data[--q->last]; +/* Line coverage test shows it is an expensive loop that is hardly ever leads to adjustment + for(i= q->last-1; r && i>=0; i--){ + s= q->data[i]; + if( s && s->flow && s->flow->stk && + r && r->flow && r->flow->stk && + s->flow->stk->tag < r->flow->stk->tag){ + q->data[i]= r; + r = s; + } + } +*/ + q->data[q->last] = 0; + } + /* else: terminating */ + /* try out random draw * + { + int i; + i = rand() % q->last; + r = q->data[i]; + for (i++; i < q->last; i++) + q->data[i - 1] = q->data[i]; + q->last--; i _______________________________________________ checkin-list mailing list checkin-list@monetdb.org https://www.monetdb.org/mailman/listinfo/checkin-list