Changeset: 2fc4fdde979a for MonetDB
URL: https://dev.monetdb.org/hg/MonetDB/rev/2fc4fdde979a
Modified Files:
clients/Tests/exports.stable.out
gdk/gdk.h
gdk/gdk_bat.c
gdk/gdk_bbp.c
gdk/gdk_group.c
gdk/gdk_join.c
gdk/gdk_system.c
gdk/gdk_system_private.h
gdk/gdk_unique.c
gdk/gdk_utils.c
monetdb5/mal/mal_dataflow.c
monetdb5/mal/mal_profiler.c
monetdb5/mal/mal_runtime.c
monetdb5/modules/mal/mal_mapi.c
monetdb5/modules/mal/tablet.c
sql/backends/monet5/UDF/capi/capi.c
sql/backends/monet5/sql_scenario.c
Branch: default
Log Message:
Removed THRcreate, THRgettid, use MT_thread_create, MT_getpid instead.
Only one way to create threads now, only one internal structure to
represent threads.
diffs (truncated from 700 to 300 lines):
diff --git a/clients/Tests/exports.stable.out b/clients/Tests/exports.stable.out
--- a/clients/Tests/exports.stable.out
+++ b/clients/Tests/exports.stable.out
@@ -405,8 +405,6 @@ BUN SORTfndlast(BAT *b, const void *v);
gdk_return STRMPcreate(BAT *b, BAT *s);
void STRMPdestroy(BAT *b);
BAT *STRMPfilter(BAT *b, BAT *s, const char *q, const bool keep_nils);
-MT_Id THRcreate(void (*f)(void *), void *arg, enum MT_thr_detach d, const char
*name);
-int THRgettid(void);
bool THRhighwater(void);
gdk_return TMsubcommit(BAT *bl) __attribute__((__warn_unused_result__));
gdk_return TMsubcommit_list(bat *restrict subcommit, BUN *restrict sizes, int
cnt, lng logno, lng transid) __attribute__((__warn_unused_result__));
diff --git a/gdk/gdk.h b/gdk/gdk.h
--- a/gdk/gdk.h
+++ b/gdk/gdk.h
@@ -1987,9 +1987,6 @@ VALptr(const ValRecord *v)
typedef struct threadStruct *Thread;
-gdk_export int THRgettid(void);
-gdk_export MT_Id THRcreate(void (*f) (void *), void *arg, enum MT_thr_detach
d, const char *name);
-
gdk_export stream *GDKstdout;
gdk_export stream *GDKstdin;
diff --git a/gdk/gdk_bat.c b/gdk/gdk_bat.c
--- a/gdk/gdk_bat.c
+++ b/gdk/gdk_bat.c
@@ -2901,8 +2901,8 @@ BATassertProps(BAT *b)
TRC_WARNING(BAT_, "Cannot allocate hash
table\n");
goto abort_check;
}
- if (snprintf(hs->heaplink.filename,
sizeof(hs->heaplink.filename), "%s.thshprpl%x", nme, (unsigned) THRgettid()) >=
(int) sizeof(hs->heaplink.filename) ||
- snprintf(hs->heapbckt.filename,
sizeof(hs->heapbckt.filename), "%s.thshprpb%x", nme, (unsigned) THRgettid()) >=
(int) sizeof(hs->heapbckt.filename)) {
+ if (snprintf(hs->heaplink.filename,
sizeof(hs->heaplink.filename), "%s.thshprpl%x", nme, (unsigned) MT_getpid()) >=
(int) sizeof(hs->heaplink.filename) ||
+ snprintf(hs->heapbckt.filename,
sizeof(hs->heapbckt.filename), "%s.thshprpb%x", nme, (unsigned) MT_getpid()) >=
(int) sizeof(hs->heapbckt.filename)) {
/* cannot happen, see comment in gdk.h
* about sizes near definition of
* BBPINIT */
diff --git a/gdk/gdk_bbp.c b/gdk/gdk_bbp.c
--- a/gdk/gdk_bbp.c
+++ b/gdk/gdk_bbp.c
@@ -1846,7 +1846,10 @@ BBPinit(void)
}
}
- manager = THRcreate(BBPmanager, NULL, MT_THR_DETACHED, "BBPmanager");
+ if (MT_create_thread(&manager, BBPmanager, NULL, MT_THR_DETACHED,
"BBPmanager") < 0) {
+ TRC_CRITICAL(GDK, "Could not start BBPmanager thread.");
+ return GDK_FAIL;
+ }
return GDK_SUCCEED;
bailout:
diff --git a/gdk/gdk_group.c b/gdk/gdk_group.c
--- a/gdk/gdk_group.c
+++ b/gdk/gdk_group.c
@@ -1179,8 +1179,8 @@ BATgroup_internal(BAT **groups, BAT **ex
}
hs->heapbckt.parentid = b->batCacheid;
hs->heaplink.parentid = b->batCacheid;
- if (snprintf(hs->heaplink.filename,
sizeof(hs->heaplink.filename), "%s.thshgrpl%x", nme, (unsigned) THRgettid()) >=
(int) sizeof(hs->heaplink.filename) ||
- snprintf(hs->heapbckt.filename,
sizeof(hs->heapbckt.filename), "%s.thshgrpb%x", nme, (unsigned) THRgettid()) >=
(int) sizeof(hs->heapbckt.filename) ||
+ if (snprintf(hs->heaplink.filename,
sizeof(hs->heaplink.filename), "%s.thshgrpl%x", nme, (unsigned) MT_getpid()) >=
(int) sizeof(hs->heaplink.filename) ||
+ snprintf(hs->heapbckt.filename,
sizeof(hs->heapbckt.filename), "%s.thshgrpb%x", nme, (unsigned) MT_getpid()) >=
(int) sizeof(hs->heapbckt.filename) ||
HASHnew(hs, bi.type, BATcount(b), nbucket, BUN_NONE, false)
!= GDK_SUCCEED) {
GDKfree(hs);
hs = NULL;
diff --git a/gdk/gdk_join.c b/gdk/gdk_join.c
--- a/gdk/gdk_join.c
+++ b/gdk/gdk_join.c
@@ -2720,7 +2720,7 @@ hashjoin(BAT **r1p, BAT **r2p, BAT *l, B
r->thash ? " ignoring existing hash" : "",
swapped ? " (swapped)" : "");
if (snprintf(ext, sizeof(ext), "thshjn%x",
- (unsigned) THRgettid()) >= (int) sizeof(ext))
+ (unsigned) MT_getpid()) >= (int) sizeof(ext))
goto bailout;
if ((hsh = BAThash_impl(r, rci, ext)) == NULL) {
goto bailout;
@@ -3199,8 +3199,8 @@ count_unique(BAT *b, BAT *s, BUN *cnt1,
mask = (BUN) 1 << 16;
if ((hs.heaplink.farmid = BBPselectfarm(TRANSIENT, bi.type,
hashheap)) < 0 ||
(hs.heapbckt.farmid = BBPselectfarm(TRANSIENT, bi.type,
hashheap)) < 0 ||
- snprintf(hs.heaplink.filename,
sizeof(hs.heaplink.filename), "%s.thshjnl%x", nme, (unsigned) THRgettid()) >=
(int) sizeof(hs.heaplink.filename) ||
- snprintf(hs.heapbckt.filename,
sizeof(hs.heapbckt.filename), "%s.thshjnb%x", nme, (unsigned) THRgettid()) >=
(int) sizeof(hs.heapbckt.filename) ||
+ snprintf(hs.heaplink.filename,
sizeof(hs.heaplink.filename), "%s.thshjnl%x", nme, (unsigned) MT_getpid()) >=
(int) sizeof(hs.heaplink.filename) ||
+ snprintf(hs.heapbckt.filename,
sizeof(hs.heapbckt.filename), "%s.thshjnb%x", nme, (unsigned) MT_getpid()) >=
(int) sizeof(hs.heapbckt.filename) ||
HASHnew(&hs, bi.type, ci.ncand, mask, BUN_NONE, false) !=
GDK_SUCCEED) {
GDKerror("cannot allocate hash table\n");
HEAPfree(&hs.heaplink, true);
diff --git a/gdk/gdk_system.c b/gdk/gdk_system.c
--- a/gdk/gdk_system.c
+++ b/gdk/gdk_system.c
@@ -203,6 +203,7 @@ static struct mtthread {
pthread_t hdl;
#else
HANDLE hdl;
+ DWORD wtid;
#endif
MT_Id tid;
uintptr_t sp;
@@ -216,7 +217,6 @@ struct mtthread mainthread = {
};
#ifdef HAVE_PTHREAD_H
static pthread_mutex_t posthread_lock = PTHREAD_MUTEX_INITIALIZER;
-static MT_Id MT_thread_id = 1;
static pthread_key_t threadkey;
#define thread_lock() pthread_mutex_lock(&posthread_lock)
#define thread_unlock() pthread_mutex_unlock(&posthread_lock)
@@ -261,6 +261,36 @@ THRhighwater(void)
return false;
}
+static uint32_t allocated[THREADS / 32];
+static MT_Lock alloclock = MT_LOCK_INITIALIZER(alloclock);
+
+static MT_Id
+alloc_thread(void)
+{
+ MT_Id mtid = 0;
+ MT_lock_set(&alloclock);
+ for (int i = 0; i < THREADS / 32; i++) {
+ if (allocated[i] != ~UINT32_C(0)) {
+ int x = candmask_lobit(~allocated[i]);
+ allocated[i] |= UINT32_C(1) << x;
+ mtid = (MT_Id) (i * 32 + x + 1);
+ break;
+ }
+ }
+ MT_lock_unset(&alloclock);
+ return mtid;
+}
+
+static void
+dealloc_thread(MT_Id mtid)
+{
+ assert(mtid > 0 && mtid <= THREADS);
+ mtid--;
+ MT_lock_set(&alloclock);
+ allocated[mtid / 32] &= ~(UINT32_C(1) << (mtid % 32));
+ MT_lock_unset(&alloclock);
+}
+
void
dump_threads(void)
{
@@ -268,8 +298,9 @@ dump_threads(void)
thread_lock();
for (struct mtthread *t = mtthreads; t; t = t->next) {
int pos = snprintf(buf, sizeof(buf),
- "%s, waiting for %s, working on %.200s",
+ "%s, tid %zu, waiting for %s, working on
%.200s",
t->threadname,
+ t->tid,
t->lockwait ? t->lockwait->name :
t->semawait ? t->semawait->name :
t->condwait ? t->condwait->name :
@@ -297,6 +328,7 @@ static void
rm_mtthread(struct mtthread *t)
{
struct mtthread **pt;
+ MT_Id mtid = t->tid;
assert(t != &mainthread);
thread_lock();
@@ -307,6 +339,7 @@ rm_mtthread(struct mtthread *t)
ATOMIC_DESTROY(&t->exited);
free(t);
thread_unlock();
+ dealloc_thread(mtid);
}
bool
@@ -317,13 +350,11 @@ MT_thread_init(void)
#ifdef HAVE_PTHREAD_H
int ret;
- assert(MT_thread_id == 1);
if ((ret = pthread_key_create(&threadkey, NULL)) != 0) {
GDKsyserr(ret, "Creating specific key for thread failed");
return false;
}
mainthread.hdl = pthread_self();
- mainthread.tid = 1;
if ((ret = thread_setself(&mainthread)) != 0) {
GDKsyserr(ret, "Setting specific value failed");
return false;
@@ -334,7 +365,7 @@ MT_thread_init(void)
GDKwinerror("Creating thread-local slot for thread failed");
return false;
}
- mainthread.tid = (MT_Id) GetCurrentThreadId();
+ mainthread.wtid = GetCurrentThreadId();
if (thread_setself(&mainthread) == 0) {
GDKwinerror("Setting thread-local value failed");
TlsFree(threadslot);
@@ -343,6 +374,8 @@ MT_thread_init(void)
}
InitializeCriticalSection(&winthread_cs);
#endif
+ allocated[0] = 1;
+ mainthread.tid = 1;
mainthread.next = NULL;
mtthreads = &mainthread;
thread_initialized = true;
@@ -351,6 +384,8 @@ MT_thread_init(void)
bool
MT_thread_register(void)
{
+ MT_Id mtid;
+
assert(thread_initialized);
if (!thread_initialized)
return false;
@@ -370,25 +405,23 @@ MT_thread_register(void)
if (self == NULL)
return false;
+ if ((mtid = alloc_thread()) == 0) {
+ TRC_DEBUG(IO_, "Too many threads\n");
+ GDKerror("too many threads\n");
+ return false;
+ }
thread_lock();
*self = (struct mtthread) {
.detached = false,
#ifdef HAVE_PTHREAD_H
- .tid = ++MT_thread_id,
.hdl = pthread_self(),
#else
- .tid = (MT_Id) GetCurrentThreadId(),
+ .wtid = GetCurrentThreadId(),
#endif
.refs = 1,
+ .tid = mtid,
};
snprintf(self->threadname, sizeof(self->threadname), "foreign %zu",
self->tid);
- Thread t = THRnew(self->tid);
- if (t == NULL) {
- free(self);
- thread_unlock();
- return false;
- }
- self->data = t;
ATOMIC_INIT(&self->exited, 0);
thread_setself(self);
self->next = mtthreads;
@@ -406,7 +439,6 @@ MT_thread_deregister(void)
return;
if (--self->refs == 0) {
- THRdel(self->data);
rm_mtthread(self);
thread_setself(NULL);
}
@@ -563,9 +595,11 @@ MT_thread_add_mylock(MT_Lock *lock)
void
MT_thread_del_mylock(MT_Lock *lock)
{
+ struct mtthread *self;
if (!thread_initialized)
- return;
- struct mtthread *self = thread_self();
+ self = &mainthread;
+ else
+ self = thread_self();
if (self) {
if (self->mylocks == lock) {
@@ -746,6 +780,7 @@ int
MT_create_thread(MT_Id *t, void (*f) (void *), void *arg, enum MT_thr_detach
d, const char *threadname)
{
struct mtthread *self;
+ MT_Id mtid;
assert(thread_initialized);
join_threads();
@@ -757,6 +792,11 @@ MT_create_thread(MT_Id *t, void (*f) (vo
TRC_CRITICAL(GDK, "Thread's name is too large\n");
return -1;
}
+ if ((mtid = alloc_thread()) == 0) {
+ TRC_DEBUG(IO_, "Too many threads\n");
+ GDKerror("too many threads\n");
+ return -1;
+ }
#ifdef HAVE_PTHREAD_H
pthread_attr_t attr;
@@ -786,6 +826,7 @@ MT_create_thread(MT_Id *t, void (*f) (vo
.waiting = false,
.detached = (d == MT_THR_DETACHED),
.refs = 1,
+ .tid = mtid,
};
ATOMIC_INIT(&self->exited, 0);
strcpy_len(self->threadname, threadname, sizeof(self->threadname));
@@ -797,7 +838,6 @@ MT_create_thread(MT_Id *t, void (*f) (vo
TRC_DEBUG(THRD, "Create thread \"%s\"\n", threadname);
_______________________________________________
checkin-list mailing list -- [email protected]
To unsubscribe send an email to [email protected]