Changeset: 2fc4fdde979a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/2fc4fdde979a
Modified Files:
        clients/Tests/exports.stable.out
        gdk/gdk.h
        gdk/gdk_bat.c
        gdk/gdk_bbp.c
        gdk/gdk_group.c
        gdk/gdk_join.c
        gdk/gdk_system.c
        gdk/gdk_system_private.h
        gdk/gdk_unique.c
        gdk/gdk_utils.c
        monetdb5/mal/mal_dataflow.c
        monetdb5/mal/mal_profiler.c
        monetdb5/mal/mal_runtime.c
        monetdb5/modules/mal/mal_mapi.c
        monetdb5/modules/mal/tablet.c
        sql/backends/monet5/UDF/capi/capi.c
        sql/backends/monet5/sql_scenario.c
Branch: default
Log Message:

Removed THRcreate, THRgettid, use MT_thread_create, MT_getpid instead.
Only one way to create threads now, only one internal structure to
represent threads.


diffs (truncated from 700 to 300 lines):

diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -405,8 +405,6 @@ BUN SORTfndlast(BAT *b, const void *v);
 gdk_return STRMPcreate(BAT *b, BAT *s);
 void STRMPdestroy(BAT *b);
 BAT *STRMPfilter(BAT *b, BAT *s, const char *q, const bool keep_nils);
-MT_Id THRcreate(void (*f)(void *), void *arg, enum MT_thr_detach d, const char 
*name);
-int THRgettid(void);
 bool THRhighwater(void);
 gdk_return TMsubcommit(BAT *bl) __attribute__((__warn_unused_result__));
 gdk_return TMsubcommit_list(bat *restrict subcommit, BUN *restrict sizes, int 
cnt, lng logno, lng transid) __attribute__((__warn_unused_result__));
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -1987,9 +1987,6 @@ VALptr(const ValRecord *v)
 typedef struct threadStruct *Thread;
 
 
-gdk_export int THRgettid(void);
-gdk_export MT_Id THRcreate(void (*f) (void *), void *arg, enum MT_thr_detach 
d, const char *name);
-
 gdk_export stream *GDKstdout;
 gdk_export stream *GDKstdin;
 
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -2901,8 +2901,8 @@ BATassertProps(BAT *b)
                                TRC_WARNING(BAT_, "Cannot allocate hash 
table\n");
                                goto abort_check;
                        }
-                       if (snprintf(hs->heaplink.filename, 
sizeof(hs->heaplink.filename), "%s.thshprpl%x", nme, (unsigned) THRgettid()) >= 
(int) sizeof(hs->heaplink.filename) ||
-                           snprintf(hs->heapbckt.filename, 
sizeof(hs->heapbckt.filename), "%s.thshprpb%x", nme, (unsigned) THRgettid()) >= 
(int) sizeof(hs->heapbckt.filename)) {
+                       if (snprintf(hs->heaplink.filename, 
sizeof(hs->heaplink.filename), "%s.thshprpl%x", nme, (unsigned) MT_getpid()) >= 
(int) sizeof(hs->heaplink.filename) ||
+                           snprintf(hs->heapbckt.filename, 
sizeof(hs->heapbckt.filename), "%s.thshprpb%x", nme, (unsigned) MT_getpid()) >= 
(int) sizeof(hs->heapbckt.filename)) {
                                /* cannot happen, see comment in gdk.h
                                 * about sizes near definition of
                                 * BBPINIT */
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -1846,7 +1846,10 @@ BBPinit(void)
                }
        }
 
-       manager = THRcreate(BBPmanager, NULL, MT_THR_DETACHED, "BBPmanager");
+       if (MT_create_thread(&manager, BBPmanager, NULL, MT_THR_DETACHED, 
"BBPmanager") < 0) {
+               TRC_CRITICAL(GDK, "Could not start BBPmanager thread.");
+               return GDK_FAIL;
+       }
        return GDK_SUCCEED;
 
   bailout:
