Changeset: 85ab277c39bc for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=85ab277c39bc
Modified Files:
gdk/gdk_mapreduce.c
gdk/gdk_mapreduce.h
monetdb5/modules/mal/joinpath.c
monetdb5/optimizer/opt_joinpath.c
monetdb5/optimizer/opt_pipes.c
monetdb5/optimizer/opt_prelude.c
monetdb5/optimizer/opt_prelude.h
monetdb5/optimizer/optimizer.mal
Branch: default
Log Message:
Merging
diffs (292 lines):
diff --git a/gdk/gdk_mapreduce.c b/gdk/gdk_mapreduce.c
--- a/gdk/gdk_mapreduce.c
+++ b/gdk/gdk_mapreduce.c
@@ -45,6 +45,7 @@ static MT_Sema mrqsema; /* threads wait
static void MRworker(void *);
+/* There is just a single queue for the workers */
static void
MRqueueCreate(int sz)
{
@@ -54,14 +55,21 @@ MRqueueCreate(int sz)
MT_lock_init(&mrqlock, "q_create");
MT_lock_set(&mrqlock, "q_create");
MT_sema_init(&mrqsema, 0, "q_create");
- sz = ((sz << 1) >> 1); /* we want a multiple of 2 */
+ if ( mrqueue ) {
+ GDKerror("One map-reduce queue allowed");
+ return;
+ }
+ sz *= 2;
mrqueue = (MRqueue *) GDKzalloc(sizeof(MRqueue) * sz);
- assert(mrqueue);
+ if ( mrqueue == 0) {
+ GDKerror("Could not create the map-reduce queue");
+ return;
+ }
mrqsize = sz;
mrqlast = 0;
/* create a worker thread for each core as specified as system
parameter */
for (i = 0; i < GDKnr_threads; i++)
- MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_JOINABLE);
+ MT_create_thread(&tid, MRworker, (void *) 0, MT_THR_DETACHED);
MT_lock_unset(&mrqlock, "q_create");
}
diff --git a/gdk/gdk_mapreduce.h b/gdk/gdk_mapreduce.h
--- a/gdk/gdk_mapreduce.h
+++ b/gdk/gdk_mapreduce.h
@@ -20,8 +20,6 @@
#ifndef _GDK_MAPREDUCE_H_
#define _GDK_MAPREDUCE_H_
-#include <monet_options.h>
-
typedef struct {
MT_Sema *sema; /* micro scheduler handle */
void (*cmd) (void *); /* the function to be executed */
diff --git a/monetdb5/modules/mal/joinpath.c b/monetdb5/modules/mal/joinpath.c
--- a/monetdb5/modules/mal/joinpath.c
+++ b/monetdb5/modules/mal/joinpath.c
@@ -208,6 +208,9 @@ ALGjoinPathBody(Client cntxt, int top, B
break;
case 2:
b = BATsemijoin(joins[j], joins[j + 1]);
+ break;
+ case 3:
+ b = BATleftfetchjoin(joins[j], joins[j + 1],
BATcount(joins[j]));
}
if (b==NULL){
if ( postpone[j] && postpone[j+1]){
@@ -272,9 +275,10 @@ ALGjoinPath(Client cntxt, MalBlkPtr mb,
int i,*bid,top=0;
int *r = (int*) getArgReference(stk, pci, 0);
BAT *b, **joins = (BAT**)GDKmalloc(pci->argc*sizeof(BAT*));
+ int error = 0;
str joinPathRef = putName("joinPath",8);
+ str semijoinPathRef = putName("semijoinPath",12);
str leftjoinPathRef = putName("leftjoinPath",12);
- int error = 0;
if ( joins == NULL)
throw(MAL, "algebra.joinPath", MAL_MALLOC_FAIL);
@@ -304,7 +308,17 @@ ALGjoinPath(Client cntxt, MalBlkPtr mb,
fprintf(stderr,"#joinpath %s\n", ps ? ps : "");
GDKfree(ps);
}
- b= ALGjoinPathBody(cntxt,top,joins, (getFunctionId(pci)==
joinPathRef?1: (getFunctionId(pci) == leftjoinPathRef? 0:2)));
+ if ( getFunctionId(pci) == joinPathRef)
+ b= ALGjoinPathBody(cntxt,top,joins, 1);
+ else
+ if ( getFunctionId(pci) == leftjoinPathRef)
+ b= ALGjoinPathBody(cntxt,top,joins, 0);
+ else
+ if ( getFunctionId(pci) == semijoinPathRef)
+ b= ALGjoinPathBody(cntxt,top,joins, 2);
+ else
+ b= ALGjoinPathBody(cntxt,top,joins, 3);
+
GDKfree(joins);
if ( b)
BBPkeepref( *r = b->batCacheid);
diff --git a/monetdb5/optimizer/opt_joinpath.c
b/monetdb5/optimizer/opt_joinpath.c
--- a/monetdb5/optimizer/opt_joinpath.c
+++ b/monetdb5/optimizer/opt_joinpath.c
@@ -52,9 +52,6 @@ static int
OPTjoinSubPath(Client cntxt, MalBlkPtr mb)
{
int i,j,k,top=0, actions =0;
- str joinPathRef = putName("joinPath",8);
- str leftjoinPathRef = putName("leftjoinPath",12);
- str semijoinPathRef = putName("semijoinPath",12);
InstrPtr q = NULL, p, *old;
int limit, slimit;
Candidate *candidate;
@@ -68,7 +65,7 @@ OPTjoinSubPath(Client cntxt, MalBlkPtr m
limit= mb->stop;
slimit= mb->ssize;
for(i=0, p= getInstrPtr(mb, i); i< limit; i++, p= getInstrPtr(mb, i))
- if ( getFunctionId(p)== joinPathRef || getFunctionId(p)==
leftjoinPathRef || getFunctionId(p) == semijoinPathRef)
+ if ( getFunctionId(p)== joinPathRef || getFunctionId(p)==
leftjoinPathRef || getFunctionId(p) == semijoinPathRef || getFunctionId(p) ==
leftfetchjoinPathRef)
for ( j= p->retc; j< p->argc-1; j++){
for (k= top-1; k >= 0 ; k--)
if ( candidate[k].lvar == getArg(p,j)
&& candidate[k].rvar == getArg(p,j+1) && candidate[k].fcn == getFunctionId(p)){
@@ -98,7 +95,7 @@ OPTjoinSubPath(Client cntxt, MalBlkPtr m
}
for(i=0, p= old[i]; i< limit; i++, p= old[i]) {
- if( getFunctionId(p)== joinPathRef || getFunctionId(p)==
leftjoinPathRef || getFunctionId(p) == semijoinPathRef)
+ if( getFunctionId(p)== joinPathRef || getFunctionId(p)==
leftjoinPathRef || getFunctionId(p) == semijoinPathRef || getFunctionId(p)==
leftfetchjoinPathRef)
for ( j= p->retc ; j< p->argc-1; j++){
for (k= top-1; k >= 0 ; k--)
if ( candidate[k].lvar == getArg(p,j)
&& candidate[k].rvar == getArg(p,j+1) && candidate[k].fcn == getFunctionId(p)
&& candidate[k].cnt > 1){
@@ -109,6 +106,8 @@ OPTjoinSubPath(Client cntxt, MalBlkPtr m
q= newStmt(mb,
algebraRef, leftjoinRef);
else if (
candidate[k].fcn == semijoinPathRef)
q= newStmt(mb,
algebraRef, semijoinRef);
+ else if (
candidate[k].fcn == leftfetchjoinPathRef)
+ q= newStmt(mb,
algebraRef, leftfetchjoinRef);
q= pushArgument(mb,q,
candidate[k].lvar);
q= pushArgument(mb,q,
candidate[k].rvar);
candidate[k].p = q;
@@ -122,6 +121,8 @@ OPTjoinSubPath(Client cntxt, MalBlkPtr m
setFunctionId(p, semijoinRef);
else if (
getFunctionId(p) == joinPathRef)
setFunctionId(p, joinRef);
+ else if (
getFunctionId(p) == leftfetchjoinPathRef)
+
setFunctionId(p, leftfetchjoinRef);
}
actions ++;
OPTDEBUGjoinPath {
@@ -154,9 +155,6 @@ OPTjoinPathImplementation(Client cntxt,
{
int i,j,k, actions=0;
int *pc;
- str joinPathRef = putName("joinPath",8);
- str leftjoinPathRef = putName("leftjoinPath",12);
- str semijoinPathRef = putName("semijoinPath",12);
InstrPtr q,r;
InstrPtr *old;
int *varcnt; /* use count */
@@ -182,7 +180,6 @@ OPTjoinPathImplementation(Client cntxt,
return 0;
}
/*
- * @-
* Count the variable use as arguments first.
*/
for (i = 0; i<limit; i++){
@@ -193,9 +190,8 @@ OPTjoinPathImplementation(Client cntxt,
for (i = 0; i<limit; i++){
p= old[i];
- if( getModuleId(p)== algebraRef && (getFunctionId(p)== joinRef
|| getFunctionId(p) == leftjoinRef || getFunctionId(p) == semijoinRef)){
+ if( getModuleId(p)== algebraRef && (getFunctionId(p)== joinRef
|| getFunctionId(p) == leftjoinRef || getFunctionId(p) == semijoinRef ||
getFunctionId(p) == leftfetchjoinRef)){
/*
- * @-
* Try to expand its argument list with what we have
found so far.
* This creates a series of join paths, many of which
will be removed during deadcode elimination.
*/
@@ -204,7 +200,6 @@ OPTjoinPathImplementation(Client cntxt,
for(j=p->retc; j<p->argc; j++){
r= getInstrPtr(mb,pc[getArg(p,j)]);
/*
- * @-
* Don't inject a pattern when it is used more
than once.
*/
if (r && varcnt[getArg(p,j)] > 1){
@@ -237,6 +232,12 @@ OPTjoinPathImplementation(Client cntxt,
q =
pushArgument(mb,q,getArg(r,k));
} else
q =
pushArgument(mb,q,getArg(p,j));
+ } else if ( getFunctionId(p) ==
leftfetchjoinRef){
+ if( r && getModuleId(r)== algebraRef
&& ( getFunctionId(r)== leftfetchjoinRef || getFunctionId(r)==
leftfetchjoinPathRef) ){
+ for(k= r->retc; k<r->argc; k++)
+ q =
pushArgument(mb,q,getArg(r,k));
+ } else
+ q =
pushArgument(mb,q,getArg(p,j));
}
}
OPTDEBUGjoinPath {
@@ -250,7 +251,6 @@ OPTjoinPathImplementation(Client cntxt,
goto wrapup;
}
/*
- * @-
* Final type check and hardwire the result type,
because that can not be inferred directly from the signature
*/
for(j=1; j<q->argc-1; j++)
@@ -273,6 +273,8 @@ OPTjoinPathImplementation(Client cntxt,
setFunctionId(q,leftjoinPathRef);
else if ( q->argc > 2 && getFunctionId(q) ==
semijoinRef)
setFunctionId(q,semijoinPathRef);
+ else if ( q->argc > 2 && getFunctionId(q) ==
leftfetchjoinRef)
+ setFunctionId(q,leftfetchjoinPathRef);
freeInstruction(p);
p= q;
actions++;
diff --git a/monetdb5/optimizer/opt_pipes.c b/monetdb5/optimizer/opt_pipes.c
--- a/monetdb5/optimizer/opt_pipes.c
+++ b/monetdb5/optimizer/opt_pipes.c
@@ -74,9 +74,9 @@ struct PIPELINES {
"optimizer.mergetable();"
"optimizer.deadcode();"
"optimizer.commonTerms();"
-// "optimizer.groups();"
-// "optimizer.joinPath();"
-// "optimizer.reorder();"
+ "optimizer.groups();"
+ "optimizer.joinPath();"
+ "optimizer.reorder();"
"optimizer.deadcode();"
"optimizer.reduce();"
"optimizer.dataflow();"
diff --git a/monetdb5/optimizer/opt_prelude.c b/monetdb5/optimizer/opt_prelude.c
--- a/monetdb5/optimizer/opt_prelude.c
+++ b/monetdb5/optimizer/opt_prelude.c
@@ -117,6 +117,7 @@ str kunionRef;
str kuniqueRef;
str languageRef;
str leftfetchjoinRef;
+str leftfetchjoinPathRef;
str leftjoinRef;
str leftjoinPathRef;
str likeselectRef;
@@ -198,6 +199,7 @@ str selectNotNilRef;
str selectRef;
str semaRef;
str semijoinRef;
+str semijoinPathRef;
str setAccessRef;
str setWriteModeRef;
str sliceRef;
@@ -363,6 +365,7 @@ void optimizerInit(void){
kuniqueRef= putName("kunique",7);
languageRef= putName("language",8);
leftfetchjoinRef = putName("leftfetchjoin",13);
+ leftfetchjoinPathRef = putName("leftfetchjoinPath",17);
leftjoinRef = putName("leftjoin",8);
leftjoinPathRef = putName("leftjoinPath",12);
likeselectRef = putName("like_select",11);
@@ -443,6 +446,7 @@ void optimizerInit(void){
selectRef = putName("select",6);
semaRef = putName("sema",4);
semijoinRef = putName("semijoin",8);
+ semijoinPathRef = putName("semijoinPath",12);
setAccessRef = putName("setAccess",9);
setWriteModeRef= putName("setWriteMode",12);
sliceRef = putName("slice",5);
diff --git a/monetdb5/optimizer/opt_prelude.h b/monetdb5/optimizer/opt_prelude.h
--- a/monetdb5/optimizer/opt_prelude.h
+++ b/monetdb5/optimizer/opt_prelude.h
@@ -116,6 +116,7 @@ opt_export str kunionRef;
opt_export str kuniqueRef;
opt_export str languageRef;
opt_export str leftfetchjoinRef;
+opt_export str leftfetchjoinPathRef;
opt_export str leftjoinRef;
opt_export str leftjoinPathRef;
opt_export str likeselectRef;
@@ -196,6 +197,7 @@ opt_export str selectNotNilRef;
opt_export str selectRef;
opt_export str semaRef;
opt_export str semijoinRef;
+opt_export str semijoinPathRef;
opt_export str setAccessRef;
opt_export str setWriteModeRef;
opt_export str sliceRef;
diff --git a/monetdb5/optimizer/optimizer.mal b/monetdb5/optimizer/optimizer.mal
--- a/monetdb5/optimizer/optimizer.mal
+++ b/monetdb5/optimizer/optimizer.mal
@@ -288,6 +288,10 @@ pattern algebra.leftjoinPath(l:bat[:any,
address ALGjoinPath
comment "Routine to handle join paths. The type analysis is rather tricky.";
+pattern algebra.leftfetchjoinPath(l:bat[:any,:any]...):bat[:any,:any]
+address ALGjoinPath
+comment "Routine to handle join paths. The type analysis is rather tricky.";
+
pattern algebra.semijoinPath(l:bat[:any,:any]...):bat[:any,:any]
address ALGjoinPath
comment "Routine to handle join paths. The type analysis is rather tricky.";
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list