Changeset: 338645776f2a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=338645776f2a
Modified Files:
        monetdb5/optimizer/opt_partition.c
        monetdb5/optimizer/opt_pipes.c
Branch: partition
Log Message:

Partition optimizer first shot
The query select count(*) from lineitem group by l_tax;
can be properly compiled with hash-partitioning first.


diffs (213 lines):

diff --git a/monetdb5/optimizer/opt_partition.c 
b/monetdb5/optimizer/opt_partition.c
--- a/monetdb5/optimizer/opt_partition.c
+++ b/monetdb5/optimizer/opt_partition.c
@@ -8,72 +8,134 @@
 
 /* example: select count(distinct userid) from hits_10m;
  * Massage aggregate operations using value based partitioning
-    X_16 := algebra.projection(X_4, X_15);
-    (X_17, X_18, X_19) := group.groupdone(X_16);
-    X_20 := algebra.projection(X_18, X_16);
-    X_21 := aggr.count(X_20, true);
+    (o,g,c):= group.groupdone(d);
+    a:= aggr.subcount(o,o,g,false);
 
-   The new could should become:
-     X_16 := algebra.projection(X_4, X_15);
-    (P1,P2,P3,P4) := bat.partition(X_16)
-    (O1, G1 , S1) := group.groupdone(P1);
-    (O2, G2 , S2) := group.groupdone(P2);
-    (O3, G3 , S3) := group.groupdone(P3);
-    (O4, G4 , S4) := group.groupdone(P4);
-    X_17 := mat.pack(O1,O3,O3,O4)
-    X_18 := mat.pack(G1,G3,G3,G4)
-    X_19 := mat.pack(S1,S3,S3,S4)
+    (p1,p2,p3,p4):= partition.hash(d);
+    (o1,g1,c1):= group.groupdone(p1);
+    (o2,g2,c2):= group.groupdone(p2);
+    (o3,g3,c3):= group.groupdone(p3);
+    (o4,g4,c4):= group.groupdone(p4);
+    o := mat.pack(o1,o2,o3,o4);
+    g := mat.pack(g1,g2,g3,g4);
+    c := mat.pack(c1,c2,c3,c4);
+    a1:= aggr.subcount(o1,o1,g1,false);
+    a2:= aggr.subcount(o2,o2,g2,false);
+    a3:= aggr.subcount(o3,o3,g3,false);
+    a4:= aggr.subcount(o4,o4,g4,false);
+    m := mat.pack(a1,a2,a3,a4);
 
-    Q1 := algebra.projection(G1, P1);
-    Q2 := algebra.projection(G2, P2);
-    Q3 := algebra.projection(G3, P3);
-    Q4 := algebra.projection(G4, P4);
-    X_20 := mat.pack(Q1,Q2,Q3,Q4);
-
-    X_21 := aggr.count(X_20, true);
  */
 #include "monetdb_config.h"
 #include "mal_instruction.h"
 #include "opt_partition.h"
 
+//#define _PARTITION_DEBUG_
+
 #define isCandidateList(M,P,I) ((M)->var[getArg(P,I)].id[0]== 'C')
+#define MAXPIECES 128
 
 str
 OPTpartitionImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr 
p)
 {
-       int i, slimit, limit, actions=0;
-       InstrPtr *old = 0;
+       int i, j, slimit, limit, actions=0;
+       InstrPtr qo, qg, qc, qh, q, *old = 0;
        lng usec = GDKusec();
        char buf[256];
+       int vlimit;
+               int pieces = 2; // GDKnr_threads ? (GDKnr_threads > MAXPIECES? 
MAXPIECES: GDKnr_threads) : 1;
+       int **vars; 
 
        (void) stk;
        (void) cntxt;
 
        limit = mb->stop;
        slimit = mb->ssize;
+       vlimit = mb->vsize;
        /* check for aggregates */
        for ( i = 0; i < limit; i++){
                p= getInstrPtr(mb,i);
-               if( getFunctionId(p) == groupdoneRef  && getModuleId(p)== 
groupRef && p->argc == 2){
+               if( getModuleId(p) == groupRef && ( getFunctionId(p) == 
groupRef || getFunctionId(p) == groupdoneRef))
                        actions ++;
-               }
        }
        if( actions == 0)
                goto finished;
 
