Changeset: d762ae2e83bd for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=d762ae2e83bd
Added Files:
monetdb5/mal/mal_resource.c
monetdb5/mal/mal_resource.h
Modified Files:
clients/mapiclient/tomograph.c
monetdb5/ChangeLog
monetdb5/mal/Makefile.ag
monetdb5/mal/mal.h
monetdb5/mal/mal_client.c
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_dataflow.h
monetdb5/mal/mal_instruction.c
monetdb5/mal/mal_instruction.h
monetdb5/mal/mal_interpreter.c
monetdb5/mal/mal_profiler.c
monetdb5/modules/mal/language.c
monetdb5/scheduler/run_octopus.c
monetdb5/scheduler/srvpool.c
sql/backends/monet5/UDF/Tests/udf-fuse.stable.out
sql/backends/monet5/UDF/Tests/udf-reverse.stable.out
sql/test/BugTracker-2009/Tests/POWER_vs_prod.SF-2596114.stable.out
sql/test/BugTracker-2010/Tests/LIMIT_OFFSET_big-endian.Bug-2622.stable.out
sql/test/BugTracker-2010/Tests/group-by_ordered_column.Bug-2564.stable.out
sql/test/BugTracker-2011/Tests/func_iter_vs_bulk.Bug-2826.stable.out
sql/test/BugTracker/Tests/explain.SF-1739353.stable.out
sql/test/BugTracker/Tests/jdbc_no_debug.SF-1739356.stable.out
Branch: lodrdf
Log Message:
Merge with default branch
diffs (truncated from 2310 to 300 lines):
diff --git a/clients/mapiclient/tomograph.c b/clients/mapiclient/tomograph.c
--- a/clients/mapiclient/tomograph.c
+++ b/clients/mapiclient/tomograph.c
@@ -56,6 +56,7 @@
#define COUNTERSDEFAULT "ISTestmrw"
+#define TOMOGRAPHPATTERN "tomograph start 2012"
/* #define _DEBUG_TOMOGRAPH_*/
static struct {
@@ -131,6 +132,7 @@ static int beat= 50;
static Mapi dbh = NULL;
static MapiHdl hdl = NULL;
static int batch = 1; /* number of queries to combine in one run */
+static int startup= 0; /* count openStream calls first */
static long maxio=0;
static int cpus = 0;
@@ -215,6 +217,8 @@ static void activateBeat(void){
char *id ="activateBeat";
snprintf(buf, BUFSIZ, "profiler.activate(\"ping%d\");\n",beat);
doQ(buf);
+ snprintf(buf, BUFSIZ, "io.print(\"%s\");\n",TOMOGRAPHPATTERN);
+ doQ(buf);
return;
stop_disconnect:
mapi_disconnect(dbh);
@@ -1006,7 +1010,7 @@ static void update(int state, int thread
return;
}
- if (state == 1 && fcn && (strncmp(fcn,"function",8) == 0 ||
strncmp(fcn,"profiler.tomograph",18) == 0 )){
+ if (state == 1 && fcn && (strncmp(fcn,"function",8) == 0 ||
strncmp(fcn,"profiler.tomograph",18) == 0) && startup > 1 ){
deactivateBeat();
createTomogram();
totalclkticks= 0; /* number of clock ticks reported */
@@ -1150,6 +1154,12 @@ static void parser(char *row){
c = strchr(c+1, (int)',');
c++;
fcn = c;
+ if (fcn && strstr(fcn, TOMOGRAPHPATTERN) ){
+ startup++; // start counting
+ if (debug)
+ printf("Found start marker\n");
+ if ( startup == 2 ) batch++;
+ }
stmt = strdup(fcn);
c = strstr(c+1, ":=");
if ( c ){
diff --git a/monetdb5/ChangeLog b/monetdb5/ChangeLog
--- a/monetdb5/ChangeLog
+++ b/monetdb5/ChangeLog
@@ -1,3 +1,9 @@
# ChangeLog file for MonetDB5
# This file is updated with Maddlog
+* Wed Oct 3 2012 Martin Kersten <[email protected]>
+- The scheduler of mserver5 was changed to use a fixed set of workers to
+ perform the work for all connected clients. Previously, each client
+ connection had its own set of workers, easily causing resource problems
+ upon multiple connections to the server.
+
diff --git a/monetdb5/mal/Makefile.ag b/monetdb5/mal/Makefile.ag
--- a/monetdb5/mal/Makefile.ag
+++ b/monetdb5/mal/Makefile.ag
@@ -40,6 +40,7 @@ lib_mal = {
mal_import.c mal_import.h \
mal_runtime.c mal_runtime.h \
mal_instruction.c mal_instruction.h \
+ mal_resource.c mal_resource.h \
mal_interpreter.c mal_interpreter.h \
mal_dataflow.c mal_dataflow.h \
mal_linker.c mal_linker.h \
diff --git a/monetdb5/mal/mal.h b/monetdb5/mal/mal.h
--- a/monetdb5/mal/mal.h
+++ b/monetdb5/mal/mal.h
@@ -46,7 +46,6 @@
*/
#define MAXSCRIPT 64
#define MEMORY_THRESHOLD 0.8
-#define DELAYUNIT 100 /* ms delay in parallel processing decissions */
mal_export char monet_cwd[PATHLENGTH];
mal_export size_t monet_memory;
diff --git a/monetdb5/mal/mal_client.c b/monetdb5/mal/mal_client.c
--- a/monetdb5/mal/mal_client.c
+++ b/monetdb5/mal/mal_client.c
@@ -76,7 +76,6 @@ void
MCinit(void)
{
char *max_clients = GDKgetenv("max_clients");
- int threads = GDKnr_threads;
int maxclients = 0;
if (max_clients != NULL)
@@ -88,8 +87,7 @@ MCinit(void)
MAL_MAXCLIENTS =
/* console */ 1 +
- /* client connections */ maxclients +
- /* workers per client */ (maxclients * threads);
+ /* client connections */ maxclients;
mal_clients = GDKzalloc(sizeof(ClientRec) * MAL_MAXCLIENTS);
}
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -36,63 +36,53 @@
* access it mostly without expensive locking.
*/
#include "mal_dataflow.h"
+
#define DFLOWpending 0 /* runnable */
#define DFLOWrunning 1 /* currently in progress */
#define DFLOWwrapup 2 /* done! */
#define DFLOWretry 3 /* reschedule */
+#define DFLOWskipped 4 /* due to errors */
+
+/* The per instruction status of execution */
+typedef struct FLOWEVENT {
+ struct DATAFLOW *flow;/* execution context */
+ int pc; /* pc in underlying malblock */
+ int blocks; /* awaiting for variables */
+ sht state; /* of execution */
+ lng clk;
+ sht cost;
+ lng hotclaim; /* memory foot print of result variables */
+ lng argclaim; /* memory foot print of arguments */
+} *FlowEvent, FlowEventRec;
typedef struct queue {
int size; /* size of queue */
int last; /* last element in the queue */
- void **data;
+ FlowEvent *data;
MT_Lock l; /* its a shared resource, ie we need locks */
MT_Sema s; /* threads wait on empty queues */
} queue;
-
/*
* The dataflow dependency is administered in a graph list structure.
* For each instruction we keep the list of instructions that
* should be checked for eligibility once we are finished with it.
*/
-typedef struct {
- MT_Id tid;
- int id;
- queue *todo; /* pending actions for this client */
- lng clk;
- struct DataFlow *flow;
-} FlowTask;
-
-typedef struct FLOWSTATUS {
+typedef struct DATAFLOW {
Client cntxt; /* for debugging and client resolution */
MalBlkPtr mb; /* carry the context */
MalStkPtr stk;
- int pc; /* pc in underlying malblock */
- int blocks; /* awaiting for variables */
- sht state; /* of execution */
- sht cost;
- lng hotclaim; /* memory foot print of result variables */
- lng argclaim; /* memory foot print of arguments */
- str error;
-} *FlowStatus, FlowStatusRec;
-
-typedef struct DataFlow {
- int start, stop; /* guarded block under consideration*/
- FlowStatus status; /* status of each instruction */
- int *nodes; /* dependency graph nodes */
- int *edges; /* dependency graph */
- queue *done; /* work finished */
- queue *todo; /* pending actions for this client */
- int nway; /* number of workers */
- FlowTask *worker; /* worker threads for the client */
- struct DataFlow *free; /* free list */
- int terminate; /* set if we need to terminate */
- MT_Lock termlock; /* lock to protect the above */
+ int start, stop; /* guarded block under consideration*/
+ FlowEvent status; /* status of each instruction */
+ str error; /* error encountered */
+ int *nodes; /* dependency graph nodes */
+ int *edges; /* dependency graph */
+ MT_Lock flowlock; /* lock to protect the above */
+ queue *done; /* instructions handled */
} *DataFlow, DataFlowRec;
-/* does not seem to have a major impact */
-lng memorypool = 0; /* memory claimed by concurrent threads */
-int memoryclaims = 0; /* number of threads active with expensive operations
*/
+static MT_Id workers[THREADS];
+static queue *todo = 0; /* pending instructions */
/*
* Calculate the size of the dataflow dependency graph.
@@ -109,122 +99,6 @@ DFLOWgraphSize(MalBlkPtr mb, int start,
}
/*
- * Running all eligible instructions in parallel creates
- * resource contention. This means we should implement
- * an admission control scheme where threads are temporarily
- * postponed if the claim for memory exceeds a threshold
- * In general such contentions will be hard to predict,
- * because they depend on the algorithm, the input sizes,
- * concurrent use of the same variables, and the output produced.
- *
- * The heuristic is based on calculating the storage footprint
- * of the operands and assuming it preferrably should fit in memory.
- * Ofcourse, there may be intermediate structures being
- * used and the size of the result is not a priori known.
- * For this, we use a high watermark on the amount of
- * physical memory we pre-allocate for the claims.
- *
- * Instructions are eligible to be executed when the
- * total footprint of all concurrent executions stays below
- * the high-watermark or it is the single expensive
- * instruction being started.
- *
- * When we run out of memory, the instruction is delayed.
- * How long depends on the other instructions to free up
- * resources. The current policy simple takes a local
- * decision by delaying the instruction based on its
- * past and the size of the memory pool size.
- * The waiting penalty decreases with each step to ensure
- * it will ultimately taken into execution, with possibly
- * all resource contention effects.
- *
- * Another option would be to maintain a priority queue of
- * suspended instructions.
- */
-
-/*
- * The memory claim is the estimate for the amount of memory hold.
- * Views are consider cheap and ignored
- */
-lng
-getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag)
-{
- lng total = 0, vol = 0;
- BAT *b;
-
- (void)mb;
- if (stk->stk[getArg(pci, i)].vtype == TYPE_bat) {
- b = BATdescriptor(stk->stk[getArg(pci, i)].val.bval);
- if (b == NULL)
- return 0;
- if (flag && isVIEW(b)) {
- BBPunfix(b->batCacheid);
- return 0;
- }
- heapinfo(&b->H->heap); total += vol;
- heapinfo(b->H->vheap); total += vol;
- hashinfo(b->H->hash); total += vol;
-
- heapinfo(&b->T->heap); total += vol;
- heapinfo(b->T->vheap); total += vol;
- hashinfo(b->T->hash); total += vol;
- if ( b->T->hash == 0 || b->H->hash ==0) /* assume one
hash claim */
- total+= BATcount(b) * sizeof(lng);
- total = total > (lng)(MEMORY_THRESHOLD * monet_memory) ?
(lng)(MEMORY_THRESHOLD * monet_memory) : total;
- BBPunfix(b->batCacheid);
- }
- return total;
-}
-
-/*
- * The hotclaim indicates the amount of data recentely written.
- * as a result of an operation. The argclaim is the sum over the hotclaims
- * for all arguments.
- * The argclaim provides a hint on how much we actually may need to execute
- * The hotclaim is a hint how large the result would be.
- */
-#ifdef USE_DFLOW_ADMISSION
-/* experiments on sf-100 on small machine showed no real improvement */
-int
-DFLOWadmission(lng argclaim, lng hotclaim)
-{
- /* optimistically set memory */
- if (argclaim == 0)
- return 0;
-
- MT_lock_set(&mal_contextLock, "DFLOWdelay");
- if (memoryclaims < 0)
- memoryclaims = 0;
- if (memorypool <= 0 && memoryclaims == 0)
- memorypool = (lng)(MEMORY_THRESHOLD * monet_memory);
-
- if (argclaim > 0) {
- if (memoryclaims == 0 || memorypool > argclaim + hotclaim) {
- memorypool -= (argclaim + hotclaim);
- memoryclaims++;
- PARDEBUG
- mnstr_printf(GDKstdout, "#DFLOWadmit %3d thread %d pool
" LLFMT "claims " LLFMT "," LLFMT "\n",
- memoryclaims, THRgettid(),
memorypool, argclaim, hotclaim);
- MT_lock_unset(&mal_contextLock, "DFLOWdelay");
- return 0;
_______________________________________________
checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list