Changeset: 6bfbe5ed1bf6 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=6bfbe5ed1bf6
Added Files:
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_dataflow.h
        monetdb5/mal/mal_interpreter.c
        monetdb5/mal/mal_interpreter.h
Removed Files:
        monetdb5/mal/mal_interpreter.mx
Modified Files:
        monetdb5/mal/Makefile.ag
        monetdb5/mal/mal_profiler.c
        monetdb5/modules/mal/language.mx
        monetdb5/scheduler/run_octopus.c
        monetdb5/scheduler/srvpool.c
Branch: default
Log Message:

De-Mx the mal interpreter
Another step towards old-fashioned c-code files


diffs (truncated from 5615 to 300 lines):

diff --git a/monetdb5/mal/Makefile.ag b/monetdb5/mal/Makefile.ag
--- a/monetdb5/mal/Makefile.ag
+++ b/monetdb5/mal/Makefile.ag
@@ -40,7 +40,8 @@ lib_mal = {
                mal_import.c mal_import.h \
                mal_runtime.c mal_runtime.h \
                mal_instruction.c mal_instruction.h \
-               mal_interpreter.mx \
+               mal_interpreter.c mal_interpreter.h \
+               mal_dataflow.c mal_dataflow.h \
                mal_linker.c mal_linker.h \
                mal_listing.c mal_listing.h \
                mal_module.c mal_module.h \
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
new file mode 100644
--- /dev/null
+++ b/monetdb5/mal/mal_dataflow.c
@@ -0,0 +1,1238 @@
+/*
+ * The contents of this file are subject to the MonetDB Public License
+ * Version 1.1 (the "License"); you may not use this file except in
+ * compliance with the License. You may obtain a copy of the License at
+ * http://www.monetdb.org/Legal/MonetDBLicense
+ * 
+ * Software distributed under the License is distributed on an "AS IS"
+ * basis, WITHOUT WARRANTY OF ANY KIND, either express or implied. See the
+ * License for the specific language governing rights and limitations
+ * under the License.
+ * 
+ * The Original Code is the MonetDB Database System.
+ * 
+ * The Initial Developer of the Original Code is CWI.
+ * Portions created by CWI are Copyright (C) 1997-July 2008 CWI.
+ * Copyright August 2008-2012 MonetDB B.V.
+ * All Rights Reserved.
+*/
+
+/*
+ * Out of order execution
+ * The alternative is to execute the instructions out of order
+ * using dataflow dependencies and as an independent process.
+ * Dataflow processing only works on a code
+ * sequence that does not include additional (implicit) flow of control
+ * statements and, ideally, consist of expensive BAT operations.
+ * The dataflow interpreter selects cheap instructions
+ * using a simple costfunction based on the size of the BATs involved.
+ *
+ * The dataflow portion is identified as a guarded block,
+ * whose entry is controlled by the function language.dataflow();
+ * This way the function can inform the caller to skip the block
+ * when dataflow execution was performed.
+ *
+ * The flow graphs should be organized such that parallel threads can
+ * access it mostly without expensive locking.
+ */
+#include "mal_dataflow.h"
+#define DFLOWpending 0         /* runnable */
+#define DFLOWrunning 1         /* currently in progress */
+#define DFLOWwrapup  2         /* done! */
+#define DFLOWretry   3         /* reschedule */
+
+typedef struct queue {
+       int size;       /* size of queue */
+       int last;       /* last element in the queue */
+       void **data;
+       MT_Lock l;      /* its a shared resource, ie we need locks */
+       MT_Sema s;      /* threads wait on empty queues */
+} queue;
+
+
+/*
+ * The dataflow dependency is administered in a graph list structure.
+ * For each instruction we keep the list of instructions that
+ * should be checked for eligibility once we are finished with it.
+ */
+typedef struct {
+       MT_Id tid;
+       int id;
+       queue *todo;            /* pending actions for this client */
+       lng clk;
+       struct DataFlow *flow;
+} FlowTask;
+
+typedef struct FLOWSTATUS {
+       Client cntxt;           /* for debugging and client resolution */
+       MalBlkPtr mb;           /* carry the context */
+       MalStkPtr stk;
+       int pc;                 /* pc in underlying malblock */
+       int blocks;     /* awaiting for variables */
+       sht state;              /* of execution */
+       sht cost;
+       lng hotclaim;   /* memory foot print of result variables */
+       lng argclaim;   /* memory foot print of arguments */
+       str error;
+} *FlowStatus, FlowStatusRec;
+
+typedef struct DataFlow {
+       int start, stop;        /* guarded block under consideration*/
+       FlowStatus status;              /* status of each instruction */
+       int *nodes;                     /* dependency graph nodes */
+       int *edges;                     /* dependency graph */
+       queue *done;            /* work finished */
+       queue *todo;            /* pending actions for this client */
+       int    nway;            /* number of workers */
+       FlowTask *worker;       /* worker threads for the client */
+       struct DataFlow *free;  /* free list */
+} *DataFlow, DataFlowRec;
+
+/* does not seem to have a major impact */
+lng memorypool = 0;      /* memory claimed by concurrent threads */
+int memoryclaims = 0;    /* number of threads active with expensive operations 
*/
+
+/*
+ * Calculate the size of the dataflow dependency graph.
+ */
+static int
+DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
+{
+       int cnt = 0;
+       int i;
+
+       for (i = start; i < stop; i++)
+               cnt += getInstrPtr(mb, i)->argc;
+       return cnt;
+}
+
+/*
+ * Running all eligible instructions in parallel creates
+ * resource contention. This means we should implement
+ * an admission control scheme where threads are temporarily
+ * postponed if the claim for memory exceeds a threshold
+ * In general such contentions will be hard to predict,
+ * because they depend on the algorithm, the input sizes,
+ * concurrent use of the same variables, and the output produced.
+ *
+ * The heuristic is based on calculating the storage footprint
+ * of the operands and assuming it preferrably should fit in memory.
+ * Ofcourse, there may be intermediate structures being
+ * used and the size of the result is not a priori known.
+ * For this, we use a high watermark on the amount of
+ * physical memory we pre-allocate for the claims.
+ *
+ * Instructions are eligible to be executed when the
+ * total footprint of all concurrent executions stays below
+ * the high-watermark or it is the single expensive
+ * instruction being started.
+ *
+ * When we run out of memory, the instruction is delayed.
+ * How long depends on the other instructions to free up
+ * resources. The current policy simple takes a local
+ * decision by delaying the instruction based on its
+ * past and the size of the memory pool size.
+ * The waiting penalty decreases with each step to ensure
+ * it will ultimately taken into execution, with possibly
+ * all resource contention effects.
+ *
+ * Another option would be to maintain a priority queue of
+ * suspended instructions.
+ */
+
+/*
+ * The memory claim is the estimate for the amount of memory hold.
+ * Views are consider cheap and ignored
+*/
+lng
+getMemoryClaim(MalBlkPtr mb, MalStkPtr stk, InstrPtr pci, int i, int flag)
+{
+       lng total = 0, vol = 0;
+       BAT *b;
+
+       (void)mb;
+       if (stk->stk[getArg(pci, i)].vtype == TYPE_bat) {
+               b = BATdescriptor(stk->stk[getArg(pci, i)].val.bval);
+               if (b == NULL)
+                       return 0;
+               if (flag && isVIEW(b)) {
+                       BBPunfix(b->batCacheid);
+                       return 0;
+               }
+               heapinfo(&b->H->heap); total += vol;
+               heapinfo(b->H->vheap); total += vol;
+               hashinfo(b->H->hash); total += vol;
+
+               heapinfo(&b->T->heap); total += vol;
+               heapinfo(b->T->vheap); total += vol;
+               hashinfo(b->T->hash); total += vol;
+               if ( b->T->hash == 0  || b->H->hash ==0)        /* assume one 
hash claim */
+                       total+= BATcount(b) * sizeof(lng);
+               total = total > (lng)(MEMORY_THRESHOLD * monet_memory) ? 
(lng)(MEMORY_THRESHOLD * monet_memory) : total;
+               BBPunfix(b->batCacheid);
+       }
+       return total;
+}
+
+/*
+ * The hotclaim indicates the amount of data recentely written.
+ * as a result of an operation. The argclaim is the sum over the hotclaims
+ * for all arguments.
+ * The argclaim provides a hint on how much we actually may need to execute
+ * The hotclaim is a hint how large the result would be.
+ */
+#ifdef USE_DFLOW_ADMISSION
+/* experiments on sf-100 on small machine showed no real improvement
+*/
+int
+DFLOWadmission(lng argclaim, lng hotclaim)
+{
+       /* optimistically set memory */
+       if (argclaim == 0)
+               return 0;
+
+       mal_set_lock(mal_contextLock, "DFLOWdelay");
+       if (memoryclaims < 0)
+               memoryclaims = 0;
+       if (memorypool <= 0 && memoryclaims == 0)
+               memorypool = (lng)(MEMORY_THRESHOLD * monet_memory);
+
+       if (argclaim > 0) {
+               if (memoryclaims == 0 || memorypool > argclaim + hotclaim) {
+                       memorypool -= (argclaim + hotclaim);
+                       memoryclaims++;
+                       PARDEBUG
+                       mnstr_printf(GDKstdout, "#DFLOWadmit %3d thread %d pool 
" LLFMT "claims " LLFMT "," LLFMT "\n",
+                                                memoryclaims, THRgettid(), 
memorypool, argclaim, hotclaim);
+                       mal_unset_lock(mal_contextLock, "DFLOWdelay");
+                       return 0;
+               }
+               PARDEBUG
+               mnstr_printf(GDKstdout, "#Delayed due to lack of memory " LLFMT 
" requested " LLFMT " memoryclaims %d\n", memorypool, argclaim + hotclaim, 
memoryclaims);
+               mal_unset_lock(mal_contextLock, "DFLOWdelay");
+               return -1;
+       }
+       /* release memory claimed before */
+       memorypool += -argclaim - hotclaim;
+       memoryclaims--;
+       PARDEBUG
+       mnstr_printf(GDKstdout, "#DFLOWadmit %3d thread %d pool " LLFMT " 
claims " LLFMT "," LLFMT "\n",
+                                memoryclaims, THRgettid(), memorypool, 
argclaim, hotclaim);
+       mal_unset_lock(mal_contextLock, "DFLOWdelay");
+       return 0;
+}
+#endif
+
+/*
+ * The dataflow execution is confined to a barrier block.
+ * Within the block there are multiple flows, which, in principle,
+ * can be executed in parallel.
+ */
+
+static queue*
+q_create(int sz)
+{
+       queue *q = (queue*)GDKmalloc(sizeof(queue));
+
+       if (q == NULL)
+               return NULL;
+       q->size = ((sz << 1) >> 1); /* we want a multiple of 2 */
+       q->last = 0;
+       q->data = (void*)GDKmalloc(sizeof(void*) * q->size);
+       if (q->data == NULL) {
+               GDKfree(q);
+               return NULL;
+       }
+
+       MT_lock_init(&q->l, "q_create");
+       MT_sema_init(&q->s, 0, "q_create");
+       return q;
+}
+
+/*
+static void
+q_destroy(queue *q)
+{
+       GDKfree(q->data);
+       GDKfree(q);
+}
+*/
+
+/* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue 
is possible */
+/* we might actually sort it for better scheduling behavior */
+static void
+q_enqueue_(queue *q, FlowStatus d)
+{
+       if (q->last == q->size) {
+               /* enlarge buffer */
+               q->size <<= 1;
+               q->data = GDKrealloc(q->data, sizeof(void*) * q->size);
+       }
+       q->data[q->last++] = (void*)d;
+}
+static void
+q_enqueue(queue *q, FlowStatus d)
+{
+       MT_lock_set(&q->l, "q_enqueue");
+       q_enqueue_(q, d);
+       MT_lock_unset(&q->l, "q_enqueue");
+       MT_sema_up(&q->s, "q_enqueue");
+}
+
+/*
_______________________________________________
Checkin-list mailing list
[email protected]
http://mail.monetdb.org/mailman/listinfo/checkin-list

Reply via email to