Changeset: b3cb4b442795 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b3cb4b442795
Removed Files:
monetdb5/optimizer/opt_mapreduce.c
monetdb5/optimizer/opt_mapreduce.h
Modified Files:
monetdb5/optimizer/Makefile.ag
monetdb5/optimizer/opt_support.c
monetdb5/optimizer/opt_support.h
monetdb5/optimizer/opt_wrapper.c
monetdb5/optimizer/optimizer.mal
Branch: default
Log Message:
Drop experimental mapreduce optimizer
It is based on old fashioned MAL code structures and will
be superseeded by the merge/remote tables approach.
diffs (truncated from 1093 to 300 lines):
diff --git a/monetdb5/optimizer/Makefile.ag b/monetdb5/optimizer/Makefile.ag
--- a/monetdb5/optimizer/Makefile.ag
+++ b/monetdb5/optimizer/Makefile.ag
@@ -48,7 +48,6 @@ lib_optimizer = {
opt_inline.c opt_inline.h \
opt_joinpath.c opt_joinpath.h \
opt_macro.c opt_macro.h \
- opt_mapreduce.c opt_mapreduce.h \
opt_matpack.c opt_matpack.h \
opt_json.c opt_json.h \
opt_mergetable.c opt_mergetable.h \
diff --git a/monetdb5/optimizer/opt_mapreduce.c
b/monetdb5/optimizer/opt_mapreduce.c
deleted file mode 100644
--- a/monetdb5/optimizer/opt_mapreduce.c
+++ /dev/null
@@ -1,971 +0,0 @@
-/*
- * The contents of this file are subject to the MonetDB Public License
- * Version 1.1 (the "License"); you may not use this file except in
- * compliance with the License. You may obtain a copy of the License at
- * http://www.monetdb.org/Legal/MonetDBLicense
- *
- * Software distributed under the License is distributed on an "AS IS"
- * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
- * License for the specific language governing rights and limitations
- * under the License.
- *
- * The Original Code is the MonetDB Database System.
- *
- * The Initial Developer of the Original Code is CWI.
- * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
- * Copyright August 2008-2015 MonetDB B.V.
- * All Rights Reserved.
- */
-
-/*
-@a F. Groffen, M. Kersten
-@- Map-Reduce
-The Map-Reduce infrastructure requires a little optimizer to turn
-an arbitrary query into a plan to be executed on the systems in the cloud.
-Each cloud consists of a series of named servers, managed by Merovingian
-with the pattern "cloudname/node//". The cloudname is detected from
-the schema in which an SQL table is stored. Only schemas starting with
-"mr_" are considered to be mapreduce schemas on the query node. The
-cloudname is the schema name without the leading "mr_" prefix.
-#
-Determining the clould is an expensive operation and for the time being
-performed each time when a query is compiled.
-#
-In the first implementation we don't optimize the plan against the mapping
scheme.
-We simply assume that the complete query can be executed and that only the
-result sets should be assembled.
-*/
-#include "monetdb_config.h"
-#include "opt_mapreduce.h"
-#include "mal_interpreter.h"
-#include "remote.h"
-
-typedef struct _mapnode {
- str uri;
- str user;
- str pass;
-} mapnode;
-
-static mapnode *mapnodes;
-
-static void
-MRcleanCloud(void)
-{
- int i;
-
- MT_lock_set(&mal_contextLock, "mapreduce");
- for (i = 0; mapnodes[i].uri; i++) {
- if (mapnodes[i].uri != NULL)
- GDKfree(mapnodes[i].uri);
- if (mapnodes[i].user != NULL)
- GDKfree(mapnodes[i].user);
- if (mapnodes[i].pass != NULL)
- GDKfree(mapnodes[i].pass);
- mapnodes[i].uri = mapnodes[i].user = mapnodes[i].pass = 0;
- }
- MT_lock_unset(&mal_contextLock, "mapreduce");
-}
-
-str
-MRgetCloud(bat *ret, str *mrcluster)
-{
- str msg;
- BAT *cloud;
- BUN p, q;
- BATiter bi;
- char nodes[BUFSIZ];
- char *n = nodes;
- int mapcount = 0;
-
- snprintf(nodes, sizeof(nodes), "*/%s/node/*", *mrcluster);
-
- if ((msg = RMTresolve(ret, &n)) != MAL_SUCCEED)
- return msg;
-
- MT_lock_set(&mal_contextLock, "mapreduce");
- cloud = BATdescriptor(*ret); /* should succeed */
- if (cloud == NULL)
- throw(MAL, "mapreduce.getCloud", RUNTIME_OBJECT_MISSING);
-
- mapnodes = (mapnode*)GDKzalloc(sizeof(mapnode) * (BATcount(cloud) + 1));
- if (mapnodes == NULL) {
- BBPunfix(*ret);
- throw(MAL, "mapreduce.getCloud", MAL_MALLOC_FAIL);
- }
-
- bi = bat_iterator(cloud);
- BATloop(cloud, p, q) {
- str t = (str)BUNtail(bi, p);
- mapnodes[mapcount].uri = GDKstrdup(t);
- mapnodes[mapcount].user = GDKstrdup("monetdb");
- mapnodes[mapcount].pass = GDKstrdup("monetdb");
- mapcount++;
- }
-
- BBPkeepref(*ret); /* we're done, keep for caller */
- cloud = NULL;
- MT_lock_unset(&mal_contextLock, "mapreduce");
-
- return MAL_SUCCEED;
-}
-
-static int
-MRcloudSize(str mrcluster)
-{
- str msg;
- int bid;
- BAT *cloud;
- int cnt;
-
- msg = MRgetCloud(&bid, &mrcluster);
- if (msg) {
- GDKfree(msg); /* bad programming */
- return 0;
- }
- cloud = BATdescriptor(bid);
- if (cloud == NULL)
- return 0;
- cnt = (int)BATcount(cloud);
- BBPunfix(bid); /* we're done with it */
- return(cnt);
-}
-
-
-enum poper { pBAT = 1, SUM, MAX, MIN, SORT, SORTDESC, LIMIT };
-
-typedef struct _mapcol {
- int mapid; /* var in map plan that is in its signature
- and return */
- int reduceid; /* original column var in reduce program we
- eventually need to replace */
- int type; /* type of the map plan var */
- int mapbat; /* the var that is a BAT containing all values
- returned from map nodes (function), can only be
- used *after* MRdistributework */
- enum poper postop;/* the operation that needs to be performed on
- mapbat to turn it into reduceid */
- struct _mapcol *next;
-} mapcol;
-
-static void
-MRdistributework(
- Client cntxt,
- MalBlkPtr reduce,
- mapcol *col,
- InstrPtr sig,
- str mrcluster)
-{
- InstrPtr o, p = NULL, *packs;
- int i, n, j, q, v, retc;
- int *gets, *w;
- mapcol *lcol;
- (void)cntxt;
-
- n = MRcloudSize(mrcluster);
-
- assert(n);
- assert(col);
-
- retc = 0;
- for (lcol = col; lcol != NULL; lcol = lcol->next)
- retc++;
-
- assert(retc);
-
- packs = (InstrPtr *)GDKmalloc(retc * sizeof(InstrPtr));
- gets = (int *)GDKmalloc(n * retc * sizeof(int));
- w = (int *)GDKmalloc(retc * sizeof(int));
-
- for (lcol = col, j = 0; lcol != NULL; lcol = lcol->next, j++) {
- /* define and create the container bat for all results from the
- * map nodes */
- packs[j] = p = newFcnCall(reduce, batRef, newRef);
- if (isaBatType(lcol->type)) {
- p = pushType(reduce, p, getHeadType(lcol->type));
- p = pushType(reduce, p, getColumnType(lcol->type));
- setArgType(reduce, p, 0, lcol->type);
- } else {
- p = pushNil(reduce, p, TYPE_void);
- p = pushType(reduce, p, lcol->type);
- setArgType(reduce, p, 0, newBatType(TYPE_void,
lcol->type));
- }
- lcol->mapbat = getArg(p, 0);
-
- /* we need to declare the variables that we will use with put,
- * exec and get */
- for (i = 0; i < n; i++) {
- if (isaBatType(lcol->type)) {
- p = newFcnCall(reduce, batRef, newRef);
- p = pushType(reduce, p,
getHeadType(lcol->type));
- p = pushType(reduce, p,
getColumnType(lcol->type));
- } else {
- p = newAssignment(reduce);
- p = pushNil(reduce, p, lcol->type);
- }
- setArgType(reduce, p, 0, lcol->type);
- gets[(i * retc) + j] = getArg(p, 0);
- }
- }
-
- for (i = 0; i < n; i++) {
- /* q := remote.connect("uri", "user", "pass"); */
- p = newStmt(reduce, remoteRef, connectRef);
- p = pushStr(reduce, p, mapnodes[i].uri);
- p = pushStr(reduce, p, mapnodes[i].user);
- p = pushStr(reduce, p, mapnodes[i].pass);
- p = pushStr(reduce, p, "msql");
- q = getArg(p, 0);
-
- /* remote.register(q, "mod", "fcn"); */
- p = newStmt(reduce, remoteRef, putName("register", 8));
- p = pushArgument(reduce, p, q);
- p = pushStr(reduce, p, getModuleId(sig));
- p = pushStr(reduce, p, getFunctionId(sig));
-
- /* (x1, x2, ..., xn) := remote.exec(q, "mod", "fcn"); */
- p = newInstruction(reduce, ASSIGNsymbol);
- setModuleId(p, remoteRef);
- setFunctionId(p, execRef);
- p = pushArgument(reduce, p, q);
- p = pushStr(reduce, p, getModuleId(sig));
- p = pushStr(reduce, p, getFunctionId(sig));
- for (j = 0; j < retc; j++) {
- /* x1 := remote.put(q, :type) */
- o = newFcnCall(reduce, remoteRef, putRef);
- o = pushArgument(reduce, o, q);
- o = pushArgument(reduce, o, gets[(i * retc) + j]);
- v = getArg(o, 0);
- p = pushReturn(reduce, p, v);
- w[j] = v;
- }
- for (j = sig->retc; j < sig->argc; j++) {
- /* x1 := remote.put(q, A0); */
- o = newStmt(reduce, remoteRef, putRef);
- o = pushArgument(reduce, o, q);
- o = pushArgument(reduce, o, getArg(sig, j));
- p = pushArgument(reduce, p, getArg(o, 0));
- }
- pushInstruction(reduce, p);
-
- /* y1 := remote.get(q, x1); */
- for (j = 0; j < retc; j++) {
- p = newFcnCall(reduce, remoteRef, getRef);
- p = pushArgument(reduce, p, q);
- p = pushArgument(reduce, p, w[j]);
- getArg(p, 0) = gets[(i * retc) + j];
- }
-
- /* remote.disconnect(q); */
- p = newStmt(reduce, remoteRef, disconnectRef);
- p = pushArgument(reduce, p, q);
- }
-
- /* delayed bat.inserts for easily creating a deterministic flow */
- for (lcol = col, j = 0; lcol != NULL; lcol = lcol->next, j++) {
- q = lcol->mapbat;
- /* p := bat.insert(b, y1) */
- for (i = 0; i < n; i++) {
- p = newStmt(reduce, batRef, insertRef);
- p = pushArgument(reduce, p, q);
- if (!isaBatType(lcol->type))
- p = pushNil(reduce, p, TYPE_void);
- p = pushArgument(reduce, p, gets[(i * retc) + j]);
- q = getArg(p, 0);
- }
-
- lcol->mapbat = getArg(p, 0);
-
- /* We must deliver here the variables (reduceid) that the rest
- * of the reduce plan uses in such a way that they can deal with
- * it. Since this code is ran at latest (after the possible
- * optimisations are known) the optimisation code cannot know
- * what vars come out of here (in particular mapbat), so it must
- * be able to rely on what it knows (reduceid). */
- switch (lcol->postop) {
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list