cvsuser     04/01/25 06:12:55

  Modified:    include/parrot tsq.h
               src      events.c tsq.c
  Log:
  event-handling-19
  * first steps towards signal and IO events
  * the io_thread waits on select, which gets either interrupted
    by unblocked signals or by an IO handle ready condition
  * installing signal handlers is still disabled
  
  Revision  Changes    Path
  1.10      +2 -1      parrot/include/parrot/tsq.h
  
  Index: tsq.h
  ===================================================================
  RCS file: /cvs/public/parrot/include/parrot/tsq.h,v
  retrieving revision 1.9
  retrieving revision 1.10
  diff -u -w -r1.9 -r1.10
  --- tsq.h     6 Jan 2004 15:58:57 -0000       1.9
  +++ tsq.h     25 Jan 2004 14:12:52 -0000      1.10
  @@ -1,7 +1,7 @@
   /* tsq.h
    *  Copyright: 2001-2003 The Perl Foundation.  All Rights Reserved.
    *  CVS Info
  - *     $Id: tsq.h,v 1.9 2004/01/06 15:58:57 leo Exp $
  + *     $Id: tsq.h,v 1.10 2004/01/25 14:12:52 leo Exp $
    *  Overview:
    *     Defines the thread-safe queue system
    *  Data Structure and Algorithms:
  @@ -45,6 +45,7 @@
   QUEUE_ENTRY *peek_entry(QUEUE *);
   QUEUE_ENTRY *wait_for_entry(QUEUE *);
   void push_entry(QUEUE *, QUEUE_ENTRY *);
  +void unshift_entry(QUEUE *, QUEUE_ENTRY *);
   void nosync_insert_entry(QUEUE *, QUEUE_ENTRY *);
   void insert_entry(QUEUE *, QUEUE_ENTRY *);
   void queue_lock(QUEUE *);
  
  
  
  1.22      +344 -47   parrot/src/events.c
  
  Index: events.c
  ===================================================================
  RCS file: /cvs/public/parrot/src/events.c,v
  retrieving revision 1.21
  retrieving revision 1.22
  diff -u -w -r1.21 -r1.22
  --- events.c  18 Jan 2004 09:52:23 -0000      1.21
  +++ events.c  25 Jan 2004 14:12:55 -0000      1.22
  @@ -1,7 +1,7 @@
   /* events.c
    *  Copyright: 2001-2003 The Perl Foundation.  All Rights Reserved.
    *  CVS Info
  - *     $Id: events.c,v 1.21 2004/01/18 09:52:23 leo Exp $
  + *     $Id: events.c,v 1.22 2004/01/25 14:12:55 leo Exp $
    *  Overview:
    *     Event handling stuff
    *  Data Structure and Algorithms:
  @@ -9,6 +9,8 @@
    *     When events are due, they are placed in per interpreter
    *     task_queues, where they are handled then by the check_event*
    *     opcodes.
  + *     IO events and signals are catched in the io_thread, which again
  + *     dispatches these to one or all interpreters.
    *  History:
    *  Notes:
    *  References:
  @@ -18,93 +20,203 @@
   #include "parrot/parrot.h"
   #include <assert.h>
   
  -/* forward def */
  +/*
  + * event debugging stuff
  + */
  +#define EVENT_DEBUG 0
  +/*
  + * not yet - need to sort out platform code and fix exceptions first
  + */
  +#define INSTALL_EVENT_HANDLER 0
  +
  +#if EVENT_DEBUG
  +#  define edebug(x) puts(x)
  +#else
  +#  define edebug(x)
  +#endif
  +
  +
  +/* forward defs */
   static void* event_thread(void *data);
  +static void* io_thread(void *data);
   static void* do_event(Parrot_Interp, parrot_event*, void*);
  +static void stop_io_thread(void);
  +static void schedule_signal_event(int signum);
  +void Parrot_schedule_broadcast_qentry(QUEUE_ENTRY* entry);
   
   /*
  - * we have exactly one event_queue
  + * we have exactly one global event_queue
  + * TODO task prio handling
    */
   static QUEUE* event_queue;
   #define TASK_PRIO 10
   
  +/*
  + * user accessible signals like SIGINT
  + */
  +#ifndef SIGINT
  +#  define SIGNIT -4711
  +#endif
  +static sig_atomic_t sig_int;
  +
  +/*
  + * a pipe is used to send messages to the IO thread
  + */
  +static int pipe_fds[2];
  +#define PIPE_READ_FD  pipe_fds[0]
  +#define PIPE_WRITE_FD pipe_fds[1]
  +/*
  + * msgs are 3 ints sized
  + * buf[0] = command
  + * buf[1] = fd or better a parrot_event ptr or such
  + * buf[2] = NL
  + */
  +#define MSG_SIZE (3*sizeof(int))
  +
  +
  +/*************************
  + * signal handling code
  + */
  +
  +/*
  + * Program Error Signals
  + */
  +static void
  +fatal_sig_handler(int signum)
  +{
  +    do_exception(0, -signum);
  +}
  +
  +/*
  + * other signals like SIGINT
  + */
   static void
   sig_handler(int signum)
   {
       switch (signum) {
  -#ifdef SIGFPE
  -        /* we can't handle SIGFPE - just give up, i.e. longjmp */
  -        case SIGFPE:
  -            do_exception(0, -signum);
  +        case SIGINT:
  +            sig_int = 1;
               break;
  -#endif
  -        default:
  +    }
  +}
  +
  +#if PARROT_HAS_SIGACTION
               /*
  -             * TODO convert signal to event
  -             * which isn't simple: the only thread and signal safe
  -             * things we may do are:
  -             * - set a sigatomic_t global or
  -             * - sem_post a semaphore
  + * signal handlers are common to all threads, signal block masks
  + * are specific, so we install one handler then block that signal
  + * and unblock it in the thread, that shall receive that signal
                */
  +static void
  +Parrot_sigaction(int sig, void (*handler)(int))
  +{
  +    struct sigaction action;
  +    sigset_t block_mask;
   
  -            break;
  +    /* install handler */
  +    action.sa_handler = handler;
  +    sigemptyset(&action.sa_mask);
  +    action.sa_flags = 0;
  +    sigaction(sig, &action, NULL);
  +
  +    /* block that signal */
  +    sigemptyset(&block_mask);
  +    sigaddset(&block_mask, sig);
  +    sigprocmask(SIG_BLOCK, &block_mask, NULL);
       }
  +
  +/*
  + * unblock a signal
  + */
  +static void
  +Parrot_unblock_signal(int sig)
  +{
  +    sigset_t block_mask;
  +
  +    sigemptyset(&block_mask);
  +    sigaddset(&block_mask, sig);
  +    sigprocmask(SIG_UNBLOCK, &block_mask, NULL);
   }
   
  +#else
  +static void
  +Parrot_sigaction(int sig, void (*handler)(int))
  +{
  +}
  +
  +static void
  +Parrot_unblock_signal(int sig)
  +{
  +}
  +#endif
  +
   void
   Parrot_init_signals(void)
   {
  -    /* quick hack to test signals and exceptions
  -     * s. t/op/hacks_1.pasm
  -     */
  -#ifdef SIGFPE
       /*
        * SIGFPE is architecture specific - some signal an error
        * some don't, so we have to use direct checks if we are dividing
        * by zero
        */
  -#endif
  -#ifdef SIGINT
  -    /* Parrot_set_sighandler(SIGINT, sig_handler); */
  -#endif
  +    Parrot_sigaction(SIGINT, sig_handler);
   }
   
  +/***********************
  + * initialization code
  + */
  +
   /*
    * init event system for first interpreter
    */
   static void
   init_events_first(Parrot_Interp interpreter)
   {
  -    Parrot_thread    the_thread;
  +    Parrot_thread    ev_handle, io_handle;
  +
  +    /*
  +     * be sure all init is done only once
  +     * we could use pthread_once for that too
  +     */
  +    if (event_queue)
  +        PANIC("event queue already exists - missing parent_interp?");
       /*
        * we need a global mutex to protect the interpreter array
        */
  -
       MUTEX_INIT(interpreter_array_mutex);
       /*
  -     * init event queue - be sure its done only once
  -     * we could use pthread_once for queue_init
  +     * create event queue
        */
  -    assert(!event_queue);
       event_queue = queue_init(TASK_PRIO);
       /*
  -     * we start an event_handler thread
  +     * we use a message pipe to send IO related stuff to the
  +     * IO thread
        */
  -    THREAD_CREATE_DETACHED(the_thread, event_thread, event_queue);
  +    if (pipe(pipe_fds))
  +        internal_exception(1, "Couldn't create message pipe");
       /*
  -     * now set some sig handlers
  +     * now set some sig handlers before any thread is started, so
  +     * that all threads inherit the signal block mask
        */
  +#if INSTALL_EVENT_HANDLER
       Parrot_init_signals();
  +#endif
  +    /*
  +     * we start an event_handler thread
  +     */
  +    THREAD_CREATE_DETACHED(ev_handle, event_thread, event_queue);
  +    /*
  +     * and a signal and IO handler thread
  +     */
  +    THREAD_CREATE_DETACHED(io_handle, io_thread, event_queue);
   }
   
   /*
  - * init events for all interpreters
  + * init code for all interpreters
    */
   static void
   init_events_all(Parrot_Interp interpreter)
   {
       /*
  -     * create event queue
  +     * create per interpreter task queue
        */
       interpreter->task_queue = queue_init(0);
   }
  @@ -121,8 +233,12 @@
       init_events_all(interpreter);
   }
   
  +/**************************
  + * event handler functions
  + */
  +
   /*
  - * create queue entry and insert event into task queue
  + * create queue entry and insert event into event queue
    */
   void
   Parrot_schedule_event(Parrot_Interp interpreter, parrot_event* ev)
  @@ -137,6 +253,10 @@
               entry->type = QUEUE_ENTRY_TYPE_TIMED_EVENT;
               insert_entry(event_queue, entry);
               break;
  +        case EVENT_TYPE_SIGNAL:
  +            entry->type = QUEUE_ENTRY_TYPE_EVENT;
  +            unshift_entry(event_queue, entry);
  +            break;
           default:
               entry->type = QUEUE_ENTRY_TYPE_EVENT;
               push_entry(event_queue, entry);
  @@ -145,6 +265,22 @@
   }
   
   /*
  + * create and schedule a signal event
  + */
  +static void
  +schedule_signal_event(int signum)
  +{
  +    parrot_event* ev = mem_sys_allocate(sizeof(parrot_event));
  +    ev->type = EVENT_TYPE_SIGNAL;
  +    ev->u.signal = signum;
  +    /*
  +     * or do directly Parrot_schedule_broadcast_qentry()
  +     */
  +    Parrot_schedule_event(NULL, ev);
  +}
  +
  +
  +/*
    * create a new timer event due at diff from now, repeated at
    * interval running the passed sub
    */
  @@ -203,6 +339,7 @@
   
   /*
    * schedule event-loop terminate event
  + * this shuts down the event thread
    */
   void
   Parrot_kill_event_loop(void)
  @@ -231,6 +368,155 @@
   }
   
   /*
  + * broadcast an event
  + */
  +void
  +Parrot_schedule_broadcast_qentry(QUEUE_ENTRY* entry)
  +{
  +    Parrot_Interp interpreter;
  +    parrot_event* event;
  +
  +    event = entry->data;
  +    switch (event->type) {
  +        case EVENT_TYPE_SIGNAL:
  +            edebug(("broadcast signal"));
  +            /*
  +             * we don't have special signal handlers in usercode yet
  +             * e.g.:
  +             * install handler like exception handler *and*
  +             * set a interpreter flag, that a handler exists
  +             * we then could examine that flag (after LOCKing it)
  +             * and dispatch the exception to all interpreters that
  +             * handle it
  +             * Finally, we send the first (main) interpreter that signal
  +             *
  +             * or just send to all?
  +             *
  +             * TODO put first interpreter into interp. array immediately
  +             *      not only when threads are started
  +             */
  +            mem_sys_free(entry);
  +            mem_sys_free(event);
  +            break;
  +        default:
  +            mem_sys_free(entry);
  +            mem_sys_free(event);
  +            internal_exception(1, "Unknown event to broadcast");
  +            break;
  +    }
  +}
  +
  +/************************
  + * IO thread handling
  + */
  +
  +/*
  + * the io thread uses select/poll to handle IO events and
  + * signals
  + *
  + * It waits on input from the message pipe to insert file
  + * descriptors in the wait sets
  + */
  +static void*
  +io_thread(void *data)
  +{
  +    QUEUE* event_q = (QUEUE*) data;
  +    fd_set rfds, wfds;
  +    int n_highest;
  +    int running = 1;
  +
  +    FD_ZERO(&rfds);
  +    FD_ZERO(&wfds);
  +    /*
  +     * Watch the reader end of the pipe for messages
  +     */
  +    FD_SET(PIPE_READ_FD, &rfds);
  +    n_highest = PIPE_READ_FD + 1;
  +    /*
  +     * all signals that we shall handle here have to be unblocked
  +     * in this and only in this thread
  +     */
  +    Parrot_unblock_signal(SIGINT);
  +    while (running) {
  +        int retval = select(n_highest, &rfds, &wfds, NULL, NULL);
  +        switch (retval) {
  +            case -1:
  +                if (errno == EINTR) {
  +                    edebug(("select EINTR"));
  +                    if (sig_int) {
  +                        edebug(("int arrived"));
  +                        sig_int = 0;
  +                        /*
  +                         * signal the event thread
  +                         */
  +                        schedule_signal_event(SIGINT);
  +                    }
  +
  +                }
  +                break;
  +            default:
  +                if (retval > 0) {
  +                    edebug(("IO ready"));
  +                    if (FD_ISSET(PIPE_READ_FD, &rfds)) {
  +                        int buf[3];
  +                        /*
  +                         * a command arrived
  +                         */
  +                        edebug(("msg arrived"));
  +                        if (read(PIPE_READ_FD, buf, MSG_SIZE) != MSG_SIZE)
  +                            internal_exception(1,
  +                                    "read error from msg pipe");
  +                        switch (buf[0]) {
  +                            case 'e':
  +                                running = 0;
  +                                break;
  +                            /* TODO */
  +                            case 'r':
  +                                /* insert fd in buf[1] into rfds */
  +                            case 'w':
  +                                /* insert fd in buf[1] into wfds */
  +                            case 'R':
  +                                /* delete fd in buf[1] from rfds */
  +                            case 'W':
  +                                /* delete fd in buf[1] from wfds */
  +                                break;
  +                            default:
  +                                internal_exception(1,
  +                                        "unhandled msg in pipe");
  +                                break;
  +                        }
  +
  +                    }
  +                    /* TODO check fds */
  +                    break;
  +                }
  +        }
  +    }
  +    edebug(("IO thread terminated"));
  +    close(PIPE_READ_FD);
  +    close(PIPE_WRITE_FD);
  +    return NULL;
  +}
  +
  +static void
  +stop_io_thread(void)
  +{
  +    int buf[3];
  +    /*
  +     * tell IO thread to stop
  +     */
  +    buf[0] = 'e';
  +    buf[1] = -1;
  +    buf[2] = '\n';
  +    if (write(PIPE_WRITE_FD, buf, MSG_SIZE) != MSG_SIZE)
  +        internal_exception(1, "msg pipe write failed");
  +}
  +
  +/***********************************
  + * event handler thread functions
  + */
  +
  +/*
    * duplicate timed entry and add interval to abstime
    */
   static QUEUE_ENTRY*
  @@ -305,6 +591,7 @@
           else if (event->type == EVENT_TYPE_EVENT_TERMINATE) {
               mem_sys_free(entry);
               mem_sys_free(event);
  +
               return 0;
           }
           /*
  @@ -314,11 +601,7 @@
               Parrot_schedule_interp_qentry(event->interp, entry);
           }
           else {
  -            /*
  -             * TODO broadcast or deliver to first interp
  -             */
  -            mem_sys_free(entry);
  -            mem_sys_free(event);
  +            Parrot_schedule_broadcast_qentry(entry);
           }
       } /* while events */
       return 1;
  @@ -355,8 +638,8 @@
               event = (parrot_event* )entry->data;
               when = event->u.timer_event.abs_time;
               abs_time.tv_sec = (time_t) when;
  -            abs_time.tv_nsec = (when - abs_time.tv_sec) * (1000L*1000L*1000L);
  -
  +            abs_time.tv_nsec = (when - abs_time.tv_sec) *
  +                (1000L*1000L*1000L);
               queue_timedwait(event_q, &abs_time);
           }
           else {
  @@ -367,18 +650,24 @@
           }
           /*
            * one or more entries arrived - we hold the mutex again
  -         * so we have to use the nonsyc_pop_entry to pop off event entries
  +         * so we have to use the nonsync_pop_entry to pop off event entries
            */
           running = process_events(event_q);
       } /* event loop */
       /*
        * the main interpreter is dying
  +     * TODO empty the queue
        */
       UNLOCK(event_q->queue_mutex);
       queue_destroy(event_q);
  +    stop_io_thread();
  +    edebug(("event thread stopped"));
       return NULL;
   }
   
  +/***********************
  + * sleep handling
  + */
   static void*
   wait_for_wakeup(Parrot_Interp interpreter, void *next)
   {
  @@ -398,7 +687,7 @@
   }
   
   /*
  - * goto sleep
  + * goto sleep - called from the sleep opcode
    */
   void*
   Parrot_sleep_on_event(Parrot_Interp interpreter, FLOATVAL t, void* next)
  @@ -421,6 +710,11 @@
       return next;
   }
   
  +
  +/**************************************
  + * event handling code for run-loops
  + */
  +
   /*
    * explicitely sync called by the check_event opcode from run loops
    */
  @@ -432,12 +726,15 @@
       return next;
   }
   
  +/*
  + * run user code or such
  + */
   static void*
   do_event(Parrot_Interp interpreter, parrot_event* event, void *next)
   {
       switch (event->type) {
           case EVENT_TYPE_TERMINATE:
  -            next = NULL;
  +            next = NULL;        /* this will terminate the run loop */
               break;
           case EVENT_TYPE_TIMER:
               /* run ops, save registers */
  
  
  
  1.14      +21 -2     parrot/src/tsq.c
  
  Index: tsq.c
  ===================================================================
  RCS file: /cvs/public/parrot/src/tsq.c,v
  retrieving revision 1.13
  retrieving revision 1.14
  diff -u -w -r1.13 -r1.14
  --- tsq.c     6 Jan 2004 15:58:59 -0000       1.13
  +++ tsq.c     25 Jan 2004 14:12:55 -0000      1.14
  @@ -1,7 +1,7 @@
   /* tsq.c
    *  Copyright: 2001-2003 The Perl Foundation.  All Rights Reserved.
    *  CVS Info
  - *     $Id: tsq.c,v 1.13 2004/01/06 15:58:59 leo Exp $
  + *     $Id: tsq.c,v 1.14 2004/01/25 14:12:55 leo Exp $
    *  Overview:
    *     Thread-safe queues
    *  Data Structure and Algorithms:
  @@ -13,7 +13,7 @@
   #include "parrot/parrot.h"
   #include <assert.h>
   
  -/* A synchronized entry popper */
  +/* A synchronized entry popper - actuall shift from front */
   QUEUE_ENTRY *
   pop_entry(QUEUE *queue) {
       QUEUE_ENTRY *returnval;
  @@ -78,6 +78,25 @@
           queue->tail = entry;
       }
       queue_signal(queue);        /* assumes only one waiter */
  +    queue_unlock(queue);
  +}
  +
  +void
  +unshift_entry(QUEUE *queue, QUEUE_ENTRY *entry) {
  +    QUEUE_ENTRY *cur;
  +
  +    queue_lock(queue);
  +    cur = queue->head;
  +    if (!cur) {
  +        /* empty just set head */
  +        queue->head = entry;
  +        queue->tail = entry;
  +    }
  +    else {
  +        queue->head = entry;
  +        entry->next = cur;
  +    }
  +    queue_signal(queue);
       queue_unlock(queue);
   }
   
  
  
  

Reply via email to