Changeset: b69a43145c59 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=b69a43145c59
Modified Files:
        monetdb5/scheduler/srvpool.c
        monetdb5/scheduler/srvpool.h
        monetdb5/scheduler/srvpool.mal
Branch: default
Log Message:

Better handling failing workers.
A worker that reports errors upon (re)start is removed from the
candidate list.


diffs (161 lines):

diff --git a/monetdb5/scheduler/srvpool.c b/monetdb5/scheduler/srvpool.c
--- a/monetdb5/scheduler/srvpool.c
+++ b/monetdb5/scheduler/srvpool.c
@@ -30,7 +30,7 @@
  *
  * @verbatim
  * barrier parallel:= scheduler.srvpool();
- * (s1,...,sn) :=  srvpool.server("queryplan");
+ * (s1,...,sn) :=  srvpool.query("queryplan");
  * a:= user.S0_1_stub(s1,arg...);
  * ...
  * b:= srvpool.S0_1_stub(sn,arg...);
@@ -83,7 +83,7 @@ static Server servers[MAXSITES];      /* regi
 static int srvtop = 0;
 static int srvbaseline= 0;
 static str srvpattern = NULL;
-static int localExecution= FALSE;
+static int localExecution= TRUE;
 
 /* 
  * The partition optimizer requires the number of peers.
@@ -125,6 +125,7 @@ SRVPOOLnewServer(str uri)
        return srvtop-1;
 }
 
+
 static int
 SRVPOOLgetServer(str uri)
 {
@@ -139,7 +140,8 @@ SRVPOOLgetServer(str uri)
 
 /* Clean function registry of non-active servers */
 
-static void SRVPOOLcleanup(int i)
+static void
+SRVPOOLcleanup(int i)
 {
        Registry r, q;
        r = servers[i].nxt;
@@ -152,6 +154,28 @@ static void SRVPOOLcleanup(int i)
        servers[i].nxt = NULL;
 }
 
+
+/* remove a server from the pool */
+static void
+SRVPOOLdropServer(int idx)
+{
+       int j;
+       SRVPOOLcleanup(idx);
+       if ( servers[idx].uri ) GDKfree(servers[idx].uri);
+       servers[idx].uri = NULL;
+       if ( servers[idx].usr ) GDKfree(servers[idx].usr);
+       servers[idx].pwd = NULL;
+       if ( servers[idx].pwd ) GDKfree(servers[idx].pwd);
+       servers[idx].pwd = NULL;
+       if ( servers[idx].conn ) GDKfree(servers[idx].conn);
+       servers[idx].conn = NULL;
+       for ( j = idx; j <srvtop -1; j++)
+               servers[j]= servers[idx + 1];
+       servers[j].uri = servers[j].usr = servers[j].pwd = servers[j].conn =  
NULL;
+       servers[j].nxt = NULL;
+       srvtop --;
+}
+
 /* logically disconnect from all servers */
 static str
 SRVPOOLdisconnect(Client cntxt)
@@ -213,7 +237,16 @@ SRVPOOLconnect(str *c, str *uri)
        return msg;
 }
 
-/* Look up the servers available for processing , guarantee a minimum number 
of servers */
+/* switch local/remote execution */
+str
+SRVPOOLlocal(int *ret, int *flag)
+{
+       (void) ret;
+       localExecution= *flag != 0;
+       return MAL_SUCCEED;
+}
+
+/* Look up the servers available for processing */
 static str
 SRVPOOLdiscover(Client cntxt)
 {
@@ -258,12 +291,16 @@ SRVPOOLdiscover(Client cntxt)
 #ifdef DEBUG_RUN_SRVPOOL
                                                
mnstr_printf(cntxt->fdout,"#Worker site %d reports %s \n", j, msg);
 #endif
-                                               break;
+                                               /* Upon receiving an 
initialization error, the entry should be ignored */
+                                               SRVPOOLdropServer(j);
+                                               GDKfree(msg);
+                                               msg = MAL_SUCCEED;
                                        }
                                }
 
 #ifdef DEBUG_RUN_SRVPOOL
-                               mnstr_printf(cntxt->fdout,"#Worker site %d 
alias %s %s\n", j, (servers[j].conn?servers[j].conn:""), t);
+                               else 
+                                       mnstr_printf(cntxt->fdout,"#Worker site 
%d alias %s %s\n", j, (servers[j].conn?servers[j].conn:""), t);
 #endif
                                assert(srvtop <MAXSITES);
                        }
@@ -402,14 +439,16 @@ SRVPOOLscheduler(Client cntxt, MalBlkPtr
        if ( localExecution) {
                *res = -1; /* skip this one */
        } else {
-               SRVPOOLdiscover(cntxt);
+               /* only discover workers once */
+               if ( srvtop == 0)
+                       SRVPOOLdiscover(cntxt);
                *res = localExecution;
        }
        return MAL_SUCCEED;
 }
 
 str
-SRVPOOLserver(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
+SRVPOOLquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, InstrPtr pci)
 {
        int i,j, fnd =0;
        str plan = *(str*) getArgReference(stk,pci,pci->retc);
diff --git a/monetdb5/scheduler/srvpool.h b/monetdb5/scheduler/srvpool.h
--- a/monetdb5/scheduler/srvpool.h
+++ b/monetdb5/scheduler/srvpool.h
@@ -43,9 +43,10 @@
 mpool_export str SRVPOOLscheduler(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr p);
 mpool_export str SRVPOOLexec(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr p);
 mpool_export str SRVPOOLregister(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
-mpool_export str SRVPOOLserver(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
+mpool_export str SRVPOOLquery(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 mpool_export str SRVPOOLreset(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 mpool_export str SRVPOOLconnect(str *c, str *dbname);
+mpool_export str SRVPOOLlocal(int *res, int *flag);
 mpool_export str SRVsetServers(Client cntxt, MalBlkPtr mb, MalStkPtr stk, 
InstrPtr pci);
 #endif /* MAL_RUN_SRVPOOL */
 
diff --git a/monetdb5/scheduler/srvpool.mal b/monetdb5/scheduler/srvpool.mal
--- a/monetdb5/scheduler/srvpool.mal
+++ b/monetdb5/scheduler/srvpool.mal
@@ -32,8 +32,8 @@ pattern srvpool.reset(dbname:str):str
 address SRVPOOLreset
 comment "Clear the server cache";
 
-pattern srvpool.server(plan:str):str...
-address SRVPOOLserver
+pattern srvpool.query(plan:str):str...
+address SRVPOOLquery
 comment "Allocate a list of servers for query execution";
 
 pattern scheduler.pattern( pat:str)
@@ -43,3 +43,7 @@ comment "Activate the peers matching the
 pattern scheduler.peers( N:int)
 address SRVsetServers
 comment "Simulate N peers";
+
+command srvpool.local(b:bit)
+address SRVPOOLlocal
+comment "Change location from remote to local execution";
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to