Changeset: 2fc4fdde979a for MonetDB URL: https://dev.monetdb.org/hg/MonetDB/rev/2fc4fdde979a Modified Files: clients/Tests/exports.stable.out gdk/gdk.h gdk/gdk_bat.c gdk/gdk_bbp.c gdk/gdk_group.c gdk/gdk_join.c gdk/gdk_system.c gdk/gdk_system_private.h gdk/gdk_unique.c gdk/gdk_utils.c monetdb5/mal/mal_dataflow.c monetdb5/mal/mal_profiler.c monetdb5/mal/mal_runtime.c monetdb5/modules/mal/mal_mapi.c monetdb5/modules/mal/tablet.c sql/backends/monet5/UDF/capi/capi.c sql/backends/monet5/sql_scenario.c Branch: default Log Message:
Removed THRcreate, THRgettid, use MT_thread_create, MT_getpid instead. Only one way to create threads now, only one internal structure to represent threads. diffs (truncated from 700 to 300 lines): diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out --- a/clients/Tests/exports.stable.out +++ b/clients/Tests/exports.stable.out @@ -405,8 +405,6 @@ BUN SORTfndlast(BAT *b, const void *v); gdk_return STRMPcreate(BAT *b, BAT *s); void STRMPdestroy(BAT *b); BAT *STRMPfilter(BAT *b, BAT *s, const char *q, const bool keep_nils); -MT_Id THRcreate(void (*f)(void *), void *arg, enum MT_thr_detach d, const char *name); -int THRgettid(void); bool THRhighwater(void); gdk_return TMsubcommit(BAT *bl) __attribute__((__warn_unused_result__)); gdk_return TMsubcommit_list(bat *restrict subcommit, BUN *restrict sizes, int cnt, lng logno, lng transid) __attribute__((__warn_unused_result__)); diff --git a/gdk/gdk.h b/gdk/gdk.h --- a/gdk/gdk.h +++ b/gdk/gdk.h @@ -1987,9 +1987,6 @@ VALptr(const ValRecord *v) typedef struct threadStruct *Thread; -gdk_export int THRgettid(void); -gdk_export MT_Id THRcreate(void (*f) (void *), void *arg, enum MT_thr_detach d, const char *name); - gdk_export stream *GDKstdout; gdk_export stream *GDKstdin; diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c --- a/gdk/gdk_bat.c +++ b/gdk/gdk_bat.c @@ -2901,8 +2901,8 @@ BATassertProps(BAT *b) TRC_WARNING(BAT_, "Cannot allocate hash table\n"); goto abort_check; } - if (snprintf(hs->heaplink.filename, sizeof(hs->heaplink.filename), "%s.thshprpl%x", nme, (unsigned) THRgettid()) >= (int) sizeof(hs->heaplink.filename) || - snprintf(hs->heapbckt.filename, sizeof(hs->heapbckt.filename), "%s.thshprpb%x", nme, (unsigned) THRgettid()) >= (int) sizeof(hs->heapbckt.filename)) { + if (snprintf(hs->heaplink.filename, sizeof(hs->heaplink.filename), "%s.thshprpl%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heaplink.filename) || + snprintf(hs->heapbckt.filename, sizeof(hs->heapbckt.filename), "%s.thshprpb%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heapbckt.filename)) { /* cannot happen, see comment in gdk.h * about sizes near definition of * BBPINIT */ diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c --- a/gdk/gdk_bbp.c +++ b/gdk/gdk_bbp.c @@ -1846,7 +1846,10 @@ BBPinit(void) } } - manager = THRcreate(BBPmanager, NULL, MT_THR_DETACHED, "BBPmanager"); + if (MT_create_thread(&manager, BBPmanager, NULL, MT_THR_DETACHED, "BBPmanager") < 0) { + TRC_CRITICAL(GDK, "Could not start BBPmanager thread."); + return GDK_FAIL; + } return GDK_SUCCEED; bailout: diff --git a/gdk/gdk_group.c b/gdk/gdk_group.c --- a/gdk/gdk_group.c +++ b/gdk/gdk_group.c @@ -1179,8 +1179,8 @@ BATgroup_internal(BAT **groups, BAT **ex } hs->heapbckt.parentid = b->batCacheid; hs->heaplink.parentid = b->batCacheid; - if (snprintf(hs->heaplink.filename, sizeof(hs->heaplink.filename), "%s.thshgrpl%x", nme, (unsigned) THRgettid()) >= (int) sizeof(hs->heaplink.filename) || - snprintf(hs->heapbckt.filename, sizeof(hs->heapbckt.filename), "%s.thshgrpb%x", nme, (unsigned) THRgettid()) >= (int) sizeof(hs->heapbckt.filename) || + if (snprintf(hs->heaplink.filename, sizeof(hs->heaplink.filename), "%s.thshgrpl%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heaplink.filename) || + snprintf(hs->heapbckt.filename, sizeof(hs->heapbckt.filename), "%s.thshgrpb%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs->heapbckt.filename) || HASHnew(hs, bi.type, BATcount(b), nbucket, BUN_NONE, false) != GDK_SUCCEED) { GDKfree(hs); hs = NULL; diff --git a/gdk/gdk_join.c b/gdk/gdk_join.c --- a/gdk/gdk_join.c +++ b/gdk/gdk_join.c @@ -2720,7 +2720,7 @@ hashjoin(BAT **r1p, BAT **r2p, BAT *l, B r->thash ? " ignoring existing hash" : "", swapped ? " (swapped)" : ""); if (snprintf(ext, sizeof(ext), "thshjn%x", - (unsigned) THRgettid()) >= (int) sizeof(ext)) + (unsigned) MT_getpid()) >= (int) sizeof(ext)) goto bailout; if ((hsh = BAThash_impl(r, rci, ext)) == NULL) { goto bailout; @@ -3199,8 +3199,8 @@ count_unique(BAT *b, BAT *s, BUN *cnt1, mask = (BUN) 1 << 16; if ((hs.heaplink.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 || (hs.heapbckt.farmid = BBPselectfarm(TRANSIENT, bi.type, hashheap)) < 0 || - snprintf(hs.heaplink.filename, sizeof(hs.heaplink.filename), "%s.thshjnl%x", nme, (unsigned) THRgettid()) >= (int) sizeof(hs.heaplink.filename) || - snprintf(hs.heapbckt.filename, sizeof(hs.heapbckt.filename), "%s.thshjnb%x", nme, (unsigned) THRgettid()) >= (int) sizeof(hs.heapbckt.filename) || + snprintf(hs.heaplink.filename, sizeof(hs.heaplink.filename), "%s.thshjnl%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs.heaplink.filename) || + snprintf(hs.heapbckt.filename, sizeof(hs.heapbckt.filename), "%s.thshjnb%x", nme, (unsigned) MT_getpid()) >= (int) sizeof(hs.heapbckt.filename) || HASHnew(&hs, bi.type, ci.ncand, mask, BUN_NONE, false) != GDK_SUCCEED) { GDKerror("cannot allocate hash table\n"); HEAPfree(&hs.heaplink, true); diff --git a/gdk/gdk_system.c b/gdk/gdk_system.c --- a/gdk/gdk_system.c +++ b/gdk/gdk_system.c @@ -203,6 +203,7 @@ static struct mtthread { pthread_t hdl; #else HANDLE hdl; + DWORD wtid; #endif MT_Id tid; uintptr_t sp; @@ -216,7 +217,6 @@ struct mtthread mainthread = { }; #ifdef HAVE_PTHREAD_H static pthread_mutex_t posthread_lock = PTHREAD_MUTEX_INITIALIZER; -static MT_Id MT_thread_id = 1; static pthread_key_t threadkey; #define thread_lock() pthread_mutex_lock(&posthread_lock) #define thread_unlock() pthread_mutex_unlock(&posthread_lock) @@ -261,6 +261,36 @@ THRhighwater(void) return false; } +static uint32_t allocated[THREADS / 32]; +static MT_Lock alloclock = MT_LOCK_INITIALIZER(alloclock); + +static MT_Id +alloc_thread(void) +{ + MT_Id mtid = 0; + MT_lock_set(&alloclock); + for (int i = 0; i < THREADS / 32; i++) { + if (allocated[i] != ~UINT32_C(0)) { + int x = candmask_lobit(~allocated[i]); + allocated[i] |= UINT32_C(1) << x; + mtid = (MT_Id) (i * 32 + x + 1); + break; + } + } + MT_lock_unset(&alloclock); + return mtid; +} + +static void +dealloc_thread(MT_Id mtid) +{ + assert(mtid > 0 && mtid <= THREADS); + mtid--; + MT_lock_set(&alloclock); + allocated[mtid / 32] &= ~(UINT32_C(1) << (mtid % 32)); + MT_lock_unset(&alloclock); +} + void dump_threads(void) { @@ -268,8 +298,9 @@ dump_threads(void) thread_lock(); for (struct mtthread *t = mtthreads; t; t = t->next) { int pos = snprintf(buf, sizeof(buf), - "%s, waiting for %s, working on %.200s", + "%s, tid %zu, waiting for %s, working on %.200s", t->threadname, + t->tid, t->lockwait ? t->lockwait->name : t->semawait ? t->semawait->name : t->condwait ? t->condwait->name : @@ -297,6 +328,7 @@ static void rm_mtthread(struct mtthread *t) { struct mtthread **pt; + MT_Id mtid = t->tid; assert(t != &mainthread); thread_lock(); @@ -307,6 +339,7 @@ rm_mtthread(struct mtthread *t) ATOMIC_DESTROY(&t->exited); free(t); thread_unlock(); + dealloc_thread(mtid); } bool @@ -317,13 +350,11 @@ MT_thread_init(void) #ifdef HAVE_PTHREAD_H int ret; - assert(MT_thread_id == 1); if ((ret = pthread_key_create(&threadkey, NULL)) != 0) { GDKsyserr(ret, "Creating specific key for thread failed"); return false; } mainthread.hdl = pthread_self(); - mainthread.tid = 1; if ((ret = thread_setself(&mainthread)) != 0) { GDKsyserr(ret, "Setting specific value failed"); return false; @@ -334,7 +365,7 @@ MT_thread_init(void) GDKwinerror("Creating thread-local slot for thread failed"); return false; } - mainthread.tid = (MT_Id) GetCurrentThreadId(); + mainthread.wtid = GetCurrentThreadId(); if (thread_setself(&mainthread) == 0) { GDKwinerror("Setting thread-local value failed"); TlsFree(threadslot); @@ -343,6 +374,8 @@ MT_thread_init(void) } InitializeCriticalSection(&winthread_cs); #endif + allocated[0] = 1; + mainthread.tid = 1; mainthread.next = NULL; mtthreads = &mainthread; thread_initialized = true; @@ -351,6 +384,8 @@ MT_thread_init(void) bool MT_thread_register(void) { + MT_Id mtid; + assert(thread_initialized); if (!thread_initialized) return false; @@ -370,25 +405,23 @@ MT_thread_register(void) if (self == NULL) return false; + if ((mtid = alloc_thread()) == 0) { + TRC_DEBUG(IO_, "Too many threads\n"); + GDKerror("too many threads\n"); + return false; + } thread_lock(); *self = (struct mtthread) { .detached = false, #ifdef HAVE_PTHREAD_H - .tid = ++MT_thread_id, .hdl = pthread_self(), #else - .tid = (MT_Id) GetCurrentThreadId(), + .wtid = GetCurrentThreadId(), #endif .refs = 1, + .tid = mtid, }; snprintf(self->threadname, sizeof(self->threadname), "foreign %zu", self->tid); - Thread t = THRnew(self->tid); - if (t == NULL) { - free(self); - thread_unlock(); - return false; - } - self->data = t; ATOMIC_INIT(&self->exited, 0); thread_setself(self); self->next = mtthreads; @@ -406,7 +439,6 @@ MT_thread_deregister(void) return; if (--self->refs == 0) { - THRdel(self->data); rm_mtthread(self); thread_setself(NULL); } @@ -563,9 +595,11 @@ MT_thread_add_mylock(MT_Lock *lock) void MT_thread_del_mylock(MT_Lock *lock) { + struct mtthread *self; if (!thread_initialized) - return; - struct mtthread *self = thread_self(); + self = &mainthread; + else + self = thread_self(); if (self) { if (self->mylocks == lock) { @@ -746,6 +780,7 @@ int MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach d, const char *threadname) { struct mtthread *self; + MT_Id mtid; assert(thread_initialized); join_threads(); @@ -757,6 +792,11 @@ MT_create_thread(MT_Id *t, void (*f) (vo TRC_CRITICAL(GDK, "Thread's name is too large\n"); return -1; } + if ((mtid = alloc_thread()) == 0) { + TRC_DEBUG(IO_, "Too many threads\n"); + GDKerror("too many threads\n"); + return -1; + } #ifdef HAVE_PTHREAD_H pthread_attr_t attr; @@ -786,6 +826,7 @@ MT_create_thread(MT_Id *t, void (*f) (vo .waiting = false, .detached = (d == MT_THR_DETACHED), .refs = 1, + .tid = mtid, }; ATOMIC_INIT(&self->exited, 0); strcpy_len(self->threadname, threadname, sizeof(self->threadname)); @@ -797,7 +838,6 @@ MT_create_thread(MT_Id *t, void (*f) (vo TRC_DEBUG(THRD, "Create thread \"%s\"\n", threadname); _______________________________________________ checkin-list mailing list -- checkin-list@monetdb.org To unsubscribe send an email to checkin-list-le...@monetdb.org