diff --git a/gdk/gdk_group.c b/gdk/gdk_group.c
--- a/gdk/gdk_group.c
+++ b/gdk/gdk_group.c
@@ -1179,8 +1179,8 @@ BATgroup_internal(BAT **groups, BAT **ex
                }
                hs->heapbckt.parentid = b->batCacheid;
                hs->heaplink.parentid = b->batCacheid;
-               if (snprintf(hs->heaplink.filename, 
sizeof(hs->heaplink.filename), "%s.thshgrpl%x", nme, (unsigned) THRgettid()) >= 
(int) sizeof(hs->heaplink.filename) ||
-                   snprintf(hs->heapbckt.filename, 
sizeof(hs->heapbckt.filename), "%s.thshgrpb%x", nme, (unsigned) THRgettid()) >= 
(int) sizeof(hs->heapbckt.filename) ||
+               if (snprintf(hs->heaplink.filename, 
sizeof(hs->heaplink.filename), "%s.thshgrpl%x", nme, (unsigned) MT_getpid()) >= 
(int) sizeof(hs->heaplink.filename) ||
+                   snprintf(hs->heapbckt.filename, 
sizeof(hs->heapbckt.filename), "%s.thshgrpb%x", nme, (unsigned) MT_getpid()) >= 
(int) sizeof(hs->heapbckt.filename) ||
                    HASHnew(hs, bi.type, BATcount(b), nbucket, BUN_NONE, false) 
!= GDK_SUCCEED) {
                        GDKfree(hs);
                        hs = NULL;
diff --git a/gdk/gdk_join.c b/gdk/gdk_join.c
--- a/gdk/gdk_join.c
+++ b/gdk/gdk_join.c
@@ -2720,7 +2720,7 @@ hashjoin(BAT **r1p, BAT **r2p, BAT *l, B
                          r->thash ? " ignoring existing hash" : "",
                          swapped ? " (swapped)" : "");
                if (snprintf(ext, sizeof(ext), "thshjn%x",
-                            (unsigned) THRgettid()) >= (int) sizeof(ext))
+                            (unsigned) MT_getpid()) >= (int) sizeof(ext))
                        goto bailout;
                if ((hsh = BAThash_impl(r, rci, ext)) == NULL) {
                        goto bailout;
@@ -3199,8 +3199,8 @@ count_unique(BAT *b, BAT *s, BUN *cnt1, 
                        mask = (BUN) 1 << 16;
                if ((hs.heaplink.farmid = BBPselectfarm(TRANSIENT, bi.type, 
hashheap)) < 0 ||
                    (hs.heapbckt.farmid = BBPselectfarm(TRANSIENT, bi.type, 
hashheap)) < 0 ||
-                   snprintf(hs.heaplink.filename, 
sizeof(hs.heaplink.filename), "%s.thshjnl%x", nme, (unsigned) THRgettid()) >= 
(int) sizeof(hs.heaplink.filename) ||
-                   snprintf(hs.heapbckt.filename, 
sizeof(hs.heapbckt.filename), "%s.thshjnb%x", nme, (unsigned) THRgettid()) >= 
(int) sizeof(hs.heapbckt.filename) ||
+                   snprintf(hs.heaplink.filename, 
sizeof(hs.heaplink.filename), "%s.thshjnl%x", nme, (unsigned) MT_getpid()) >= 
(int) sizeof(hs.heaplink.filename) ||
+                   snprintf(hs.heapbckt.filename, 
sizeof(hs.heapbckt.filename), "%s.thshjnb%x", nme, (unsigned) MT_getpid()) >= 
(int) sizeof(hs.heapbckt.filename) ||
                    HASHnew(&hs, bi.type, ci.ncand, mask, BUN_NONE, false) != 
GDK_SUCCEED) {
                        GDKerror("cannot allocate hash table\n");
                        HEAPfree(&hs.heaplink, true);
diff --git a/gdk/gdk_system.c b/gdk/gdk_system.c
--- a/gdk/gdk_system.c
+++ b/gdk/gdk_system.c
@@ -203,6 +203,7 @@ static struct mtthread {
        pthread_t hdl;
 #else
        HANDLE hdl;
+       DWORD wtid;
 #endif
        MT_Id tid;
        uintptr_t sp;
@@ -216,7 +217,6 @@ struct mtthread mainthread = {
 };
 #ifdef HAVE_PTHREAD_H
 static pthread_mutex_t posthread_lock = PTHREAD_MUTEX_INITIALIZER;
-static MT_Id MT_thread_id = 1;
 static pthread_key_t threadkey;
 #define thread_lock()          pthread_mutex_lock(&posthread_lock)
 #define thread_unlock()                pthread_mutex_unlock(&posthread_lock)
@@ -261,6 +261,36 @@ THRhighwater(void)
        return false;
 }
 
+static uint32_t allocated[THREADS / 32];
+static MT_Lock alloclock = MT_LOCK_INITIALIZER(alloclock);
+
+static MT_Id
+alloc_thread(void)
+{
+       MT_Id mtid = 0;
+       MT_lock_set(&alloclock);
+       for (int i = 0; i < THREADS / 32; i++) {
+               if (allocated[i] != ~UINT32_C(0)) {
+                       int x = candmask_lobit(~allocated[i]);
+                       allocated[i] |= UINT32_C(1) << x;
+                       mtid = (MT_Id) (i * 32 + x + 1);
+                       break;
+               }
+       }
+       MT_lock_unset(&alloclock);
+       return mtid;
+}
+
+static void
+dealloc_thread(MT_Id mtid)
+{
+       assert(mtid > 0 && mtid <= THREADS);
+       mtid--;
+       MT_lock_set(&alloclock);
+       allocated[mtid / 32] &= ~(UINT32_C(1) << (mtid % 32));
+       MT_lock_unset(&alloclock);
+}
+
 void
 dump_threads(void)
 {
@@ -268,8 +298,9 @@ dump_threads(void)
        thread_lock();
        for (struct mtthread *t = mtthreads; t; t = t->next) {
                int pos = snprintf(buf, sizeof(buf),
-                                  "%s, waiting for %s, working on %.200s",
+                                  "%s, tid %zu, waiting for %s, working on 
%.200s",
                                   t->threadname,
+                                  t->tid,
                                   t->lockwait ? t->lockwait->name :
                                   t->semawait ? t->semawait->name :
                                   t->condwait ? t->condwait->name :
@@ -297,6 +328,7 @@ static void
 rm_mtthread(struct mtthread *t)
 {
        struct mtthread **pt;
+       MT_Id mtid = t->tid;
 
        assert(t != &mainthread);
        thread_lock();
@@ -307,6 +339,7 @@ rm_mtthread(struct mtthread *t)
        ATOMIC_DESTROY(&t->exited);
        free(t);
        thread_unlock();
+       dealloc_thread(mtid);
 }
 
 bool
@@ -317,13 +350,11 @@ MT_thread_init(void)
 #ifdef HAVE_PTHREAD_H
        int ret;
 
-       assert(MT_thread_id == 1);
        if ((ret = pthread_key_create(&threadkey, NULL)) != 0) {
                GDKsyserr(ret, "Creating specific key for thread failed");
                return false;
        }
        mainthread.hdl = pthread_self();
-       mainthread.tid = 1;
        if ((ret = thread_setself(&mainthread)) != 0) {
                GDKsyserr(ret, "Setting specific value failed");
                return false;
@@ -334,7 +365,7 @@ MT_thread_init(void)
                GDKwinerror("Creating thread-local slot for thread failed");
                return false;
        }
-       mainthread.tid = (MT_Id) GetCurrentThreadId();
+       mainthread.wtid = GetCurrentThreadId();
        if (thread_setself(&mainthread) == 0) {
                GDKwinerror("Setting thread-local value failed");
                TlsFree(threadslot);
@@ -343,6 +374,8 @@ MT_thread_init(void)
        }
        InitializeCriticalSection(&winthread_cs);
 #endif
+       allocated[0] = 1;
+       mainthread.tid = 1;
        mainthread.next = NULL;
        mtthreads = &mainthread;
        thread_initialized = true;
@@ -351,6 +384,8 @@ MT_thread_init(void)
 bool
 MT_thread_register(void)
 {
+       MT_Id mtid;
+
        assert(thread_initialized);
        if (!thread_initialized)
                return false;
@@ -370,25 +405,23 @@ MT_thread_register(void)
        if (self == NULL)
                return false;
 
+       if ((mtid = alloc_thread()) == 0) {
+               TRC_DEBUG(IO_, "Too many threads\n");
+               GDKerror("too many threads\n");
+               return false;
+       }
        thread_lock();
        *self = (struct mtthread) {
                .detached = false,
 #ifdef HAVE_PTHREAD_H
-               .tid = ++MT_thread_id,
                .hdl = pthread_self(),
 #else
-               .tid = (MT_Id) GetCurrentThreadId(),
+               .wtid = GetCurrentThreadId(),
 #endif
                .refs = 1,
+               .tid = mtid,
        };
        snprintf(self->threadname, sizeof(self->threadname), "foreign %zu", 
self->tid);
-       Thread t = THRnew(self->tid);
-       if (t == NULL) {
-               free(self);
-               thread_unlock();
-               return false;
-       }
-       self->data = t;
        ATOMIC_INIT(&self->exited, 0);
        thread_setself(self);
        self->next = mtthreads;
@@ -406,7 +439,6 @@ MT_thread_deregister(void)
                return;
 
        if (--self->refs == 0) {
-               THRdel(self->data);
                rm_mtthread(self);
                thread_setself(NULL);
        }
@@ -563,9 +595,11 @@ MT_thread_add_mylock(MT_Lock *lock)
 void
 MT_thread_del_mylock(MT_Lock *lock)
 {
+       struct mtthread *self;
        if (!thread_initialized)
-               return;
-       struct mtthread *self = thread_self();
+               self = &mainthread;
+       else
+               self = thread_self();
 
        if (self) {
                if (self->mylocks == lock) {
@@ -746,6 +780,7 @@ int
 MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach 
d, const char *threadname)
 {
        struct mtthread *self;
+       MT_Id mtid;
 
        assert(thread_initialized);
        join_threads();
@@ -757,6 +792,11 @@ MT_create_thread(MT_Id *t, void (*f) (vo
                TRC_CRITICAL(GDK, "Thread's name is too large\n");
                return -1;
        }
+       if ((mtid = alloc_thread()) == 0) {
+               TRC_DEBUG(IO_, "Too many threads\n");
+               GDKerror("too many threads\n");
+               return -1;
+       }
 
 #ifdef HAVE_PTHREAD_H
        pthread_attr_t attr;
@@ -786,6 +826,7 @@ MT_create_thread(MT_Id *t, void (*f) (vo
                .waiting = false,
                .detached = (d == MT_THR_DETACHED),
                .refs = 1,
+               .tid = mtid,
        };
        ATOMIC_INIT(&self->exited, 0);
        strcpy_len(self->threadname, threadname, sizeof(self->threadname));
@@ -797,7 +838,6 @@ MT_create_thread(MT_Id *t, void (*f) (vo
        TRC_DEBUG(THRD, "Create thread \"%s\"\n", threadname);
_______________________________________________
checkin-list mailing list -- checkin-list@monetdb.org
To unsubscribe send an email to checkin-list-le...@monetdb.org

Reply via email to