Changeset: 608619e3fb60 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=608619e3fb60
Modified Files:
        MonetDB5/src/optimizer/opt_mapreduce.mx
Branch: default
Log Message:

cleanup


diffs (211 lines):

diff -r 0e39b0c870c8 -r 608619e3fb60 MonetDB5/src/optimizer/opt_mapreduce.mx
--- a/MonetDB5/src/optimizer/opt_mapreduce.mx   Sun May 23 11:37:43 2010 +0200
+++ b/MonetDB5/src/optimizer/opt_mapreduce.mx   Tue May 25 10:34:09 2010 +0200
@@ -19,9 +19,12 @@
 @a F. Groffen, M. Kersten
 @- Map-Reduce
 The Map-Reduce infrastructure requires a little optimizer to turn
-an arbitrary query into a plan to be executed on the elements of the Cloud.
+an arbitrary query into a plan to be executed on the systems in the cloud.
 Each cloud consists of a series of named servers, managed by Merovingian
-with the pattern "*/Cloudname/node/*".
+with the pattern "*/cloudname/node/*".  The cloudname is detected from
+the schema in which an SQL table is stored.  Only schemas starting with
+"mr_" are considered to be mapreduce schemas on the query node.  The
+cloudname is the schema name without the leading "mr_" prefix.
 
 Determining the clould is an expensive operation and for the time being
 performed each time when a query is compiled.
@@ -30,92 +33,6 @@
 We simply assume that the complete query can be executed and that only the
 result sets should be assembled.
 
