--- ext2fs/ext2fs.c | 18 +++++++ ext2fs/ext2fs.h | 6 +++ ext2fs/pager.c | 35 +++++++++++++- fatfs/fatfs.h | 2 + fatfs/pager.c | 35 +++++++++++++- libdiskfs/disk-pager.c | 3 +- libdiskfs/diskfs-pager.h | 1 + libpager/demuxer.c | 122 +++++++++++++++++++++++++++++++++++++++++++---- libpager/pager.h | 23 ++++++++- 9 files changed, 230 insertions(+), 15 deletions(-)
diff --git a/ext2fs/ext2fs.c b/ext2fs/ext2fs.c index d0fdfe7..200c210 100644 --- a/ext2fs/ext2fs.c +++ b/ext2fs/ext2fs.c @@ -207,10 +207,28 @@ main (int argc, char **argv) error_t diskfs_reload_global_state () { + error_t err; + pokel_flush (&global_pokel); pager_flush (diskfs_disk_pager, 1); + + /* Paging must specifically be inhibited; if not, a paging request + could be handled while sblock is still NULL. + While some RPCs are inhibited when this function is called by + libdiskfs, paging RPCs are still enabled. Even if we were to + inhibit paging RPCs, libpager has its own pool of workers to handle + requests asynchronously, which libports is unaware of, so requests + can be handled even after the relevant RPCs are disabled. This is + all dealt with by {inhibit,resume}_disk_pager. */ + err = inhibit_disk_pager (); + if (err) + return err; + sblock = NULL; get_hypermetadata (); map_hypermetadata (); + + resume_disk_pager (); + return 0; } diff --git a/ext2fs/ext2fs.h b/ext2fs/ext2fs.h index 96d8e9d..d5f400e 100644 --- a/ext2fs/ext2fs.h +++ b/ext2fs/ext2fs.h @@ -201,6 +201,12 @@ struct user_pager_info /* Set up the disk pager. */ void create_disk_pager (void); +/* Inhibit the disk pager. */ +error_t inhibit_disk_pager (void); + +/* Resume the disk pager. */ +void resume_disk_pager (void); + /* Call this when we should turn off caching so that unused memory object ports get freed. */ void drop_pager_softrefs (struct node *node); diff --git a/ext2fs/pager.c b/ext2fs/pager.c index b56c923..f41107f 100644 --- a/ext2fs/pager.c +++ b/ext2fs/pager.c @@ -34,6 +34,10 @@ struct port_bucket *disk_pager_bucket; /* A ports bucket to hold file pager ports. */ struct port_bucket *file_pager_bucket; +/* Stores a reference to the requests instance used by the file pager so its + worker threads can be inhibited and resumed. */ +struct requests *file_pager_requests; + pthread_spinlock_t node_to_page_lock = PTHREAD_SPINLOCK_INITIALIZER; @@ -1217,11 +1221,40 @@ create_disk_pager (void) file_pager_bucket = ports_create_bucket (); /* Start libpagers worker threads. */ - err = pager_start_workers (file_pager_bucket); + err = pager_start_workers (&file_pager_requests, file_pager_bucket); if (err) ext2_panic ("can't create libpager worker threads: %s", strerror (err)); } +error_t +inhibit_disk_pager (void) +{ + error_t err; + + /* Inhibiting RPCs is not sufficient, nor is it in fact necessary. + Since libpager has its own pool of workers, requests can still be + handled after RPCs have been inhibited, so pager_inhibit_workers + must be used. In fact, RPCs will not be inhibited; the requests + will just queue up inside libpager, and will be handled once the + workers are resumed. + The file pager can rely on the disk pager, so inhibit the file + pager first. */ + + err = pager_inhibit_workers (file_pager_requests); + if (err) + return err; + + err = pager_inhibit_workers (diskfs_disk_pager_requests); + return err; +} + +void +resume_disk_pager (void) +{ + pager_resume_workers (diskfs_disk_pager_requests); + pager_resume_workers (file_pager_requests); +} + /* Call this to create a FILE_DATA pager and return a send right. NODE must be locked. */ mach_port_t diff --git a/fatfs/fatfs.h b/fatfs/fatfs.h index 3c3d836..54c3426 100644 --- a/fatfs/fatfs.h +++ b/fatfs/fatfs.h @@ -121,6 +121,8 @@ extern struct dirrect dr_root_node; void drop_pager_softrefs (struct node *); void allow_pager_softrefs (struct node *); void create_fat_pager (void); +error_t inhibit_fat_pager (void); +void resume_fat_pager (void); void flush_node_pager (struct node *node); diff --git a/fatfs/pager.c b/fatfs/pager.c index 10d1fc9..3d860d1 100644 --- a/fatfs/pager.c +++ b/fatfs/pager.c @@ -29,6 +29,10 @@ struct port_bucket *disk_pager_bucket; /* A ports bucket to hold file pager ports. */ struct port_bucket *file_pager_bucket; +/* Stores a reference to the requests instance used by the file pager so its + worker threads can be inhibited and resumed. */ +struct requests *file_pager_requests; + /* Mapped image of the FAT. */ void *fat_image; @@ -776,11 +780,40 @@ create_fat_pager (void) file_pager_bucket = ports_create_bucket (); /* Start libpagers worker threads. */ - err = pager_start_workers (file_pager_bucket); + err = pager_start_workers (&file_pager_requests, file_pager_bucket); if (err) error (2, err, "can't create libpager worker threads"); } +error_t +inhibit_fat_pager (void) +{ + error_t err; + + /* Inhibiting RPCs is not sufficient, nor is it in fact necessary. + Since libpager has its own pool of workers, requests can still be + handled after RPCs have been inhibited, so pager_inhibit_workers + must be used. In fact, RPCs will not be inhibited; the requests + will just queue up inside libpager, and will be handled once the + workers are resumed. + The file pager can rely on the disk pager, so inhibit the file + pager first. */ + + err = pager_inhibit_workers (file_pager_requests); + if (err) + return err; + + err = pager_inhibit_workers (diskfs_disk_pager_requests); + return err; +} + +void +resume_fat_pager (void) +{ + pager_resume_workers (diskfs_disk_pager_requests); + pager_resume_workers (file_pager_requests); +} + /* Call this to create a FILE_DATA pager and return a send right. NODE must be locked. */ mach_port_t diff --git a/libdiskfs/disk-pager.c b/libdiskfs/disk-pager.c index 008aa2d..2cb33d4 100644 --- a/libdiskfs/disk-pager.c +++ b/libdiskfs/disk-pager.c @@ -24,6 +24,7 @@ __thread struct disk_image_user *diskfs_exception_diu; struct pager *diskfs_disk_pager; +struct requests *diskfs_disk_pager_requests; static void fault_handler (int sig, long int sigcode, struct sigcontext *scp); static struct hurd_signal_preemptor preemptor = @@ -43,7 +44,7 @@ diskfs_start_disk_pager (struct user_pager_info *upi, mach_port_t disk_pager_port; /* Start libpagers worker threads. */ - err = pager_start_workers (pager_bucket); + err = pager_start_workers (&diskfs_disk_pager_requests, pager_bucket); if (err) error (2, err, "creating pager worker threads failed"); diff --git a/libdiskfs/diskfs-pager.h b/libdiskfs/diskfs-pager.h index a253069..db99f9ff 100644 --- a/libdiskfs/diskfs-pager.h +++ b/libdiskfs/diskfs-pager.h @@ -40,6 +40,7 @@ extern void diskfs_start_disk_pager (struct user_pager_info *info, size_t size, void **image); extern struct pager *diskfs_disk_pager; +extern struct requests *diskfs_disk_pager_requests; struct disk_image_user { diff --git a/libpager/demuxer.c b/libpager/demuxer.c index 4dd3cd8..524705c 100644 --- a/libpager/demuxer.c +++ b/libpager/demuxer.c @@ -71,9 +71,16 @@ struct worker struct requests { struct port_bucket *bucket; - struct queue queue; + /* Normally, both queues are the same. However, when the workers are + inhibited, a new queue_in is created, but queue_out is left as the + old value, so the workers drain queue_out but do not receive new + requests. */ + struct queue *queue_in; /* the queue to add to */ + struct queue *queue_out; /* the queue to take from */ int asleep; pthread_cond_t wakeup; + pthread_cond_t inhibit_wakeup; + pthread_cond_t resume_wakeup; pthread_mutex_t lock; struct worker workers[WORKER_COUNT]; }; @@ -108,10 +115,10 @@ pager_demuxer (struct requests *requests, pthread_mutex_lock (&requests->lock); - queue_enqueue (&requests->queue, &r->item); + queue_enqueue (requests->queue_in, &r->item); - /* Awake worker. */ - if (requests->asleep > 0) + /* Awake worker, but only if not inhibited. */ + if (requests->asleep > 0 && requests->queue_in == requests->queue_out) pthread_cond_signal (&requests->wakeup); pthread_mutex_unlock (&requests->lock); @@ -186,9 +193,11 @@ worker_func (void *arg) get_request_locked: /* ... get a request from the global queue instead. */ - while ((r = queue_dequeue (&requests->queue)) == NULL) + while ((r = queue_dequeue (requests->queue_out)) == NULL) { requests->asleep += 1; + if (requests->asleep == WORKER_COUNT) + pthread_cond_broadcast (&requests->inhibit_wakeup); pthread_cond_wait (&requests->wakeup, &requests->lock); requests->asleep -= 1; } @@ -298,27 +307,48 @@ service_paging_requests (void *arg) /* Start the worker threads libpager uses to service requests. */ error_t -pager_start_workers (struct port_bucket *pager_bucket) +pager_start_workers (struct requests **out_requests, + struct port_bucket *pager_bucket) { error_t err; int i; pthread_t t; struct requests *requests; + if (out_requests == NULL) + /* Return rather than using goto done, since that would dereference + out_requests. */ + return EINVAL; + requests = malloc (sizeof *requests); if (requests == NULL) - return ENOMEM; + { + err = ENOMEM; + goto done; + } requests->bucket = pager_bucket; requests->asleep = 0; - queue_init (&requests->queue); + + requests->queue_in = malloc (sizeof *requests->queue_in); + if (requests->queue_in == NULL) + { + err = ENOMEM; + goto done; + } + queue_init (requests->queue_in); + /* Until the workers are inhibited, both queues are the same. */ + requests->queue_out = requests->queue_in; + pthread_cond_init (&requests->wakeup, NULL); + pthread_cond_init (&requests->inhibit_wakeup, NULL); + pthread_cond_init (&requests->resume_wakeup, NULL); pthread_mutex_init (&requests->lock, NULL); /* Make a thread to service paging requests. */ err = pthread_create (&t, NULL, service_paging_requests, requests); if (err) - return err; + goto done; pthread_detach (t); for (i = 0; i < WORKER_COUNT; i++) @@ -329,9 +359,81 @@ pager_start_workers (struct port_bucket *pager_bucket) err = pthread_create (&t, NULL, &worker_func, &requests->workers[i]); if (err) - return err; + goto done; pthread_detach (t); } +done: + if (err) + *out_requests = NULL; + else + *out_requests = requests; + return err; } + +error_t +pager_inhibit_workers (struct requests *requests) +{ + error_t err; + + pthread_mutex_lock (&requests->lock); + + /* Check the workers are not already inhibited nor in the process of + being inhibited, and only create a new queue if necessary; + otherwise the queued requests would be discarded, and queue_out + would be leaked. */ + if (requests->queue_out == requests->queue_in) + { + /* Any new paging requests will go into a new queue. */ + struct queue *new_queue = malloc (sizeof *new_queue); + if (new_queue == NULL) + { + err = ENOMEM; + goto done_locked; + } + queue_init (new_queue); + requests->queue_in = new_queue; + } + + /* Wait until all the workers are asleep, as then the old queue and + all individual worker queues have been drained. */ + while (requests->asleep < WORKER_COUNT) + pthread_cond_wait (&requests->inhibit_wakeup, &requests->lock); + + pthread_cond_broadcast (&requests->resume_wakeup); + +done_locked: + pthread_mutex_unlock (&requests->lock); + return err; +} + +void +pager_resume_workers (struct requests *requests) +{ + pthread_mutex_lock (&requests->lock); + + /* Check the workers are inhibited/being inhibited. */ + if (requests->queue_out != requests->queue_in) + { + /* If the inhibiting has not yet finished (the old queue has not + drained), wait for it to do so. */ + while (requests->asleep < WORKER_COUNT) + pthread_cond_wait (&requests->resume_wakeup, &requests->lock); + + /* Another resume may have run in the meantime, in which case the + old queue has already been freed, so queue_out should not be + freed and updated to be queue_in. */ + if (requests->queue_out != requests->queue_in) + { + /* The queue has been drained and will no longer be used. */ + free (requests->queue_out); + requests->queue_out = requests->queue_in; + /* We need to wake up all workers, as there could be multiple + requests in the new queue. */ + pthread_cond_broadcast (&requests->wakeup); + } + } + + pthread_mutex_unlock (&requests->lock); +} diff --git a/libpager/pager.h b/libpager/pager.h index fe34238..8a99c30 100644 --- a/libpager/pager.h +++ b/libpager/pager.h @@ -25,8 +25,27 @@ scope. */ struct user_pager_info; -/* Start the worker threads libpager uses to service requests. */ -error_t pager_start_workers (struct port_bucket *pager_bucket); +struct requests; + +/* Start the worker threads libpager uses to service requests. If no + error is returned, *requests will be a valid pointer, else it will be + set to NULL. Returns EINVAL if requests is NULL. */ +error_t +pager_start_workers (struct requests **requests, + struct port_bucket *pager_bucket); + +/* Inhibits the worker threads libpager uses to service requests, + blocking until all requests sent before this function is called have + finished. Note that RPCs will not be inhibited, so new requests will + queue up, but will not be handled until the workers are resumed. If + RPCs should be inhibited as well, call ports_inhibit_bucket_rpcs with + the bucket used to create the workers before calling this. */ +error_t +pager_inhibit_workers (struct requests *requests); + +/* Resumes the worker threads libpager uses to service requests. */ +void +pager_resume_workers (struct requests *requests); /* Create a new pager. The pager will have a port created for it (using libports, in BUCKET) and will be immediately ready -- 2.4.6