Changeset: c79022b55949 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=c79022b55949
Modified Files:
        
Branch: default
Log Message:

merge


diffs (truncated from 718 to 300 lines):

diff --git a/monetdb5/mal/mal_interpreter.mx b/monetdb5/mal/mal_interpreter.mx
--- a/monetdb5/mal/mal_interpreter.mx
+++ b/monetdb5/mal/mal_interpreter.mx
@@ -1114,7 +1114,7 @@
 static void
 DFLOWinit(DataFlow flow, Client cntxt, MalBlkPtr mb, MalStkPtr stk, int size)
 {
-       int pc, i, j, k, n, etop = 0;
+       int pc, i, j, k, l, n, etop = 0;
        int *assign;
        InstrPtr p;
 
@@ -1135,6 +1135,7 @@
 
                /* administer flow dependencies */
                for (j = p->retc; j < p->argc; j++) {
+                       /* list of instructions that wake n-th instruction up */
                        if (!isVarConstant(mb, getArg(p, j)) && (k = 
assign[getArg(p, j)])) {
                                /* add edge to the target instruction for 
wakeup call */
                                k -= flow->start;
@@ -1155,28 +1156,31 @@
 
                                flow->status[n].blocks++;
                        }
-                       /* be careful, watch out for garbage collection 
interference */
-                       /* those should be scheduled after all its other uses */
-                       k = getEndOfLife(mb, getArg(p, j));
-                       if (k != pc && k < flow->stop && k > flow->start) {
-                               /* add edge to the target instruction for 
wakeup call */
-                               PARDEBUG mnstr_printf(GDKstdout, "forward %d -> 
%d\n", n + flow->start, k);
-                               k -= flow->start;
-                               if (flow->nodes[n]) {
-                                       /* add wakeup to tail of list */
-                                       for (i = n; flow->edges[i] > 0; i = 
flow->edges[i])
-                                               ;
-                                       flow->nodes[etop] = k;
-                                       flow->edges[etop] = -1;
-                                       flow->edges[i] = etop;
-                                       etop++;
-                                       (void)size;
-                                       assert(etop < size);
-                               } else {
-                                       flow->nodes[n] = k;
-                                       flow->edges[n] = -1;
+                       
+                       /* list of instructions to be woken up explicitly */
+                       if ( !isVarConstant(mb, getArg(p, j)) ) {
+                               /* be careful, watch out for garbage collection 
interference */
+                               /* those should be scheduled after all its 
other uses */
+                               l = getEndOfLife(mb, getArg(p, j));
+                               if (l != pc && l < flow->stop && l > 
flow->start) {
+                                       /* add edge to the target instruction 
for wakeup call */
+                                       PARDEBUG mnstr_printf(GDKstdout, 
"endoflife for %s is %d -> %d\n", getVarName(mb, getArg(p,j)), n + flow->start, 
l);
+                                       l -= flow->start;
+                                       if (flow->nodes[n]) {
+                                               /* add wakeup to tail of list */
+                                               for (i = n; flow->edges[i] > 0; 
i = flow->edges[i])
+                                                       ;
+                                               flow->nodes[etop] = l;
+                                               flow->edges[etop] = -1;
+                                               flow->edges[i] = etop;
+                                               etop++;
+                                               assert(etop < size);
+                                       } else {
+                                               flow->nodes[n] = l;
+                                               flow->edges[n] = -1;
+                                       }
+                                       flow->status[l].blocks++;
                                }
-                               flow->status[k].blocks++;
                        }
                }
 
@@ -1184,10 +1188,11 @@
                        assign[getArg(p, j)] = pc;  /* ensure recognition of 
dependency on first instruction and constant */
        }
        GDKfree(assign);
-       PARDEBUG for (n = 0; n < flow->stop - flow->start; n++) {
+       PARDEBUG 
+        for (n = 0; n < flow->stop - flow->start; n++) {
                mnstr_printf(GDKstdout, "#[%d] %d: ", flow->start + n, n);
                printInstruction(GDKstdout, mb, 0, getInstrPtr(mb, n + 
flow->start), LIST_MAL_STMT | LIST_MAPI);
-               mnstr_printf(GDKstdout, "#[%d]Dependents blocks %d:", 
flow->start + n, flow->status[n].blocks);
+               mnstr_printf(GDKstdout, "#[%d]Dependents block count %d 
wakeup", flow->start + n, flow->status[n].blocks);
                for (j = n; flow->edges[j]; j = flow->edges[j]) {
                        mnstr_printf(GDKstdout, "%d ", flow->start + 
flow->nodes[j]);
                        if (flow->edges[j] == -1)
@@ -1206,15 +1211,31 @@
 They take effect after we have ensured that the basic properties for
 execution hold.
 @c
+static void showFlowStatus(DataFlow flow, int pc)
+{
+       int i;
+       FlowStatus fs= flow->status;
+
+               mnstr_printf(GDKstdout, "#end of data flow %d done %d \n", pc, 
flow->stop - flow->start);
+               for (i = 0; i < flow->stop - flow->start; i++)
+                       if (fs[i].state != DFLOWwrapup && fs[i].pc >= 0) {
+                               mnstr_printf(GDKstdout, "#missed pc %d status 
%d %d  blocks %d", fs[i].state, i, fs[i].pc, fs[i].blocks);
+                               printInstruction(GDKstdout, fs[i].mb, 0, 
getInstrPtr(fs[i].mb, fs[i].pc), LIST_MAL_STMT | LIST_MAPI);
+                       }
+}
+
 static str
 DFLOWscheduler(DataFlow flow)
 {
        int queued = 0, oldq = 0, last;
-       int pc = 0, i, j;
+       int i,pc = 0;
+#ifdef USE_DFLOW_ADMISSION
+       int j;
+       InstrPtr p;
+#endif
        int todo = flow->stop - flow->start;
        str ret = MAL_SUCCEED;
        FlowStatus fs, f = 0;
-       InstrPtr p;
 
        if (todo == 0)
                throw(MAL, "dataflow", "Empty dataflow block");
@@ -1233,9 +1254,11 @@
        MT_lock_set(&flow->todo->l, "q_enqueue");
        for (i = 0; i < todo; i++)
                if (flow->status[i].blocks == 0) {
+#ifdef USE_DFLOW_ADMISSION
                        p = getInstrPtr(fs[0].mb, flow->start + i );
                        for (j = p->retc; j < p->argc; j++)
                                flow->status[i].argclaim += 
getMemoryClaim(flow->status[0].mb, flow->status[0].stk, p, j, FALSE);
+#endif
                        queued++;
                        flow->status[i].state = DFLOWrunning;
                        PARDEBUG mnstr_printf(GDKstdout, "#enqueue pc=%d 
claim=%d queue %d\n", flow->status[i].pc, flow->status[i].argclaim, queued);
@@ -1247,10 +1270,11 @@
 
        /* consume the remainder */
        PARDEBUG mnstr_printf(GDKstdout, "#run %d instructions in dataflow 
block\n", todo);
-       while (queued) {
-               PARDEBUG mnstr_printf(GDKstdout, "#waiting for results, queued 
%d\n", queued);
+       while (queued || todo > 0 ) {
+               PARDEBUG mnstr_printf(GDKstdout, "#waiting for results, queued 
%d todo %d\n", queued, todo);
                f = q_dequeue(flow->done);
                queued--;
+               todo = todo > 0 ? todo -1: 0;
 
                if (f->pc < 0) {
                        PARDEBUG mnstr_printf(GDKstdout, "#errors encountered 
%s ", f->error ? f->error : "unknown");
@@ -1268,6 +1292,8 @@
                                        ret = z;
                                }
                        }
+                       /* first error terminates the batch a.s.a.p. */
+                       todo = 0;
                }
 
                /*
@@ -1285,6 +1311,7 @@
                MT_lock_set(&flow->todo->l, "q_enqueue");
 
                oldq = queued;
+               if ( f->pc > 0 )
                for (; last >= 0 && (i = flow->nodes[last]) > 0; last = 
flow->edges[last])
                        if (flow->status[i].state == DFLOWpending) {
                                flow->status[i].argclaim += f->hotclaim;
@@ -1313,14 +1340,7 @@
                        while (oldq++ < queued)
                                MT_sema_up(&flow->todo->s, "q_enqueue");
        }
-       PARDEBUG {
-               mnstr_printf(GDKstdout, "#end of data flow %d todo %d \n", pc, 
flow->stop - flow->start);
-               for (i = 0; i < flow->stop - flow->start; i++)
-                       if (fs[i].state != DFLOWwrapup && fs[i].pc >= 0) {
-                               mnstr_printf(GDKstdout, "#missed %d %d %d ", i, 
fs[i].state, fs[i].pc);
-                               printInstruction(GDKstdout, fs[i].mb, 0, 
getInstrPtr(fs[i].mb, fs[i].pc), LIST_MAL_STMT | LIST_MAPI);
-                       }
-       }
+       PARDEBUG showFlowStatus(flow,pc);
        return ret;
 }
 
diff --git a/monetdb5/optimizer/Makefile.ag b/monetdb5/optimizer/Makefile.ag
--- a/monetdb5/optimizer/Makefile.ag
+++ b/monetdb5/optimizer/Makefile.ag
@@ -38,7 +38,6 @@
                opt_constants.mx \
                opt_costModel.mx \
                opt_crack.mx \
-               opt_datacell.mx \
                opt_datacyclotron.mx \
                opt_dataflow.mx \
                opt_deadcode.mx \
@@ -92,7 +91,7 @@
                opt_singleton.mx opt_costModel.mx opt_reduce.mx opt_macro.mx \
                opt_accumulators.mx opt_qep.mx opt_mergetable.mx \
                opt_remoteQueries.mx opt_joinselect.mx opt_partitions.mx \
-               opt_datacell.mx opt_reorder.mx opt_prejoin.mx 
opt_compression.mx \
+               opt_reorder.mx opt_prejoin.mx opt_compression.mx \
                opt_evaluate.mx opt_inline.mx opt_pushranges.mx 
opt_derivepath.mx \
                opt_accessmode.mx opt_joinpath.mx opt_heuristics.mx 
opt_remap.mx \
                opt_statistics.mx opt_trace.mx  opt_recycler.mx opt_dataflow.mx 
\
diff --git a/monetdb5/optimizer/opt_commonTerms.mx 
b/monetdb5/optimizer/opt_commonTerms.mx
--- a/monetdb5/optimizer/opt_commonTerms.mx
+++ b/monetdb5/optimizer/opt_commonTerms.mx
@@ -175,7 +175,7 @@
 Like all optimizer decisions, it is safe to stop.
 @c
                barrier |= getFunctionId(p) == assertRef;
-               if (p->token == NOOPsymbol || p->token == ASSIGNsymbol || 
barrier || p->retc == p->argc) {
+               if (p->token == NOOPsymbol || p->token == ASSIGNsymbol || 
barrier /* || p->retc == p->argc */) {
 #ifdef DEBUG_OPT_COMMONTERMS_MORE
                                mnstr_printf(cntxt->fdout, "COMMON SKIPPED[%d] 
%d %d\n",i, barrier, p->retc == p->argc);
 #endif
@@ -184,7 +184,7 @@
 
                /* from here we have a candidate to look for a match */
 #ifdef DEBUG_OPT_COMMONTERMS_MORE
-               mnstr_printf(cntxt->fdout,"#CANDIDATE[%d] last= %d ",i,last);
+               mnstr_printf(cntxt->fdout,"#CANDIDATE[%d] ",i);
                printInstruction(cntxt->fdout, mb, 0, p, LIST_MAL_ALL);
 #endif
                prop = hasSideEffects(p,TRUE) || isUpdateInstruction(p);
@@ -192,9 +192,9 @@
                for (j = candidate; j ; j = list[j]) 
                        if ( (q=getInstrPtr(mb,j))->fcn == p->fcn  && 
!isUnsafeFunction(q)){
 #ifdef DEBUG_OPT_COMMONTERMS_MORE
-                       mnstr_printf(cntxt->fdout,"#CANDIDATE %d, %d  %d %d 
lookback %d ", i, j, 
+                       mnstr_printf(cntxt->fdout,"#CANDIDATE %d, %d  %d %d ", 
i, j, 
                                hasSameSignature(mb, p, q, p->retc), 
-                               hasSameArguments(mb, p, q), last);
+                               hasSameArguments(mb, p, q));
                                printInstruction(cntxt->fdout, mb, 0, q, 
LIST_MAL_ALL);
                                mnstr_printf(cntxt->fdout," :%d %d %d=%d %d %d 
%d %d %d\n", 
                                        q->token != ASSIGNsymbol ,
diff --git a/monetdb5/optimizer/opt_dataflow.mx 
b/monetdb5/optimizer/opt_dataflow.mx
--- a/monetdb5/optimizer/opt_dataflow.mx
+++ b/monetdb5/optimizer/opt_dataflow.mx
@@ -173,7 +173,7 @@
 
                if (p->token == ENDsymbol)
                        break;
-               if (hasSideEffects(p,TRUE) || isUnsafeFunction(p) || 
blockCntrl(p) || (!dumbcopy && blockExit(p)) || dflowAssignTest(span,p,i) ){
+               if (hasSideEffects(p,TRUE) || isUnsafeFunction(p) || 
blockCntrl(p) || (!dumbcopy && blockExit(p)) || (getModuleId(p) == sqlRef && 
isUpdateInstruction(p)) || dflowAssignTest(span,p,i) ){
                        @:flowblock@
                        pushInstruction(mb,p);
                        continue;
diff --git a/monetdb5/optimizer/opt_multiplex.mx 
b/monetdb5/optimizer/opt_multiplex.mx
--- a/monetdb5/optimizer/opt_multiplex.mx
+++ b/monetdb5/optimizer/opt_multiplex.mx
@@ -311,6 +311,9 @@
        if (mb->errors){
                /* rollback */
        }
+       if ( mb->errors == 0)
+               setLifespan(mb); /* leave a proper initialized life span behind 
*/
+
        return mb->errors? 0: actions;
 }
 @include optimizerWrapper.mx
diff --git a/monetdb5/optimizer/opt_pipes.mx b/monetdb5/optimizer/opt_pipes.mx
--- a/monetdb5/optimizer/opt_pipes.mx
+++ b/monetdb5/optimizer/opt_pipes.mx
@@ -38,16 +38,19 @@
 
 opt_export str getPipeDefinition(str name);
 opt_export str getPipeCatalog(int *ret);
+opt_export str addPipeDefinition(str name, str pipe);
 
 #endif
 @c
 #include "monetdb_config.h"
 #include "opt_pipes.h"
 
+#define MAXOPTPIPES 64
+
 struct PIPELINES{
        char name[50];
        char def[256];
-} pipes[] ={
+} pipes[MAXOPTPIPES] ={
 /* The minimal pipeline necessary by the server to operate correctly*/
 { "minimal_pipe",      "inline,remap,deadcode,multiplex,garbageCollector"},
 
@@ -87,8 +90,6 @@
 {"cracker_pipe",       
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,selcrack,deadcode,commonTerms,joinPath,reorder,deadcode,reduce,dataflow,history,multiplex,garbageCollector"},
 {"sidcrack_pipe",      
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,sidcrack,deadcode,commonTerms,joinPath,reorder,deadcode,reduce,dataflow,history,multiplex,garbageCollector"},
 
-{"datacell_pipe",      
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,deadcode,constants,commonTerms,joinPath,datacell,deadcode,reduce,dataflow,history,multiplex,garbageCollector"},
-
 /*
  * The Octopus pipeline for distributed processing (Merovingian enabled 
platforms only)
 */
@@ -104,9 +105,8 @@
 {"dictionary_pipe",    
"inline,remap,dictionary,evaluate,costModel,coercions,emptySet,aliases,mergetable,deadcode,constants,commonTerms,joinPath,deadcode,reduce,dataflow,history,multiplex,garbageCollector"},
 
 /* The default + compression */
-{"compression_pipe",   
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,mergetable,deadcode,constants,commonTerms,joinPath,deadcode,reduce,dataflow,compression,dataflow,history,multiplex,garbageCollector"},
+{"compression_pipe",   
"inline,remap,evaluate,costModel,coercions,emptySet,aliases,mergetable,deadcode,constants,commonTerms,joinPath,deadcode,reduce,dataflow,compression,dataflow,history,multiplex,garbageCollector"}
 
-{"",""}
 };
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to