Changeset: eb0a130ae7a2 for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/eb0a130ae7a2
Modified Files:
        monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:

Reworked allocation code in the dataflow.


diffs (truncated from 401 to 300 lines):

diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -80,20 +80,19 @@ typedef struct DATAFLOW {
        bool set_qry_ctx;
 } *DataFlow, DataFlowRec;
 
-static struct worker {
+struct worker {
        MT_Id id;
-       enum { IDLE, WAITING, RUNNING, FREE, EXITED } flag;
+       enum { WAITING, RUNNING, FREE, EXITED } flag;
        ATOMIC_PTR_TYPE cntxt;          /* client we do work for (NULL -> any) 
*/
-       char *errbuf;                           /* GDKerrbuf so that we can 
allocate before fork */
        MT_Sema s;
-       int self;
-       int next;
-} workers[THREADS];
-/* heads of two mutually exclusive linked lists, both using the .next
+       struct worker *next;
+       char errbuf[GDKMAXERRLEN];      /* GDKerrbuf so that we can allocate 
before fork */
+};
+/* heads of three mutually exclusive linked lists, all using the .next
  * field in the worker struct */
-static int exited_workers = -1;        /* to be joined threads */
-static int idle_workers = -1;  /* idle workers (no thread associated) */
-static int free_workers = -1;  /* free workers (thread doing nothing) */
+static struct worker *workers;           /* "working" workers */
+static struct worker *exited_workers; /* to be joined threads (.flag==EXITED) 
*/
+static struct worker *free_workers;    /* free workers (.flag==FREE) */
 static int free_count = 0;             /* number of free threads */
 static int free_max = 0;               /* max number of spare free threads */
 
@@ -101,23 +100,6 @@ static Queue *todo = 0;                    /* pending ins
 
 static ATOMIC_TYPE exiting = ATOMIC_VAR_INIT(0);
 static MT_Lock dataflowLock = MT_LOCK_INITIALIZER(dataflowLock);
-static void stopMALdataflow(void);
-
-void
-mal_dataflow_reset(void)
-{
-       stopMALdataflow();
-       memset((char *) workers, 0, sizeof(workers));
-       idle_workers = -1;
-       exited_workers = -1;
-       if (todo) {
-               MT_lock_destroy(&todo->l);
-               MT_sema_destroy(&todo->s);
-               GDKfree(todo);
-       }
-       todo = 0;                                       /* pending instructions 
*/
-       ATOMIC_SET(&exiting, 0);
-}
 
 /*
  * Calculate the size of the dataflow dependency graph.
@@ -126,9 +108,8 @@ static int
 DFLOWgraphSize(MalBlkPtr mb, int start, int stop)
 {
        int cnt = 0;
-       int i;
 
-       for (i = start; i < stop; i++)
+       for (int i = start; i < stop; i++)
                cnt += getInstrPtr(mb, i)->argc;
        return cnt;
 }
@@ -267,10 +248,8 @@ DFLOWworker(void *T)
 #ifdef _MSC_VER
        srand((unsigned int) GDKusec());
 #endif
-       assert(t->errbuf != NULL);
-       t->errbuf[0] = 0;
        GDKsetbuf(t->errbuf);           /* where to leave errors */
-       t->errbuf = NULL;
+       snprintf(t->s.name, sizeof(t->s.name), "DFLOWsema%04zu", MT_getpid());
 
        for (;;) {
                DataFlow flow;
@@ -437,24 +416,32 @@ DFLOWworker(void *T)
                        break;
                }
                if (free_count >= free_max) {
+                       struct worker **tp = &workers;
+                       while (*tp && *tp != t)
+                               tp = &(*tp)->next;
+                       assert(*tp && *tp == t);
+                       *tp = t->next;
                        t->flag = EXITED;
                        t->next = exited_workers;
-                       exited_workers = t->self;
+                       exited_workers = t;
                        MT_lock_unset(&dataflowLock);
                        break;
                }
                free_count++;
