Changeset: d762ae2e83bd for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d762ae2e83bd
Added Files:
        monetdb5/mal/mal_resource.c
        monetdb5/mal/mal_resource.h
Modified Files:
        clients/mapiclient/tomograph.c
        monetdb5/ChangeLog
        monetdb5/mal/Makefile.ag
        monetdb5/mal/mal.h
        monetdb5/mal/mal_client.c
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/mal/mal_instruction.c
        monetdb5/mal/mal_instruction.h
        monetdb5/mal/mal_interpreter.c
        monetdb5/mal/mal_profiler.c
        monetdb5/modules/mal/language.c
        monetdb5/scheduler/run_octopus.c
        monetdb5/scheduler/srvpool.c
        sql/backends/monet5/UDF/Tests/udf-fuse.stable.out
        sql/backends/monet5/UDF/Tests/udf-reverse.stable.out
        sql/test/BugTracker-2009/Tests/POWER_vs_prod.SF-2596114.stable.out
        
sql/test/BugTracker-2010/Tests/LIMIT_OFFSET_big-endian.Bug-2622.stable.out
        
sql/test/BugTracker-2010/Tests/group-by_ordered_column.Bug-2564.stable.out
        sql/test/BugTracker-2011/Tests/func_iter_vs_bulk.Bug-2826.stable.out
        sql/test/BugTracker/Tests/explain.SF-1739353.stable.out
        sql/test/BugTracker/Tests/jdbc_no_debug.SF-1739356.stable.out
Branch: lodrdf
Log Message:

Merge with default branch


diffs (truncated from 2310 to 300 lines):

diff --git a/clients/mapiclient/tomograph.c b/clients/mapiclient/tomograph.c
--- a/clients/mapiclient/tomograph.c
+++ b/clients/mapiclient/tomograph.c
@@ -56,6 +56,7 @@
 
 #define COUNTERSDEFAULT "ISTestmrw"
 
