Changeset: c79022b55949 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c79022b55949
Modified Files:
Branch: default
Log Message:
merge
diffs (truncated from 718 to 300 lines):
diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx
--- a/monetdb5/mal/mal_interpreter.mx
+++ b/monetdb5/mal/mal_interpreter.mx
@@ -1114,7 +1114,7 @@
static void
DFLOWinit(DataFlow flow, Client cntxt, MalBlkPtr mb, MalStkPtr stk, int size)
{
- int pc, i, j, k, n, etop = 0;
+ int pc, i, j, k, l, n, etop = 0;
int *assign;
InstrPtr p;
@@ -1135,6 +1135,7 @@
/* administer flow dependencies */
for (j = p->retc; j < p->argc; j++) {
+ /* list of instructions that wake n-th instruction up */
if (!isVarConstant(mb, getArg(p, j)) && (k =
assign[getArg(p, j)])) {
/* add edge to the target instruction for
wakeup call */
k -= flow->start;
@@ -1155,28 +1156,31 @@
flow->status[n].blocks++;
}
- /* be careful, watch out for garbage collection
interference */
- /* those should be scheduled after all its other uses */
- k = getEndOfLife(mb, getArg(p, j));
- if (k != pc && k < flow->stop && k > flow->start) {
- /* add edge to the target instruction for
wakeup call */
- PARDEBUG mnstr_printf(GDKstdout, "forward %d ->
%d\n", n + flow->start, k);
- k -= flow->start;
- if (flow->nodes[n]) {
- /* add wakeup to tail of list */
- for (i = n; flow->edges[i] > 0; i =
flow->edges[i])
- ;
- flow->nodes[etop] = k;
- flow->edges[etop] = -1;
- flow->edges[i] = etop;
- etop++;
- (void)size;
- assert(etop < size);
- } else {
- flow->nodes[n] = k;
- flow->edges[n] = -1;
+
+ /* list of instructions to be woken up explicitly */
+ if ( !isVarConstant(mb, getArg(p, j)) ) {
+ /* be careful, watch out for garbage collection
interference */
+ /* those should be scheduled after all its
other uses */
+ l = getEndOfLife(mb, getArg(p, j));
+ if (l != pc && l < flow->stop && l >
flow->start) {
+ /* add edge to the target instruction
for wakeup call */
+ PARDEBUG mnstr_printf(GDKstdout,
"endoflife for %s is %d -> %d\n", getVarName(mb, getArg(p,j)), n + flow->start,
l);
+ l -= flow->start;
+ if (flow->nodes[n]) {
+ /* add wakeup to tail of list */
+ for (i = n; flow->edges[i] > 0;
i = flow->edges[i])
+ ;
+ flow->nodes[etop] = l;
+ flow->edges[etop] = -1;
+ flow->edges[i] = etop;
+ etop++;
+ assert(etop < size);
+ } else {
+ flow->nodes[n] = l;
+ flow->edges[n] = -1;
+ }
+ flow->status[l].blocks++;
}
- flow->status[k].blocks++;
}
}
@@ -1184,10 +1188,11 @@
assign[getArg(p, j)] = pc; /* ensure recognition of
dependency on first instruction and constant */
}
GDKfree(assign);
- PARDEBUG for (n = 0; n < flow->stop - flow->start; n++) {
+ PARDEBUG
+ for (n = 0; n < flow->stop - flow->start; n++) {
mnstr_printf(GDKstdout, "#[%d] %d: ", flow->start + n, n);
printInstruction(GDKstdout, mb, 0, getInstrPtr(mb, n +
flow->start), LIST_MAL_STMT | LIST_MAPI);
- mnstr_printf(GDKstdout, "#[%d]Dependents blocks %d:",
flow->start + n, flow->status[n].blocks);
+ mnstr_printf(GDKstdout, "#[%d]Dependents block count %d
wakeup", flow->start + n, flow->status[n].blocks);
for (j = n; flow->edges[j]; j = flow->edges[j]) {
mnstr_printf(GDKstdout, "%d ", flow->start +
flow->nodes[j]);
if (flow->edges[j] == -1)
@@ -1206,15 +1211,31 @@
They take effect after we have ensured that the basic properties for
execution hold.
@c
+static void showFlowStatus(DataFlow flow, int pc)
+{
+ int i;
+ FlowStatus fs= flow->status;
+
+ mnstr_printf(GDKstdout, "#end of data flow %d done %d \n", pc,
flow->stop - flow->start);
+ for (i = 0; i < flow->stop - flow->start; i++)
+ if (fs[i].state != DFLOWwrapup && fs[i].pc >= 0) {
+ mnstr_printf(GDKstdout, "#missed pc %d status
%d %d blocks %d", fs[i].state, i, fs[i].pc, fs[i].blocks);
+ printInstruction(GDKstdout, fs[i].mb, 0,
getInstrPtr(fs[i].mb, fs[i].pc), LIST_MAL_STMT | LIST_MAPI);
+ }
+}
+
static str
DFLOWscheduler(DataFlow flow)
{
int queued = 0, oldq = 0, last;
- int pc = 0, i, j;
+ int i,pc = 0;
+#ifdef USE_DFLOW_ADMISSION
+ int j;
+ InstrPtr p;
+#endif
int todo = flow->stop - flow->start;
str ret = MAL_SUCCEED;
FlowStatus fs, f = 0;
- InstrPtr p;
if (todo == 0)
throw(MAL, "dataflow", "Empty dataflow block");
@@ -1233,9 +1254,11 @@
MT_lock_set(&flow->todo->l, "q_enqueue");
for (i = 0; i < todo; i++)
if (flow->status[i].blocks == 0) {
+#ifdef USE_DFLOW_ADMISSION
p = getInstrPtr(fs[0].mb, flow->start + i );
for (j = p->retc; j < p->argc; j++)
flow->status[i].argclaim +=
getMemoryClaim(flow->status[0].mb, flow->status[0].stk, p, j, FALSE);
+#endif
queued++;
flow->status[i].state = DFLOWrunning;
PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d
claim=%d queue %d\n", flow->status[i].pc, flow->status[i].argclaim, queued);
@@ -1247,10 +1270,11 @@
/* consume the remainder */
PARDEBUG mnstr_printf(GDKstdout, "#run %d instructions in dataflow
block\n", todo);
- while (queued) {
- PARDEBUG mnstr_printf(GDKstdout, "#waiting for results, queued
%d\n", queued);
+ while (queued || todo > 0 ) {
+ PARDEBUG mnstr_printf(GDKstdout, "#waiting for results, queued
%d todo %d\n", queued, todo);
f = q_dequeue(flow->done);
queued--;
+ todo = todo > 0 ? todo -1: 0;
if (f->pc < 0) {
PARDEBUG mnstr_printf(GDKstdout, "#errors encountered
%s ", f->error ? f->error : "unknown");
@@ -1268,6 +1292,8 @@
ret = z;
}
}
+ /* first error terminates the batch a.s.a.p. */
+ todo = 0;
}
/*
@@ -1285,6 +1311,7 @@
MT_lock_set(&flow->todo->l, "q_enqueue");
oldq = queued;
+ if ( f->pc > 0 )
for (; last >= 0 && (i = flow->nodes[last]) > 0; last =
flow->edges[last])
if (flow->status[i].state == DFLOWpending) {
flow->status[i].argclaim += f->hotclaim;
@@ -1313,14 +1340,7 @@
while (oldq++ < queued)
MT_sema_up(&flow->todo->s, "q_enqueue");
}
- PARDEBUG {
- mnstr_printf(GDKstdout, "#end of data flow %d todo %d \n", pc,
flow->stop - flow->start);
- for (i = 0; i < flow->stop - flow->start; i++)
- if (fs[i].state != DFLOWwrapup && fs[i].pc >= 0) {
- mnstr_printf(GDKstdout, "#missed %d %d %d ", i,
fs[i].state, fs[i].pc);
- printInstruction(GDKstdout, fs[i].mb, 0,
getInstrPtr(fs[i].mb, fs[i].pc), LIST_MAL_STMT | LIST_MAPI);
- }
- }
+ PARDEBUG showFlowStatus(flow,pc);
return ret;
}
diff --git a/monetdb5/optimizer/Makefile.ag b/monetdb5/optimizer/Makefile.ag
--- a/monetdb5/optimizer/Makefile.ag
+++ b/monetdb5/optimizer/Makefile.ag
@@ -38,7 +38,6 @@
opt_constants.mx \
opt_costModel.mx \
opt_crack.mx \
- opt_datacell.mx \
opt_datacyclotron.mx \
opt_dataflow.mx \
opt_deadcode.mx \
@@ -92,7 +91,7 @@
opt_singleton.mx opt_costModel.mx opt_reduce.mx opt_macro.mx \
opt_accumulators.mx opt_qep.mx opt_mergetable.mx \
opt_remoteQueries.mx opt_joinselect.mx opt_partitions.mx \
- opt_datacell.mx opt_reorder.mx opt_prejoin.mx
opt_compression.mx \
+ opt_reorder.mx opt_prejoin.mx opt_compression.mx \
opt_evaluate.mx opt_inline.mx opt_pushranges.mx
opt_derivepath.mx \
opt_accessmode.mx opt_joinpath.mx opt_heuristics.mx
opt_remap.mx \
opt_statistics.mx opt_trace.mx opt_recycler.mx opt_dataflow.mx
\
diff --git a/monetdb5/optimizer/opt_commonTerms.mx
b/monetdb5/optimizer/opt_commonTerms.mx
--- a/monetdb5/optimizer/opt_commonTerms.mx
+++ b/monetdb5/optimizer/opt_commonTerms.mx
@@ -175,7 +175,7 @@
Like all optimizer decisions, it is safe to stop.
@c
barrier |= getFunctionId(p) == assertRef;
- if (p->token == NOOPsymbol || p->token == ASSIGNsymbol ||
barrier || p->retc == p->argc) {
+ if (p->token == NOOPsymbol || p->token == ASSIGNsymbol ||
barrier /* || p->retc == p->argc */) {
#ifdef DEBUG_OPT_COMMONTERMS_MORE
mnstr_printf(cntxt->fdout, "COMMON SKIPPED[%d]
%d %d\n",i, barrier, p->retc == p->argc);
#endif
@@ -184,7 +184,7 @@
/* from here we have a candidate to look for a match */
#ifdef DEBUG_OPT_COMMONTERMS_MORE
- mnstr_printf(cntxt->fdout,"#CANDIDATE[%d] last= %d ",i,last);
+ mnstr_printf(cntxt->fdout,"#CANDIDATE[%d] ",i);
printInstruction(cntxt->fdout, mb, 0, p, LIST_MAL_ALL);
#endif
prop = hasSideEffects(p,TRUE) || isUpdateInstruction(p);
@@ -192,9 +192,9 @@
for (j = candidate; j ; j = list[j])
if ( (q=getInstrPtr(mb,j))->fcn == p->fcn &&
!isUnsafeFunction(q)){
#ifdef DEBUG_OPT_COMMONTERMS_MORE
- mnstr_printf(cntxt->fdout,"#CANDIDATE %d, %d %d %d
lookback %d ", i, j,
+ mnstr_printf(cntxt->fdout,"#CANDIDATE %d, %d %d %d ",
i, j,
hasSameSignature(mb, p, q, p->retc),
- hasSameArguments(mb, p, q), last);
+ hasSameArguments(mb, p, q));
printInstruction(cntxt->fdout, mb, 0, q,
LIST_MAL_ALL);
mnstr_printf(cntxt->fdout," :%d %d %d=%d %d %d
%d %d %d\n",
q->token != ASSIGNsymbol ,
diff --git a/monetdb5/optimizer/opt_dataflow.mx
b/monetdb5/optimizer/opt_dataflow.mx
--- a/monetdb5/optimizer/opt_dataflow.mx
+++ b/monetdb5/optimizer/opt_dataflow.mx
@@ -173,7 +173,7 @@
if (p->token == ENDsymbol)
break;
- if (hasSideEffects(p,TRUE) || isUnsafeFunction(p) ||
blockCntrl(p) || (!dumbcopy && blockExit(p)) || dflowAssignTest(span,p,i) ){
+ if (hasSideEffects(p,TRUE) || isUnsafeFunction(p) ||
blockCntrl(p) || (!dumbcopy && blockExit(p)) || (getModuleId(p) == sqlRef &&
isUpdateInstruction(p)) || dflowAssignTest(span,p,i) ){
@:flowblock@
pushInstruction(mb,p);
continue;
diff --git a/monetdb5/optimizer/opt_multiplex.mx
b/monetdb5/optimizer/opt_multiplex.mx
--- a/monetdb5/optimizer/opt_multiplex.mx
+++ b/monetdb5/optimizer/opt_multiplex.mx
@@ -311,6 +311,9 @@
if (mb->errors){
/* rollback */
}
+ if ( mb->errors == 0)
+ setLifespan(mb); /* leave a proper initialized life span behind
*/
+
return mb->errors? 0: actions;
}
@include optimizerWrapper.mx
diff --git a/monetdb5/optimizer/opt_pipes.mx b/monetdb5/optimizer/opt_pipes.mx
--- a/monetdb5/optimizer/opt_pipes.mx
+++ b/monetdb5/optimizer/opt_pipes.mx
@@ -38,16 +38,19 @@
opt_export str getPipeDefinition(str name);
opt_export str getPipeCatalog(int *ret);
+opt_export str addPipeDefinition(str name, str pipe);
#endif
@c
#include "monetdb_config.h"
#include "opt_pipes.h"
+#define MAXOPTPIPES 64
+
struct PIPELINES{
char name[50];
char def[256];
-} pipes[] ={
+} pipes[MAXOPTPIPES] ={
/* The minimal pipeline necessary by the server to operate correctly*/
{ "minimal_pipe", "inline,remap,deadcode,multiplex,garbageCollector"},
@@ -87,8 +90,6 @@
{"cracker_pipe",
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,selcrack,deadcode,commonTerms,joinPath,reorder,deadcode,reduce,dataflow,history,multiplex,garbageCollector"},
{"sidcrack_pipe",
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,sidcrack,deadcode,commonTerms,joinPath,reorder,deadcode,reduce,dataflow,history,multiplex,garbageCollector"},
-{"datacell_pipe",
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,deadcode,constants,commonTerms,joinPath,datacell,deadcode,reduce,dataflow,history,multiplex,garbageCollector"},
-
/*
* The Octopus pipeline for distributed processing (Merovingian enabled
platforms only)
*/
@@ -104,9 +105,8 @@
{"dictionary_pipe",
"inline,remap,dictionary,evaluate,costModel,coercions,emptySet,aliases,mergetable,deadcode,constants,commonTerms,joinPath,deadcode,reduce,dataflow,history,multiplex,garbageCollector"},
/* The default + compression */
-{"compression_pipe",
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,mergetable,deadcode,constants,commonTerms,joinPath,deadcode,reduce,dataflow,compression,dataflow,history,multiplex,garbageCollector"},
+{"compression_pipe",
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,mergetable,deadcode,constants,commonTerms,joinPath,deadcode,reduce,dataflow,compression,dataflow,history,multiplex,garbageCollector"}
-{"",""}
};
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list