Changeset: 608619e3fb60 for MonetDB URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=608619e3fb60 Modified Files: MonetDB5/src/optimizer/opt_mapreduce.mx Branch: default Log Message:
cleanup diffs (211 lines): diff -r 0e39b0c870c8 -r 608619e3fb60 MonetDB5/src/optimizer/opt_mapreduce.mx --- a/MonetDB5/src/optimizer/opt_mapreduce.mx Sun May 23 11:37:43 2010 +0200 +++ b/MonetDB5/src/optimizer/opt_mapreduce.mx Tue May 25 10:34:09 2010 +0200 @@ -19,9 +19,12 @@ @a F. Groffen, M. Kersten @- Map-Reduce The Map-Reduce infrastructure requires a little optimizer to turn -an arbitrary query into a plan to be executed on the elements of the Cloud. +an arbitrary query into a plan to be executed on the systems in the cloud. Each cloud consists of a series of named servers, managed by Merovingian -with the pattern "*/Cloudname/node/*". +with the pattern "*/cloudname/node/*". The cloudname is detected from +the schema in which an SQL table is stored. Only schemas starting with +"mr_" are considered to be mapreduce schemas on the query node. The +cloudname is the schema name without the leading "mr_" prefix. Determining the clould is an expensive operation and for the time being performed each time when a query is compiled. @@ -30,92 +33,6 @@ We simply assume that the complete query can be executed and that only the result sets should be assembled. -[OUTOFDATE] - -Consider part of the query plan for 'select * from tables' -...@verbatim -function user.s0_0{autoCommit=true}():void; - _23:bat[:oid,:sht] := sql.bind("sys","_tables","type",1); - _24:bat[:oid,:oid] := sql.bind_dbat("sys","_tables",1); - _25 := bat.reverse(_24); -... - _96:bat[:oid,:bte] := bat.new(nil:oid,nil:bte); - _98 := bat.append(_96,_95,true); - _96:bat[:oid,:bte] := nil:BAT; - _99 := bat.append(_98,_93,true); - _100 := sql.resultSet(8,1,_33); - sql.rsColumn(_100,".tables","id","int",32,0,_33); - sql.rsColumn(_100,".tables","name","varchar",1024,0,_44); - sql.rsColumn(_100,".tables","schema_id","int",32,0,_54); - sql.rsColumn(_100,".tables","query","varchar",2048,0,_64); - sql.rsColumn(_100,".tables","type","smallint",16,0,_70); - sql.rsColumn(_100,".tables","system","boolean",1,0,_81); - sql.rsColumn(_100,".tables","commit_action","smallint",16,0,_91); - sql.rsColumn(_100,".tables","temporary","tinyint",8,0,_99); - _121 := io.stdout(); - sql.exportResult(_121,_100); -end s0_0; -...@end verbatim -This plan is turned into two routines. One to be executed -on the individual nodes and one to assemble the results. -...@verbatim -function user.s0_0mp() (s0_0:void,X61:bat[:oid,:int],X85:bat[:oid,:str],X109:bat[:oid,:int],X134:bat[:oid,:str],X142:bat[:oid,:sht],X168:bat[:oid,:bit],X191:bat[:oid,:sht],X201:bat[:oid,:bte]); - _23:bat[:oid,:sht] := sql.bind("sys","_tables","type",1); - _24:bat[:oid,:oid] := sql.bind_dbat("sys","_tables",1); - _25 := bat.reverse(_24); -... - _96:bat[:oid,:bte] := bat.new(nil:oid,nil:bte); - _98 := bat.append(_96,_95,true); - _96:bat[:oid,:bte] := nil:BAT; - _99 := bat.append(_98,_93,true); - return (s0_0,X61,X85,X109,X134,X142,X168,X191,X201); -end s0_0mp; -function user.s0_0():void; - s0_0 := nil:void; - X61 := nil:bat[:oid,:int]; - X85 := nil:bat[:oid,:str]; - X109 := nil:bat[:oid,:int]; - X134 := nil:bat[:oid,:str]; - X142 := nil:bat[:oid,:sht]; - X168 := nil:bat[:oid,:bit]; - X191 := nil:bat[:oid,:sht]; - X201 := nil:bat[:oid,:bte]; - (_253,_254,_255,_256,_257,_258,_259,_260,_261) := mapreduce.exec(0,"user","s0_0mp"); - (_263,_264,_265,_266,_267,_268,_269,_270,_271) := mapreduce.exec(1,"user","s0_0mp"); - (_273,_274,_275,_276,_277,_278,_279,_280,_281) := mapreduce.exec(2,"user","s0_0mp"); - X61 := mat.pack(_254,_264,_274); - X85 := mat.pack(_255,_265,_275); - X109 := mat.pack(_256,_266,_276); - X134 := mat.pack(_257,_267,_277); - X142 := mat.pack(_258,_268,_278); - X168 := mat.pack(_259,_269,_279); - X191 := mat.pack(_260,_270,_280); - X201 := mat.pack(_261,_271,_281); -exit _250; - X202 := sql.resultSet(8,1,X61); - sql.rsColumn(X202,".tables","id","int",32,0,X61); - sql.rsColumn(X202,".tables","name","varchar",1024,0,X85); - sql.rsColumn(X202,".tables","schema_id","int",32,0,X109); - sql.rsColumn(X202,".tables","query","varchar",2048,0,X134); - sql.rsColumn(X202,".tables","type","smallint",16,0,X142); - sql.rsColumn(X202,".tables","system","boolean",1,0,X168); - sql.rsColumn(X202,".tables","commit_action","smallint",16,0,X191); - sql.rsColumn(X202,".tables","temporary","tinyint",8,0,X201); - X232 := io.stdout(); - sql.exportResult(X232,X202); -end s0_0; -function mapreduce.exec_3(conn:str, mod:str, fcn:str):any_1...; - remote.register(conn, mod, fcn); - (X201,X202,X203) := remote.exec(conn, mod, fcn); - Y201 := remote.get(conn, X201); - Y202 := remote.get(conn, X202); - Y203 := remote.get(conn, X203); - return (Y201,Y202,Y203); -end exec; -...@end verbatim -...@{ -The code can be considered a refinement of the Octopus. -The mal primitives are meant for debugging. @mal pattern optimizer.mapreduce():str address OPTmapreduce; @@ -133,7 +50,6 @@ #include "opt_prelude.h" #include "opt_support.h" -opt_export str MRexec(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci); opt_export str MRgetCloud(int *ret, str *nme); @c #include "mal_config.h" @@ -141,14 +57,6 @@ #include "mal_interpreter.h" #include "remote.h" -...@- -The work distribution assumes that we know at compile time -the number of nodes participating in the cloud setting. -It calls the map-reduce executor to produce a result -possible with the aid of a replica. -The mapnode is an auxilary structure. -...@c - typedef struct _mapnode { str uri; str user; @@ -163,7 +71,7 @@ { int i; - mal_set_lock(mal_contextLock,"mapreduce"); + mal_set_lock(mal_contextLock, "mapreduce"); for (i = 0; mapnodes[i].uri; i++) { if (mapnodes[i].uri != NULL) GDKfree(mapnodes[i].uri); @@ -173,10 +81,10 @@ GDKfree(mapnodes[i].pass); mapnodes[i].uri = mapnodes[i].user = mapnodes[i].pass = 0; } - if ( cloud) + if (cloud) BBPreleaseref(cloud->batCacheid); cloud = 0; - mal_unset_lock(mal_contextLock,"mapreduce"); + mal_unset_lock(mal_contextLock, "mapreduce"); } str @@ -195,18 +103,18 @@ if (msg ) return msg; - if ( cloud ) + if (cloud) /* cleanup remains of previous call */ MRcleanCloud(); - mal_set_lock(mal_contextLock,"mapreduce"); + mal_set_lock(mal_contextLock, "mapreduce"); cloud = BATdescriptor(*ret); /* should succeed */ BBPkeepref(*ret); /* keep if during the session */ - mapnodes = (mapnode*) GDKzalloc(sizeof(mapnode) * (BATcount(cloud) +1)); - if ( mapnodes == 0){ + mapnodes = (mapnode*)GDKzalloc(sizeof(mapnode) * (BATcount(cloud) + 1)); + if (mapnodes == 0) { BBPreleaseref(*ret); - throw(MAL,"mapreduce.getCloud", MAL_MALLOC_FAIL); + throw(MAL, "mapreduce.getCloud", MAL_MALLOC_FAIL); } bi = bat_iterator(cloud); @@ -220,7 +128,7 @@ if (GDKnr_threads < (int) BATcount(cloud)) GDKnr_threads = (int) BATcount(cloud); - mal_unset_lock(mal_contextLock,"mapreduce"); + mal_unset_lock(mal_contextLock, "mapreduce"); BBPkeepref(*ret); /* give it to the called */ return MAL_SUCCEED; @@ -233,7 +141,7 @@ int bid; msg = MRgetCloud(&bid, &mrcluster); - if ( msg ){ + if (msg) { GDKfree(msg); /* bad programming */ return 0; } @@ -478,10 +386,10 @@ snprintf(nme, IDLENGTH, "%smap", getFunctionId(getInstrPtr(mb, 0))); /* zap */ - if (newMalBlkStmt(mc, mc->ssize) < 0){ + if (newMalBlkStmt(mc, mc->ssize) < 0) { return 0; } - if (newMalBlkStmt(mb, mb->ssize) < 0){ + if (newMalBlkStmt(mb, mb->ssize) < 0) { freeMalBlk(mc); return 0; } _______________________________________________ Checkin-list mailing list Checkin-list@monetdb.org http://mail.monetdb.org/mailman/listinfo/checkin-list