+#define TOMOGRAPHPATTERN "tomograph start 2012"
 /* #define _DEBUG_TOMOGRAPH_*/
 
 static struct {
@@ -131,6 +132,7 @@ static int beat= 50;
 static Mapi dbh = NULL;
 static MapiHdl hdl = NULL;
 static int batch = 1; /* number of queries to combine in one run */
+static int startup= 0; /* count openStream calls first */
 static long maxio=0;
 static int cpus = 0;
 
@@ -215,6 +217,8 @@ static void activateBeat(void){
        char *id ="activateBeat";
        snprintf(buf, BUFSIZ, "profiler.activate(\"ping%d\");\n",beat);
        doQ(buf);
+       snprintf(buf, BUFSIZ, "io.print(\"%s\");\n",TOMOGRAPHPATTERN);
+       doQ(buf);
        return;
 stop_disconnect:
        mapi_disconnect(dbh);
@@ -1006,7 +1010,7 @@ static void update(int state, int thread
                return;
        }
 
-       if (state == 1 && fcn && (strncmp(fcn,"function",8) == 0 || 
strncmp(fcn,"profiler.tomograph",18) == 0 )){
+       if (state == 1 && fcn && (strncmp(fcn,"function",8) == 0 || 
strncmp(fcn,"profiler.tomograph",18) == 0)  && startup > 1 ){
                deactivateBeat();
                createTomogram();
                totalclkticks= 0; /* number of clock ticks reported */
@@ -1150,6 +1154,12 @@ static void parser(char *row){
        c = strchr(c+1, (int)',');
        c++;
        fcn = c;
+       if (fcn && strstr(fcn, TOMOGRAPHPATTERN) ){
+               startup++;      // start counting 
+               if (debug)
+                       printf("Found start marker\n");
+               if ( startup == 2 ) batch++;
+       }
        stmt = strdup(fcn);
        c = strstr(c+1, ":=");
        if ( c ){
diff --git a/monetdb5/ChangeLog b/monetdb5/ChangeLog
--- a/monetdb5/ChangeLog
+++ b/monetdb5/ChangeLog
@@ -1,3 +1,9 @@
 # ChangeLog file for MonetDB5
 # This file is updated with Maddlog
 
+* Wed Oct  3 2012 Martin Kersten <[email protected]>
+- The scheduler of mserver5 was changed to use a fixed set of workers to
+  perform the work for all connected clients.  Previously, each client
+  connection had its own set of workers, easily causing resource problems
+  upon multiple connections to the server.
+
diff --git a/monetdb5/mal/Makefile.ag b/monetdb5/mal/Makefile.ag
--- a/monetdb5/mal/Makefile.ag
+++ b/monetdb5/mal/Makefile.ag
@@ -40,6 +40,7 @@ lib_mal = {
                mal_import.c mal_import.h \
                mal_runtime.c mal_runtime.h \
                mal_instruction.c mal_instruction.h \
+               mal_resource.c mal_resource.h \
                mal_interpreter.c mal_interpreter.h \
                mal_dataflow.c mal_dataflow.h \
                mal_linker.c mal_linker.h \
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -46,7 +46,6 @@
  */
 #define MAXSCRIPT 64
 #define MEMORY_THRESHOLD  0.8
-#define DELAYUNIT 100 /* ms delay in parallel processing decissions */
 
 mal_export char     monet_cwd[PATHLENGTH];
 mal_export size_t      monet_memory;
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -76,7 +76,6 @@ void
 MCinit(void)
 {
        char *max_clients = GDKgetenv("max_clients");
-       int threads = GDKnr_threads;
        int maxclients = 0;
 
        if (max_clients != NULL)
@@ -88,8 +87,7 @@ MCinit(void)
 
        MAL_MAXCLIENTS =
                /* console */ 1 +
-               /* client connections */ maxclients +
-               /* workers per client */ (maxclients * threads);
+               /* client connections */ maxclients;
        mal_clients = GDKzalloc(sizeof(ClientRec) * MAL_MAXCLIENTS);
 }
 
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -36,63 +36,53 @@
  * access it mostly without expensive locking.
  */
 #include "mal_dataflow.h"
+
 #define DFLOWpending 0         /* runnable */
 #define DFLOWrunning 1         /* currently in progress */
 #define DFLOWwrapup  2         /* done! */
 #define DFLOWretry   3         /* reschedule */
+#define DFLOWskipped 4         /* due to errors */
+
+/* The per instruction status of execution */
+typedef struct FLOWEVENT {
+       struct DATAFLOW *flow;/* execution context */
+       int pc;         /* pc in underlying malblock */
+       int blocks;     /* awaiting for variables */
+       sht state;      /* of execution */
+       lng clk;
+       sht cost;
+       lng hotclaim;   /* memory foot print of result variables */
+       lng argclaim;   /* memory foot print of arguments */
+} *FlowEvent, FlowEventRec;
 
 typedef struct queue {
        int size;       /* size of queue */
        int last;       /* last element in the queue */
-       void **data;
+       FlowEvent *data;
        MT_Lock l;      /* its a shared resource, ie we need locks */
        MT_Sema s;      /* threads wait on empty queues */
 } queue;
 
-
 /*
  * The dataflow dependency is administered in a graph list structure.
  * For each instruction we keep the list of instructions that
  * should be checked for eligibility once we are finished with it.
  */
-typedef struct {
-       MT_Id tid;
-       int id;
-       queue *todo;            /* pending actions for this client */
-       lng clk;
-       struct DataFlow *flow;
-} FlowTask;
-
-typedef struct FLOWSTATUS {
+typedef struct DATAFLOW {
        Client cntxt;   /* for debugging and client resolution */
        MalBlkPtr mb;   /* carry the context */
        MalStkPtr stk;
-       int pc;         /* pc in underlying malblock */
-       int blocks;     /* awaiting for variables */
-       sht state;      /* of execution */
-       sht cost;
-       lng hotclaim;   /* memory foot print of result variables */
-       lng argclaim;   /* memory foot print of arguments */
-       str error;
-} *FlowStatus, FlowStatusRec;
-
-typedef struct DataFlow {
-    int start, stop;    /* guarded block under consideration*/
-    FlowStatus status;  /* status of each instruction */
-    int *nodes;         /* dependency graph nodes */
-    int *edges;         /* dependency graph */
-    queue *done;        /* work finished */
-    queue *todo;        /* pending actions for this client */
-    int    nway;        /* number of workers */
-    FlowTask *worker;   /* worker threads for the client */
-    struct DataFlow *free; /* free list */
-    int terminate;      /* set if we need to terminate */
-    MT_Lock termlock;   /* lock to protect the above */
+       int start, stop;    /* guarded block under consideration*/
+       FlowEvent status;   /* status of each instruction */
+       str error;          /* error encountered */
+       int *nodes;         /* dependency graph nodes */
+       int *edges;         /* dependency graph */
+       MT_Lock flowlock;   /* lock to protect the above */
+       queue *done;        /* instructions handled */
 } *DataFlow, DataFlowRec;
 
-/* does not seem to have a major impact */
-lng memorypool = 0;      /* memory claimed by concurrent threads */
-int memoryclaims = 0;    /* number of threads active with expensive operations 
*/
+static MT_Id workers[THREADS];
+static queue *todo = 0;        /* pending instructions */
 
 /*
  * Calculate the size of the dataflow dependency graph.
@@ -109,122 +99,6 @@ DFLOWgraphSize(MalBlkPtr mb, int start, 
 }
 
 /*
- * Running all eligible instructions in parallel creates
- * resource contention. This means we should implement
- * an admission control scheme where threads are temporarily
- * postponed if the claim for memory exceeds a threshold
- * In general such contentions will be hard to predict,
- * because they depend on the algorithm, the input sizes,
- * concurrent use of the same variables, and the output produced.
- *
- * The heuristic is based on calculating the storage footprint
- * of the operands and assuming it preferrably should fit in memory.
- * Ofcourse, there may be intermediate structures being
- * used and the size of the result is not a priori known.
- * For this, we use a high watermark on the amount of
- * physical memory we pre-allocate for the claims.
- *
- * Instructions are eligible to be executed when the
- * total footprint of all concurrent executions stays below
- * the high-watermark or it is the single expensive
- * instruction being started.
- *
- * When we run out of memory, the instruction is delayed.
- * How long depends on the other instructions to free up
- * resources. The current policy simple takes a local
- * decision by delaying the instruction based on its
- * past and the size of the memory pool size.
- * The waiting penalty decreases with each step to ensure
- * it will ultimately taken into execution, with possibly
- * all resource contention effects.
- *
- * Another option would be to maintain a priority queue of
- * suspended instructions.
- */
-
-/*
- * The memory claim is the estimate for the amount of memory hold.
- * Views are consider cheap and ignored
- */
-lng
-getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag)
-{
-       lng total = 0, vol = 0;
-       BAT *b;
-
-       (void)mb;
-       if (stk->stk[getArg(pci, i)].vtype == TYPE_bat) {
-               b = BATdescriptor(stk->stk[getArg(pci, i)].val.bval);
-               if (b == NULL)
-                       return 0;
-               if (flag && isVIEW(b)) {
-                       BBPunfix(b->batCacheid);
-                       return 0;
-               }
-               heapinfo(&b->H->heap); total += vol;
-               heapinfo(b->H->vheap); total += vol;
-               hashinfo(b->H->hash); total += vol;
-
-               heapinfo(&b->T->heap); total += vol;
-               heapinfo(b->T->vheap); total += vol;
-               hashinfo(b->T->hash); total += vol;
-               if ( b->T->hash == 0  || b->H->hash ==0)        /* assume one 
hash claim */
-                       total+= BATcount(b) * sizeof(lng);
-               total = total > (lng)(MEMORY_THRESHOLD * monet_memory) ? 
(lng)(MEMORY_THRESHOLD * monet_memory) : total;
-               BBPunfix(b->batCacheid);
-       }
-       return total;
-}
-
-/*
- * The hotclaim indicates the amount of data recentely written.
- * as a result of an operation. The argclaim is the sum over the hotclaims
- * for all arguments.
- * The argclaim provides a hint on how much we actually may need to execute
- * The hotclaim is a hint how large the result would be.
- */
-#ifdef USE_DFLOW_ADMISSION
-/* experiments on sf-100 on small machine showed no real improvement */
-int
-DFLOWadmission(lng argclaim, lng hotclaim)
-{
-       /* optimistically set memory */
-       if (argclaim == 0)
-               return 0;
-
-       MT_lock_set(&mal_contextLock, "DFLOWdelay");
-       if (memoryclaims < 0)
-               memoryclaims = 0;
-       if (memorypool <= 0 && memoryclaims == 0)
-               memorypool = (lng)(MEMORY_THRESHOLD * monet_memory);
-
-       if (argclaim > 0) {
-               if (memoryclaims == 0 || memorypool > argclaim + hotclaim) {
-                       memorypool -= (argclaim + hotclaim);
-                       memoryclaims++;
-                       PARDEBUG
-                       mnstr_printf(GDKstdout, "#DFLOWadmit %3d thread %d pool 
" LLFMT "claims " LLFMT "," LLFMT "\n",
-                                                memoryclaims, THRgettid(), 
memorypool, argclaim, hotclaim);
-                       MT_lock_unset(&mal_contextLock, "DFLOWdelay");
-                       return 0;
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to