Changeset: 11fbb5587d03 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=11fbb5587d03
Modified Files:
MonetDB5/src/mal/Tests/tst450.stable.out
MonetDB5/src/modules/mal/Tests/inspect05.stable.out
MonetDB5/src/optimizer/opt_mapreduce.mx
Branch: default
Log Message:
Minor additions to mapreduce
Keep the BAT with nodes for inspection
Cleanup the structure properly, instead of the non-existing last one.
Simple protection against concurrency.
diffs (229 lines):
diff -r 617161373486 -r 11fbb5587d03 MonetDB5/src/mal/Tests/tst450.stable.out
--- a/MonetDB5/src/mal/Tests/tst450.stable.out Fri May 21 13:51:19 2010 +0200
+++ b/MonetDB5/src/mal/Tests/tst450.stable.out Fri May 21 15:56:09 2010 +0200
@@ -119,6 +119,7 @@
[ "mal", nil ]
[ "manual", nil ]
[ "mapi", nil ]
+[ "mapreduce", nil ]
[ "master", nil ]
[ "mat", nil ]
[ "mbr", nil ]
diff -r 617161373486 -r 11fbb5587d03
MonetDB5/src/modules/mal/Tests/inspect05.stable.out
--- a/MonetDB5/src/modules/mal/Tests/inspect05.stable.out Fri May 21
13:51:19 2010 +0200
+++ b/MonetDB5/src/modules/mal/Tests/inspect05.stable.out Fri May 21
15:56:09 2010 +0200
@@ -1009,6 +1009,7 @@
[ "new", "command", "cluster",
"(b:bat[:oid,:bte],bits:int,offset:int)
(psum:bat[:oid,:wrd],map:bat[:oid,:wrd]) ",
"CLS_create_bte;" ]
[ "split", "pattern", "cluster",
"(clustered:bat[:oid,:any_1],psum:bat[:oid,:wrd]):bat[:oid,:any_1]... ",
"CLS_split;" ]
[ "table", "pattern", "cluster",
"(b:bat[:oid,:any]...):bat[:oid,:oid] ",
"CLUSTER_table;"
]
+[ "getCloud", "command", "mapreduce",
"(nme:str):bat[:oid,:str] ",
"MRgetCloud;"
]
[ "multiplex", "pattern", "mal",
"(a:any...):any ",
"OPTremapMultiplex;"
]
[ "argRecord", "pattern", "sql",
"(a:any...):str ",
"SQLargRecord;"
]
[ "argRecord", "pattern", "sql", "():str ",
"SQLargRecord;"
]
diff -r 617161373486 -r 11fbb5587d03 MonetDB5/src/optimizer/opt_mapreduce.mx
--- a/MonetDB5/src/optimizer/opt_mapreduce.mx Fri May 21 13:51:19 2010 +0200
+++ b/MonetDB5/src/optimizer/opt_mapreduce.mx Fri May 21 15:56:09 2010 +0200
@@ -20,6 +20,11 @@
@- Map-Reduce
The Map-Reduce infrastructure requires a little optimizer to turn
an arbitrary query into a plan to be executed on the elements of the Cloud.
+Each cloud consists of a series of named servers, managed by Merovingian
+with the pattern "*/Cloudname/node/*".
+
+Determining the clould is an expensive operation and for the time being
+performed each time when a query is compiled.
In the first implementation we don't optimize the plan against the mapping
scheme.
We simply assume that the complete query can be executed and that only the
@@ -108,14 +113,20 @@
return (Y201,Y202,Y203);
end exec;
@end verbatim
+...@{
The code can be considered a refinement of the Octopus.
-...@{
+The mal primitives are meant for debugging.
@mal
pattern optimizer.mapreduce():str
address OPTmapreduce;
pattern optimizer.mapreduce(mod:str, fcn:str):str
address OPTmapreduce
comment "Modify the plan to exploit parallel processing on multiple systems
using map-reduce";
+
+module mapreduce;
+command getCloud(nme:str):bat[:oid,:str]
+address MRgetCloud
+comment "Localize the elements of a named cloud";
@h
#ifndef _OPT_MAPREDUCE_
#define _OPT_MAPREDUCE_
@@ -123,6 +134,7 @@
#include "opt_support.h"
opt_export str MRexec(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci);
+opt_export str MRgetCloud(int *ret, str *nme);
@c
#include "mal_config.h"
#include "opt_mapreduce.h"
@@ -134,6 +146,7 @@
the number of nodes participating in the cloud setting.
It calls the map-reduce executor to produce a result
possible with the aid of a replica.
+The mapnode is an auxilary structure.
@c
typedef struct _mapnode {
@@ -142,68 +155,92 @@
str pass;
} mapnode;
-#define MAXNODES 256
-static mapnode mapnodes[MAXNODES];
+static mapnode *mapnodes;
+static BAT *cloud;
+
+static void
+MRcleanCloud()
+{
+ int i;
+
+ mal_set_lock(mal_contextLock,"mapreduce");
+ for (i = 0; mapnodes[i].uri; i++) {
+ if (mapnodes[i].uri != NULL)
+ GDKfree(mapnodes[i].uri);
+ if (mapnodes[i].user != NULL)
+ GDKfree(mapnodes[i].user);
+ if (mapnodes[i].pass != NULL)
+ GDKfree(mapnodes[i].pass);
+ mapnodes[i].uri = mapnodes[i].user = mapnodes[i].pass = 0;
+ }
+ if ( cloud)
+ BBPreleaseref(cloud->batCacheid);
+ cloud = 0;
+ mal_unset_lock(mal_contextLock,"mapreduce");
+}
+
+str
+MRgetCloud(int *ret, str *mrcluster)
+{
+ str msg;
+ BUN p, q;
+ BATiter bi;
+ char nodes[BUFSIZ];
+ char *n = nodes;
+ int mapcount = 0;
+
+ snprintf(nodes, sizeof(nodes), "*/%s/node/*", *mrcluster);
+
+ msg = RMTresolve(ret, &n);
+ if (msg )
+ return msg;
+
+ if ( cloud )
+ /* cleanup remains of previous call */
+ MRcleanCloud();
+
+ mal_set_lock(mal_contextLock,"mapreduce");
+ cloud = BATdescriptor(*ret); /* should succeed */
+ BBPkeepref(*ret); /* keep if during the session */
+
+ mapnodes = (mapnode*) GDKzalloc(sizeof(mapnode) * (BATcount(cloud) +1));
+ if ( mapnodes == 0){
+ BBPreleaseref(*ret);
+ throw(MAL,"mapreduce.getCloud", MAL_MALLOC_FAIL);
+ }
+
+ bi = bat_iterator(cloud);
+ BATloop(cloud, p, q) {
+ str t = (str)BUNtail(bi, p);
+ mapnodes[mapcount].uri = GDKstrdup(t);
+ mapnodes[mapcount].user = GDKstrdup("monetdb");
+ mapnodes[mapcount].pass = GDKstrdup("monetdb");
+ mapcount++;
+ }
+ if (GDKnr_threads < (int) BATcount(cloud))
+ GDKnr_threads = (int) BATcount(cloud);
+
+ mal_unset_lock(mal_contextLock,"mapreduce");
+ BBPkeepref(*ret); /* give it to the called */
+
+ return MAL_SUCCEED;
+}
static int
MRcloudSize(str mrcluster)
{
str msg;
- bat bid = 0;
- BAT *b;
- BUN p, q;
- BATiter bi;
- int mapcount = 0;
- char nodes[1024];
- char *n = nodes;
+ int bid;
- snprintf(nodes, sizeof(nodes), "*/%s/node/*", mrcluster);
-
- msg = RMTresolve(&bid, &n);
- if (msg != MAL_SUCCEED) {
- if (msg != M5OutOfMemory)
- GDKfree(msg);
- return(0);
+ msg = MRgetCloud(&bid, &mrcluster);
+ if ( msg ){
+ GDKfree(msg); /* bad programming */
+ return 0;
}
-
- b = BATdescriptor(bid);
- if (b == NULL)
- return(0);
-
- bi = bat_iterator(b);
- BATloop(b, p, q) {
- str t = (str)BUNtail(bi, p);
-
- if (mapcount == MAXNODES)
- break;
-
- mapnodes[mapcount].uri = GDKstrdup(t);
- mapnodes[mapcount].user = GDKstrdup("monetdb");
- mapnodes[mapcount].pass = GDKstrdup("monetdb");
- mapcount++;
- }
- BBPreleaseref(bid);
-
- if (GDKnr_threads < mapcount)
- GDKnr_threads = mapcount;
-
- return(mapcount);
+ return (int) BATcount(cloud);
}
-static void
-MRcleanCloud(int mapcount)
-{
- int i;
- for (i = 0; i < mapcount; i++) {
- if (mapnodes[mapcount].uri != NULL)
- GDKfree(mapnodes[mapcount].uri);
- if (mapnodes[mapcount].user != NULL)
- GDKfree(mapnodes[mapcount].user);
- if (mapnodes[mapcount].pass != NULL)
- GDKfree(mapnodes[mapcount].pass);
- }
-}
typedef struct _mapcol {
int val1;
@@ -331,7 +368,7 @@
printFunction(cntxt->fdout, mb, 0, LIST_MAL_STMT);
#endif
- MRcleanCloud(n);
+ MRcleanCloud();
}
enum copymode { cNONE, FREE, STICK, SINGLE, SINGLE_DUP, DUP, LEAVE };
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list