Changeset: 11fbb5587d03 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=11fbb5587d03
Modified Files:
        MonetDB5/src/mal/Tests/tst450.stable.out
        MonetDB5/src/modules/mal/Tests/inspect05.stable.out
        MonetDB5/src/optimizer/opt_mapreduce.mx
Branch: default
Log Message:

Minor additions to mapreduce
Keep the BAT with nodes for inspection
Cleanup the structure properly, instead of the non-existing last one.
Simple protection against concurrency.


diffs (229 lines):

diff -r 617161373486 -r 11fbb5587d03 MonetDB5/src/mal/Tests/tst450.stable.out
--- a/MonetDB5/src/mal/Tests/tst450.stable.out  Fri May 21 13:51:19 2010 +0200
+++ b/MonetDB5/src/mal/Tests/tst450.stable.out  Fri May 21 15:56:09 2010 +0200
@@ -119,6 +119,7 @@
 [ "mal",         nil     ]
 [ "manual",      nil     ]
 [ "mapi",        nil     ]
+[ "mapreduce",   nil     ]
 [ "master",              nil     ]
 [ "mat",         nil     ]
 [ "mbr",         nil     ]
diff -r 617161373486 -r 11fbb5587d03 
MonetDB5/src/modules/mal/Tests/inspect05.stable.out
--- a/MonetDB5/src/modules/mal/Tests/inspect05.stable.out       Fri May 21 
13:51:19 2010 +0200
+++ b/MonetDB5/src/modules/mal/Tests/inspect05.stable.out       Fri May 21 
15:56:09 2010 +0200
@@ -1009,6 +1009,7 @@
 [ "new",                         "command",      "cluster",              
"(b:bat[:oid,:bte],bits:int,offset:int) 
(psum:bat[:oid,:wrd],map:bat[:oid,:wrd]) ",                                     
        "CLS_create_bte;"                               ]
 [ "split",                       "pattern",      "cluster",      
"(clustered:bat[:oid,:any_1],psum:bat[:oid,:wrd]):bat[:oid,:any_1]... ",        
                                                                                
                        "CLS_split;"                                    ]
 [ "table",                               "pattern",      "cluster",      
"(b:bat[:oid,:any]...):bat[:oid,:oid] ",                                        
                                        "CLUSTER_table;"                        
                ]
+[ "getCloud",                    "command",      "mapreduce",    
"(nme:str):bat[:oid,:str] ",                                                    
                                                        "MRgetCloud;"           
                                ]
 [ "multiplex",                   "pattern",      "mal",          
"(a:any...):any ",                                                              
                                                        "OPTremapMultiplex;"    
                                ]
 [ "argRecord",                   "pattern",      "sql",          
"(a:any...):str ",                                                              
                                                        "SQLargRecord;"         
                                ]
 [ "argRecord",                   "pattern",      "sql",          "():str ",    
                                                                                
                                          "SQLargRecord;"                       
                  ]
diff -r 617161373486 -r 11fbb5587d03 MonetDB5/src/optimizer/opt_mapreduce.mx
--- a/MonetDB5/src/optimizer/opt_mapreduce.mx   Fri May 21 13:51:19 2010 +0200
+++ b/MonetDB5/src/optimizer/opt_mapreduce.mx   Fri May 21 15:56:09 2010 +0200
@@ -20,6 +20,11 @@
 @- Map-Reduce
 The Map-Reduce infrastructure requires a little optimizer to turn
 an arbitrary query into a plan to be executed on the elements of the Cloud.
+Each cloud consists of a series of named servers, managed by Merovingian
+with the pattern "*/Cloudname/node/*".
+
+Determining the clould is an expensive operation and for the time being
+performed each time when a query is compiled.
 
 In the first implementation we don't optimize the plan against the mapping 
scheme.
 We simply assume that the complete query can be executed and that only the
@@ -108,14 +113,20 @@
        return (Y201,Y202,Y203);
 end exec;
 @end verbatim