+               struct worker **tp = &workers;
+               while (*tp && *tp != t)
+                       tp = &(*tp)->next;
+               assert(*tp && *tp == t);
+               *tp = t->next;
                t->flag = FREE;
-               assert(free_workers != t->self);
                t->next = free_workers;
-               free_workers = t->self;
+               free_workers = t;
                MT_lock_unset(&dataflowLock);
                MT_sema_down(&t->s);
                if (GDKexiting() || ATOMIC_GET(&exiting))
                        break;
                assert(t->flag == WAITING);
        }
-       GDKfree(GDKerrbuf);
        GDKsetbuf(NULL);
 }
 
@@ -468,9 +455,8 @@ DFLOWworker(void *T)
 static int
 DFLOWinitialize(void)
 {
-       int i, limit;
+       int limit;
        int created = 0;
-       static bool first = true;
 
        MT_lock_set(&mal_contextLock);
        MT_lock_set(&dataflowLock);
@@ -488,44 +474,27 @@ DFLOWinitialize(void)
                MT_lock_unset(&mal_contextLock);
                return -1;
        }
-       assert(idle_workers == -1);
-       for (i = 0; i < THREADS; i++) {
-               char name[MT_NAME_LEN];
-               snprintf(name, sizeof(name), "DFLOWsema%d", i);
-               MT_sema_init(&workers[i].s, 0, name);
-               workers[i].flag = IDLE;
-               workers[i].self = i;
-               workers[i].id = 0;
-               workers[i].next = idle_workers;
-               idle_workers = i;
-               if (first)                              /* only initialize once 
*/
-                       ATOMIC_PTR_INIT(&workers[i].cntxt, NULL);
-       }
-       first = false;
        limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
