Changeset: c66263db594c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c66263db594c
Modified Files:
monetdb5/mal/mal_instruction.h
monetdb5/mal/mal_profiler.c
monetdb5/mal/mal_runtime.c
monetdb5/modules/mal/recycle.c
monetdb5/optimizer/opt_centipede.c
Branch: mutation
Log Message:
merge with default
diffs (truncated from 533 to 300 lines):
diff --git a/monetdb5/mal/mal_instruction.h b/monetdb5/mal/mal_instruction.h
--- a/monetdb5/mal/mal_instruction.h
+++ b/monetdb5/mal/mal_instruction.h
@@ -108,9 +108,9 @@ typedef struct PERF {
#endif
struct timeval clock; /* clock */
lng clk; /* microseconds clock */
- lng ticks; /* micro seconds spent
*/
- int counter; /* accumulate statistics */
- lng totalticks;
+ lng ticks; /* micro seconds spent
on last call */
+ lng totalticks; /* accumulate micro seconds
send on this call */
+ int calls; /* number of calls seen
*/
bit trace; /* facilitate
filter-based profiling */
lng rbytes; /* bytes read by an
instruction */
lng wbytes; /* bytes written by an
instruction */
diff --git a/monetdb5/mal/mal_profiler.c b/monetdb5/mal/mal_profiler.c
--- a/monetdb5/mal/mal_profiler.c
+++ b/monetdb5/mal/mal_profiler.c
@@ -239,9 +239,9 @@ static void logsent(char *logbuffer)
MT_lock_set(&mal_profileLock, "profileLock");
eventcounter++;
if (profileCounter[PROFevent].status && eventcounter)
- mnstr_printf(eventstream,"[ %d,\t%s ]\n", eventcounter,
logbuffer);
+ mnstr_printf(eventstream,"[ %d,\t%s", eventcounter,
logbuffer);
else
- mnstr_printf(eventstream,"[ %s ]\n", logbuffer);
+ mnstr_printf(eventstream,"[ %s", logbuffer);
mnstr_flush(eventstream);
MT_lock_unset(&mal_profileLock, "profileLock");
}
diff --git a/monetdb5/mal/mal_runtime.c b/monetdb5/mal/mal_runtime.c
--- a/monetdb5/mal/mal_runtime.c
+++ b/monetdb5/mal/mal_runtime.c
@@ -175,7 +175,7 @@ runtimeProfileExit(Client cntxt, MalBlkP
if (stk != NULL && prof->stkpc >= 0 && mb->profiler != NULL &&
mb->profiler[stkpc].trace ) {
gettimeofday(&mb->profiler[stkpc].clock, NULL);
- mb->profiler[stkpc].counter++;
+ mb->profiler[stkpc].calls++;
mb->profiler[stkpc].totalticks += mb->profiler[stkpc].ticks;
mb->profiler[stkpc].clk += mb->profiler[stkpc].ticks;
if (pci) {
diff --git a/monetdb5/modules/mal/recycle.c b/monetdb5/modules/mal/recycle.c
--- a/monetdb5/modules/mal/recycle.c
+++ b/monetdb5/modules/mal/recycle.c
@@ -138,7 +138,7 @@ RECYCLEdump(stream *s)
else mnstr_printf(s,"# ");
mnstr_printf(s,"%4d\t"LLFMT"\t%d\t"LLFMT"\t"LLFMT"\t"LLFMT"\t%s\n", i,
recycleBlk->profiler[i].clk,
- recycleBlk->profiler[i].counter,
+ recycleBlk->profiler[i].calls,
recycleBlk->profiler[i].ticks,
recycleBlk->profiler[i].rbytes,
recycleBlk->profiler[i].wbytes,
@@ -266,7 +266,7 @@ RECYCLErunningStat(Client cntxt, MalBlkP
#ifdef _DEBUG_CACHE_
if ( getInstrPtr(recycleBlk,i)->token != NOOPsymbol )
#endif
- if ( recycleBlk->profiler[i].counter >1)
+ if ( recycleBlk->profiler[i].calls >1)
reusedmem += recycleBlk->profiler[i].wbytes;
mnstr_printf(s,"%d\t %7.2f\t ", ++q,
(GDKusec()-cntxt->rcc->time0)/1000.0);
diff --git a/monetdb5/optimizer/opt_centipede.c
b/monetdb5/optimizer/opt_centipede.c
--- a/monetdb5/optimizer/opt_centipede.c
+++ b/monetdb5/optimizer/opt_centipede.c
@@ -43,7 +43,6 @@ typedef struct{
int type, slice;
int lslices, hslices; /* variables holding the range bound */
lng rowcnt;
- ValRecord bounds[MAXSITES];
} Slices;
static int nrservers;
@@ -51,7 +50,6 @@ static int nrservers;
/*
* The query will be controlled from the coordinator with a plan
* geared at parallel execution
- * TODO pack is expensive, use incremental pack
*/
static MalBlkPtr
OPTexecController(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb, Slices *slices,
oid plantag)
@@ -257,8 +255,8 @@ OPTexecController(Client cntxt, MalBlkPt
chkProgram(cntxt->fdout, cntxt->nspace, cmb);
#ifdef _DEBUG_OPT_CENTIPEDE_
- mnstr_printf(cntxt->fdout,"#rough cntrl plan %d \n", cmb->errors);
- printFunction(cntxt->fdout, cmb, 0, LIST_MAL_STMT);
+ //mnstr_printf(cntxt->fdout,"#rough cntrl plan %d \n", cmb->errors);
+ //printFunction(cntxt->fdout, cmb, 0, LIST_MAL_STMT);
#endif
GDKfree(alias);
GDKfree(pack);
@@ -370,17 +368,21 @@ static void
OPTmaterializePartition(MalBlkPtr mb, InstrPtr p, int low, int hgh)
{
int v,oldvar;
+ int i;
+ InstrPtr q;
- oldvar = getArg(p,0);
- getArg(p,0) = v = newTmpVariable(mb, getVarType(mb,oldvar));
- setVarUDFtype(mb, v);
- setVarFixed(mb,v);
+ for(i=0; i< p->retc; i++ ){
+ oldvar = getArg(p,i);
+ getArg(p,i) = v = newTmpVariable(mb, getVarType(mb,oldvar));
+ setVarUDFtype(mb, v);
+ setVarFixed(mb,v);
- p = newStmt(mb, algebraRef, sliceRef);
- p = pushArgument(mb, p, v);
- p = pushArgument(mb, p, low);
- p = pushArgument(mb, p, hgh);
- getArg(p,0)= oldvar;
+ q = newStmt(mb, algebraRef, sliceRef);
+ q = pushArgument(mb, q, v);
+ q = pushArgument(mb, q, low);
+ q = pushArgument(mb, q, hgh);
+ getArg(q,0)= oldvar;
+ }
}
@@ -405,7 +407,7 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n
}
if ( slices->slice == 0){
- slices->slice = newTmpVariable(nmb, getVarType(nmb,
getArg(slices->target,0)));
+ slices->slice = newTmpVariable(nmb, slices->type);
setVarUDFtype(nmb, slices->slice);
setVarUsed(nmb, slices->slice);
nmb->stmt[0] = pushArgument(nmb, nmb->stmt[0], slices->lslices);
@@ -429,13 +431,14 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n
* when a connection is re-used by different client sessions.
*/
#define BLOCKED 1
-#define PARTITION 2
-#define SUPPORTIVE 3
-#define EXPORTED 4
-#define KEEPLOCAL 5
+#define PARTITION 2 // phase 1 result
+#define PIVOT 3 // phase 2 result
+#define SUPPORTIVE 4 // phase 2 result
+#define EXPORTED 5
+#define KEEPLOCAL 6
#ifdef _DEBUG_OPT_CENTIPEDE_
-static char *statusname[6]= {"", "blocked ", "partition ", "support ",
"exported ", "keeplocal "};
+static char *statusname[7]= {"", "blocked ", "partition ", "pivot ",
"support ", "exported ", "keeplocal "};
#endif
static void
@@ -491,10 +494,8 @@ OPTbakePlans(Client cntxt, MalBlkPtr mb,
#define VALS 2
/* Phase 1: determine all variables/instructions indirectly dependent
on a fragmented column */
- /* Keep track on passing the OID or VALues around */
/* Instructions are marked as PARTITION if the have to be propagated */
last = limit;
- // Calling instruction and arguments are supportive to the partitioning
status[0]= PARTITION;
for ( j = old[0]->retc; j < old[0]->argc; j++)
vars[getArg(old[0],j)]= SUPPORTIVE;
@@ -506,110 +507,60 @@ OPTbakePlans(Client cntxt, MalBlkPtr mb,
last = i;
} else
// incorporate both single/double target sql.bind operations
- if ( getModuleId(p) == sqlRef && (getFunctionId(p) == bindRef
|| getFunctionId(p) == bindidxRef) &&
- strcmp(slices->schema, getVarConstant(mb, getArg(p,
p->retc + 1)).val.sval) == 0 &&
- strcmp(slices->table, getVarConstant(mb, getArg(p,
p->retc + 2)).val.sval) == 0 ) {
- status[i] = PARTITION;
- if ( p->retc == 1) {
- head[getArg(p,0)] = OIDS;
- tail[getArg(p,0)] = VALS;
- } else {
- tail[getArg(p,0)] = OIDS;
- tail[getArg(p,1)] = VALS;
- }
- } else
- if ( getModuleId(p) == sqlRef && getFunctionId(p) == deltaRef ){
- if ( head[getArg(p,1)] ){
- status[i] = PARTITION;
- tail[getArg(p,0)] = VALS;
- }
- } else
- if ( getModuleId(p) == sqlRef && getFunctionId(p) == tidRef &&
+ if ( getModuleId(p) == sqlRef &&
+ ( getFunctionId(p) == tidRef || getFunctionId(p) ==
bindRef || getFunctionId(p) == bindidxRef) &&
strcmp(slices->schema, getVarConstant(mb, getArg(p,
p->retc + 1)).val.sval) == 0 &&
strcmp(slices->table, getVarConstant(mb, getArg(p,
p->retc + 2)).val.sval) == 0 ) {
status[i] = PARTITION;
- tail[getArg(p,0)] = OIDS;
- }
+ vars[getArg(p,0)] = PARTITION;
+ } else
+ if ( getModuleId(p) == sqlRef && getFunctionId(p) == deltaRef ){
+ if ( vars[getArg(p,1)] ){
+ status[i] = PARTITION;
+ vars[getArg(p,0)] = PARTITION;
+ }
+ }
/* blocking instructions are those that require data exchange,
aggregation or total view */
- if ( getModuleId(p) == algebraRef && getFunctionId(p) ==
joinRef ) {
- if ( head[getArg(p,p->retc)] ){
+ if ( getModuleId(p) == algebraRef && (getFunctionId(p) ==
joinRef || getFunctionId(p) == leftjoinRef || getFunctionId(p) ==
leftfetchjoinRef) ) {
+ if ( vars[getArg(p,p->retc)] ||
vars[getArg(p,p->retc+1)] ){
status[i] = PARTITION;
- head[getArg(p,0)] = head[getArg(p,p->retc)];
- head[getArg(p,1)] = head[getArg(p,p->retc)];
+ vars[getArg(p,0)] = PARTITION;
}
- if ( tail[getArg(p,p->retc)] ){
+ } else
+ if ( getModuleId(p) == algebraRef && (getFunctionId(p)==
thetaselectRef || getFunctionId(p) == selectRef || getFunctionId(p) ==
subselectRef)){
+ if ( vars[getArg(p,p->retc)] ){
status[i] = PARTITION;
- tail[getArg(p,0)] = tail[getArg(p,p->retc)];
- tail[getArg(p,1)] = tail[getArg(p,p->retc)];
+ vars[getArg(p,0)] = PARTITION;
}
} else
- if ( getModuleId(p) == algebraRef && (getFunctionId(p) ==
leftjoinRef || getFunctionId(p) == leftfetchjoinRef) ) {
- if ( tail[getArg(p,1)] ){
+ if ( getModuleId(p) == batRef && getFunctionId(p) ==
mirrorRef ) {
+ if ( vars[getArg(p,p->retc)] ){
status[i] = PARTITION;
- head[getArg(p,0)] = tail[getArg(p,1)];
- }
- if ( tail[getArg(p,2)] ){
- status[i] = PARTITION;
- tail[getArg(p,0)] = tail[getArg(p,p->retc+1)];
- }
- } else
- if ( getModuleId(p) == algebraRef && (getFunctionId(p)==
thetauselectRef || getFunctionId(p) == uselectRef || getFunctionId(p) ==
selectRef ) ) {
- if (head[getArg(p,p->retc)] ) {
- head[getArg(p,0)] = head[getArg(p,p->retc)];
- tail[getArg(p,0)] = tail[getArg(p,p->retc)];
- status[i] = PARTITION;
- }
- } else
-/*
- if ( getModuleId(p) == algebraRef && (getFunctionId(p) ==
subsortRef) ) {
- if (tail[getArg(p,1)] ){
- tail[getArg(p,0)] = tail[getArg(p,1)];
- status[i] = PARTITION;
- }
- } else
-*/
- if ( getModuleId(p) == batRef && getFunctionId(p) ==
mirrorRef ) {
- if (head[getArg(p,1)]){
- head[getArg(p,0)] = head[getArg(p,1)];
- tail[getArg(p,0)] = head[getArg(p,1)];
- status[i] = PARTITION;
+ vars[getArg(p,0)] = PARTITION;
}
} else
if ( getModuleId(p) == batRef &&
getFunctionId(p)==reverseRef ) {
- if (head[getArg(p,1)] || tail[getArg(p,1)] ){
- head[getArg(p,0)] = tail[getArg(p,p->retc)];
- tail[getArg(p,0)] = head[getArg(p,p->retc)];
+ if ( vars[getArg(p,p->retc)] ){
status[i] = PARTITION;
+ vars[getArg(p,0)] = PARTITION;
}
} else
- if ( getModuleId(p) == groupRef && ( getFunctionId(p) ==
subgroupRef || getFunctionId(p) == subgroupdoneRef) ){
- if ( head[getArg(p, p->retc)] ){
- /* groups against the partition column is
allowed.
- It calls for a proper group reconstruction
at the receiver
- */
- head[getArg(p,0)] = OIDS;
- tail[getArg(p,0)] = 0;
- head[getArg(p,1)] = OIDS;
- tail[getArg(p,1)] = OIDS;
- status[i] = PARTITION;
+ if ( getModuleId(p) == groupRef && ( getFunctionId(p) ==
subgroupRef || getFunctionId(p) == subgroupdoneRef) && p->retc== 3){
+ if ( vars[getArg(p,p->retc)] ){
+ status[i] = PIVOT;
+ vars[getArg(p,0)] = PARTITION;
+ vars[getArg(p,1)] = PARTITION;
+ vars[getArg(p,2)] = PARTITION;
}
- }else
- if ( (getModuleId(p) == sqlRef && (getFunctionId(p) ==
resultSetRef || getFunctionId(p) == putName("exportValue",11) ) ) ||
getModuleId(p) == ioRef )
+ } else
+ if ((getModuleId(p) == sqlRef && (getFunctionId(p) ==
resultSetRef || getFunctionId(p) == putName("exportValue",11))) ||
getModuleId(p) == ioRef )
status[i] = BLOCKED;
else
if ( getModuleId(p) == batcalcRef ){
- if ( p->argc == 2 /* coercions and unaries */ &&
vars[getArg(p,1)] == PARTITION ) {
- status[i]= PARTITION;
- head[getArg(p,0)] = head[getArg(p,1)];
- tail[getArg(p,0)] = tail[getArg(p,1)];
+ if ( vars[getArg(p,p->retc)] ||
vars[getArg(p,p->retc+1)] ){
status[i] = PARTITION;
- }
- if ( p->argc == 3 /* binaries */ && (vars[getArg(p,1)]
== PARTITION || vars[getArg(p,2)] == PARTITION)) {
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list