Changeset: 3e625a4c333d for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/3e625a4c333d
Modified Files:
monetdb5/mal/mal_dataflow.c
Branch: default
Log Message:
Better cleanup of dataflow threads.
diffs (92 lines):
diff --git a/monetdb5/mal/mal_dataflow.c b/monetdb5/mal/mal_dataflow.c
--- a/monetdb5/mal/mal_dataflow.c
+++ b/monetdb5/mal/mal_dataflow.c
@@ -82,7 +82,7 @@ typedef struct DATAFLOW {
struct worker {
MT_Id id;
- enum { WAITING, RUNNING, FREE, EXITED } flag;
+ enum { WAITING, RUNNING, FREE, EXITED, FINISHING } flag;
ATOMIC_PTR_TYPE cntxt; /* client we do work for (NULL -> any)
*/
MT_Sema s;
struct worker *next;
@@ -245,6 +245,7 @@ static void
DFLOWworker(void *T)
{
struct worker *t = (struct worker *) T;
+ bool locked = false;
#ifdef _MSC_VER
srand((unsigned int) GDKusec());
#endif
@@ -411,20 +412,8 @@ DFLOWworker(void *T)
}
}
MT_lock_set(&dataflowLock);
- if (GDKexiting() || ATOMIC_GET(&exiting)) {
- MT_lock_unset(&dataflowLock);
- break;
- }
- if (free_count >= free_max) {
- struct worker **tp = &workers;
- while (*tp && *tp != t)
- tp = &(*tp)->next;
- assert(*tp && *tp == t);
- *tp = t->next;
- t->flag = EXITED;
- t->next = exited_workers;
- exited_workers = t;
- MT_lock_unset(&dataflowLock);
+ if (GDKexiting() || ATOMIC_GET(&exiting) || free_count >=
free_max) {
+ locked = true;
break;
}
free_count++;
@@ -442,6 +431,19 @@ DFLOWworker(void *T)
break;
assert(t->flag == WAITING);
}
+ if (!locked)
+ MT_lock_set(&dataflowLock);
+ if (t->flag != FINISHING) {
+ struct worker **tp = t->flag == FREE ? &free_workers : &workers;
+ while (*tp && *tp != t)
+ tp = &(*tp)->next;
+ assert(*tp && *tp == t);
+ *tp = t->next;
+ t->flag = EXITED;
+ t->next = exited_workers;
+ exited_workers = t;
+ }
+ MT_lock_unset(&dataflowLock);
GDKsetbuf(NULL);
}
@@ -747,6 +749,7 @@ DFLOWscheduler(DataFlow flow, struct wor
static inline void
finish_worker(struct worker *t)
{
+ t->flag = FINISHING;
MT_lock_unset(&dataflowLock);
MT_join_thread(t->id);
MT_sema_destroy(&t->s);
@@ -944,6 +947,8 @@ stopMALdataflow(void)
}
while (free_workers) {
struct worker *t = free_workers;
+ assert(free_count > 0);
+ free_count--;
free_workers = free_workers->next;
MT_sema_up(&t->s);
finish_worker(t);
@@ -953,6 +958,11 @@ stopMALdataflow(void)
workers = workers->next;
finish_worker(t);
}
+ while (exited_workers) {
+ struct worker *t = exited_workers;
+ exited_workers = exited_workers->next;
+ finish_worker(t);
+ }
MT_lock_unset(&dataflowLock);
}
}
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]