Changeset: 4bf19a7c4e2c for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=4bf19a7c4e2c
Modified Files:
        monetdb5/optimizer/opt_dataflow.c
        monetdb5/optimizer/opt_prelude.c
        monetdb5/optimizer/opt_prelude.h
Branch: Feb2013
Log Message:

Additional stuff for sink()


diffs (208 lines):

diff --git a/monetdb5/optimizer/opt_dataflow.c 
b/monetdb5/optimizer/opt_dataflow.c
--- a/monetdb5/optimizer/opt_dataflow.c
+++ b/monetdb5/optimizer/opt_dataflow.c
@@ -28,7 +28,11 @@
  * dataflow processing incurs overhead and is only
  * relevant if multiple tasks kan be handled at the same time.
  * Also simple expressions dont had to be done in parallel.
-*/
+ *
+ * The garbagesink takes multiple variables whose endoflife is within
+ * a dataflow block and who are used multiple times. They should be
+ * garbage collected outside the parallel block.
+ */
 static int
 simpleFlow(InstrPtr *old, int start, int last)
 {
@@ -128,15 +132,37 @@ dflowInstruction(InstrPtr p) {
        return FALSE;
 }
 
+static InstrPtr
+dflowGarbagesink(MalBlkPtr mb, InstrPtr *old, int start, int last, int var, 
int *usage){
+       InstrPtr p, sink;
+       int j,k;
+       if ( usage[var] == 0  || isVarConstant(mb, var) )
+               return NULL;
+       sink= newInstruction(mb,ASSIGNsymbol); 
+       getModuleId(sink) = languageRef;
+       getFunctionId(sink) = sinkRef;
+       getArg(sink,0)= newTmpVariable(mb,TYPE_void);
+       sink= pushArgument(mb, sink, var);
+       for ( j= start; j< last; j++){
+               p = old[j];
+               if ( p )
+               for (k = p->retc; k< p->argc; k++)
+                       if ( getArg(p,k)== var)
+                               sink= pushArgument(mb,sink, getArg(p,0));
+       }
+       return sink;
+}
+
 int
 OPTdataflowImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
p)
 {
        int i,j,k, cnt, start=1,entries=0, actions=0;
        int flowblock= 0, dumbcopy=0;
-       InstrPtr *old, q;
-       int limit, slimit;
+       InstrPtr *sink, *old, q;
+       int limit, slimit, top = 0;
        Lifespan span;
        char *init;
+       int *usage;
 
        /* don't use dataflow on single processor systems */
        if (GDKnr_threads <= 1)
@@ -155,12 +181,28 @@ OPTdataflowImplementation(Client cntxt, 
                GDKfree(span);
                return 0;
        }
+       usage= (int*) GDKzalloc(mb->vtop * sizeof(int));
+       if ( usage == NULL){
+               GDKfree(span);
+               GDKfree(init);
+               return 0;
+       }
+       sink= (InstrPtr*) GDKzalloc(mb->stop * sizeof(InstrPtr));
+       if ( usage == NULL){
+               GDKfree(span);
+               GDKfree(init);
+               GDKfree(sink);
+               return 0;
+       }
+
        limit= mb->stop;
        slimit= mb->ssize;
        old = mb->stmt;
        if ( newMalBlkStmt(mb, mb->ssize+20) <0 ){
                GDKfree(span);
                GDKfree(init);
+               GDKfree(usage);
+               GDKfree(sink);
                return 0;
        }
        pushInstruction(mb,old[0]);
@@ -182,7 +224,7 @@ OPTdataflowImplementation(Client cntxt, 
                                int sf = simpleFlow(old,start,i);
                                if (!sf && entries > 1){
                                        for( j=start ; j<i; j++)
-                                       if (old[j]) 
+                                       if (old[j]) {
                                                for( k=0; k<old[j]->retc; k++)
                                                if( 
getBeginLifespan(span,getArg(old[j],k)) >= start && 
getEndLifespan(span,getArg(old[j],k)) >= i && init[getArg(old[j],k)]==0){
                                                        InstrPtr r= 
newAssignment(mb);
@@ -190,6 +232,16 @@ OPTdataflowImplementation(Client cntxt, 
                                                        
pushNil(mb,r,getArgType(mb,old[j],k));
                                                        
init[getArg(old[j],k)]=1;
                                                }
+                                               /* collect variables garbage 
collected within the block */
+                                               for( k=old[j]->retc; 
k<old[j]->argc; k++)
+                                               if( 
getEndLifespan(span,getArg(old[j],k)) == j) {
+                                                       sink[top] = 
dflowGarbagesink(mb,old, start, i, getArg(old[j],k), usage);
+                                                       top += sink[top] != 
NULL;
+                                               }
+                                               else
+                                               if( 
getEndLifespan(span,getArg(old[j],k)) < i)
+                                                       
usage[getArg(old[j],k)]++;
+                                       }
                                        q= 
newFcnCall(mb,languageRef,dataflowRef);
                                        q->barrier= BARRIERsymbol;
                                        getArg(q,0)= flowblock;
@@ -200,11 +252,18 @@ OPTdataflowImplementation(Client cntxt, 
                                for( j=start ; j<i; j++)
                                        if (old[j])
                                                pushInstruction(mb,old[j]);
+                               for( j=0; j<top; j++)
+                                               pushInstruction(mb,sink[j]);
+                               if ( top ) {
+                                       top = 0;
+                                       memset( (char*) sink, 0, limit * 
sizeof(InstrPtr));
+                               }
                                if (!sf && entries>1){
                                        q= newAssignment(mb);
                                        q->barrier= EXITsymbol;
                                        getArg(q,0) = flowblock;
                                }
+                               /* inject the optional garbage sink statement */
                                entries = 0;
                                flowblock = 0;
                                actions++;
@@ -221,7 +280,7 @@ OPTdataflowImplementation(Client cntxt, 
                                        int sf = simpleFlow(old,start,i);
                                        if (!sf && entries > 1){
                                                for( j=start ; j<i; j++)
-                                               if (old[j]) 
+                                               if (old[j]) {
                                                        for( k=0; 
k<old[j]->retc; k++)
                                                        if( 
getBeginLifespan(span,getArg(old[j],k)) >= start && 
getEndLifespan(span,getArg(old[j],k)) >= i && init[getArg(old[j],k)]==0){
                                                                InstrPtr r= 
newAssignment(mb);
@@ -229,6 +288,16 @@ OPTdataflowImplementation(Client cntxt, 
                                                                
pushNil(mb,r,getArgType(mb,old[j],k));
                                                                
init[getArg(old[j],k)]=1;
                                                        }
+                                                       /* collect variables 
garbagecollected in the block */
+                                                       for( k=old[j]->retc; 
k<old[j]->argc; k++)
+                                                               if( 
getEndLifespan(span,getArg(old[j],k)) == i) {
+                                                                       
sink[top] = dflowGarbagesink(mb, old, start, i, getArg(old[j],k), usage);
+                                                                       top += 
sink[top] != NULL;
+                                                               }
+                                                               else
+                                                               if( 
getEndLifespan(span,getArg(old[j],k)) < i) 
+                                                                       
usage[getArg(old[j],k)]++;
+                                               }
                                                q= 
newFcnCall(mb,languageRef,dataflowRef);
                                                q->barrier= BARRIERsymbol;
                                                getArg(q,0)= flowblock;
@@ -239,6 +308,13 @@ OPTdataflowImplementation(Client cntxt, 
                                        for( j=start ; j<i; j++)
                                                if (old[j])
                                                        
pushInstruction(mb,old[j]);
+                                               /* inject the optional garbage 
sink statement */
+                                               for( j=0; j<top; j++)
+                                                               
pushInstruction(mb,sink[j]);
+                                               if ( top) {
+                                                       top = 0;
+                                                       memset( (char*) sink, 
0, mb->stop * sizeof(InstrPtr));
+                                               }
                                        if (!sf && entries>1){
                                                q= newAssignment(mb);
                                                q->barrier= EXITsymbol;
@@ -320,5 +396,7 @@ OPTdataflowImplementation(Client cntxt, 
        GDKfree(old);
        GDKfree(span);
        GDKfree(init);
+       GDKfree(sink);
+       GDKfree(usage);
        return actions;
 }
diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c
--- a/monetdb5/optimizer/opt_prelude.c
+++ b/monetdb5/optimizer/opt_prelude.c
@@ -207,6 +207,7 @@ str semijoinRef;
 str semijoinPathRef;
 str setAccessRef;
 str setWriteModeRef;
+str sinkRef;
 str sliceRef;
 str subsliceRef;
 str sortHRef;
@@ -461,6 +462,7 @@ void optimizerInit(void){
                semijoinPathRef = putName("semijoinPath",12);
                setAccessRef = putName("setAccess",9);
                setWriteModeRef= putName("setWriteMode",12);
+               sinkRef = putName("sink",4);
                sliceRef = putName("slice",5);
                subsliceRef = putName("subslice",8);
                singleRef = putName("single",6);
diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h
--- a/monetdb5/optimizer/opt_prelude.h
+++ b/monetdb5/optimizer/opt_prelude.h
@@ -205,6 +205,7 @@ opt_export  str semijoinRef;
 opt_export  str semijoinPathRef;
 opt_export  str setAccessRef;
 opt_export  str setWriteModeRef;
+opt_export  str sinkRef;
 opt_export  str sliceRef;
 opt_export  str subsliceRef;
 opt_export  str singleRef;
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to