On Tue, 28 Jun 2022, adr wrote:
It's just to play with it, note that onnote should be just passed
once. I'll post another patch if things work ok.
Here it is. I'm doing sysfatal when malloc fails to be coherent
with the rest of libthread, but I don't think that's a good approach.
When those functions fail to make an allocation they should return
an error so the program could take an action depending on the error.
For example, if there is no sufficient memory at a time, the program
could wait until there are enough resources to spawn a new thread.
adr
--- /tmp/main.c
+++ /sys/src/libthread/main.c
@@ -28,6 +28,10 @@
_qlockinit(_threadrendezvous);
_sysfatal = _threadsysfatal;
__assert = _threadassert;
+ onnote = mallocz(PPCHUNK*sizeof(uintptr), 1);
+ if(!onnote)
+ sysfatal("Malloc of size %d failed: %r",
PPCHUNK*sizeof(uintptr));
+ onnotesize = PPCHUNK;
notify(_threadnote);
if(mainstacksize == 0)
mainstacksize = 8*1024;
--- /tmp/note.c
+++ /sys/src/libthread/note.c
@@ -5,7 +5,6 @@
int _threadnopasser;
-#define NFN 33
#define ERRLEN 48
typedef struct Note Note;
struct Note
@@ -17,62 +16,157 @@
static Note notes[128];
static Note *enotes = notes+nelem(notes);
-static int (*onnote[NFN])(void*, char*);
-static int onnotepid[NFN];
+Onnote **onnote;
+int onnotesize;
+static int (*onnoteall[NFN])(void*, char*);
static Lock onnotelock;
int
threadnotify(int (*f)(void*, char*), int in)
{
- int i, topid;
- int (*from)(void*, char*), (*to)(void*, char*);
+ int i, j, n;
- if(in){
- from = nil;
- to = f;
- topid = _threadgetproc()->pid;
- }else{
- from = f;
- to = nil;
- topid = 0;
- }
lock(&onnotelock);
- for(i=0; i<NFN; i++)
- if(onnote[i]==from){
- onnote[i] = to;
- onnotepid[i] = topid;
+
+ /* add note for all processes */
+ if(in < -1){
+ n = -1;
+ for(i=0; i<NFN; i++){
+ if(onnoteall[i] == f){
+ unlock(&onnotelock);
+ return 1;
+ }
+ if(onnoteall[i]==nil && n==-1)
+ n = i;
+ }
+ if(n > -1)
+ onnoteall[n] = f;
+ unlock(&onnotelock);
+ return n>-1;
+ }
+
+ /* remove note for all processes */
+ if(in == -1){
+ for(i=0; i<NFN; i++)
+ if(onnoteall[i] == f){
+ onnoteall[i] = nil;
+ break;
+ }
+ unlock(&onnotelock);
+ return i<NFN;
+ }
+
+ /* remove note for current process */
+ if(!in){
+ for(i=0; i<onnotesize; i++){
+ if(onnote[i]!=nil &&
onnote[i]->pid==_threadgetproc()->pid){
+ for(j=0; j<NFN; j++){
+ if(onnote[i]->fn[j] == f){
+ onnote[i]->fn[j] = 0;
+ break;
+ }
+ }
+ unlock(&onnotelock);
+ return j<NFN;
+ }
+ }
+ unlock(&onnotelock);
+ return 0;
+ }
+
+ /* add note for current process */
+ n = -1;
+ for(i=0; i<onnotesize; i++){
+ /* process has already a slot */
+ if(onnote[i] && onnote[i]->pid==_threadgetproc()->pid){
+ n = -1;
+ for(j=0; j<NFN; j++){
+ if(onnote[i]->fn[j] == f){
+ unlock(&onnotelock);
+ return 1;
+ }
+ if(onnote[i]->fn[j]==nil && n==-1)
+ n = j;
+ }
+ if(n > -1)
+ onnote[i]->fn[n] = f;
+ unlock(&onnotelock);
+ return n>-1;
+ }
+ /* there is a free slot */
+ if(!onnote[i] && n==-1)
+ n = i;
+ }
+ /* there is no free slot */
+ if(n == -1){
+ onnotesize += PPCHUNK;
+ onnote = realloc(onnote, onnotesize*sizeof(uintptr));
+ if(!onnote){
+ unlock(&onnotelock);
+ sysfatal("Malloc of size %d failed: %r",
onnotesize*sizeof(uintptr));
+ }
+ memset(onnote+i+1, 0, PPCHUNK-1);
+ n = i;
+ }
+ onnote[n]=mallocz(sizeof(Onnote), 1);
+ if(!onnote[n]){
+ unlock(&onnotelock);
+ sysfatal("Malloc of size %d failed: %r", sizeof(Onnote));
+ }
+ onnote[n]->pid = _threadgetproc()->pid;
+ onnote[n]->fn[0] = f;
+ unlock(&onnotelock);
+ return 1;
+}
+
+void
+threadcancelnotes(int pid)
+{
+ int i;
+
+ lock(&onnotelock);
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==pid){
+ free(onnote[i]);
+ onnote[i] = nil;
break;
}
unlock(&onnotelock);
- return i<NFN;
+ return;
}
static void
delayednotes(Proc *p, void *v)
{
- int i;
+ int i, j, all;
Note *n;
- char s[ERRMAX];
- int (*fn)(void*, char*);
+ int (*f)(void*, char*);
if(!p->pending)
return;
p->pending = 0;
+ all = j = 0;
for(n=notes; n<enotes; n++){
if(n->proc == p){
- strcpy(s, n->s);
- n->proc = nil;
- unlock(&n->inuse);
-
- for(i=0; i<NFN; i++){
- if(onnotepid[i]!=p->pid || (fn =
onnote[i])==nil)
- continue;
- if((*fn)(v, s))
- break;
+ for(i=0; i<NFN; i++)
+ if(f=onnoteall[i])
+ if((*f)(v, n->s)){
+ all = 1;
+ break;
+ }
+ if(!all){
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==p->pid){
+ for(j=0; j<NFN; j++)
+ if(f=onnote[i]->fn[j])
+ if((*f)(v,
n->s))
+ break;
+ break;
+ }
}
- if(i==NFN){
- _threaddebug(DBGNOTE, "Unhandled note %s, proc
%p", n->s, p);
+ if(!all && (i==onnotesize || j==NFN)){
+ _threaddebug(DBGNOTE, "Unhandled note %s, proc
%p\n", n->s, p);
if(v != nil)
noted(NDFLT);
else if(strncmp(n->s, "sys:", 4)==0)
@@ -79,6 +173,8 @@
abort();
threadexitsall(n->s);
}
+ n->proc = nil;
+ unlock(&n->inuse);
}
}
}
@@ -94,7 +190,7 @@
noted(NDFLT);
if(_threadexitsallstatus){
- _threaddebug(DBGNOTE, "Threadexitsallstatus = '%s'",
_threadexitsallstatus);
+ _threaddebug(DBGNOTE, "Threadexitsallstatus = '%s'\n",
_threadexitsallstatus);
_exits(_threadexitsallstatus);
}
--- /tmp/sched.c
+++ /sys/src/libthread/sched.c
@@ -157,6 +157,7 @@
t = runthread(p);
if(t == nil){
_threaddebug(DBGSCHED, "all threads gone; exiting");
+ threadcancelnotes(p->pid);
unlinkproc(p);
_schedexit(p); /* frees proc */
}
--- /tmp/thread.h
+++ /sys/include/thread.h
@@ -97,6 +97,7 @@
void threadkillgrp(int); /* kill threads in group */
void threadmain(int argc, char *argv[]);
int threadnotify(int (*f)(void*, char*), int in);
+void threadcancelnotes(int pid);
int threadid(void);
int threadpid(int);
int threadsetgrp(int); /* set thread group, return old */
--- /tmp/threadimpl.h
+++ /sys/src/libthread/threadimpl.h
@@ -192,3 +192,15 @@
#define _threaddebug(flag, ...) if((_threaddebuglevel&(flag))==0){}else
_threadprint(__VA_ARGS__)
#define ioproc_arg(io, type) (va_arg((io)->arg, type))
+
+#define PPCHUNK 100
+#define NFN 33
+typedef struct Onnote Onnote;
+struct Onnote
+{
+ int pid;
+ int (*fn[NFN])(void*, char*);
+};
+extern Onnote **onnote;
+extern int onnotesize;
+void _threadnote(void*, char*);
------------------------------------------
9fans: 9fans
Permalink:
https://9fans.topicbox.com/groups/9fans/Tfa6823048ad90a21-M0bb660408ef52d611ffd55ab
Delivery options: https://9fans.topicbox.com/groups/9fans/subscription
--- /tmp/main.c
+++ /sys/src/libthread/main.c
@@ -28,6 +28,10 @@
_qlockinit(_threadrendezvous);
_sysfatal = _threadsysfatal;
__assert = _threadassert;
+ onnote = mallocz(PPCHUNK*sizeof(uintptr), 1);
+ if(!onnote)
+ sysfatal("Malloc of size %d failed: %r", PPCHUNK*sizeof(uintptr));
+ onnotesize = PPCHUNK;
notify(_threadnote);
if(mainstacksize == 0)
mainstacksize = 8*1024;
--- /tmp/note.c
+++ /sys/src/libthread/note.c
@@ -5,7 +5,6 @@
int _threadnopasser;
-#define NFN 33
#define ERRLEN 48
typedef struct Note Note;
struct Note
@@ -17,62 +16,157 @@
static Note notes[128];
static Note *enotes = notes+nelem(notes);
-static int (*onnote[NFN])(void*, char*);
-static int onnotepid[NFN];
+Onnote **onnote;
+int onnotesize;
+static int (*onnoteall[NFN])(void*, char*);
static Lock onnotelock;
int
threadnotify(int (*f)(void*, char*), int in)
{
- int i, topid;
- int (*from)(void*, char*), (*to)(void*, char*);
+ int i, j, n;
- if(in){
- from = nil;
- to = f;
- topid = _threadgetproc()->pid;
- }else{
- from = f;
- to = nil;
- topid = 0;
- }
lock(&onnotelock);
- for(i=0; i<NFN; i++)
- if(onnote[i]==from){
- onnote[i] = to;
- onnotepid[i] = topid;
+
+ /* add note for all processes */
+ if(in < -1){
+ n = -1;
+ for(i=0; i<NFN; i++){
+ if(onnoteall[i] == f){
+ unlock(&onnotelock);
+ return 1;
+ }
+ if(onnoteall[i]==nil && n==-1)
+ n = i;
+ }
+ if(n > -1)
+ onnoteall[n] = f;
+ unlock(&onnotelock);
+ return n>-1;
+ }
+
+ /* remove note for all processes */
+ if(in == -1){
+ for(i=0; i<NFN; i++)
+ if(onnoteall[i] == f){
+ onnoteall[i] = nil;
+ break;
+ }
+ unlock(&onnotelock);
+ return i<NFN;
+ }
+
+ /* remove note for current process */
+ if(!in){
+ for(i=0; i<onnotesize; i++){
+ if(onnote[i]!=nil && onnote[i]->pid==_threadgetproc()->pid){
+ for(j=0; j<NFN; j++){
+ if(onnote[i]->fn[j] == f){
+ onnote[i]->fn[j] = 0;
+ break;
+ }
+ }
+ unlock(&onnotelock);
+ return j<NFN;
+ }
+ }
+ unlock(&onnotelock);
+ return 0;
+ }
+
+ /* add note for current process */
+ n = -1;
+ for(i=0; i<onnotesize; i++){
+ /* process has already a slot */
+ if(onnote[i] && onnote[i]->pid==_threadgetproc()->pid){
+ n = -1;
+ for(j=0; j<NFN; j++){
+ if(onnote[i]->fn[j] == f){
+ unlock(&onnotelock);
+ return 1;
+ }
+ if(onnote[i]->fn[j]==nil && n==-1)
+ n = j;
+ }
+ if(n > -1)
+ onnote[i]->fn[n] = f;
+ unlock(&onnotelock);
+ return n>-1;
+ }
+ /* there is a free slot */
+ if(!onnote[i] && n==-1)
+ n = i;
+ }
+ /* there is no free slot */
+ if(n == -1){
+ onnotesize += PPCHUNK;
+ onnote = realloc(onnote, onnotesize*sizeof(uintptr));
+ if(!onnote){
+ unlock(&onnotelock);
+ sysfatal("Malloc of size %d failed: %r", onnotesize*sizeof(uintptr));
+ }
+ memset(onnote+i+1, 0, PPCHUNK-1);
+ n = i;
+ }
+ onnote[n]=mallocz(sizeof(Onnote), 1);
+ if(!onnote[n]){
+ unlock(&onnotelock);
+ sysfatal("Malloc of size %d failed: %r", sizeof(Onnote));
+ }
+ onnote[n]->pid = _threadgetproc()->pid;
+ onnote[n]->fn[0] = f;
+ unlock(&onnotelock);
+ return 1;
+}
+
+void
+threadcancelnotes(int pid)
+{
+ int i;
+
+ lock(&onnotelock);
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==pid){
+ free(onnote[i]);
+ onnote[i] = nil;
break;
}
unlock(&onnotelock);
- return i<NFN;
+ return;
}
static void
delayednotes(Proc *p, void *v)
{
- int i;
+ int i, j, all;
Note *n;
- char s[ERRMAX];
- int (*fn)(void*, char*);
+ int (*f)(void*, char*);
if(!p->pending)
return;
p->pending = 0;
+ all = j = 0;
for(n=notes; n<enotes; n++){
if(n->proc == p){
- strcpy(s, n->s);
- n->proc = nil;
- unlock(&n->inuse);
-
- for(i=0; i<NFN; i++){
- if(onnotepid[i]!=p->pid || (fn = onnote[i])==nil)
- continue;
- if((*fn)(v, s))
- break;
+ for(i=0; i<NFN; i++)
+ if(f=onnoteall[i])
+ if((*f)(v, n->s)){
+ all = 1;
+ break;
+ }
+ if(!all){
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==p->pid){
+ for(j=0; j<NFN; j++)
+ if(f=onnote[i]->fn[j])
+ if((*f)(v, n->s))
+ break;
+ break;
+ }
}
- if(i==NFN){
- _threaddebug(DBGNOTE, "Unhandled note %s, proc %p", n->s, p);
+ if(!all && (i==onnotesize || j==NFN)){
+ _threaddebug(DBGNOTE, "Unhandled note %s, proc %p\n", n->s, p);
if(v != nil)
noted(NDFLT);
else if(strncmp(n->s, "sys:", 4)==0)
@@ -79,6 +173,8 @@
abort();
threadexitsall(n->s);
}
+ n->proc = nil;
+ unlock(&n->inuse);
}
}
}
@@ -94,7 +190,7 @@
noted(NDFLT);
if(_threadexitsallstatus){
- _threaddebug(DBGNOTE, "Threadexitsallstatus = '%s'", _threadexitsallstatus);
+ _threaddebug(DBGNOTE, "Threadexitsallstatus = '%s'\n", _threadexitsallstatus);
_exits(_threadexitsallstatus);
}
--- /tmp/sched.c
+++ /sys/src/libthread/sched.c
@@ -157,6 +157,7 @@
t = runthread(p);
if(t == nil){
_threaddebug(DBGSCHED, "all threads gone; exiting");
+ threadcancelnotes(p->pid);
unlinkproc(p);
_schedexit(p); /* frees proc */
}
--- /tmp/thread.h
+++ /sys/include/thread.h
@@ -97,6 +97,7 @@
void threadkillgrp(int); /* kill threads in group */
void threadmain(int argc, char *argv[]);
int threadnotify(int (*f)(void*, char*), int in);
+void threadcancelnotes(int pid);
int threadid(void);
int threadpid(int);
int threadsetgrp(int); /* set thread group, return old */
--- /tmp/threadimpl.h
+++ /sys/src/libthread/threadimpl.h
@@ -192,3 +192,15 @@
#define _threaddebug(flag, ...) if((_threaddebuglevel&(flag))==0){}else _threadprint(__VA_ARGS__)
#define ioproc_arg(io, type) (va_arg((io)->arg, type))
+
+#define PPCHUNK 100
+#define NFN 33
+typedef struct Onnote Onnote;
+struct Onnote
+{
+ int pid;
+ int (*fn[NFN])(void*, char*);
+};
+extern Onnote **onnote;
+extern int onnotesize;
+void _threadnote(void*, char*);