Andrey, if you want to use different note handlers per process (with a big
number of processes) using libthread, this may be helpful.
The idea is this:
An array of handlers for all processes which can be changed by all processes.
When a note is received by a process, this array takes priority.
An array of pointers to structures of the type
struct Onnote
{
int pid;
int (*fn[NFN])(void*, char*);
};
initially of size PPCHUNK (I set it to 100, experiment with that),
but it can grow if necessary (but not shrink, I think this would
be overkilling).
These structures are allocated the first time a process record a
handler and freed when the process exits (or by calling
threadcancelnotes(), note that this function can free other processes'
function handlers, maybe should be better to make some restrictions)
The use of "in" in threadnotify(int (*f)(void*, char*), int in) is:
in > 0 : set the handler for the calling process.
in == 0 : clear the handler for the calling process.
in == -1 : clear the handler for all processes (except those who has
registered it already for themselves).
in < -1 : set the handler for all processes.
There is no use of threadnotify with "in < 0" in /sys/src, so nothing is broken.
As you are using 9front and they are serving their sources with
9p, here is a diff to their sources. I haven't compiled it in
9front, though. Note that if you want to compile the system with
this changes, you have to eliminate the copy of note.c at
/sys/src/cmd/execnet (it seems that note.c was added afterwards as
I thought).
I haven't test it too much, this has been more like a time-destroyer
pastime.
adr
--- /tmp/main.c
+++ /sys/src/libthread/main.c
@@ -28,6 +28,10 @@
_qlockinit(_threadrendezvous);
_sysfatal = _threadsysfatal;
__assert = _threadassert;
+ onnote = mallocz(PPCHUNK*sizeof(uintptr), 1);
+ if(!onnote)
+ sysfatal("Malloc of size %d failed: %r",
PPCHUNK*sizeof(uintptr));
+ onnotesize = PPCHUNK;
notify(_threadnote);
if(mainstacksize == 0)
mainstacksize = 8*1024;
--- /tmp/note.c
+++ /sys/src/libthread/note.c
@@ -5,9 +5,9 @@
int _threadnopasser;
-#define NFN 33
#define ERRLEN 48
typedef struct Note Note;
+
struct Note
{
Lock inuse;
@@ -17,62 +17,155 @@
static Note notes[128];
static Note *enotes = notes+nelem(notes);
-static int (*onnote[NFN])(void*, char*);
-static int onnotepid[NFN];
+Onnote **onnote;
+int onnotesize;
+static int (*onnoteall[NFN])(void*, char*);
static Lock onnotelock;
int
threadnotify(int (*f)(void*, char*), int in)
{
- int i, topid;
- int (*from)(void*, char*), (*to)(void*, char*);
+ int i, j;
- if(in){
- from = nil;
- to = f;
- topid = _threadgetproc()->pid;
- }else{
- from = f;
- to = nil;
- topid = 0;
- }
lock(&onnotelock);
- for(i=0; i<NFN; i++)
- if(onnote[i]==from){
- onnote[i] = to;
- onnotepid[i] = topid;
+
+ /* add note for all processes */
+ if(in < -1){
+ for(i=0; i<NFN; i++)
+ if(onnoteall[i] == f){
+ unlock(&onnotelock);
+ return 1;
+ }
+ for(i=0; i<NFN; i++)
+ if(onnoteall[i] == nil){
+ onnoteall[i] = f;
+ break;
+ }
+ unlock(&onnotelock);
+ return i<NFN;
+ }
+
+ /* remove note for all processes */
+ if(in == -1){
+ for(i=0; i<NFN; i++)
+ if(onnoteall[i] == f){
+ onnoteall[i] = nil;
+ break;
+ }
+ unlock(&onnotelock);
+ return i<NFN;
+ }
+
+ /* remove note for current process */
+ if(!in){
+ for(i=0; i<onnotesize; i++){
+ if(onnote[i]!=nil &&
onnote[i]->pid==_threadgetproc()->pid){
+ for(j=0; j<NFN; j++){
+ if(onnote[i]->fn[j] == f){
+ onnote[i]->fn[j] = 0;
+ break;
+ }
+ }
+ unlock(&onnotelock);
+ return j<NFN;
+ }
+ }
+ unlock(&onnotelock);
+ return i<onnotesize;
+ }
+
+ /* add note for current process */
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==_threadgetproc()->pid)
+ break;
+
+ /* process has already a slot */
+ if(i < onnotesize){
+ for(j=0; j<NFN; j++){
+ if(onnote[i]->fn[j] == nil){
+ onnote[i]->fn[j] = f;
+ break;
+ }
+ }
+ unlock(&onnotelock);
+ return j<NFN;
+
+ }
+
+ for(i=0; i<onnotesize; i++)
+ if(!onnote[i])
+ break;
+
+ /* there is no free slot */
+ if(i == onnotesize){
+ onnotesize += PPCHUNK;
+ onnote = realloc(onnote, onnotesize*sizeof(uintptr));
+ if(!onnote){
+ unlock(&onnotelock);
+ sysfatal("Malloc of size %d failed: %r",
onnotesize*sizeof(uintptr));
+ }
+ memset(onnote+i+1, 0, PPCHUNK-1);
+ }
+
+ onnote[i]=mallocz(sizeof(Onnote), 1);
+ if(!onnote[i]){
+ unlock(&onnotelock);
+ sysfatal("Malloc of size %d failed: %r", sizeof(Onnote));
+ }
+ onnote[i]->pid = _threadgetproc()->pid;
+ onnote[i]->fn[0] = f;
+ unlock(&onnotelock);
+ return 1;
+}
+
+void
+threadcancelnotes(int pid)
+{
+ int i;
+
+ lock(&onnotelock);
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==pid){
+ free(onnote[i]);
+ onnote[i] = nil;
break;
}
unlock(&onnotelock);
- return i<NFN;
+ return;
}
static void
delayednotes(Proc *p, void *v)
{
- int i;
+ int i, j, all;
Note *n;
- char s[ERRMAX];
- int (*fn)(void*, char*);
+ int (*f)(void*, char*);
if(!p->pending)
return;
p->pending = 0;
+ all = j = 0;
for(n=notes; n<enotes; n++){
if(n->proc == p){
- strcpy(s, n->s);
- n->proc = nil;
- unlock(&n->inuse);
-
- for(i=0; i<NFN; i++){
- if(onnotepid[i]!=p->pid || (fn =
onnote[i])==nil)
- continue;
- if((*fn)(v, s))
- break;
+ for(i=0; i<NFN; i++)
+ if(f=onnoteall[i])
+ if((*f)(v, n->s)){
+ all = 1;
+ break;
+ }
+ if(!all){
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==p->pid){
+ for(j=0; j<NFN; j++)
+ if(f=onnote[i]->fn[j])
+ if((*f)(v,
n->s))
+ break;
+ break;
+ }
}
- if(i==NFN){
- _threaddebug(DBGNOTE, "Unhandled note %s, proc
%p", n->s, p);
+ if(!all && (i==onnotesize || j==NFN)){
+ _threaddebug(DBGNOTE, "Unhandled note %s, proc
%p\n", n->s, p);
if(v != nil)
noted(NDFLT);
else if(strncmp(n->s, "sys:", 4)==0)
@@ -79,6 +172,8 @@
abort();
threadexitsall(n->s);
}
+ n->proc = nil;
+ unlock(&n->inuse);
}
}
}
@@ -94,7 +189,7 @@
noted(NDFLT);
if(_threadexitsallstatus){
- _threaddebug(DBGNOTE, "Threadexitsallstatus = '%s'",
_threadexitsallstatus);
+ _threaddebug(DBGNOTE, "Threadexitsallstatus = '%s'\n",
_threadexitsallstatus);
_exits(_threadexitsallstatus);
}
--- /tmp/sched.c
+++ /sys/src/libthread/sched.c
@@ -157,6 +157,7 @@
t = runthread(p);
if(t == nil){
_threaddebug(DBGSCHED, "all threads gone; exiting");
+ threadcancelnotes(p->pid);
unlinkproc(p);
_schedexit(p); /* frees proc */
}
--- /tmp/thread.h
+++ /sys/include/thread.h
@@ -97,6 +97,7 @@
void threadkillgrp(int); /* kill threads in group */
void threadmain(int argc, char *argv[]);
int threadnotify(int (*f)(void*, char*), int in);
+void threadcancelnotes(int pid);
int threadid(void);
int threadpid(int);
int threadsetgrp(int); /* set thread group, return old */
--- /tmp/threadimpl.h
+++ /sys/src/libthread/threadimpl.h
@@ -192,3 +192,15 @@
#define _threaddebug(flag, ...) if((_threaddebuglevel&(flag))==0){}else
_threadprint(__VA_ARGS__)
#define ioproc_arg(io, type) (va_arg((io)->arg, type))
+
+#define PPCHUNK 100
+#define NFN 33
+typedef struct Onnote Onnote;
+struct Onnote
+{
+ int pid;
+ int (*fn[NFN])(void*, char*);
+};
+extern Onnote **onnote;
+extern int onnotesize;
+void _threadnote(void*, char*);
------------------------------------------
9fans: 9fans
Permalink:
https://9fans.topicbox.com/groups/9fans/Tfa6823048ad90a21-M401cfa47db4a93cc273cac83
Delivery options: https://9fans.topicbox.com/groups/9fans/subscription
--- /tmp/main.c
+++ /sys/src/libthread/main.c
@@ -28,6 +28,10 @@
_qlockinit(_threadrendezvous);
_sysfatal = _threadsysfatal;
__assert = _threadassert;
+ onnote = mallocz(PPCHUNK*sizeof(uintptr), 1);
+ if(!onnote)
+ sysfatal("Malloc of size %d failed: %r", PPCHUNK*sizeof(uintptr));
+ onnotesize = PPCHUNK;
notify(_threadnote);
if(mainstacksize == 0)
mainstacksize = 8*1024;
--- /tmp/note.c
+++ /sys/src/libthread/note.c
@@ -5,9 +5,9 @@
int _threadnopasser;
-#define NFN 33
#define ERRLEN 48
typedef struct Note Note;
+
struct Note
{
Lock inuse;
@@ -17,62 +17,155 @@
static Note notes[128];
static Note *enotes = notes+nelem(notes);
-static int (*onnote[NFN])(void*, char*);
-static int onnotepid[NFN];
+Onnote **onnote;
+int onnotesize;
+static int (*onnoteall[NFN])(void*, char*);
static Lock onnotelock;
int
threadnotify(int (*f)(void*, char*), int in)
{
- int i, topid;
- int (*from)(void*, char*), (*to)(void*, char*);
+ int i, j;
- if(in){
- from = nil;
- to = f;
- topid = _threadgetproc()->pid;
- }else{
- from = f;
- to = nil;
- topid = 0;
- }
lock(&onnotelock);
- for(i=0; i<NFN; i++)
- if(onnote[i]==from){
- onnote[i] = to;
- onnotepid[i] = topid;
+
+ /* add note for all processes */
+ if(in < -1){
+ for(i=0; i<NFN; i++)
+ if(onnoteall[i] == f){
+ unlock(&onnotelock);
+ return 1;
+ }
+ for(i=0; i<NFN; i++)
+ if(onnoteall[i] == nil){
+ onnoteall[i] = f;
+ break;
+ }
+ unlock(&onnotelock);
+ return i<NFN;
+ }
+
+ /* remove note for all processes */
+ if(in == -1){
+ for(i=0; i<NFN; i++)
+ if(onnoteall[i] == f){
+ onnoteall[i] = nil;
+ break;
+ }
+ unlock(&onnotelock);
+ return i<NFN;
+ }
+
+ /* remove note for current process */
+ if(!in){
+ for(i=0; i<onnotesize; i++){
+ if(onnote[i]!=nil && onnote[i]->pid==_threadgetproc()->pid){
+ for(j=0; j<NFN; j++){
+ if(onnote[i]->fn[j] == f){
+ onnote[i]->fn[j] = 0;
+ break;
+ }
+ }
+ unlock(&onnotelock);
+ return j<NFN;
+ }
+ }
+ unlock(&onnotelock);
+ return i<onnotesize;
+ }
+
+ /* add note for current process */
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==_threadgetproc()->pid)
+ break;
+
+ /* process has already a slot */
+ if(i < onnotesize){
+ for(j=0; j<NFN; j++){
+ if(onnote[i]->fn[j] == nil){
+ onnote[i]->fn[j] = f;
+ break;
+ }
+ }
+ unlock(&onnotelock);
+ return j<NFN;
+
+ }
+
+ for(i=0; i<onnotesize; i++)
+ if(!onnote[i])
+ break;
+
+ /* there is no free slot */
+ if(i == onnotesize){
+ onnotesize += PPCHUNK;
+ onnote = realloc(onnote, onnotesize*sizeof(uintptr));
+ if(!onnote){
+ unlock(&onnotelock);
+ sysfatal("Malloc of size %d failed: %r", onnotesize*sizeof(uintptr));
+ }
+ memset(onnote+i+1, 0, PPCHUNK-1);
+ }
+
+ onnote[i]=mallocz(sizeof(Onnote), 1);
+ if(!onnote[i]){
+ unlock(&onnotelock);
+ sysfatal("Malloc of size %d failed: %r", sizeof(Onnote));
+ }
+ onnote[i]->pid = _threadgetproc()->pid;
+ onnote[i]->fn[0] = f;
+ unlock(&onnotelock);
+ return 1;
+}
+
+void
+threadcancelnotes(int pid)
+{
+ int i;
+
+ lock(&onnotelock);
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==pid){
+ free(onnote[i]);
+ onnote[i] = nil;
break;
}
unlock(&onnotelock);
- return i<NFN;
+ return;
}
static void
delayednotes(Proc *p, void *v)
{
- int i;
+ int i, j, all;
Note *n;
- char s[ERRMAX];
- int (*fn)(void*, char*);
+ int (*f)(void*, char*);
if(!p->pending)
return;
p->pending = 0;
+ all = j = 0;
for(n=notes; n<enotes; n++){
if(n->proc == p){
- strcpy(s, n->s);
- n->proc = nil;
- unlock(&n->inuse);
-
- for(i=0; i<NFN; i++){
- if(onnotepid[i]!=p->pid || (fn = onnote[i])==nil)
- continue;
- if((*fn)(v, s))
- break;
+ for(i=0; i<NFN; i++)
+ if(f=onnoteall[i])
+ if((*f)(v, n->s)){
+ all = 1;
+ break;
+ }
+ if(!all){
+ for(i=0; i<onnotesize; i++)
+ if(onnote[i] && onnote[i]->pid==p->pid){
+ for(j=0; j<NFN; j++)
+ if(f=onnote[i]->fn[j])
+ if((*f)(v, n->s))
+ break;
+ break;
+ }
}
- if(i==NFN){
- _threaddebug(DBGNOTE, "Unhandled note %s, proc %p", n->s, p);
+ if(!all && (i==onnotesize || j==NFN)){
+ _threaddebug(DBGNOTE, "Unhandled note %s, proc %p\n", n->s, p);
if(v != nil)
noted(NDFLT);
else if(strncmp(n->s, "sys:", 4)==0)
@@ -79,6 +172,8 @@
abort();
threadexitsall(n->s);
}
+ n->proc = nil;
+ unlock(&n->inuse);
}
}
}
@@ -94,7 +189,7 @@
noted(NDFLT);
if(_threadexitsallstatus){
- _threaddebug(DBGNOTE, "Threadexitsallstatus = '%s'", _threadexitsallstatus);
+ _threaddebug(DBGNOTE, "Threadexitsallstatus = '%s'\n", _threadexitsallstatus);
_exits(_threadexitsallstatus);
}
--- /tmp/sched.c
+++ /sys/src/libthread/sched.c
@@ -157,6 +157,7 @@
t = runthread(p);
if(t == nil){
_threaddebug(DBGSCHED, "all threads gone; exiting");
+ threadcancelnotes(p->pid);
unlinkproc(p);
_schedexit(p); /* frees proc */
}
--- /tmp/thread.h
+++ /sys/include/thread.h
@@ -97,6 +97,7 @@
void threadkillgrp(int); /* kill threads in group */
void threadmain(int argc, char *argv[]);
int threadnotify(int (*f)(void*, char*), int in);
+void threadcancelnotes(int pid);
int threadid(void);
int threadpid(int);
int threadsetgrp(int); /* set thread group, return old */
--- /tmp/threadimpl.h
+++ /sys/src/libthread/threadimpl.h
@@ -192,3 +192,15 @@
#define _threaddebug(flag, ...) if((_threaddebuglevel&(flag))==0){}else _threadprint(__VA_ARGS__)
#define ioproc_arg(io, type) (va_arg((io)->arg, type))
+
+#define PPCHUNK 100
+#define NFN 33
+typedef struct Onnote Onnote;
+struct Onnote
+{
+ int pid;
+ int (*fn[NFN])(void*, char*);
+};
+extern Onnote **onnote;
+extern int onnotesize;
+void _threadnote(void*, char*);