Changeset: 07fb8a796c70 for MonetDB
URL: http://dev.monetdb.org/hg/MonetDB?cmd=changeset;node=07fb8a796c70
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: Jan2014
Log Message:
Improvements to dataflow thread and lock handling.
diffs (131 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -61,7 +61,6 @@ typedef struct queue {
int size; /* size of queue */
int last; /* last element in the queue */
int exitcount; /* how many threads should exit */
- int exitedcount; /* how many threads have exited
*/
FlowEvent *data;
MT_Lock l; /* it's a shared resource, ie we need locks */
MT_Sema s; /* threads wait on empty queues */
@@ -87,7 +86,7 @@ typedef struct DATAFLOW {
static struct worker {
MT_Id id;
- enum {IDLE, RUNNING, EXITED} flag;
+ enum {IDLE, RUNNING, JOINING, EXITED} flag;
Client cntxt; /* client we do work for (NULL
-> any) */
MT_Sema s;
} workers[THREADS];
@@ -134,7 +133,6 @@ q_create(int sz, const char *name)
return NULL;
}
q->exitcount = 0;
- q->exitedcount = 0;
(void) name; /* in case MT_LOCK_TRACE is not enabled in gdk_system.h */
MT_lock_init(&q->l, name);
@@ -244,9 +242,6 @@ q_dequeue(Queue *q, Client cntxt)
if (q->exitcount > 0) {
q->exitcount--;
MT_lock_unset(&q->l, "q_dequeue");
- MT_lock_set(&dataflowLock, "q_dequeue");
- q->exitedcount++;
- MT_lock_unset(&dataflowLock, "q_dequeue");
return NULL;
}
assert(q->last > 0);
@@ -458,6 +453,11 @@ DFLOWinitialize(void)
for (i = 0; i < THREADS; i++)
MT_sema_init(&workers[i].s, 0, "DFLOWinitialize");
limit = GDKnr_threads ? GDKnr_threads - 1 : 0;
+#ifdef NEED_MT_LOCK_INIT
+ ATOMIC_INIT(exitingLock, "exitingLock");
+ MT_lock_init(&dataflowLock, "dataflowLock");
+#endif
+ MT_lock_set(&dataflowLock, "DFLOWinitialize");
for (i = 0; i < limit; i++) {
workers[i].flag = RUNNING;
workers[i].cntxt = NULL;
@@ -466,6 +466,7 @@ DFLOWinitialize(void)
else
created++;
}
+ MT_lock_unset(&dataflowLock, "DFLOWinitialize");
if (created == 0) {
/* no threads created */
q_destroy(todo);
@@ -473,10 +474,6 @@ DFLOWinitialize(void)
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
return -1;
}
-#ifdef NEED_MT_LOCK_INIT
- ATOMIC_INIT(exitingLock, "exitingLock");
- MT_lock_init(&dataflowLock, "dataflowLock");
-#endif
MT_lock_unset(&mal_contextLock, "DFLOWinitialize");
return 0;
}
@@ -766,20 +763,22 @@ runMALdataflow(Client cntxt, MalBlkPtr m
* until all work is done */
MT_lock_set(&dataflowLock, "runMALdataflow");
/* join with already exited threads */
- while (todo->exitedcount > 0) {
- for (i = 0; i < THREADS; i++) {
- if (workers[i].flag == EXITED) {
- todo->exitedcount--;
- workers[i].flag = IDLE;
- workers[i].cntxt = NULL;
- MT_lock_unset(&dataflowLock, "runMALdataflow");
- MT_join_thread(workers[i].id);
- MT_lock_set(&dataflowLock, "runMALdataflow");
- break;
+ {
+ int joined;
+ do {
+ joined = 0;
+ for (i = 0; i < THREADS; i++) {
+ if (workers[i].flag == EXITED) {
+ workers[i].flag = JOINING;
+ workers[i].cntxt = NULL;
+ joined = 1;
+ MT_lock_unset(&dataflowLock,
"runMALdataflow");
+ MT_join_thread(workers[i].id);
+ MT_lock_set(&dataflowLock,
"runMALdataflow");
+ workers[i].flag = IDLE;
+ }
}
- }
- if (i == THREADS)
- break;
+ } while (joined);
}
for (i = 0; i < THREADS; i++) {
if (workers[i].flag == IDLE) {
@@ -802,13 +801,14 @@ runMALdataflow(Client cntxt, MalBlkPtr m
/* not doing a recursive call: create specific
worker */
workers[i].cntxt = cntxt;
}
+ workers[i].flag = RUNNING;
if (MT_create_thread(&workers[i].id, DFLOWworker, (void
*) &workers[i], MT_THR_JOINABLE) < 0) {
/* cannot start new thread, run serially */
*ret = TRUE;
+ workers[i].flag = IDLE;
MT_lock_unset(&dataflowLock, "runMALdataflow");
return MAL_SUCCEED;
}
- workers[i].flag = RUNNING;
break;
}
}
@@ -899,7 +899,8 @@ stopMALdataflow(void)
MT_sema_up(&todo->s, "stopMALdataflow");
MT_lock_set(&dataflowLock, "stopMALdataflow");
for (i = 0; i < THREADS; i++) {
- if (workers[i].flag != IDLE) {
+ if (workers[i].flag != IDLE && workers[i].flag !=
JOINING) {
+ workers[i].flag = JOINING;
MT_lock_unset(&dataflowLock, "stopMALdataflow");
MT_join_thread(workers[i].id);
MT_lock_set(&dataflowLock, "stopMALdataflow");
_______________________________________________
checkin-list mailing list
[email protected]
https://www.monetdb.org/mailman/listinfo/checkin-list