Changeset: c79022b55949 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c79022b55949 Modified Files: Branch: default Log Message:
merge diffs (truncated from 718 to 300 lines): diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx --- a/monetdb5/mal/mal_interpreter.mx +++ b/monetdb5/mal/mal_interpreter.mx @@ -1114,7 +1114,7 @@ static void DFLOWinit(DataFlow flow, Client cntxt, MalBlkPtr mb, MalStkPtr stk, int size) { - int pc, i, j, k, n, etop = 0; + int pc, i, j, k, l, n, etop = 0; int *assign; InstrPtr p; @@ -1135,6 +1135,7 @@ /* administer flow dependencies */ for (j = p->retc; j < p->argc; j++) { + /* list of instructions that wake n-th instruction up */ if (!isVarConstant(mb, getArg(p, j)) && (k = assign[getArg(p, j)])) { /* add edge to the target instruction for wakeup call */ k -= flow->start; @@ -1155,28 +1156,31 @@ flow->status[n].blocks++; } - /* be careful, watch out for garbage collection interference */ - /* those should be scheduled after all its other uses */ - k = getEndOfLife(mb, getArg(p, j)); - if (k != pc && k < flow->stop && k > flow->start) { - /* add edge to the target instruction for wakeup call */ - PARDEBUG mnstr_printf(GDKstdout, "forward %d -> %d\n", n + flow->start, k); - k -= flow->start; - if (flow->nodes[n]) { - /* add wakeup to tail of list */ - for (i = n; flow->edges[i] > 0; i = flow->edges[i]) - ; - flow->nodes[etop] = k; - flow->edges[etop] = -1; - flow->edges[i] = etop; - etop++; - (void)size; - assert(etop < size); - } else { - flow->nodes[n] = k; - flow->edges[n] = -1; + + /* list of instructions to be woken up explicitly */ + if ( !isVarConstant(mb, getArg(p, j)) ) { + /* be careful, watch out for garbage collection interference */ + /* those should be scheduled after all its other uses */ + l = getEndOfLife(mb, getArg(p, j)); + if (l != pc && l < flow->stop && l > flow->start) { + /* add edge to the target instruction for wakeup call */ + PARDEBUG mnstr_printf(GDKstdout, "endoflife for %s is %d -> %d\n", getVarName(mb, getArg(p,j)), n + flow->start, l); + l -= flow->start; + if (flow->nodes[n]) { + /* add wakeup to tail of list */ + for (i = n; flow->edges[i] > 0; i = flow->edges[i]) + ; + flow->nodes[etop] = l; + flow->edges[etop] = -1; + flow->edges[i] = etop; + etop++; + assert(etop < size); + } else { + flow->nodes[n] = l; + flow->edges[n] = -1; + } + flow->status[l].blocks++; } - flow->status[k].blocks++; } } @@ -1184,10 +1188,11 @@ assign[getArg(p, j)] = pc; /* ensure recognition of dependency on first instruction and constant */ } GDKfree(assign); - PARDEBUG for (n = 0; n < flow->stop - flow->start; n++) { + PARDEBUG + for (n = 0; n < flow->stop - flow->start; n++) { mnstr_printf(GDKstdout, "#[%d] %d: ", flow->start + n, n); printInstruction(GDKstdout, mb, 0, getInstrPtr(mb, n + flow->start), LIST_MAL_STMT | LIST_MAPI); - mnstr_printf(GDKstdout, "#[%d]Dependents blocks %d:", flow->start + n, flow->status[n].blocks); + mnstr_printf(GDKstdout, "#[%d]Dependents block count %d wakeup", flow->start + n, flow->status[n].blocks); for (j = n; flow->edges[j]; j = flow->edges[j]) { mnstr_printf(GDKstdout, "%d ", flow->start + flow->nodes[j]); if (flow->edges[j] == -1) @@ -1206,15 +1211,31 @@ They take effect after we have ensured that the basic properties for execution hold. @c +static void showFlowStatus(DataFlow flow, int pc) +{ + int i; + FlowStatus fs= flow->status; + + mnstr_printf(GDKstdout, "#end of data flow %d done %d \n", pc, flow->stop - flow->start); + for (i = 0; i < flow->stop - flow->start; i++) + if (fs[i].state != DFLOWwrapup && fs[i].pc >= 0) { + mnstr_printf(GDKstdout, "#missed pc %d status %d %d blocks %d", fs[i].state, i, fs[i].pc, fs[i].blocks); + printInstruction(GDKstdout, fs[i].mb, 0, getInstrPtr(fs[i].mb, fs[i].pc), LIST_MAL_STMT | LIST_MAPI); + } +} + static str DFLOWscheduler(DataFlow flow) { int queued = 0, oldq = 0, last; - int pc = 0, i, j; + int i,pc = 0; +#ifdef USE_DFLOW_ADMISSION + int j; + InstrPtr p; +#endif int todo = flow->stop - flow->start; str ret = MAL_SUCCEED; FlowStatus fs, f = 0; - InstrPtr p; if (todo == 0) throw(MAL, "dataflow", "Empty dataflow block"); @@ -1233,9 +1254,11 @@ MT_lock_set(&flow->todo->l, "q_enqueue"); for (i = 0; i < todo; i++) if (flow->status[i].blocks == 0) { +#ifdef USE_DFLOW_ADMISSION p = getInstrPtr(fs[0].mb, flow->start + i ); for (j = p->retc; j < p->argc; j++) flow->status[i].argclaim += getMemoryClaim(flow->status[0].mb, flow->status[0].stk, p, j, FALSE); +#endif queued++; flow->status[i].state = DFLOWrunning; PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d claim=%d queue %d\n", flow->status[i].pc, flow->status[i].argclaim, queued); @@ -1247,10 +1270,11 @@ /* consume the remainder */ PARDEBUG mnstr_printf(GDKstdout, "#run %d instructions in dataflow block\n", todo); - while (queued) { - PARDEBUG mnstr_printf(GDKstdout, "#waiting for results, queued %d\n", queued); + while (queued || todo > 0 ) { + PARDEBUG mnstr_printf(GDKstdout, "#waiting for results, queued %d todo %d\n", queued, todo); f = q_dequeue(flow->done); queued--; + todo = todo > 0 ? todo -1: 0; if (f->pc < 0) { PARDEBUG mnstr_printf(GDKstdout, "#errors encountered %s ", f->error ? f->error : "unknown"); @@ -1268,6 +1292,8 @@ ret = z; } } + /* first error terminates the batch a.s.a.p. */ + todo = 0; } /* @@ -1285,6 +1311,7 @@ MT_lock_set(&flow->todo->l, "q_enqueue"); oldq = queued; + if ( f->pc > 0 ) for (; last >= 0 && (i = flow->nodes[last]) > 0; last = flow->edges[last]) if (flow->status[i].state == DFLOWpending) { flow->status[i].argclaim += f->hotclaim; @@ -1313,14 +1340,7 @@ while (oldq++ < queued) MT_sema_up(&flow->todo->s, "q_enqueue"); } - PARDEBUG { - mnstr_printf(GDKstdout, "#end of data flow %d todo %d \n", pc, flow->stop - flow->start); - for (i = 0; i < flow->stop - flow->start; i++) - if (fs[i].state != DFLOWwrapup && fs[i].pc >= 0) { - mnstr_printf(GDKstdout, "#missed %d %d %d ", i, fs[i].state, fs[i].pc); - printInstruction(GDKstdout, fs[i].mb, 0, getInstrPtr(fs[i].mb, fs[i].pc), LIST_MAL_STMT | LIST_MAPI); - } - } + PARDEBUG showFlowStatus(flow,pc); return ret; } diff --git a/monetdb5/optimizer/Makefile.ag b/monetdb5/optimizer/Makefile.ag --- a/monetdb5/optimizer/Makefile.ag +++ b/monetdb5/optimizer/Makefile.ag @@ -38,7 +38,6 @@ opt_constants.mx \ opt_costModel.mx \ opt_crack.mx \ - opt_datacell.mx \ opt_datacyclotron.mx \ opt_dataflow.mx \ opt_deadcode.mx \ @@ -92,7 +91,7 @@ opt_singleton.mx opt_costModel.mx opt_reduce.mx opt_macro.mx \ opt_accumulators.mx opt_qep.mx opt_mergetable.mx \ opt_remoteQueries.mx opt_joinselect.mx opt_partitions.mx \ - opt_datacell.mx opt_reorder.mx opt_prejoin.mx opt_compression.mx \ + opt_reorder.mx opt_prejoin.mx opt_compression.mx \ opt_evaluate.mx opt_inline.mx opt_pushranges.mx opt_derivepath.mx \ opt_accessmode.mx opt_joinpath.mx opt_heuristics.mx opt_remap.mx \ opt_statistics.mx opt_trace.mx opt_recycler.mx opt_dataflow.mx \ diff --git a/monetdb5/optimizer/opt_commonTerms.mx b/monetdb5/optimizer/opt_commonTerms.mx --- a/monetdb5/optimizer/opt_commonTerms.mx +++ b/monetdb5/optimizer/opt_commonTerms.mx @@ -175,7 +175,7 @@ Like all optimizer decisions, it is safe to stop. @c barrier |= getFunctionId(p) == assertRef; - if (p->token == NOOPsymbol || p->token == ASSIGNsymbol || barrier || p->retc == p->argc) { + if (p->token == NOOPsymbol || p->token == ASSIGNsymbol || barrier /* || p->retc == p->argc */) { #ifdef DEBUG_OPT_COMMONTERMS_MORE mnstr_printf(cntxt->fdout, "COMMON SKIPPED[%d] %d %d\n",i, barrier, p->retc == p->argc); #endif @@ -184,7 +184,7 @@ /* from here we have a candidate to look for a match */ #ifdef DEBUG_OPT_COMMONTERMS_MORE - mnstr_printf(cntxt->fdout,"#CANDIDATE[%d] last= %d ",i,last); + mnstr_printf(cntxt->fdout,"#CANDIDATE[%d] ",i); printInstruction(cntxt->fdout, mb, 0, p, LIST_MAL_ALL); #endif prop = hasSideEffects(p,TRUE) || isUpdateInstruction(p); @@ -192,9 +192,9 @@ for (j = candidate; j ; j = list[j]) if ( (q=getInstrPtr(mb,j))->fcn == p->fcn && !isUnsafeFunction(q)){ #ifdef DEBUG_OPT_COMMONTERMS_MORE - mnstr_printf(cntxt->fdout,"#CANDIDATE %d, %d %d %d lookback %d ", i, j, + mnstr_printf(cntxt->fdout,"#CANDIDATE %d, %d %d %d ", i, j, hasSameSignature(mb, p, q, p->retc), - hasSameArguments(mb, p, q), last); + hasSameArguments(mb, p, q)); printInstruction(cntxt->fdout, mb, 0, q, LIST_MAL_ALL); mnstr_printf(cntxt->fdout," :%d %d %d=%d %d %d %d %d %d\n", q->token != ASSIGNsymbol , diff --git a/monetdb5/optimizer/opt_dataflow.mx b/monetdb5/optimizer/opt_dataflow.mx --- a/monetdb5/optimizer/opt_dataflow.mx +++ b/monetdb5/optimizer/opt_dataflow.mx @@ -173,7 +173,7 @@ if (p->token == ENDsymbol) break; - if (hasSideEffects(p,TRUE) || isUnsafeFunction(p) || blockCntrl(p) || (!dumbcopy && blockExit(p)) || dflowAssignTest(span,p,i) ){ + if (hasSideEffects(p,TRUE) || isUnsafeFunction(p) || blockCntrl(p) || (!dumbcopy && blockExit(p)) || (getModuleId(p) == sqlRef && isUpdateInstruction(p)) || dflowAssignTest(span,p,i) ){ @:flowblock@ pushInstruction(mb,p); continue; diff --git a/monetdb5/optimizer/opt_multiplex.mx b/monetdb5/optimizer/opt_multiplex.mx --- a/monetdb5/optimizer/opt_multiplex.mx +++ b/monetdb5/optimizer/opt_multiplex.mx @@ -311,6 +311,9 @@ if (mb->errors){ /* rollback */ } + if ( mb->errors == 0) + setLifespan(mb); /* leave a proper initialized life span behind */ + return mb->errors? 0: actions; } @include optimizerWrapper.mx diff --git a/monetdb5/optimizer/opt_pipes.mx b/monetdb5/optimizer/opt_pipes.mx --- a/monetdb5/optimizer/opt_pipes.mx +++ b/monetdb5/optimizer/opt_pipes.mx @@ -38,16 +38,19 @@ opt_export str getPipeDefinition(str name); opt_export str getPipeCatalog(int *ret); +opt_export str addPipeDefinition(str name, str pipe); #endif @c #include "monetdb_config.h" #include "opt_pipes.h" +#define MAXOPTPIPES 64 + struct PIPELINES{ char name[50]; char def[256]; -} pipes[] ={ +} pipes[MAXOPTPIPES] ={ /* The minimal pipeline necessary by the server to operate correctly*/ { "minimal_pipe", "inline,remap,deadcode,multiplex,garbageCollector"}, @@ -87,8 +90,6 @@ {"cracker_pipe", "inline,remap,evaluate,costModel,coercions,emptySet,aliases,selcrack,deadcode,commonTerms,joinPath,reorder,deadcode,reduce,dataflow,history,multiplex,garbageCollector"}, {"sidcrack_pipe", "inline,remap,evaluate,costModel,coercions,emptySet,aliases,sidcrack,deadcode,commonTerms,joinPath,reorder,deadcode,reduce,dataflow,history,multiplex,garbageCollector"}, -{"datacell_pipe", "inline,remap,evaluate,costModel,coercions,emptySet,aliases,deadcode,constants,commonTerms,joinPath,datacell,deadcode,reduce,dataflow,history,multiplex,garbageCollector"}, - /* * The Octopus pipeline for distributed processing (Merovingian enabled platforms only) */ @@ -104,9 +105,8 @@ {"dictionary_pipe", "inline,remap,dictionary,evaluate,costModel,coercions,emptySet,aliases,mergetable,deadcode,constants,commonTerms,joinPath,deadcode,reduce,dataflow,history,multiplex,garbageCollector"}, /* The default + compression */ -{"compression_pipe", "inline,remap,evaluate,costModel,coercions,emptySet,aliases,mergetable,deadcode,constants,commonTerms,joinPath,deadcode,reduce,dataflow,compression,dataflow,history,multiplex,garbageCollector"}, +{"compression_pipe", "inline,remap,evaluate,costModel,coercions,emptySet,aliases,mergetable,deadcode,constants,commonTerms,joinPath,deadcode,reduce,dataflow,compression,dataflow,history,multiplex,garbageCollector"} -{"",""} }; _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list