+       vars = (int **) GDKzalloc(sizeof(int*) * vlimit);
+       if( vars == NULL)
+               throw(MAL,"optimizer.postfix", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+
        old = mb->stmt;
-       if (newMalBlkStmt(mb, mb->ssize) < 0)
+       if (newMalBlkStmt(mb, mb->ssize + 32) < 0){
+               GDKfree(vars);
                throw(MAL,"optimizer.postfix", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+       }
 
        /* construct the new plan */
        for ( i = 0; i < limit; i++)
        {
                p= old[i];
-               pushInstruction(mb,p);
+               if( getModuleId(p) == groupRef && ( getFunctionId(p) == 
groupRef || getFunctionId(p) == groupdoneRef)){
+#ifdef _PARTITION_DEBUG_
+                       fprintf(stderr,"Found partition candidate %d\n", p->pc);
+#endif
+                       qh = newStmt(mb, partitionRef, hashRef);
+                       qh->retc =0;
+                       qh->argc =0;
+                       for( j = 0; j < pieces; j++)
+                               qh = pushReturn(mb, qh, newTmpVariable(mb, 
getArgType(mb, p, p->retc)));
+                       pushArgument(mb, qh, getArg(p,p->retc));
+                       qo = newInstruction(mb, matRef, packRef);
+                       getArg(qo,0) =  getArg(p,0);
+                       qg = newInstruction(mb, matRef, packRef);
+                       getArg(qg,0) = getArg(p,1);
+                       qc = newInstruction(mb, matRef, packRef);
+                       getArg(qc,0) = getArg(p,2);
+
+                       vars[getArg(p,0)]= (int*) GDKzalloc(sizeof(int) * 
MAXPIECES);
+                       vars[getArg(p,1)]= (int*) GDKzalloc(sizeof(int) * 
MAXPIECES);
+                       vars[getArg(p,2)]= (int*) GDKzalloc(sizeof(int) * 
MAXPIECES);
+                       for( j = 0; j < pieces; j++){
+                               q = newStmt(mb, groupRef, getFunctionId(p));
+                               vars[getArg(p,0)][j] = newTmpVariable(mb, 
getArgType(mb,p,0));
+                               vars[getArg(p,1)][j] = newTmpVariable(mb, 
getArgType(mb,p,1));
+                               vars[getArg(p,2)][j] = newTmpVariable(mb, 
getArgType(mb,p,2));
+                               getArg(q,0) = vars[getArg(p,0)][j];
+                               q = pushReturn(mb, q, vars[getArg(p,1)][j]);
+                               q = pushReturn(mb, q, vars[getArg(p,2)][j]);
+                               pushArgument(mb, q, getArg(qh, j));
+                               qo = pushArgument(mb, qo, getArg(q,0));
+                               qg = pushArgument(mb, qg, getArg(q,1));
+                               qc = pushArgument(mb, qc, getArg(q,2));
+                       }
+                       pushInstruction(mb, qo);
+                       pushInstruction(mb, qg);
+                       pushInstruction(mb, qc);
+               } else
+               if( getModuleId(p) == aggrRef && getFunctionId(p) == 
subcountRef && vars[getArg(p, p->retc)] != NULL){
+                       /* subcount over the individual groups */
+                       qo = newInstruction(mb, matRef, packRef);
+                       getArg(qo,0) =  getArg(p,0);
+                       vars[getArg(p,0)]= (int*) GDKzalloc(sizeof(int) * 
MAXPIECES);
+                       for( j = 0; j < pieces; j++){
+                               q = newStmt(mb, aggrRef, subcountRef);
+                               q = pushArgument(mb, q, vars[getArg(p,1)][j]);
+                               q = pushArgument(mb, q, vars[getArg(p,2)][j]);
+                               q = pushArgument(mb, q, vars[getArg(p,3)][j]);
+                               q = pushArgument(mb, q, getArg(p,p->argc-1));
+                               qo = pushArgument(mb, qo, getArg(q,0));
+                       }
+                       pushInstruction(mb, qo);
+               } else
+                       pushInstruction(mb,p);
        }
-       for( ;i < slimit; i++)
+
+       for( ; i < slimit; i++)
                if( old[i])
                        freeInstruction(old[i]);
+#ifdef _PARTITION_DEBUG_
+       fprintFunction(stderr,mb, 0, LIST_MAL_ALL);
+#endif
 
        /* Defense line against incorrect plans */
        if( actions ){
@@ -83,9 +145,13 @@ OPTpartitionImplementation(Client cntxt,
        }
        /* keep all actions taken as a post block comment and update statics */
        GDKfree(old);
+       for( i = 0; i< vlimit; i++)
+               if( vars[i])
+                       GDKfree(vars[i]);
+       GDKfree(vars);
 finished:
        usec= GDKusec() - usec;
-       snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec", "postfix", 
actions, usec);
+       snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec", "partition", 
actions, usec);
        newComment(mb,buf);
        addtoMalBlkHistory(mb);
 
diff --git a/monetdb5/optimizer/opt_pipes.c b/monetdb5/optimizer/opt_pipes.c
--- a/monetdb5/optimizer/opt_pipes.c
+++ b/monetdb5/optimizer/opt_pipes.c
@@ -97,7 +97,7 @@ static struct PIPELINES {
         "optimizer.garbageCollector();",
         "stable", NULL, NULL, 1},
 // Experiment with partition
-       {"default_pipe",
+       {"partition_pipe",
         "optimizer.inline();"
         "optimizer.remap();"
         "optimizer.costModel();"
@@ -107,8 +107,8 @@ static struct PIPELINES {
         "optimizer.pushselect();"
         "optimizer.aliases();"
         "optimizer.partition();"
-        "optimizer.mitosis();"
-        "optimizer.mergetable();"
+//      "optimizer.mitosis();" not used now
+//      "optimizer.mergetable();" not used now
         "optimizer.deadcode();"
         "optimizer.aliases();"
         "optimizer.constants();"
@@ -118,7 +118,7 @@ static struct PIPELINES {
         "optimizer.reorder();"
 //      "optimizer.reduce();" deprecated
         "optimizer.matpack();"
-        "optimizer.dataflow();"
+//      "optimizer.dataflow();" not used now
         "optimizer.querylog();"
         "optimizer.multiplex();"
         "optimizer.generator();"
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to