Changeset: b69a43145c59 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b69a43145c59
Modified Files:
monetdb5/scheduler/srvpool.c
monetdb5/scheduler/srvpool.h
monetdb5/scheduler/srvpool.mal
Branch: default
Log Message:
Better handling failing workers.
A worker that reports errors upon (re)start is removed from the
candidate list.
diffs (161 lines):
diff --git a/monetdb5/scheduler/srvpool.c b/monetdb5/scheduler/srvpool.c
--- a/monetdb5/scheduler/srvpool.c
+++ b/monetdb5/scheduler/srvpool.c
@@ -30,7 +30,7 @@
*
* @verbatim
* barrier parallel:= scheduler.srvpool();
- * (s1,...,sn) := srvpool.server("queryplan");
+ * (s1,...,sn) := srvpool.query("queryplan");
* a:= user.S0_1_stub(s1,arg...);
* ...
* b:= srvpool.S0_1_stub(sn,arg...);
@@ -83,7 +83,7 @@ static Server servers[MAXSITES]; /* regi
static int srvtop = 0;
static int srvbaseline= 0;
static str srvpattern = NULL;
-static int localExecution= FALSE;
+static int localExecution= TRUE;
/*
* The partition optimizer requires the number of peers.
@@ -125,6 +125,7 @@ SRVPOOLnewServer(str uri)
return srvtop-1;
}
+
static int
SRVPOOLgetServer(str uri)
{
@@ -139,7 +140,8 @@ SRVPOOLgetServer(str uri)
/* Clean function registry of non-active servers */
-static void SRVPOOLcleanup(int i)
+static void
+SRVPOOLcleanup(int i)
{
Registry r, q;
r = servers[i].nxt;
@@ -152,6 +154,28 @@ static void SRVPOOLcleanup(int i)
servers[i].nxt = NULL;
}
+
+/* remove a server from the pool */
+static void
+SRVPOOLdropServer(int idx)
+{
+ int j;
+ SRVPOOLcleanup(idx);
+ if ( servers[idx].uri ) GDKfree(servers[idx].uri);
+ servers[idx].uri = NULL;
+ if ( servers[idx].usr ) GDKfree(servers[idx].usr);
+ servers[idx].pwd = NULL;
+ if ( servers[idx].pwd ) GDKfree(servers[idx].pwd);
+ servers[idx].pwd = NULL;
+ if ( servers[idx].conn ) GDKfree(servers[idx].conn);
+ servers[idx].conn = NULL;
+ for ( j = idx; j <srvtop -1; j++)
+ servers[j]= servers[idx + 1];
+ servers[j].uri = servers[j].usr = servers[j].pwd = servers[j].conn =
NULL;
+ servers[j].nxt = NULL;
+ srvtop --;
+}
+
/* logically disconnect from all servers */
static str
SRVPOOLdisconnect(Client cntxt)
@@ -213,7 +237,16 @@ SRVPOOLconnect(str *c, str *uri)
return msg;
}
-/* Look up the servers available for processing , guarantee a minimum number
of servers */
+/* switch local/remote execution */
+str
+SRVPOOLlocal(int *ret, int *flag)
+{
+ (void) ret;
+ localExecution= *flag != 0;
+ return MAL_SUCCEED;
+}
+
+/* Look up the servers available for processing */
static str
SRVPOOLdiscover(Client cntxt)
{
@@ -258,12 +291,16 @@ SRVPOOLdiscover(Client cntxt)
#ifdef DEBUG_RUN_SRVPOOL
mnstr_printf(cntxt->fdout,"#Worker site %d reports %s \n", j, msg);
#endif
- break;
+ /* Upon receiving an
initialization error, the entry should be ignored */
+ SRVPOOLdropServer(j);
+ GDKfree(msg);
+ msg = MAL_SUCCEED;
}
}
#ifdef DEBUG_RUN_SRVPOOL
- mnstr_printf(cntxt->fdout,"#Worker site %d
alias %s %s\n", j, (servers[j].conn?servers[j].conn:""), t);
+ else
+ mnstr_printf(cntxt->fdout,"#Worker site
%d alias %s %s\n", j, (servers[j].conn?servers[j].conn:""), t);
#endif
assert(srvtop <MAXSITES);
}
@@ -402,14 +439,16 @@ SRVPOOLscheduler(Client cntxt, MalBlkPtr
if ( localExecution) {
*res = -1; /* skip this one */
} else {
- SRVPOOLdiscover(cntxt);
+ /* only discover workers once */
+ if ( srvtop == 0)
+ SRVPOOLdiscover(cntxt);
*res = localExecution;
}
return MAL_SUCCEED;
}
str
-SRVPOOLserver(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+SRVPOOLquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
{
int i,j, fnd =0;
str plan = *(str*) getArgReference(stk,pci,pci->retc);
diff --git a/monetdb5/scheduler/srvpool.h b/monetdb5/scheduler/srvpool.h
--- a/monetdb5/scheduler/srvpool.h
+++ b/monetdb5/scheduler/srvpool.h
@@ -43,9 +43,10 @@
mpool_export str SRVPOOLscheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr p);
mpool_export str SRVPOOLexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr p);
mpool_export str SRVPOOLregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
-mpool_export str SRVPOOLserver(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
+mpool_export str SRVPOOLquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
mpool_export str SRVPOOLreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
mpool_export str SRVPOOLconnect(str *c, str *dbname);
+mpool_export str SRVPOOLlocal(int *res, int *flag);
mpool_export str SRVsetServers(Client cntxt, MalBlkPtr mb, MalStkPtr stk,
InstrPtr pci);
#endif /* MAL_RUN_SRVPOOL */
diff --git a/monetdb5/scheduler/srvpool.mal b/monetdb5/scheduler/srvpool.mal
--- a/monetdb5/scheduler/srvpool.mal
+++ b/monetdb5/scheduler/srvpool.mal
@@ -32,8 +32,8 @@ pattern srvpool.reset(dbname:str):str
address SRVPOOLreset
comment "Clear the server cache";
-pattern srvpool.server(plan:str):str...
-address SRVPOOLserver
+pattern srvpool.query(plan:str):str...
+address SRVPOOLquery
comment "Allocate a list of servers for query execution";
pattern scheduler.pattern( pat:str)
@@ -43,3 +43,7 @@ comment "Activate the peers matching the
pattern scheduler.peers( N:int)
address SRVsetServers
comment "Simulate N peers";
+
+command srvpool.local(b:bit)
+address SRVPOOLlocal
+comment "Change location from remote to local execution";
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list