Changeset: 6ce816ead8f0 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6ce816ead8f0
Modified Files:
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_dataflow.h
monetdb5/mal/mal_interpreter.h
monetdb5/optimizer/opt_commonTerms.c
monetdb5/optimizer/opt_joinpath.c
monetdb5/optimizer/opt_prelude.c
monetdb5/optimizer/opt_prelude.h
monetdb5/optimizer/opt_support.c
monetdb5/optimizer/opt_support.h
Branch: headless
Log Message:
Merge with default
Manually patch the changed scheduler and optimizers.
diffs (truncated from 1538 to 300 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -56,11 +56,6 @@
static lng memorypool; /* memory claimed by concurrent threads */
static lng memoryused; /* memory used for intermediates */
static int memoryclaims = 0; /* number of threads active with expensive
operations */
-static struct{
- lng claim; /* actual claim on memory*/
- int bid;
-} hotpotatoes[MAXHOT];
-static int hottop = 0;
#define DFLOWpending 0 /* runnable */
#define DFLOWrunning 1 /* currently in progress */
@@ -81,17 +76,27 @@
int id;
queue *todo; /* pending actions for this client */
lng clk;
+ struct DataFlow *flow;
} FlowTask;
-typedef struct DataFlow {
+typedef struct FLOWSTATUS {
Client cntxt; /* for debugging and client resolution */
MalBlkPtr mb; /* carry the context */
MalStkPtr stk;
+ int pc; /* pc in underlying malblock */
+ int blocks; /* awaiting for variables */
+ sht state; /* of execution */
+ sht cost;
+ lng hotclaim; /* memory foot print of result variables */
+ lng argclaim; /* memory foot print of arguments */
+ str error;
+} *FlowStatus, FlowStatusRec;
+
+typedef struct DataFlow {
int start, stop; /* guarded block under consideration*/
- char *status; /* statements can be blocked on other
statements */
- char *blocked; /* blocked, should be created first */
- int *assign; /* first assignment of variable */
- int *inuse; /* inuse in parallel threads reference count */
+ FlowStatus status; /* status of each instruction */
+ int *nodes; /* dependency graph nodes */
+ int *edges; /* dependency graph */
queue *done; /* work finished */
queue *todo; /* pending actions for this client */
int nway; /* number of workers */
@@ -99,17 +104,8 @@
struct DataFlow *free; /* free list */
} *DataFlow, DataFlowRec;
-typedef struct {
- int pc; /* pc in underlying malblock */
- sht status;
- sht cost;
- str error;
- DataFlow flow;
-} *FlowStep, FlowStepRec;
-static DataFlow flows = NULL;
-static int workerid = 0;
/*
* Running all eligible instructions in parallel creates
* resource contention. This means we should implement
@@ -144,225 +140,81 @@
* suspended instructions.
*/
+static int
+DFLOWgraphSize(MalBlkPtr mb, int start, int stop){
+ int cnt = 0;
+ int i;
+
+ for(i= start; i < stop; i++)
+ cnt += getInstrPtr(mb,i)->argc;
+ return cnt;
+}
+
#define heapinfo(X) if((X) && (X)->base) vol = (X)->free; else vol = 0;
#define hashinfo(X) if((X) && (X)->mask) vol =
((X)->mask+(X)->lim+1)*sizeof(int) + sizeof(*(X)); else vol = 0;
static lng
-calcclaim(MalStkPtr stk, InstrPtr pci, int i)
-{
- lng total = 0,vol;
+getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag){
+ lng total=0, vol = 0;
+ oid cnt = 0;
COL *b;
- assert(i < pci->argc);
+ (void) mb;
if (stk->stk[getArg(pci,i)].vtype == TYPE_bat){
b = CBPquickdesc(stk->stk[getArg(pci,i)].val.bval,TRUE);
if (b==NULL)
- return total;
+ return 0;
+ if ( flag && isVIEW(b)){
+ CBPreleaseref( b);
+ return 0;
+ }
heapinfo(&b->heap); total += vol;
heapinfo(b->vheap); total += vol;
hashinfo(b->hash); total += vol;
if ( COLtype(b) == TYPE_oid && !COLordered(b) && !COLdense(b) ){
/* assume we may have to do random IO, punish it by
increasing the claim with hash elem size*/
- total += 4 * COLcount(b) ;
- total = total > (lng) (MEMORY_THRESHOLD *
monet_memory)? (lng) (MEMORY_THRESHOLD * monet_memory) : total;
+ total += 4 * cnt;
+ total = total >(lng) (MEMORY_THRESHOLD * monet_memory)?
(lng) (MEMORY_THRESHOLD * monet_memory):total;
}
}
return total;
}
-static lng
-getHotClaim()
+int
+DFLOWadmission(lng argclaim, lng hotclaim)
{
- lng total = 0;
- int i;
- for ( i = 0; i < hottop; i++)
- total += hotpotatoes[i].claim;
- return total;
-}
-
-/*
- * Calculate the memory need of a single instruction and also determine how
much of the
- * hot potatoes it will eat.
-*/
-void
-getMemoryClaim(MalStkPtr stk, InstrPtr pci, lng *argclaim, lng *retclaim, lng
*hotclaim)
-{
- int i, h = int_nil;
- lng guess=0, t=0, total = 0;
- oid cnt = 0;
-
- total = 0;
- if (retclaim)
- *retclaim =0;
- if (hotclaim)
- *hotclaim =0;
- if (argclaim){
- for(i= pci->retc; i< pci->argc; i++) {
- h = int_nil;
- total+= calcclaim(stk,pci,i);
- if ( guess == 0)
- guess = total -t;
- /* claims on hotpotatoes are not counted */
- if (hotclaim)
- for (t =0; t< hottop; t++)
- if (hotpotatoes[t].bid == h)
- *hotclaim += hotpotatoes[t].claim;
- }
- /* make sure you can afford one hash when you have propCheck
enabled */
- if ( GDKdebug & 10 ){
- total += cnt * sizeof(oid);
- }
- *argclaim = total;
- }
- if ( retclaim && *retclaim == 0)
- *retclaim = guess * pci->retc;
-#ifdef DEBUG_MEMORY_CLAIM
- if ( total )
- mnstr_printf(GDKout,"#DFLOWgetMemoryClaim pool " LLFMT "
claims " LLFMT "," LLFMT "," LLFMT"\n",
memorypool,total,(retclaim?*retclaim:0),(hotclaim?*hotclaim:0));
-#endif
-}
-
-/*
- * After we have executed the instruction, we should release any hotpotatoe
claim (it is hot only once)
- * and make more precise claims for the return arguments. Moreover, we should
reduce the hot potatoe set,
- * due to other arguments being loaded.
-*/
-
-void
-updMemoryUsedPart(MalStkPtr stk, InstrPtr pci, int start, int stop, lng
argclaim)
-{
- /* remove the result arguments from the hot set */
- int i,j,h,bid;
-
- if ( hottop >= MAXHOT || memoryused > (lng) (MEMORY_THRESHOLD *
monet_memory) ){
- /* forget everything returning memory to pool */
- mal_set_lock(mal_contextLock, "DFLOWdelay");
- for ( i =0; i< MAXHOT; i++){
- hotpotatoes[i].claim=0;
- hotpotatoes[i].bid=0;
- }
- memoryused = 0;
- hottop = 0;
- mal_unset_lock(mal_contextLock, "DFLOWdelay");
-#ifdef DEBUG_MEMORY_CLAIM
- mnstr_printf(GDKout,"#DFLOWhotpotatoes reset\n");
-#endif
- }
-
- if (memorypool == 0 ){
- /* not initialized */
- return;
- }
- mal_set_lock(mal_contextLock, "DFLOWdelay");
- for ( i = start; i< stop; i++)
- if (stk->stk[getArg(pci,i)].vtype == TYPE_bat && (bid =
stk->stk[getArg(pci,i)].val.bval) && bid)
- {
- for ( h = j= 0; j< hottop; j++)
- if ( hotpotatoes[j].bid != bid)
- hotpotatoes[h++]= hotpotatoes[j];
- else{
-#ifdef DEBUG_MEMORY_CLAIM
- if ( hotpotatoes[j].claim){
- str cv = NULL;
- ATOMformat(TYPE_bat,
&hotpotatoes[hottop].bid, &cv);
-
mnstr_printf(GDKout,"#DFLOWhotpotatoes[%d] drops [%s]" LLFMT "\n", j, cv,
hotpotatoes[j].claim);
- if (cv) GDKfree(cv);
- }
-#endif
- }
- hottop = h;
- }
- /* input also invalidates part of the hot set */
- argclaim = argclaim * (memorypool / (memorypool+memoryused));
- for ( h = j= 0; j< hottop; j++)
- if ( argclaim > 0)
- argclaim -= hotpotatoes[j].claim;
- else hotpotatoes[h++]= hotpotatoes[j];
- hottop = h;
- mal_unset_lock(mal_contextLock, "DFLOWdelay");
-}
-
-void
-updMemoryUsed(MalStkPtr stk, InstrPtr pci, lng argclaim)
-{
- lng action=0,total,t;
- int i,h,bid;
-
- for ( i= 0; i< pci->retc && hottop < MAXHOT; i++)
- if (stk->stk[getArg(pci,i)].vtype == TYPE_bat && (bid =
stk->stk[getArg(pci,i)].val.bval) && bid)
- {
- total= calcclaim(stk,pci,i);
- (void) t;
- (void) h;
- if ( total ) {
- mal_set_lock(mal_contextLock, "DFLOWdelay");
- hotpotatoes[hottop].bid =
stk->stk[getArg(pci,i)].val.bval;
- hotpotatoes[hottop].claim = total;
-#ifdef DEBUG_MEMORY_CLAIM
- if ( total ) {
- str cv = NULL;
- ATOMformat(TYPE_bat, &hotpotatoes[hottop].bid,
&cv);
- mnstr_printf(GDKout,"#DFLOWhotpotatoes[%d]
claims [%s]" LLFMT "\n", hottop, cv, total);
- GDKfree(cv);
- }
-#endif
- hottop++;
- action++;
- mal_unset_lock(mal_contextLock, "DFLOWdelay");
- }
- }
- updMemoryUsedPart(stk,pci, pci->retc,pci->argc,argclaim);
-#ifdef DEBUG_MEMORY_CLAIM
- if ( total && action )
- mnstr_printf(GDKout,"#DFLOWhotpotatoes pool " LLFMT " used "
LLFMT "\n", memorypool, memoryused);
-#endif
-}
-
-int
-DFLOWadmission(lng argclaim, lng retclaim, lng hotclaim)
-{
- lng hot;
/* optimistically set memory */
- if ( argclaim + retclaim == 0)
+ if ( argclaim == 0)
return 0;
+ if( argclaim >= 0)
+ return 0; /* admission control not used */
mal_set_lock(mal_contextLock, "DFLOWdelay");
if (memorypool <= 0 && memoryclaims == 0) {
memorypool = (lng) (MEMORY_THRESHOLD * monet_memory);
- hottop = 0;
}
- if ( argclaim + retclaim > 0 ) {
- if (memoryclaims == 0 || memorypool - memoryused > argclaim +
retclaim -hotclaim){
- hot = getHotClaim();
- if ( memoryclaims && hotclaim == 0 && argclaim +
retclaim > memorypool - memoryused - hot) {
- /* don't start unless you can eat the hot
potatoes first */
-#ifdef DEBUG_MEMORY_CLAIM
- mnstr_printf(GDKerr,"#Delayed due to hot
potatoes pool " LLFMT " used " LLFMT " hot " LLFMT "\n", memorypool,
memoryused,hot);
-#endif
- mal_unset_lock(mal_contextLock, "DFLOWdelay");
- return -1;
- }
- memorypool -= (argclaim + retclaim);
+ if ( argclaim > 0 ) {
+ if (memoryclaims == 0 || memorypool - memoryused > argclaim +
hotclaim){
+ memorypool -= (argclaim + hotclaim);
memoryclaims ++;
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list