-[OUTOFDATE]
-
-Consider part of the query plan for 'select * from tables'
-...@verbatim
-function user.s0_0{autoCommit=true}():void;
-    _23:bat[:oid,:sht]  := sql.bind("sys","_tables","type",1);
-    _24:bat[:oid,:oid]  := sql.bind_dbat("sys","_tables",1);
-    _25 := bat.reverse(_24);
-...
-    _96:bat[:oid,:bte]  := bat.new(nil:oid,nil:bte);
-    _98 := bat.append(_96,_95,true);
-    _96:bat[:oid,:bte]  := nil:BAT;
-    _99 := bat.append(_98,_93,true);
-    _100 := sql.resultSet(8,1,_33);
-    sql.rsColumn(_100,".tables","id","int",32,0,_33);
-    sql.rsColumn(_100,".tables","name","varchar",1024,0,_44);
-    sql.rsColumn(_100,".tables","schema_id","int",32,0,_54);
-    sql.rsColumn(_100,".tables","query","varchar",2048,0,_64);
-    sql.rsColumn(_100,".tables","type","smallint",16,0,_70);
-    sql.rsColumn(_100,".tables","system","boolean",1,0,_81);
-    sql.rsColumn(_100,".tables","commit_action","smallint",16,0,_91);
-    sql.rsColumn(_100,".tables","temporary","tinyint",8,0,_99);
-    _121 := io.stdout();
-    sql.exportResult(_121,_100);
-end s0_0;
-...@end verbatim
-This plan is turned into two routines. One to be executed
-on the individual nodes and one to assemble the results.
-...@verbatim
-function user.s0_0mp() 
(s0_0:void,X61:bat[:oid,:int],X85:bat[:oid,:str],X109:bat[:oid,:int],X134:bat[:oid,:str],X142:bat[:oid,:sht],X168:bat[:oid,:bit],X191:bat[:oid,:sht],X201:bat[:oid,:bte]);
-    _23:bat[:oid,:sht]  := sql.bind("sys","_tables","type",1);
-    _24:bat[:oid,:oid]  := sql.bind_dbat("sys","_tables",1);
-    _25 := bat.reverse(_24);
-...
-    _96:bat[:oid,:bte]  := bat.new(nil:oid,nil:bte);
-    _98 := bat.append(_96,_95,true);
-    _96:bat[:oid,:bte]  := nil:BAT;
-    _99 := bat.append(_98,_93,true);
-    return (s0_0,X61,X85,X109,X134,X142,X168,X191,X201);
-end s0_0mp;
-function user.s0_0():void;
-    s0_0 := nil:void;
-    X61 := nil:bat[:oid,:int];
-    X85 := nil:bat[:oid,:str];
-    X109 := nil:bat[:oid,:int];
-    X134 := nil:bat[:oid,:str];
-    X142 := nil:bat[:oid,:sht];
-    X168 := nil:bat[:oid,:bit];
-    X191 := nil:bat[:oid,:sht];
-    X201 := nil:bat[:oid,:bte];
-    (_253,_254,_255,_256,_257,_258,_259,_260,_261) := 
mapreduce.exec(0,"user","s0_0mp");
-    (_263,_264,_265,_266,_267,_268,_269,_270,_271) := 
mapreduce.exec(1,"user","s0_0mp");
-    (_273,_274,_275,_276,_277,_278,_279,_280,_281) := 
mapreduce.exec(2,"user","s0_0mp");
-    X61 := mat.pack(_254,_264,_274);
-    X85 := mat.pack(_255,_265,_275);
-    X109 := mat.pack(_256,_266,_276);
-    X134 := mat.pack(_257,_267,_277);
-    X142 := mat.pack(_258,_268,_278);
-    X168 := mat.pack(_259,_269,_279);
-    X191 := mat.pack(_260,_270,_280);
-    X201 := mat.pack(_261,_271,_281);
-exit _250;
-    X202 := sql.resultSet(8,1,X61);
-    sql.rsColumn(X202,".tables","id","int",32,0,X61);
-    sql.rsColumn(X202,".tables","name","varchar",1024,0,X85);
-    sql.rsColumn(X202,".tables","schema_id","int",32,0,X109);
-    sql.rsColumn(X202,".tables","query","varchar",2048,0,X134);
-    sql.rsColumn(X202,".tables","type","smallint",16,0,X142);
-    sql.rsColumn(X202,".tables","system","boolean",1,0,X168);
-    sql.rsColumn(X202,".tables","commit_action","smallint",16,0,X191);
-    sql.rsColumn(X202,".tables","temporary","tinyint",8,0,X201);
-    X232 := io.stdout();
-    sql.exportResult(X232,X202);
-end s0_0;
-function mapreduce.exec_3(conn:str, mod:str, fcn:str):any_1...;
-       remote.register(conn, mod, fcn);
-       (X201,X202,X203) := remote.exec(conn, mod, fcn);
-       Y201 := remote.get(conn, X201);
-       Y202 := remote.get(conn, X202);
-       Y203 := remote.get(conn, X203);
-       return (Y201,Y202,Y203);
-end exec;
-...@end verbatim
-...@{
-The code can be considered a refinement of the Octopus.
-The mal primitives are meant for debugging.
 @mal
 pattern optimizer.mapreduce():str
 address OPTmapreduce;
@@ -133,7 +50,6 @@
 #include "opt_prelude.h"
 #include "opt_support.h"
 
-opt_export str MRexec(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
 opt_export str MRgetCloud(int *ret, str *nme);
 @c
 #include "mal_config.h"
@@ -141,14 +57,6 @@
 #include "mal_interpreter.h"
 #include "remote.h"
 
-...@-
-The work distribution assumes that we know at compile time
-the number of nodes participating in the cloud setting.
-It calls the map-reduce executor to produce a result
-possible with the aid of a replica.
-The mapnode is an auxilary structure.
-...@c
-
 typedef struct _mapnode {
        str uri;
        str user;
@@ -163,7 +71,7 @@
 {
        int i;
 
-       mal_set_lock(mal_contextLock,"mapreduce");
+       mal_set_lock(mal_contextLock, "mapreduce");
        for (i = 0; mapnodes[i].uri; i++) {
                if (mapnodes[i].uri != NULL)
                        GDKfree(mapnodes[i].uri);
@@ -173,10 +81,10 @@
                        GDKfree(mapnodes[i].pass);
                mapnodes[i].uri = mapnodes[i].user = mapnodes[i].pass = 0;
        }
-       if ( cloud)
+       if (cloud)
                BBPreleaseref(cloud->batCacheid);
        cloud = 0;
-       mal_unset_lock(mal_contextLock,"mapreduce");
+       mal_unset_lock(mal_contextLock, "mapreduce");
 }
 
 str
@@ -195,18 +103,18 @@
        if (msg ) 
                return msg;
 
-       if ( cloud )
+       if (cloud)
                /* cleanup remains of previous call */
                MRcleanCloud();
        
-       mal_set_lock(mal_contextLock,"mapreduce");
+       mal_set_lock(mal_contextLock, "mapreduce");
        cloud = BATdescriptor(*ret); /* should succeed */
        BBPkeepref(*ret);       /* keep if during the session */
 
-       mapnodes = (mapnode*) GDKzalloc(sizeof(mapnode) * (BATcount(cloud) +1));
-       if ( mapnodes == 0){
+       mapnodes = (mapnode*)GDKzalloc(sizeof(mapnode) * (BATcount(cloud) + 1));
+       if (mapnodes == 0) {
                BBPreleaseref(*ret);
-               throw(MAL,"mapreduce.getCloud", MAL_MALLOC_FAIL);
+               throw(MAL, "mapreduce.getCloud", MAL_MALLOC_FAIL);
        }
 
        bi = bat_iterator(cloud);
@@ -220,7 +128,7 @@
        if (GDKnr_threads < (int) BATcount(cloud))
                GDKnr_threads = (int) BATcount(cloud);
 
-       mal_unset_lock(mal_contextLock,"mapreduce");
+       mal_unset_lock(mal_contextLock, "mapreduce");
        BBPkeepref(*ret);       /* give it to the called */
 
        return MAL_SUCCEED;
@@ -233,7 +141,7 @@
        int bid;
 
        msg = MRgetCloud(&bid, &mrcluster);
-       if ( msg ){
+       if (msg) {
                GDKfree(msg); /* bad programming */
                return 0;
        }
@@ -478,10 +386,10 @@
        snprintf(nme, IDLENGTH, "%smap", getFunctionId(getInstrPtr(mb, 0)));
 
        /* zap */
-       if (newMalBlkStmt(mc, mc->ssize) < 0){
+       if (newMalBlkStmt(mc, mc->ssize) < 0) {
                return 0;
        }
-       if (newMalBlkStmt(mb, mb->ssize) < 0){
+       if (newMalBlkStmt(mb, mb->ssize) < 0) {
                freeMalBlk(mc);
                return 0;
        }
_______________________________________________
Checkin-list mailing list
Checkin-list@monetdb.org
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to