Changeset: 338645776f2a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=338645776f2a
Modified Files:
monetdb5/optimizer/opt_partition.c
monetdb5/optimizer/opt_pipes.c
Branch: partition
Log Message:
Partition optimizer first shot
The query select count(*) from lineitem group by l_tax;
can be properly compiled with hash-partitioning first.
diffs (213 lines):
diff --git a/monetdb5/optimizer/opt_partition.c
b/monetdb5/optimizer/opt_partition.c
--- a/monetdb5/optimizer/opt_partition.c
+++ b/monetdb5/optimizer/opt_partition.c
@@ -8,72 +8,134 @@
/* example: select count(distinct userid) from hits_10m;
* Massage aggregate operations using value based partitioning
- X_16 := algebra.projection(X_4, X_15);
- (X_17, X_18, X_19) := group.groupdone(X_16);
- X_20 := algebra.projection(X_18, X_16);
- X_21 := aggr.count(X_20, true);
+ (o,g,c):= group.groupdone(d);
+ a:= aggr.subcount(o,o,g,false);
- The new could should become:
- X_16 := algebra.projection(X_4, X_15);
- (P1,P2,P3,P4) := bat.partition(X_16)
- (O1, G1 , S1) := group.groupdone(P1);
- (O2, G2 , S2) := group.groupdone(P2);
- (O3, G3 , S3) := group.groupdone(P3);
- (O4, G4 , S4) := group.groupdone(P4);
- X_17 := mat.pack(O1,O3,O3,O4)
- X_18 := mat.pack(G1,G3,G3,G4)
- X_19 := mat.pack(S1,S3,S3,S4)
+ (p1,p2,p3,p4):= partition.hash(d);
+ (o1,g1,c1):= group.groupdone(p1);
+ (o2,g2,c2):= group.groupdone(p2);
+ (o3,g3,c3):= group.groupdone(p3);
+ (o4,g4,c4):= group.groupdone(p4);
+ o := mat.pack(o1,o2,o3,o4);
+ g := mat.pack(g1,g2,g3,g4);
+ c := mat.pack(c1,c2,c3,c4);
+ a1:= aggr.subcount(o1,o1,g1,false);
+ a2:= aggr.subcount(o2,o2,g2,false);
+ a3:= aggr.subcount(o3,o3,g3,false);
+ a4:= aggr.subcount(o4,o4,g4,false);
+ m := mat.pack(a1,a2,a3,a4);
- Q1 := algebra.projection(G1, P1);
- Q2 := algebra.projection(G2, P2);
- Q3 := algebra.projection(G3, P3);
- Q4 := algebra.projection(G4, P4);
- X_20 := mat.pack(Q1,Q2,Q3,Q4);
-
- X_21 := aggr.count(X_20, true);
*/
#include "monetdb_config.h"
#include "mal_instruction.h"
#include "opt_partition.h"
+//#define _PARTITION_DEBUG_
+
#define isCandidateList(M,P,I) ((M)->var[getArg(P,I)].id[0]== 'C')
+#define MAXPIECES 128
str
OPTpartitionImplementation(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
p)
{
- int i, slimit, limit, actions=0;
- InstrPtr *old = 0;
+ int i, j, slimit, limit, actions=0;
+ InstrPtr qo, qg, qc, qh, q, *old = 0;
lng usec = GDKusec();
char buf[256];
+ int vlimit;
+ int pieces = 2; // GDKnr_threads ? (GDKnr_threads > MAXPIECES?
MAXPIECES: GDKnr_threads) : 1;
+ int **vars;
(void) stk;
(void) cntxt;
limit = mb->stop;
slimit = mb->ssize;
+ vlimit = mb->vsize;
/* check for aggregates */
for ( i = 0; i < limit; i++){
p= getInstrPtr(mb,i);
- if( getFunctionId(p) == groupdoneRef && getModuleId(p)==
groupRef && p->argc == 2){
+ if( getModuleId(p) == groupRef && ( getFunctionId(p) ==
groupRef || getFunctionId(p) == groupdoneRef))
actions ++;
- }
}
if( actions == 0)
goto finished;
+ vars = (int **) GDKzalloc(sizeof(int*) * vlimit);
+ if( vars == NULL)
+ throw(MAL,"optimizer.postfix", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+
old = mb->stmt;
- if (newMalBlkStmt(mb, mb->ssize) < 0)
+ if (newMalBlkStmt(mb, mb->ssize + 32) < 0){
+ GDKfree(vars);
throw(MAL,"optimizer.postfix", SQLSTATE(HY001) MAL_MALLOC_FAIL);
+ }
/* construct the new plan */
for ( i = 0; i < limit; i++)
{
p= old[i];
- pushInstruction(mb,p);
+ if( getModuleId(p) == groupRef && ( getFunctionId(p) ==
groupRef || getFunctionId(p) == groupdoneRef)){
+#ifdef _PARTITION_DEBUG_
+ fprintf(stderr,"Found partition candidate %d\n", p->pc);
+#endif
+ qh = newStmt(mb, partitionRef, hashRef);
+ qh->retc =0;
+ qh->argc =0;
+ for( j = 0; j < pieces; j++)
+ qh = pushReturn(mb, qh, newTmpVariable(mb,
getArgType(mb, p, p->retc)));
+ pushArgument(mb, qh, getArg(p,p->retc));
+ qo = newInstruction(mb, matRef, packRef);
+ getArg(qo,0) = getArg(p,0);
+ qg = newInstruction(mb, matRef, packRef);
+ getArg(qg,0) = getArg(p,1);
+ qc = newInstruction(mb, matRef, packRef);
+ getArg(qc,0) = getArg(p,2);
+
+ vars[getArg(p,0)]= (int*) GDKzalloc(sizeof(int) *
MAXPIECES);
+ vars[getArg(p,1)]= (int*) GDKzalloc(sizeof(int) *
MAXPIECES);
+ vars[getArg(p,2)]= (int*) GDKzalloc(sizeof(int) *
MAXPIECES);
+ for( j = 0; j < pieces; j++){
+ q = newStmt(mb, groupRef, getFunctionId(p));
+ vars[getArg(p,0)][j] = newTmpVariable(mb,
getArgType(mb,p,0));
+ vars[getArg(p,1)][j] = newTmpVariable(mb,
getArgType(mb,p,1));
+ vars[getArg(p,2)][j] = newTmpVariable(mb,
getArgType(mb,p,2));
+ getArg(q,0) = vars[getArg(p,0)][j];
+ q = pushReturn(mb, q, vars[getArg(p,1)][j]);
+ q = pushReturn(mb, q, vars[getArg(p,2)][j]);
+ pushArgument(mb, q, getArg(qh, j));
+ qo = pushArgument(mb, qo, getArg(q,0));
+ qg = pushArgument(mb, qg, getArg(q,1));
+ qc = pushArgument(mb, qc, getArg(q,2));
+ }
+ pushInstruction(mb, qo);
+ pushInstruction(mb, qg);
+ pushInstruction(mb, qc);
+ } else
+ if( getModuleId(p) == aggrRef && getFunctionId(p) ==
subcountRef && vars[getArg(p, p->retc)] != NULL){
+ /* subcount over the individual groups */
+ qo = newInstruction(mb, matRef, packRef);
+ getArg(qo,0) = getArg(p,0);
+ vars[getArg(p,0)]= (int*) GDKzalloc(sizeof(int) *
MAXPIECES);
+ for( j = 0; j < pieces; j++){
+ q = newStmt(mb, aggrRef, subcountRef);
+ q = pushArgument(mb, q, vars[getArg(p,1)][j]);
+ q = pushArgument(mb, q, vars[getArg(p,2)][j]);
+ q = pushArgument(mb, q, vars[getArg(p,3)][j]);
+ q = pushArgument(mb, q, getArg(p,p->argc-1));
+ qo = pushArgument(mb, qo, getArg(q,0));
+ }
+ pushInstruction(mb, qo);
+ } else
+ pushInstruction(mb,p);
}
- for( ;i < slimit; i++)
+
+ for( ; i < slimit; i++)
if( old[i])
freeInstruction(old[i]);
+#ifdef _PARTITION_DEBUG_
+ fprintFunction(stderr,mb, 0, LIST_MAL_ALL);
+#endif
/* Defense line against incorrect plans */
if( actions ){
@@ -83,9 +145,13 @@ OPTpartitionImplementation(Client cntxt,
}
/* keep all actions taken as a post block comment and update statics */
GDKfree(old);
+ for( i = 0; i< vlimit; i++)
+ if( vars[i])
+ GDKfree(vars[i]);
+ GDKfree(vars);
finished:
usec= GDKusec() - usec;
- snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec", "postfix",
actions, usec);
+ snprintf(buf,256,"%-20s actions=%2d time=" LLFMT " usec", "partition",
actions, usec);
newComment(mb,buf);
addtoMalBlkHistory(mb);
diff --git a/monetdb5/optimizer/opt_pipes.c b/monetdb5/optimizer/opt_pipes.c
--- a/monetdb5/optimizer/opt_pipes.c
+++ b/monetdb5/optimizer/opt_pipes.c
@@ -97,7 +97,7 @@ static struct PIPELINES {
"optimizer.garbageCollector();",
"stable", NULL, NULL, 1},
// Experiment with partition
- {"default_pipe",
+ {"partition_pipe",
"optimizer.inline();"
"optimizer.remap();"
"optimizer.costModel();"
@@ -107,8 +107,8 @@ static struct PIPELINES {
"optimizer.pushselect();"
"optimizer.aliases();"
"optimizer.partition();"
- "optimizer.mitosis();"
- "optimizer.mergetable();"
+// "optimizer.mitosis();" not used now
+// "optimizer.mergetable();" not used now
"optimizer.deadcode();"
"optimizer.aliases();"
"optimizer.constants();"
@@ -118,7 +118,7 @@ static struct PIPELINES {
"optimizer.reorder();"
// "optimizer.reduce();" deprecated
"optimizer.matpack();"
- "optimizer.dataflow();"
+// "optimizer.dataflow();" not used now
"optimizer.querylog();"
"optimizer.multiplex();"
"optimizer.generator();"
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list