Changeset: cc2956f1a6eb for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=cc2956f1a6eb
Modified Files:
Branch: default
Log Message:
merging
diffs (truncated from 354 to 300 lines):
diff --git a/monetdb5/mal/mal_resolve.mx b/monetdb5/mal/mal_resolve.mx
--- a/monetdb5/mal/mal_resolve.mx
+++ b/monetdb5/mal/mal_resolve.mx
@@ -160,6 +160,10 @@ findFunctionType(Module scope, MalBlkPtr
m = scope;
s = m->subscope[(int)(getSubScope(getFunctionId(p)))];
if (s == 0) return -1;
+
+ returntype= (int*) GDKzalloc(p->retc * sizeof(int));
+ if ( returntype == 0) return -1;
+
while (s != NULL) { /* single scope element check */
if (getFunctionId(p) != s->name) {
s = s->skip; continue;
@@ -357,7 +361,6 @@ findFunctionType(Module scope, MalBlkPtr
* the resulting type can not be determined.
*/
s1 = 0;
- returntype= (int*) GDKzalloc(p->retc * sizeof(int));
if (sig->polymorphic)
for (k = i = 0; i < p->retc; k++, i++) {
int actual = getArgType(mb, p, i);
diff --git a/monetdb5/optimizer/opt_dataflow.mx
b/monetdb5/optimizer/opt_dataflow.mx
--- a/monetdb5/optimizer/opt_dataflow.mx
+++ b/monetdb5/optimizer/opt_dataflow.mx
@@ -110,6 +110,11 @@ opt_export void removeDataflow(InstrPtr
#include "mal_instruction.h"
#include "mal_interpreter.h"
+/*
+ * dataflow processing incurs overhead and is only
+ * relevant if multiple tasks kan be handled at the same time.
+ * Also simple expressions dont had to be done in parallel.
+*/
static int
simpleFlow(InstrPtr *old, int start, int last)
{
@@ -125,6 +130,8 @@ simpleFlow(InstrPtr *old, int start, int
if( getArg(p,0) == getArg(q,j))
simple= TRUE;
if( !simple)
+ simple = getModuleId(p) == calcRef || getModuleId(p) ==
mtimeRef || getModuleId(p) == strRef || getModuleId(p)== mmathRef;
+ if( !simple)
return 0;
p = q;
}
diff --git a/monetdb5/optimizer/opt_partition.mx
b/monetdb5/optimizer/opt_partition.mx
--- a/monetdb5/optimizer/opt_partition.mx
+++ b/monetdb5/optimizer/opt_partition.mx
@@ -38,10 +38,6 @@ address OPTpartitionMaterialize
comment "Implement the partition operation. Throw an exception if the
partition was empty,
because then the subquery should produce a NIL ";
-command partition.markH( b:bat[:any_1,:any_2] ) :bat[:oid,:any_2]
-address OPTmarkHead
-comment "Ignore a NIL bat";
-
pattern partition.vector(b:bat[:oid,:any_1]) :any_1...
address OPTvector
comment "Derive a series of slices values based on sampling";
@@ -57,7 +53,6 @@ comment "Derive a series of slices value
opt_export str OPTvector(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr
pci);
opt_export str OPTpartitionMaterialize(int *result, int *bid, ptr low, ptr
high);
-opt_export str OPTmarkHead(int *result, int *bid);
/* #define DEBUG_DETAIL*/
#define _DEBUG_OPT_PARTITION_
@@ -81,6 +76,11 @@ typedef struct{
ValRecord bounds[MAXSITES];
} Slices;
+/*
+ * The query will be controlled from the coordinator with a plan
+ * geared at parallel execution
+ * TODO pack is expensive, move to mat.new
+*/
static MalBlkPtr
OPTplanCntrl(Client cntxt, MalBlkPtr mb, MalBlkPtr pmb, Slices *slices)
{
@@ -181,18 +181,13 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb,
pushInstruction(cmb,q);
}
/* put all mat.pack instructions into the program
- and make sure that they have contiguous void headed columns
+ make sure that they have contiguous void headed columns
*/
p = getInstrPtr(pmb,0);
if ( slices->column)
for ( k=0 ; k < nrpack; k++) {
pushInstruction(cmb, pack[k]);
getArg(pack[k],0)= getArg(p,k);
-/*
- q= newFcnCall(cmb,partitionRef,markHRef);
- getArg(q,0) = getArg(p,k);
- q= pushArgument(cmb,q, getArg(pack[k],0));
-*/
}
/* finalize the dataflow block */
@@ -218,7 +213,7 @@ OPTplanCntrl(Client cntxt, MalBlkPtr mb,
return cmb;
}
-/* prepare slicing a column by addition over the target */
+/* prepare access to partitions by injection of the materialize instructions */
static int
OPTpreparePartition(MalBlkPtr nmb, InstrPtr p, Slices *slices, int pc)
{
@@ -256,6 +251,9 @@ OPTpreparePartition(MalBlkPtr nmb, Instr
return parallel;
}
+/*
+ * For bind instructions we have to inject materialize and semijoin
instructions
+*/
static int
OPTsliceColumn(Client cntxt, MalBlkPtr nmb, MalBlkPtr mb, InstrPtr p, Slices
*slices, int pc)
{
@@ -307,12 +305,15 @@ OPTsliceColumn(Client cntxt, MalBlkPtr n
/*
* The plan is analysed for the maximal subplan that involves a partitioned
table
* and that does not require data exchanges.
- * This portion is extracted for possibly remote execution.
+ * Algebraic operators that can be executed on fragments are delegated too.
+ * For example join(A,B) where A is fragmented and B is not can be done
elsewhere.
+ * In all cases we should ensure that the result of the remote execution can be
+ * simply unioned together.
*/
#define BLOCKED 1
#define REQUIRED 2
-#define EXPORTED 3
-#define NEEDED 4
+#define SUPPORTIVE 3
+
static int
OPTplanFragment(Client cntxt, MalBlkPtr mb, Slices *slices)
{
@@ -358,9 +359,7 @@ OPTplanFragment(Client cntxt, MalBlkPtr
(void) slices;
#endif
- /* Phase 1: determine all variables/instructions indirectly dependent
on a
- fragmented column
- */
+ /* Phase 1: determine all variables/instructions indirectly dependent
on a fragmented column */
last = limit;
for ( i = 0; i < limit ; i++) {
p = old[i];
@@ -379,11 +378,24 @@ OPTplanFragment(Client cntxt, MalBlkPtr
if (vars[getArg(p,j)] == BLOCKED)
plan[i] = BLOCKED;
- /* blocking instructions */
+ /* blocking instructions are those that require data exchange
or total view */
+ if ( getModuleId(p) == algebraRef && getFunctionId(p) ==
joinRef ) {
+ if (vars[getArg(p,1)] == REQUIRED && vars[getArg(p,2)]
== REQUIRED) {
+ /* not possible to delegate */
+ plan[i] = BLOCKED;
+ } else {
+ /* other variable is supportive */
+ if (vars[getArg(p,1)] != REQUIRED)
+ vars[getArg(p,1)] = SUPPORTIVE;
+ if (vars[getArg(p,2)] != REQUIRED)
+ vars[getArg(p,2)] = SUPPORTIVE;
+ plan[i] = SUPPORTIVE;
+ }
+ } else
if ( (getModuleId(p) == groupRef && (getFunctionId(p) ==
doneRef || getFunctionId(p) == newRef ||getFunctionId(p) == deriveRef) ) ||
getModuleId(p) == pqueueRef || getModuleId(p)
== aggrRef || getModuleId(p) == ioRef ||
(getModuleId(p) == sqlRef && (getFunctionId(p)
== resultSetRef || getFunctionId(p) == putName("exportValue",11) )) ||
- (getModuleId(p) == algebraRef
&&(getFunctionId(p) == sliceRef || getFunctionId(p) == joinRef ||
getFunctionId(p)==markTRef)) ) {
+ (getModuleId(p) == algebraRef
&&(getFunctionId(p) == sliceRef || getFunctionId(p)==markTRef)) ) {
/* add the targets of its argument to the output */
plan[i] = BLOCKED;
}
@@ -393,18 +405,50 @@ OPTplanFragment(Client cntxt, MalBlkPtr
vars[getArg(p,j)] = BLOCKED;
} else {
for( j = 0; j < p->argc; j++)
- if (vars[getArg(p,j)] == REQUIRED)
+ if (vars[getArg(p,j)] == REQUIRED )
break;
- if ( j != p->argc) {
- for ( j= 0; j< p->retc; j++)
+ if ( j != p->argc)
+ plan[i]= REQUIRED;
+
+ for( j = 0; j < p->argc; j++)
+ if (vars[getArg(p,j)] == SUPPORTIVE )
+ break;
+ if ( j != p->argc && plan[i] != REQUIRED)
+ plan[i]= SUPPORTIVE;
+
+ if ( plan[i] == REQUIRED)
+ for ( j= 0; j< p->argc; j++)
vars[getArg(p,j)] = REQUIRED;
- plan[i] = REQUIRED;
- }
+ if ( plan[i] == SUPPORTIVE)
+ for ( j= 0; j< p->argc; j++)
+ if ( vars[getArg(p,j)] == 0)
+ vars[getArg(p,j)] = SUPPORTIVE;
}
}
- /* Phase 2: determine all variables/instructions contributing */
- mnstr_printf(cntxt->fdout,"#phase 2\n");
+#ifdef _DEBUG_OPT_PARTITION_
+ mnstr_printf(cntxt->fdout,"\n#phase 1\n");
+ for( i= 0; i< limit; i++)
+ if (plan[i] ) {
+ switch (plan[i]) {
+ case BLOCKED:
+ mnstr_printf(cntxt->fdout,"#blocked ");
+ break;
+ case REQUIRED:
+ mnstr_printf(cntxt->fdout,"#required ");
+ break;
+ case SUPPORTIVE:
+ mnstr_printf(cntxt->fdout,"#support ");
+ }
+ if( old[i])
+ printInstruction(cntxt->fdout,
mb,0,old[i],LIST_MAL_STMT);
+ }
+#endif
+
+ /* Phase 2: determine all variables/instructions contributing
+ instructions based on supportive variables remain marked as
supportive
+ because we have to avoid common ancestor dependency on partitioned
variables
+ */
for ( i = limit -1; i >= 0 ; i--)
if ( plan[i] != BLOCKED ){
p = old[i];
@@ -412,12 +456,69 @@ OPTplanFragment(Client cntxt, MalBlkPtr
if (vars[getArg(p,j)] == REQUIRED)
plan[i] = REQUIRED;
- if( plan[i] == REQUIRED)
- for ( j= p->retc; j< p->argc; j++)
+ if ( plan[i] == 0)
+ for( j = 0; j < p->argc; j++)
+ if (vars[getArg(p,j)] == SUPPORTIVE)
+ plan[i] = SUPPORTIVE;
+
+ if( plan[i]== REQUIRED )
+ for ( j= 0; j< p->argc; j++)
vars[getArg(p,j)] = REQUIRED;
}
- /* Phase 3: determine all variables to be exported */
- mnstr_printf(cntxt->fdout,"#phase 3\n");
+#ifdef _DEBUG_OPT_PARTITION_
+ mnstr_printf(cntxt->fdout,"\n#phase 2\n");
+ for( i= 0; i< limit; i++)
+ if (plan[i] ) {
+ switch (plan[i]) {
+ case BLOCKED:
+ mnstr_printf(cntxt->fdout,"#blocked ");
+ break;
+ case REQUIRED:
+ mnstr_printf(cntxt->fdout,"#required ");
+ break;
+ case SUPPORTIVE:
+ mnstr_printf(cntxt->fdout,"#support ");
+ }
+ if( old[i])
+ printInstruction(cntxt->fdout,
mb,0,old[i],LIST_MAL_STMT);
+ }
+#endif
+ /* Phase 3: turn all supportive instructions into required ones
+ if it helps to produce the required intermediate
+ */
+ for ( i = 0; i < limit; i++)
+ if( plan[i] == SUPPORTIVE){
+ p = old[i];
+ for( j = 0; j < p->argc; j++)
+ if (vars[getArg(p,j)] == SUPPORTIVE)
+ break;
+ if( j == p->argc)
+ plan[i] = REQUIRED;
+ else
+ plan[i] = BLOCKED;
+ }
+#ifdef _DEBUG_OPT_PARTITION_
+ mnstr_printf(cntxt->fdout,"\n#phase 3\n");
+ for( i= 0; i< limit; i++)
+ if (plan[i] ) {
+ switch (plan[i]) {
+ case BLOCKED:
+ mnstr_printf(cntxt->fdout,"#blocked ");
+ break;
+ case REQUIRED:
+ mnstr_printf(cntxt->fdout,"#required ");
+ break;
+ case SUPPORTIVE:
+ mnstr_printf(cntxt->fdout,"#support ");
+ }
+ if( old[i])
+ printInstruction(cntxt->fdout,
mb,0,old[i],LIST_MAL_STMT);
+ }
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list