Changeset: a3863d5bf0a8 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=a3863d5bf0a8
Modified Files:
monetdb5/scheduler/mut_policy.c
monetdb5/scheduler/mut_transforms.c
monetdb5/scheduler/mut_transforms.h
Branch: mutation
Log Message:
left-fetch-join parallelization code + new mutation policy with partition
propagation to join operator when join is not parallelized yet and is the
dependency operator from previous mat.pack
diffs (truncated from 409 to 300 lines):
diff --git a/monetdb5/scheduler/mut_policy.c b/monetdb5/scheduler/mut_policy.c
--- a/monetdb5/scheduler/mut_policy.c
+++ b/monetdb5/scheduler/mut_policy.c
@@ -43,6 +43,11 @@ mutationCandidate(MalBlkPtr mb, InstrPtr
if (getFunctionId(p) == joinRef)
return 1;
}
+ if ( getModuleId(p) == algebraRef) {
+ if (getFunctionId(p) == leftfetchjoinRef)
+ return 1;
+ }
+
/*
if ( getModuleId(p) == aggrRef){
@@ -105,6 +110,8 @@ MUTpolicy(Client cntxt, Mutant m)
mutationJoin(cntxt,m);
else if(getFunctionId(p) == subselectRef)
mutationSelect(cntxt,m);
+ else if(getFunctionId(p) == leftfetchjoinRef)
+ mutationLeftFetchJoin(cntxt,m);
else // proxy function
mutationJoinDouble(cntxt,m);
}
diff --git a/monetdb5/scheduler/mut_transforms.c
b/monetdb5/scheduler/mut_transforms.c
--- a/monetdb5/scheduler/mut_transforms.c
+++ b/monetdb5/scheduler/mut_transforms.c
@@ -246,7 +246,6 @@ mutationJoin_(MalBlkPtr mb, MalStkPtr st
pushInstruction(mb,q);
// inherit profiling
mb->profiler[mb->stop-1].trace = profiler;
-
}
void
@@ -394,7 +393,7 @@ mutationSelect(Client cntxt, Mutant m){
if ( newMalBlkStmt(m->src, m->src->ssize) < 0)
return;
- assert( m->src->profiler);
+ assert( m->src->profiler);
pushInstruction(m->src, old[0]);
for (i = 1; i < limit; i++) {
p= old[i];
@@ -488,3 +487,346 @@ mutationSum(Client cntxt, Mutant m){
GDKfree(old);
}
+static void
+mutationLeftFetchJoin_(MalBlkPtr mb, MalStkPtr stk, InstrPtr p, int
partitions, int slice, int profiler, int *v1Ptr)
+{
+ int b1;
+ InstrPtr q;
+
+ q= newStmt(mb, batRef, partitionRef);
+ setVarType(mb, getArg(q,0), getArgType(mb, p, p->retc));
+ b1 = getArg(q,0);
+ q = pushArgument(mb,q,getArg(p,p->retc));
+ q = pushInt(mb,q,2);
+ // update the stack as well, because we are executing
+ stk->stk[getArg(q, q->argc-1)].val.ival = partitions;
+ q = pushInt(mb,q,slice);
+ stk->stk[getArg(q, q->argc-1)].val.ival = slice;
+ // inherit profiling
+ mb->profiler[mb->stop-1].trace = profiler;
+
+ q= copyInstruction(p);
+ getArg(q,1)= b1;
+ *v1Ptr = getArg(q,0)= newTmpVariable(mb,TYPE_any);
+ pushInstruction(mb,q);
+ // inherit profiling
+ mb->profiler[mb->stop-1].trace = profiler;
+}
+
+void
+mutationLeftFetchJoin(Client cntxt, Mutant m){
+ int pc = m->target, i, j, k, limit, v1,v2;
+ InstrPtr p=0, *old= m->src->stmt, q;
+ int matpc = 0, profiler;
+
+
+ (void) cntxt;
+ limit= m->src->stop;
+ if ( newMalBlkStmt(m->src, m->src->ssize) < 0)
+ return;
+
+ pushInstruction(m->src, old[0]);
+ for (i = 1; i < limit; i++) {
+ p= old[i];
+ if ( i == pc){
+ // replace the instruction, e.g. with a partioned one.
+ // Dont use any partition intelligence, simple half
split
+ // x1 := algebra.LeftFetchJoin(b,Y) =>
+ // b1 := bat.partition(b,2,0);
+ // b2 := bat.partition(b,2,1);
+ // v1:= algebra.leftfetchjoin(b1,Y);
+ // v2:= algebra.leftfetchjoin(b2,Y);
+ // x1:= mat.pack(v1,v2);
+ //
+ // Be careful not to change the size of the stack,
+ // for we can not easily pass it back to the
+ // current interpreter call sequence
+ //
+ if ( m->stk->stksize < m->src->vtop + 7){
+ pushInstruction(m->src,p);
+ continue;
+ }
+
+ profiler = m->src->profiler[i].trace;
+
+ mutationLeftFetchJoin_(m->src, m->stk, p, 2, 0,
profiler, &v1);
+ mutationLeftFetchJoin_(m->src, m->stk, p, 2, 1,
profiler, &v2);
+
+ // replace its use in other mat packs
+ for (j = i+1; j < limit; j++) {
+ q= old[j];
+ if ( getModuleId(q) == matRef &&
getFunctionId(q) == packRef){
+ for( k= old[j]->retc; k < old[j]->argc;
k++)
+ if ( getArg(q,k) ==
getArg(p,0)){
+ // replace this
argument
+ matpc++;
+ delArgument(old[j],k);
+ old[j] =
setArgument(m->src,old[j],k, v2);
+ old[j] =
setArgument(m->src,old[j],k, v1);
+ break;
+ }
+ }
+ }
+
+ if ( matpc == 0){
+ q= newStmt(m->src,matRef,packRef);
+ getArg(q,0)= getArg(p,0);
+ q= pushArgument(m->src,q,v1);
+ q= pushArgument(m->src,q,v2);
+ m->src->profiler[m->src->stop-1].trace =
profiler;
+
+ q= newStmt(m->src, languageRef, passRef);
+ q = pushArgument(m->src,q, getArg(p,p->retc+1));
+ // inherit profiling
+ m->src->profiler[m->src->stop-1].trace =
profiler;
+ }
+
+ //pushInstruction(m->src,p);
+ m->target = pc;
+ m->comment = GDKstrdup("mutationLeftFetchJoin");
+ } else
+ pushInstruction(m->src,p);
+ }
+ GDKfree(old);
+}
+
+static void
+mutateNonPartitionedOperators(Mutant m, int stmtLoop, int matPackRefInstr, int
matPackRefInstrArg, InstrPtr instrMatPack, int profiler)
+{
+ int j, k, l, limit, v1[MAX_PARTITIONS], z1[MAX_PARTITIONS];
+ InstrPtr *old= m->src->stmt, q;
+ int matpc = 0, mat_pack_partitions;
+
+ limit= m->src->stop;
+
+ if(getModuleId(getInstrPtr(m->src, matPackRefInstr)) == algebraRef)
+ {
+ if(getFunctionId(getInstrPtr(m->src, matPackRefInstr)) ==
joinRef)
+ {
+ // number_of_partitions = Get number of partitions from
mat.pack
+ mat_pack_partitions = instrMatPack->argc -
instrMatPack->retc;
+
+ for (k=0; k<mat_pack_partitions; k++)
+ {
+ q= copyInstruction(getInstrPtr(m->src,
matPackRefInstr)); // matPackRefInstr = join
+
+ getArg(q,matPackRefInstrArg) =
getArg(instrMatPack, instrMatPack->retc + k);
+
+ v1[k] = getArg(q,0)=
newTmpVariable(m->src,TYPE_any);
+
+ z1[k] = getArg(q,1)=
newTmpVariable(m->src,TYPE_any);
+
+ pushInstruction(m->src,q);
+
+ // inherit profiling
+ m->src->profiler[m->src->stop-1].trace = profiler;
+ }
+
+ // replace its use in other mat packs
+ for (j = stmtLoop+1; j < limit; j++)
+ {
+ q= old[j];
+ if ( getModuleId(q) == matRef &&
getFunctionId(q) == packRef)
+ {
+ for( k= old[j]->retc; k < old[j]->argc;
k++)
+ {
+ if ( getArg(q,k) ==
getArg(getInstrPtr(m->src, matPackRefInstr),0))
+ {
+ // replace this
argument
+ matpc++;
+ delArgument(old[j],k);
+
for(l=mat_pack_partitions-1; l>=0; l--)
+ old[j] =
setArgument(m->src,old[j],k, v1[l]);
+ break;
+ }
+ }
+ }
+ if(matpc>0 && j+1<limit)
+ {
+ j++;
+ q= old[j];
+ if ( getModuleId(q) == matRef &&
getFunctionId(q) == packRef)
+ {
+ for( k= old[j]->retc; k <
old[j]->argc; k++)
+ {
+ if ( getArg(q,k) ==
getArg(getInstrPtr(m->src,matPackRefInstr),1))
+ {
+ // replace this
argument
+ matpc++;
+
delArgument(old[j],k);
+
+
for(l=mat_pack_partitions-1; l>=0; l--)
+ old[j]
= setArgument(m->src,old[j],k, z1[l]);
+ break;
+ }
+ }
+ }
+ }
+ }
+ if (matpc == 0)
+ {
+ for(k=0; k<mat_pack_partitions; k++)
+ {
+ q= newStmt(m->src,matRef,packRef);
+ getArg(q,0)=
getArg(getInstrPtr(m->src,matPackRefInstr),0);
+
+ for(k=0; k<mat_pack_partitions; k++)
+ q= pushArgument(m->src,q,v1[k]);
+ m->src->profiler[m->src->stop-1].trace
= profiler;
+
+ q= newStmt(m->src,matRef,packRef);
+ getArg(q,0)=
getArg(getInstrPtr(m->src,matPackRefInstr),1);
+
+ for(k=0; k<mat_pack_partitions; k++)
+ q= pushArgument(m->src,q,z1[k]);
+ m->src->profiler[m->src->stop-1].trace
= profiler;
+
+ q= newStmt(m->src, languageRef,
passRef); // The instruction below should be remembered if done correct
+ q = pushArgument(m->src,q,
getArg(getInstrPtr(m->src,matPackRefInstr),getInstrPtr(m->src,matPackRefInstr)->retc+1));
+
+ // inherit profiling
+ m->src->profiler[m->src->stop-1].trace
= profiler;
+ }
+ }
+ // mask the original algebra.join instruction
+ getInstrPtr(m->src,matPackRefInstr)->token =
NOOPsymbol;
+
+ }
+ }
+
+//end of the function
+}
+
+
+
+void
+mutationMatPack(Client cntxt, Mutant m) {
+ int pc = m->target, i, j, limit, profiler;
+ InstrPtr p=0, *old= m->src->stmt, instrMatPack;
+
+ int stmtLoop, matPackRefInstr, matPackRefInstrArg;
+
+
+ // search for the mat.pack outuput variable
+ // check in which instruction this mat.pack output appears in the
dependency instructions.
+
+ // If the referenced instruction is a non-bat-ref instruction, and its
cost is relatively
+ // comparable to the cost of mat.pack operator, then count the number
of partitions in the
+ // mat.pack instruction and introduce those many partitions for the new
dependency operator
+ // which we just found. That is partition the input of the dependency
operator (This dependency operator could be
+ // any for time being consider it to be a join operator and mat.pack
combines a select operator
+ // Then we have join(A,x) propagate to
+ // join(s1,x), join(s2,x), join(s3,x) that is propagate the input to
mat.pack to the newly introduced join operator
+ // and remove the old join operator. also remove the old mat.pack
operator from the select output, and introduce a new
+ // mat.pack operator to combine the join output
+
+ // Case 1
+ //
+ // A1 := algebra.subselect(....);
+ // A2 := algebra.subselect(....);
+ // M1 := mat.pack(A1, A2);
+ // J := algebra.join(M1, b);
+ //
+ //
+ // Morphed into new plan
+ //
+ // A1 := algebra.subselect(....);
+ // A2 := algebra.subselect(....);
+ // J1 := algebra.join(A1,b);
+ // J2 := algebra.join(A2,b);
+ // M := mat.pack(J1, J2);
+
+
+ // Case 2
+ //
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list