Changeset: 0166c4155d05 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=0166c4155d05
Added Files:
        MonetDB5/mal/mal_dataflow.c
Branch: tracer
Log Message:

Revert "Reverted to 71063:8ed9e57e46cc"

This reverts commit 9070164a3dc6bd5c977cb517384f32d86193858d.


diffs (truncated from 1006 to 300 lines):

diff --git a/MonetDB5/mal/mal_dataflow.c b/MonetDB5/mal/mal_dataflow.c
new file mode 100644
--- /dev/null
+++ b/MonetDB5/mal/mal_dataflow.c
@@ -0,0 +1,1001 @@
+/*
+ * This Source Code Form is subject to the terms of the Mozilla Public
+ * License, v. 2.0.  If a copy of the MPL was not distributed with this
+ * file, You can obtain one at http://mozilla.org/MPL/2.0/.
+ *
+ * Copyright 1997 - July 2008 CWI, August 2008 - 2019 MonetDB B.V.
+ */
+
+/*
+ * (author) M Kersten
+ * Out of order execution
+ * The alternative is to execute the instructions out of order
+ * using dataflow dependencies and as an independent process.
+ * Dataflow processing only works on a code
+ * sequence that does not include additional (implicit) flow of control
+ * statements and, ideally, consist of expensive BAT operations.
+ * The dataflow interpreter selects cheap instructions
+ * using a simple costfunction based on the size of the BATs involved.
+ *
+ * The dataflow portion is identified as a guarded block,
+ * whose entry is controlled by the function language.dataflow();
+ * This way the function can inform the caller to skip the block
+ * when dataflow execution was performed.
+ *
+ * The flow graphs should be organized such that parallel threads can
+ * access it mostly without expensive locking.
+ */
+#include "monetdb_config.h"
+#include "mal_dataflow.h"
+#include "mal_private.h"
+#include "mal_runtime.h"
+#include "mal_resource.h"
+
+#define DFLOWpending 0         /* runnable */
+#define DFLOWrunning 1         /* currently in progress */
+#define DFLOWwrapup  2         /* done! */
+#define DFLOWretry   3         /* reschedule */
+#define DFLOWskipped 4         /* due to errors */
+
+/* The per instruction status of execution */
+typedef struct FLOWEVENT {
+       struct DATAFLOW *flow;/* execution context */
+       int pc;         /* pc in underlying malblock */
+       int blocks;     /* awaiting for variables */
+       sht state;      /* of execution */
+       lng clk;
+       sht cost;
+       lng hotclaim;   /* memory foot print of result variables */
+       lng argclaim;   /* memory foot print of arguments */
+       lng maxclaim;   /* memory foot print of  largest argument, counld be 
used to indicate result size */
+} *FlowEvent, FlowEventRec;
+
+typedef struct queue {
+       int size;       /* size of queue */
+       int last;       /* last element in the queue */
+       int exitcount;  /* how many threads should exit */
+       FlowEvent *data;
+       MT_Lock l;      /* it's a shared resource, ie we need locks */
+       MT_Sema s;      /* threads wait on empty queues */
+} Queue;
+
+/*
+ * The dataflow dependency is administered in a graph list structure.
+ * For each instruction we keep the list of instructions that
+ * should be checked for eligibility once we are finished with it.
+ */
+typedef struct DATAFLOW {
+       Client cntxt;   /* for debugging and client resolution */
+       MalBlkPtr mb;   /* carry the context */
+       MalStkPtr stk;
+       int start, stop;    /* guarded block under consideration*/
+       FlowEvent status;   /* status of each instruction */
+       ATOMIC_PTR_TYPE error;          /* error encountered */
+       int *nodes;         /* dependency graph nodes */
+       int *edges;         /* dependency graph */
+       MT_Lock flowlock;   /* lock to protect the above */
+       Queue *done;        /* instructions handled */
+} *DataFlow, DataFlowRec;
+
+static struct worker {
+       MT_Id id;
+       enum {IDLE, RUNNING, JOINING, EXITED} flag;
+       ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any) */
+       MT_Sema s;
+} workers[THREADS];
+
+static Queue *todo = 0;        /* pending instructions */
+
+static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
+static MT_Lock dataflowLock = MT_LOCK_INITIALIZER("dataflowLock");
+static void stopMALdataflow(void);
+
+void
+mal_dataflow_reset(void)
+{
+       stopMALdataflow();
+       memset((char*) workers, 0,  sizeof(workers));
+       if( todo) {
+               GDKfree(todo->data);
+               MT_lock_destroy(&todo->l);
+               MT_sema_destroy(&todo->s);
+               GDKfree(todo);
+       }
+       todo = 0;       /* pending instructions */
+       ATOMIC_SET(&exiting, 0);
+}
+
+/*
+ * Calculate the size of the dataflow dependency graph.
+ */
+static int
+DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
+{
+       int cnt = 0;
+       int i;
+
+       for (i = start; i < stop; i++)
+               cnt += getInstrPtr(mb, i)->argc;
+       return cnt;
+}
+
+/*
+ * The dataflow execution is confined to a barrier block.
+ * Within the block there are multiple flows, which, in principle,
+ * can be executed in parallel.
+ */
+
+static Queue*
+q_create(int sz, const char *name)
+{
+       Queue *q = (Queue*)GDKmalloc(sizeof(Queue));
+
+       if (q == NULL)
+               return NULL;
+       *q = (Queue) {
+               .size = ((sz << 1) >> 1), /* we want a multiple of 2 */
+       };
+       q->data = (FlowEvent*) GDKmalloc(sizeof(FlowEvent) * q->size);
+       if (q->data == NULL) {
+               GDKfree(q);
+               return NULL;
+       }
+
+       MT_lock_init(&q->l, name);
+       MT_sema_init(&q->s, 0, name);
+       return q;
+}
+
+static void
+q_destroy(Queue *q)
+{
+       assert(q);
+       MT_lock_destroy(&q->l);
+       MT_sema_destroy(&q->s);
+       GDKfree(q->data);
+       GDKfree(q);
+}
+
+/* keep a simple LIFO queue. It won't be a large one, so shuffles of requeue 
is possible */
+/* we might actually sort it for better scheduling behavior */
+static void
+q_enqueue_(Queue *q, FlowEvent d)
+{
+       assert(q);
+       assert(d);
+       if (q->last == q->size) {
+               q->size <<= 1;
+               q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * 
q->size);
+               assert(q->data);
+       }
+       q->data[q->last++] = d;
+}
+static void
+q_enqueue(Queue *q, FlowEvent d)
+{
+       assert(q);
+       assert(d);
+       MT_lock_set(&q->l);
+       q_enqueue_(q, d);
+       MT_lock_unset(&q->l);
+       MT_sema_up(&q->s);
+}
+
+/*
+ * A priority queue over the hot claims of memory may
+ * be more effective. It priorizes those instructions
+ * that want to use a big recent result
+ */
+
+#ifdef USE_MAL_ADMISSION
+static void
+q_requeue_(Queue *q, FlowEvent d)
+{
+       int i;
+
+       assert(q);
+       assert(d);
+       if (q->last == q->size) {
+               /* enlarge buffer */
+               q->size <<= 1;
+               q->data = (FlowEvent*) GDKrealloc(q->data, sizeof(FlowEvent) * 
q->size);
+               assert(q->data);
+       }
+       for (i = q->last; i > 0; i--)
+               q->data[i] = q->data[i - 1];
+       q->data[0] = d;
+       q->last++;
+}
+static void
+q_requeue(Queue *q, FlowEvent d)
+{
+       assert(q);
+       assert(d);
+       MT_lock_set(&q->l);
+       q_requeue_(q, d);
+       MT_lock_unset(&q->l);
+       MT_sema_up(&q->s);
+}
+#endif
+
+static FlowEvent
+q_dequeue(Queue *q, Client cntxt)
+{
+       FlowEvent r = NULL, s = NULL;
+       //int i;
+
+       assert(q);
+       MT_sema_down(&q->s);
+       if (ATOMIC_GET(&exiting))
+               return NULL;
+       MT_lock_set(&q->l);
+       if (cntxt) {
+               int i, minpc = -1;
+
+               for (i = q->last - 1; i >= 0; i--) {
+                       if (q->data[i]->flow->cntxt == cntxt) {
+                               if (q->last > 1024) {
+                                       /* for long "queues", just grab the 
first eligible
+                                        * entry we encounter */
+                                       minpc = i;
+                                       break;
+                               }
+                               /* for shorter "queues", find the oldest 
eligible entry */
+                               if (minpc < 0) {
+                                       minpc = i;
+                                       s = q->data[i];
+                               }
+                               r = q->data[i];
+                               if (s && r && s->pc > r->pc) {
+                                       minpc = i;
+                                       s = r;
+                               }
+                       }
+               }
+               if (minpc >= 0) {
+                       r = q->data[minpc];
+                       i = minpc;
+                       q->last--;
+                       memmove(q->data + i, q->data + i + 1, (q->last - i) * 
sizeof(q->data[0]));
+               }
+
+               MT_lock_unset(&q->l);
+               return r;
+       }
+       if (q->exitcount > 0) {
+               q->exitcount--;
+               MT_lock_unset(&q->l);
+               return NULL;
+       }
+       assert(q->last > 0);
+       if (q->last > 0) {
+               /* LIFO favors garbage collection */
+               r = q->data[--q->last];
+/*  Line coverage test shows it is an expensive loop that is hardly ever leads 
to adjustment
+               for(i= q->last-1; r &&  i>=0; i--){
+                       s= q->data[i];
+                       if( s && s->flow && s->flow->stk &&
+                           r && r->flow && r->flow->stk &&
+                           s->flow->stk->tag < r->flow->stk->tag){
+                               q->data[i]= r;
+                               r = s;
+                       }
+               }
+*/
+               q->data[q->last] = 0;
+       }
+       /* else: terminating */
+       /* try out random draw *
+       {
+               int i;
+               i = rand() % q->last;
+               r = q->data[i];
+               for (i++; i < q->last; i++)
+                       q->data[i - 1] = q->data[i];
+               q->last--; i
_______________________________________________
checkin-list mailing list
checkin-list@monetdb.org
https://www.monetdb.org/mailman/listinfo/checkin-list

Reply via email to