Changeset: 329fafa35a13 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=329fafa35a13
Modified Files:
monetdb5/optimizer/opt_partition.mx
Branch: default
Log Message:
Extend the subplans with remote joins
diffs (truncated from 308 to 300 lines):
diff --git a/monetdb5/optimizer/opt_partition.mx
b/monetdb5/optimizer/opt_partition.mx
--- a/monetdb5/optimizer/opt_partition.mx
+++ b/monetdb5/optimizer/opt_partition.mx
@@ -38,10 +38,6 @@ address OPTpartitionMaterialize
comment "Implement the partition operation. Throw an exception if the
partition was empty,
because then the subquery should produce a NIL ";
-command partition.markH( b:bat[:any_1,:any_2] ) :bat[:oid,:any_2]
-address OPTmarkHead
-comment "Ignore a NIL bat";
-
pattern partition.vector(b:bat[:oid,:any_1]) :any_1...
address OPTvector
comment "Derive a series of slices values based on sampling";
@@ -57,7 +53,6 @@ comment "Derive a series of slices value
opt_export str OPTvector(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
opt_export str OPTpartitionMaterialize(int *result, int *bid, ptr low, ptr
high);
-opt_export str OPTmarkHead(int *result, int *bid);
/* #define DEBUG_DETAIL*/
#define _DEBUG_OPT_PARTITION_
@@ -81,6 +76,11 @@ typedef struct{
ValRecord bounds[MAXSITES];
} Slices;
+/*
+ * The query will be controlled from the coordinator with a plan
+ * geared at parallel execution
+ * TODO pack is expensive, move to mat.new
+*/
static MalBlkPtr
OPTplanCntrl(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb, Slices *slices)
{
@@ -181,18 +181,13 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb,
pushInstruction(cmb,q);
}
/* put all mat.pack instructions into the program
- and make sure that they have contiguous void headed columns
+ make sure that they have contiguous void headed columns
*/
p = getInstrPtr(pmb,0);
if ( slices->column)
for ( k=0 ; k < nrpack; k++) {
pushInstruction(cmb, pack[k]);
getArg(pack[k],0)= getArg(p,k);
-/*
- q= newFcnCall(cmb,partitionRef,markHRef);
- getArg(q,0) = getArg(p,k);
- q= pushArgument(cmb,q, getArg(pack[k],0));
-*/
}
/* finalize the dataflow block */
@@ -218,7 +213,7 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb,
return cmb;
}
-/* prepare slicing a column by addition over the target */
+/* prepare access to partitions by injection of the materialize instructions */
static int
OPTpreparePartition(MalBlkPtr nmb, InstrPtr p, Slices *slices, int pc)
{
@@ -256,6 +251,9 @@ OPTpreparePartition(MalBlkPtr nmb, Instr
return parallel;
}
+/*
+ * For bind instructions we have to inject materialize and semijoin
instructions
+*/
static int
OPTsliceColumn(Client cntxt, MalBlkPtr nmb, MalBlkPtr mb, InstrPtr p, Slices
*slices, int pc)
{
@@ -307,12 +305,15 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n
/*
* The plan is analysed for the maximal subplan that involves a partitioned
table
* and that does not require data exchanges.
- * This portion is extracted for possibly remote execution.
+ * Algebraic operators that can be executed on fragments are delegated too.
+ * For example join(A,B) where A is fragmented and B is not can be done
elsewhere.
+ * In all cases we should ensure that the result of the remote execution can be
+ * simply unioned together.
*/
#define BLOCKED 1
#define REQUIRED 2
-#define EXPORTED 3
-#define NEEDED 4
+#define SUPPORTIVE 3
+
static int
OPTplanFragment(Client cntxt, MalBlkPtr mb, Slices *slices)
{
@@ -358,9 +359,7 @@ OPTplanFragment(Client cntxt, MalBlkPtr
(void) slices;
#endif
- /* Phase 1: determine all variables/instructions indirectly dependent
on a
- fragmented column
- */
+ /* Phase 1: determine all variables/instructions indirectly dependent
on a fragmented column */
last = limit;
for ( i = 0; i < limit ; i++) {
p = old[i];
@@ -379,11 +378,24 @@ OPTplanFragment(Client cntxt, MalBlkPtr
if (vars[getArg(p,j)] == BLOCKED)
plan[i] = BLOCKED;
- /* blocking instructions */
+ /* blocking instructions are those that require data exchange
or total view */
+ if ( getModuleId(p) == algebraRef && getFunctionId(p) ==
joinRef ) {
+ if (vars[getArg(p,1)] == REQUIRED && vars[getArg(p,2)]
== REQUIRED) {
+ /* not possible to delegate */
+ plan[i] = BLOCKED;
+ } else {
+ /* other variable is supportive */
+ if (vars[getArg(p,1)] != REQUIRED)
+ vars[getArg(p,1)] = SUPPORTIVE;
+ if (vars[getArg(p,2)] != REQUIRED)
+ vars[getArg(p,2)] = SUPPORTIVE;
+ plan[i] = SUPPORTIVE;
+ }
+ } else
if ( (getModuleId(p) == groupRef && (getFunctionId(p) ==
doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) ) ||
getModuleId(p) == pqueueRef || getModuleId(p)
== aggrRef || getModuleId(p) == ioRef ||
(getModuleId(p) == sqlRef && (getFunctionId(p)
== resultSetRef || getFunctionId(p) == putName("exportValue",11) )) ||
- (getModuleId(p) == algebraRef
&&(getFunctionId(p) == sliceRef || getFunctionId(p) == joinRef ||
getFunctionId(p)==markTRef)) ) {
+ (getModuleId(p) == algebraRef
&&(getFunctionId(p) == sliceRef || getFunctionId(p)==markTRef)) ) {
/* add the targets of its argument to the output */
plan[i] = BLOCKED;
}
@@ -393,18 +405,50 @@ OPTplanFragment(Client cntxt, MalBlkPtr
vars[getArg(p,j)] = BLOCKED;
} else {
for( j = 0; j < p->argc; j++)
- if (vars[getArg(p,j)] == REQUIRED)
+ if (vars[getArg(p,j)] == REQUIRED )
break;
- if ( j != p->argc) {
- for ( j= 0; j< p->retc; j++)
+ if ( j != p->argc)
+ plan[i]= REQUIRED;
+
+ for( j = 0; j < p->argc; j++)
+ if (vars[getArg(p,j)] == SUPPORTIVE )
+ break;
+ if ( j != p->argc && plan[i] != REQUIRED)
+ plan[i]= SUPPORTIVE;
+
+ if ( plan[i] == REQUIRED)
+ for ( j= 0; j< p->argc; j++)
vars[getArg(p,j)] = REQUIRED;
- plan[i] = REQUIRED;
- }
+ if ( plan[i] == SUPPORTIVE)
+ for ( j= 0; j< p->argc; j++)
+ if ( vars[getArg(p,j)] == 0)
+ vars[getArg(p,j)] = SUPPORTIVE;
}
}
- /* Phase 2: determine all variables/instructions contributing */
- mnstr_printf(cntxt->fdout,"#phase 2\n");
+#ifdef _DEBUG_OPT_PARTITION_
+ mnstr_printf(cntxt->fdout,"\n#phase 1\n");
+ for( i= 0; i< limit; i++)
+ if (plan[i] ) {
+ switch (plan[i]) {
+ case BLOCKED:
+ mnstr_printf(cntxt->fdout,"#blocked ");
+ break;
+ case REQUIRED:
+ mnstr_printf(cntxt->fdout,"#required ");
+ break;
+ case SUPPORTIVE:
+ mnstr_printf(cntxt->fdout,"#support ");
+ }
+ if( old[i])
+ printInstruction(cntxt->fdout,
mb,0,old[i],LIST_MAL_STMT);
+ }
+#endif
+
+ /* Phase 2: determine all variables/instructions contributing
+ instructions based on supportive variables remain marked as
supportive
+ because we have to avoid common ancestor dependency on partitioned
variables
+ */
for ( i = limit -1; i >= 0 ; i--)
if ( plan[i] != BLOCKED ){
p = old[i];
@@ -412,12 +456,69 @@ OPTplanFragment(Client cntxt, MalBlkPtr
if (vars[getArg(p,j)] == REQUIRED)
plan[i] = REQUIRED;
- if( plan[i] == REQUIRED)
- for ( j= p->retc; j< p->argc; j++)
+ if ( plan[i] == 0)
+ for( j = 0; j < p->argc; j++)
+ if (vars[getArg(p,j)] == SUPPORTIVE)
+ plan[i] = SUPPORTIVE;
+
+ if( plan[i]== REQUIRED )
+ for ( j= 0; j< p->argc; j++)
vars[getArg(p,j)] = REQUIRED;
}
- /* Phase 3: determine all variables to be exported */
- mnstr_printf(cntxt->fdout,"#phase 3\n");
+#ifdef _DEBUG_OPT_PARTITION_
+ mnstr_printf(cntxt->fdout,"\n#phase 2\n");
+ for( i= 0; i< limit; i++)
+ if (plan[i] ) {
+ switch (plan[i]) {
+ case BLOCKED:
+ mnstr_printf(cntxt->fdout,"#blocked ");
+ break;
+ case REQUIRED:
+ mnstr_printf(cntxt->fdout,"#required ");
+ break;
+ case SUPPORTIVE:
+ mnstr_printf(cntxt->fdout,"#support ");
+ }
+ if( old[i])
+ printInstruction(cntxt->fdout,
mb,0,old[i],LIST_MAL_STMT);
+ }
+#endif
+ /* Phase 3: turn all supportive instructions into required ones
+ if it helps to produce the required intermediate
+ */
+ for ( i = 0; i < limit; i++)
+ if( plan[i] == SUPPORTIVE){
+ p = old[i];
+ for( j = 0; j < p->argc; j++)
+ if (vars[getArg(p,j)] == SUPPORTIVE)
+ break;
+ if( j == p->argc)
+ plan[i] = REQUIRED;
+ else
+ plan[i] = BLOCKED;
+ }
+#ifdef _DEBUG_OPT_PARTITION_
+ mnstr_printf(cntxt->fdout,"\n#phase 3\n");
+ for( i= 0; i< limit; i++)
+ if (plan[i] ) {
+ switch (plan[i]) {
+ case BLOCKED:
+ mnstr_printf(cntxt->fdout,"#blocked ");
+ break;
+ case REQUIRED:
+ mnstr_printf(cntxt->fdout,"#required ");
+ break;
+ case SUPPORTIVE:
+ mnstr_printf(cntxt->fdout,"#support ");
+ }
+ if( old[i])
+ printInstruction(cntxt->fdout,
mb,0,old[i],LIST_MAL_STMT);
+ }
+#endif
+ /* Phase 4: determine all variables to be exported
+ this is limited to the partitioned fragments
+ supported variables remain local
+ */
ret= newInstruction(nmb,ASSIGNsymbol);
ret->barrier = RETURNsymbol;
ret->argc= ret->retc = 0;
@@ -425,7 +526,7 @@ OPTplanFragment(Client cntxt, MalBlkPtr
if ( plan[i] != REQUIRED ){
p = old[i];
for( j = p->retc; j < p->argc; j++)
- if (vars[getArg(p,j)] == REQUIRED &&
isaBatType(getArgType(nmb,p,j)) ) {
+ if ( vars[getArg(p,j)] == REQUIRED &&
isaBatType(getArgType(nmb,p,j)) ) {
for ( k = 0; k < ret->retc; k++)
if (getArg(ret,k) == getArg(p,j))
break;
@@ -458,10 +559,8 @@ OPTplanFragment(Client cntxt, MalBlkPtr
for( j = 0; j< mb->stop; j++) {
p = getInstrPtr(mb,j);
for ( k = 0; k< p->retc; k++)
- if( getArg(p,k) == getArg(ret,i)){
- mnstr_printf(cntxt->fdout,"#adjust instruction
%d\n",j);
+ if( getArg(p,k) == getArg(ret,i))
getArg(p,k) = newTmpVariable(mb,
getArgType(mb,p,k));
- }
}
}
@@ -615,7 +714,9 @@ OPTpartitionImplementation(Client cntxt,
*vv = @1_nil;
vm = (ptr) &min.val.@2;
BATmin(b, vm);
- step.val.@2 = (max.val.@2 - min.val.@2) / (pci->retc - 2);
+ if ( pci->retc > 2)
+ step.val.@2 = (max.val.@2 - min.val.@2) / (pci->retc -
2);
+ else
if ( step.val.@2 == 0)
step.val.@2 = max.val.@2;
for ( i = 1; i < pci->retc-1; i++) {
@@ -680,13 +781,3 @@ str OPTpartitionMaterialize(int *result,
return ALGselectInclusive( result, bid, low, high, &bitlow, &bithigh);
}
-
-opt_export str OPTmarkHead(int *result, int *bid)
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list