+...@{
 The code can be considered a refinement of the Octopus.
-...@{
+The mal primitives are meant for debugging.
 @mal
 pattern optimizer.mapreduce():str
 address OPTmapreduce;
 pattern optimizer.mapreduce(mod:str, fcn:str):str
 address OPTmapreduce
 comment "Modify the plan to exploit parallel processing on multiple systems 
using map-reduce";
+
+module mapreduce;
+command getCloud(nme:str):bat[:oid,:str]
+address MRgetCloud
+comment "Localize the elements of a named cloud";
 @h
 #ifndef _OPT_MAPREDUCE_
 #define _OPT_MAPREDUCE_
@@ -123,6 +134,7 @@
 #include "opt_support.h"
 
 opt_export str MRexec(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
+opt_export str MRgetCloud(int *ret, str *nme);
 @c
 #include "mal_config.h"
 #include "opt_mapreduce.h"
@@ -134,6 +146,7 @@
 the number of nodes participating in the cloud setting.
 It calls the map-reduce executor to produce a result
 possible with the aid of a replica.
+The mapnode is an auxilary structure.
 @c
 
 typedef struct _mapnode {
@@ -142,68 +155,92 @@
        str pass;
 } mapnode;
 
-#define MAXNODES 256
-static mapnode mapnodes[MAXNODES];
+static mapnode *mapnodes;
+static BAT *cloud;
+
+static void
+MRcleanCloud()
+{
+       int i;
+
+       mal_set_lock(mal_contextLock,"mapreduce");
+       for (i = 0; mapnodes[i].uri; i++) {
+               if (mapnodes[i].uri != NULL)
+                       GDKfree(mapnodes[i].uri);
+               if (mapnodes[i].user != NULL)
+                       GDKfree(mapnodes[i].user);
+               if (mapnodes[i].pass != NULL)
+                       GDKfree(mapnodes[i].pass);
+               mapnodes[i].uri = mapnodes[i].user = mapnodes[i].pass = 0;
+       }
+       if ( cloud)
+               BBPreleaseref(cloud->batCacheid);
+       cloud = 0;
+       mal_unset_lock(mal_contextLock,"mapreduce");
+}
+
+str
+MRgetCloud(int *ret, str *mrcluster)
+{
+       str msg;
+       BUN p, q;
+       BATiter bi;
+       char nodes[BUFSIZ];
+       char *n = nodes;
+       int mapcount = 0;
+
+       snprintf(nodes, sizeof(nodes), "*/%s/node/*", *mrcluster);
+       
+       msg = RMTresolve(ret, &n);
+       if (msg ) 
+               return msg;
+
+       if ( cloud )
+               /* cleanup remains of previous call */
+               MRcleanCloud();
+       
+       mal_set_lock(mal_contextLock,"mapreduce");
+       cloud = BATdescriptor(*ret); /* should succeed */
+       BBPkeepref(*ret);       /* keep if during the session */
+
+       mapnodes = (mapnode*) GDKzalloc(sizeof(mapnode) * (BATcount(cloud) +1));
+       if ( mapnodes == 0){
+               BBPreleaseref(*ret);
+               throw(MAL,"mapreduce.getCloud", MAL_MALLOC_FAIL);
+       }
+
+       bi = bat_iterator(cloud);
+       BATloop(cloud, p, q) {
+               str t = (str)BUNtail(bi, p);
+               mapnodes[mapcount].uri = GDKstrdup(t);
+               mapnodes[mapcount].user = GDKstrdup("monetdb");
+               mapnodes[mapcount].pass = GDKstrdup("monetdb");
+               mapcount++;
+       }
+       if (GDKnr_threads < (int) BATcount(cloud))
+               GDKnr_threads = (int) BATcount(cloud);
+
+       mal_unset_lock(mal_contextLock,"mapreduce");
+       BBPkeepref(*ret);       /* give it to the called */
+
+       return MAL_SUCCEED;
+}
 
 static int
 MRcloudSize(str mrcluster)
 {
        str msg;
-       bat bid = 0;
-       BAT *b;
-       BUN p, q;
-       BATiter bi;
-       int mapcount = 0;
-       char nodes[1024];
-       char *n = nodes;
+       int bid;
 
-       snprintf(nodes, sizeof(nodes), "*/%s/node/*", mrcluster);
-       
-       msg = RMTresolve(&bid, &n);
-       if (msg != MAL_SUCCEED) {
-               if (msg != M5OutOfMemory)
-                       GDKfree(msg);
-               return(0);
+       msg = MRgetCloud(&bid, &mrcluster);
+       if ( msg ){
+               GDKfree(msg); /* bad programming */
+               return 0;
        }
-
-       b = BATdescriptor(bid);
-       if (b == NULL)
-               return(0);
-
-       bi = bat_iterator(b);
-       BATloop(b, p, q) {
-               str t = (str)BUNtail(bi, p);
-
-               if (mapcount == MAXNODES)
-                       break;
-
-               mapnodes[mapcount].uri = GDKstrdup(t);
-               mapnodes[mapcount].user = GDKstrdup("monetdb");
-               mapnodes[mapcount].pass = GDKstrdup("monetdb");
-               mapcount++;
-       }
-       BBPreleaseref(bid);
-
-       if (GDKnr_threads < mapcount)
-               GDKnr_threads = mapcount;
-
-       return(mapcount);
+       return (int) BATcount(cloud);
 }
 
-static void
-MRcleanCloud(int mapcount)
-{
-       int i;
 
-       for (i = 0; i < mapcount; i++) {
-               if (mapnodes[mapcount].uri != NULL)
-                       GDKfree(mapnodes[mapcount].uri);
-               if (mapnodes[mapcount].user != NULL)
-                       GDKfree(mapnodes[mapcount].user);
-               if (mapnodes[mapcount].pass != NULL)
-                       GDKfree(mapnodes[mapcount].pass);
-       }
-}
 
 typedef struct _mapcol {
        int val1;
@@ -331,7 +368,7 @@
        printFunction(cntxt->fdout, mb, 0, LIST_MAL_STMT);
 #endif
 
-       MRcleanCloud(n);
+       MRcleanCloud();
 }
 
 enum copymode { cNONE, FREE, STICK, SINGLE, SINGLE_DUP, DUP, LEAVE };
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to