-       if (limit > THREADS)
-               limit = THREADS;
        while (limit > 0) {
                limit--;
-               i = idle_workers;
-               workers[i].errbuf = GDKmalloc(GDKMAXERRLEN);
-               if (workers[i].errbuf == NULL) {
-                       TRC_CRITICAL(MAL_SERVER, "cannot allocate error buffer 
for worker");
+               struct worker *t = GDKmalloc(sizeof(*t));
+               if (t == NULL) {
+                       TRC_CRITICAL(MAL_SERVER, "cannot allocate structure for 
worker");
                        continue;
                }
-               idle_workers = workers[i].next;
-               workers[i].flag = RUNNING;
-               ATOMIC_PTR_SET(&workers[i].cntxt, NULL);
-               char name[MT_NAME_LEN];
-               snprintf(name, sizeof(name), "DFLOWworker%d", i);
-               if (MT_create_thread(&workers[i].id, DFLOWworker, (void 
*)&workers[i],
-                                                        MT_THR_JOINABLE, name) 
< 0) {
-                       GDKfree(workers[i].errbuf);
-                       workers[i].errbuf = NULL;
-                       workers[i].flag = IDLE;
-                       workers[i].next = idle_workers;
-                       idle_workers = i;
+               *t = (struct worker) {
+                       .flag = RUNNING,
+               };
+               ATOMIC_PTR_INIT(&t->cntxt, NULL);
+               MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder name */
+               if (MT_create_thread(&t->id, DFLOWworker, t,
+                                                        MT_THR_JOINABLE, 
"DFLOWworkerXXXX") < 0) {
+                       ATOMIC_PTR_DESTROY(&t->cntxt);
+                       MT_sema_destroy(&t->s);
+                       GDKfree(t);
                } else {
+                       t->next = workers;
+                       workers = t;
                        created++;
                }
        }
@@ -773,6 +742,19 @@ DFLOWscheduler(DataFlow flow, struct wor
        return ret;
 }
 
+/* called and returns with dataflowLock locked, temporarily unlocks
+ * join the thread associated with the worker and destroy the structure */
+static inline void
+finish_worker(struct worker *t)
+{
+       MT_lock_unset(&dataflowLock);
+       MT_join_thread(t->id);
+       MT_sema_destroy(&t->s);
+       ATOMIC_PTR_DESTROY(&t->cntxt);
+       GDKfree(t);
+       MT_lock_set(&dataflowLock);
+}
+
 /* We create a pool of GDKnr_threads-1 generic workers, that is,
  * workers that will take on jobs from any clients.  In addition, we
  * create a single specific worker per client (i.e. each time we enter
@@ -795,7 +777,7 @@ runMALdataflow(Client cntxt, MalBlkPtr m
        str msg = MAL_SUCCEED;
        int size;
        bit *ret;
-       int i;
+       struct worker *t;
 
        if (stk == NULL)
                throw(MAL, "dataflow", "runMALdataflow(): Called with stk == 
NULL");
@@ -819,81 +801,51 @@ runMALdataflow(Client cntxt, MalBlkPtr m
         * until all work is done */
        MT_lock_set(&dataflowLock);
        /* join with already exited threads */
-       while ((i = exited_workers) >= 0) {
-               assert(workers[i].flag == EXITED);
-               exited_workers = workers[i].next;
-               workers[i].flag = IDLE;
-               MT_lock_unset(&dataflowLock);
-               MT_join_thread(workers[i].id);
-               MT_lock_set(&dataflowLock);
-               workers[i].id = 0;
-               workers[i].next = idle_workers;
-               idle_workers = i;
+       while (exited_workers != NULL) {
+               assert(exited_workers->flag == EXITED);
+               struct worker *t = exited_workers;
+               exited_workers = exited_workers->next;
+               finish_worker(t);
        }
        assert(cntxt != NULL);
-       if ((i = free_workers) >= 0) {
-               assert(workers[i].flag == FREE);
+       if (free_workers != NULL) {
+               t = free_workers;
+               assert(t->flag == FREE);
                assert(free_count > 0);
                free_count--;
-               free_workers = workers[i].next;
-               workers[i].flag = WAITING;
-               ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
-               if (stk->calldepth > 1) {
-                       MT_Id pid = MT_getpid();
-
-                       /* doing a recursive call: copy specificity from
-                        * current worker to new worker */
-                       for (int j = 0; j < THREADS; j++) {
-                               if (workers[j].id == pid && workers[j].flag == 
RUNNING) {
-                                       ATOMIC_PTR_SET(&workers[i].cntxt,
-                                                                  
ATOMIC_PTR_GET(&workers[j].cntxt));
-                                       break;
-                               }
+               free_workers = t->next;
+               t->next = workers;
+               workers = t;
+               t->flag = WAITING;
+               ATOMIC_PTR_SET(&t->cntxt, cntxt);
+               MT_sema_up(&t->s);
+       } else {
+               t = GDKmalloc(sizeof(*t));
+               if (t != NULL) {
+                       *t = (struct worker) {
+                               .flag = WAITING,
+                       };
+                       ATOMIC_PTR_INIT(&t->cntxt, cntxt);
+                       MT_sema_init(&t->s, 0, "DFLOWsema"); /* placeholder 
name */
+                       if (MT_create_thread(&t->id, DFLOWworker, t,
+                                                                
MT_THR_JOINABLE, "DFLOWworkerXXXX") < 0) {
+                               ATOMIC_PTR_DESTROY(&t->cntxt);
+                               MT_sema_destroy(&t->s);
+                               GDKfree(t);
+                               t = NULL;
+                       } else {
+                               t->next = workers;
+                               workers = t;
                        }
                }
-               MT_sema_up(&workers[i].s);
-       } else if ((i = idle_workers) >= 0) {
-               assert(workers[i].flag == IDLE);
-               ATOMIC_PTR_SET(&workers[i].cntxt, cntxt);
-               /* only create specific worker if we are not doing a
-                * recursive call */
-               if (stk->calldepth > 1) {
-                       MT_Id pid = MT_getpid();
-
-                       /* doing a recursive call: copy specificity from
-                        * current worker to new worker */
-                       for (int j = 0; j < THREADS; j++) {
-                               if (workers[j].flag == RUNNING && workers[j].id 
== pid) {
-                                       ATOMIC_PTR_SET(&workers[i].cntxt,
-                                                                  
ATOMIC_PTR_GET(&workers[j].cntxt));
-                                       break;
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to