cvsuser 04/01/25 06:12:55
Modified: include/parrot tsq.h
src events.c tsq.c
Log:
event-handling-19
* first steps towards signal and IO events
* the io_thread waits on select, which gets either interrupted
by unblocked signals or by an IO handle ready condition
* installing signal handlers is still disabled
Revision Changes Path
1.10 +2 -1 parrot/include/parrot/tsq.h
Index: tsq.h
===================================================================
RCS file: /cvs/public/parrot/include/parrot/tsq.h,v
retrieving revision 1.9
retrieving revision 1.10
diff -u -w -r1.9 -r1.10
--- tsq.h 6 Jan 2004 15:58:57 -0000 1.9
+++ tsq.h 25 Jan 2004 14:12:52 -0000 1.10
@@ -1,7 +1,7 @@
/* tsq.h
* Copyright: 2001-2003 The Perl Foundation. All Rights Reserved.
* CVS Info
- * $Id: tsq.h,v 1.9 2004/01/06 15:58:57 leo Exp $
+ * $Id: tsq.h,v 1.10 2004/01/25 14:12:52 leo Exp $
* Overview:
* Defines the thread-safe queue system
* Data Structure and Algorithms:
@@ -45,6 +45,7 @@
QUEUE_ENTRY *peek_entry(QUEUE *);
QUEUE_ENTRY *wait_for_entry(QUEUE *);
void push_entry(QUEUE *, QUEUE_ENTRY *);
+void unshift_entry(QUEUE *, QUEUE_ENTRY *);
void nosync_insert_entry(QUEUE *, QUEUE_ENTRY *);
void insert_entry(QUEUE *, QUEUE_ENTRY *);
void queue_lock(QUEUE *);
1.22 +344 -47 parrot/src/events.c
Index: events.c
===================================================================
RCS file: /cvs/public/parrot/src/events.c,v
retrieving revision 1.21
retrieving revision 1.22
diff -u -w -r1.21 -r1.22
--- events.c 18 Jan 2004 09:52:23 -0000 1.21
+++ events.c 25 Jan 2004 14:12:55 -0000 1.22
@@ -1,7 +1,7 @@
/* events.c
* Copyright: 2001-2003 The Perl Foundation. All Rights Reserved.
* CVS Info
- * $Id: events.c,v 1.21 2004/01/18 09:52:23 leo Exp $
+ * $Id: events.c,v 1.22 2004/01/25 14:12:55 leo Exp $
* Overview:
* Event handling stuff
* Data Structure and Algorithms:
@@ -9,6 +9,8 @@
* When events are due, they are placed in per interpreter
* task_queues, where they are handled then by the check_event*
* opcodes.
+ * IO events and signals are catched in the io_thread, which again
+ * dispatches these to one or all interpreters.
* History:
* Notes:
* References:
@@ -18,93 +20,203 @@
#include "parrot/parrot.h"
#include <assert.h>
-/* forward def */
+/*
+ * event debugging stuff
+ */
+#define EVENT_DEBUG 0
+/*
+ * not yet - need to sort out platform code and fix exceptions first
+ */
+#define INSTALL_EVENT_HANDLER 0
+
+#if EVENT_DEBUG
+# define edebug(x) puts(x)
+#else
+# define edebug(x)
+#endif
+
+
+/* forward defs */
static void* event_thread(void *data);
+static void* io_thread(void *data);
static void* do_event(Parrot_Interp, parrot_event*, void*);
+static void stop_io_thread(void);
+static void schedule_signal_event(int signum);
+void Parrot_schedule_broadcast_qentry(QUEUE_ENTRY* entry);
/*
- * we have exactly one event_queue
+ * we have exactly one global event_queue
+ * TODO task prio handling
*/
static QUEUE* event_queue;
#define TASK_PRIO 10
+/*
+ * user accessible signals like SIGINT
+ */
+#ifndef SIGINT
+# define SIGNIT -4711
+#endif
+static sig_atomic_t sig_int;
+
+/*
+ * a pipe is used to send messages to the IO thread
+ */
+static int pipe_fds[2];
+#define PIPE_READ_FD pipe_fds[0]
+#define PIPE_WRITE_FD pipe_fds[1]
+/*
+ * msgs are 3 ints sized
+ * buf[0] = command
+ * buf[1] = fd or better a parrot_event ptr or such
+ * buf[2] = NL
+ */
+#define MSG_SIZE (3*sizeof(int))
+
+
+/*************************
+ * signal handling code
+ */
+
+/*
+ * Program Error Signals
+ */
+static void
+fatal_sig_handler(int signum)
+{
+ do_exception(0, -signum);
+}
+
+/*
+ * other signals like SIGINT
+ */
static void
sig_handler(int signum)
{
switch (signum) {
-#ifdef SIGFPE
- /* we can't handle SIGFPE - just give up, i.e. longjmp */
- case SIGFPE:
- do_exception(0, -signum);
+ case SIGINT:
+ sig_int = 1;
break;
-#endif
- default:
+ }
+}
+
+#if PARROT_HAS_SIGACTION
/*
- * TODO convert signal to event
- * which isn't simple: the only thread and signal safe
- * things we may do are:
- * - set a sigatomic_t global or
- * - sem_post a semaphore
+ * signal handlers are common to all threads, signal block masks
+ * are specific, so we install one handler then block that signal
+ * and unblock it in the thread, that shall receive that signal
*/
+static void
+Parrot_sigaction(int sig, void (*handler)(int))
+{
+ struct sigaction action;
+ sigset_t block_mask;
- break;
+ /* install handler */
+ action.sa_handler = handler;
+ sigemptyset(&action.sa_mask);
+ action.sa_flags = 0;
+ sigaction(sig, &action, NULL);
+
+ /* block that signal */
+ sigemptyset(&block_mask);
+ sigaddset(&block_mask, sig);
+ sigprocmask(SIG_BLOCK, &block_mask, NULL);
}
+
+/*
+ * unblock a signal
+ */
+static void
+Parrot_unblock_signal(int sig)
+{
+ sigset_t block_mask;
+
+ sigemptyset(&block_mask);
+ sigaddset(&block_mask, sig);
+ sigprocmask(SIG_UNBLOCK, &block_mask, NULL);
}
+#else
+static void
+Parrot_sigaction(int sig, void (*handler)(int))
+{
+}
+
+static void
+Parrot_unblock_signal(int sig)
+{
+}
+#endif
+
void
Parrot_init_signals(void)
{
- /* quick hack to test signals and exceptions
- * s. t/op/hacks_1.pasm
- */
-#ifdef SIGFPE
/*
* SIGFPE is architecture specific - some signal an error
* some don't, so we have to use direct checks if we are dividing
* by zero
*/
-#endif
-#ifdef SIGINT
- /* Parrot_set_sighandler(SIGINT, sig_handler); */
-#endif
+ Parrot_sigaction(SIGINT, sig_handler);
}
+/***********************
+ * initialization code
+ */
+
/*
* init event system for first interpreter
*/
static void
init_events_first(Parrot_Interp interpreter)
{
- Parrot_thread the_thread;
+ Parrot_thread ev_handle, io_handle;
+
+ /*
+ * be sure all init is done only once
+ * we could use pthread_once for that too
+ */
+ if (event_queue)
+ PANIC("event queue already exists - missing parent_interp?");
/*
* we need a global mutex to protect the interpreter array
*/
-
MUTEX_INIT(interpreter_array_mutex);
/*
- * init event queue - be sure its done only once
- * we could use pthread_once for queue_init
+ * create event queue
*/
- assert(!event_queue);
event_queue = queue_init(TASK_PRIO);
/*
- * we start an event_handler thread
+ * we use a message pipe to send IO related stuff to the
+ * IO thread
*/
- THREAD_CREATE_DETACHED(the_thread, event_thread, event_queue);
+ if (pipe(pipe_fds))
+ internal_exception(1, "Couldn't create message pipe");
/*
- * now set some sig handlers
+ * now set some sig handlers before any thread is started, so
+ * that all threads inherit the signal block mask
*/
+#if INSTALL_EVENT_HANDLER
Parrot_init_signals();
+#endif
+ /*
+ * we start an event_handler thread
+ */
+ THREAD_CREATE_DETACHED(ev_handle, event_thread, event_queue);
+ /*
+ * and a signal and IO handler thread
+ */
+ THREAD_CREATE_DETACHED(io_handle, io_thread, event_queue);
}
/*
- * init events for all interpreters
+ * init code for all interpreters
*/
static void
init_events_all(Parrot_Interp interpreter)
{
/*
- * create event queue
+ * create per interpreter task queue
*/
interpreter->task_queue = queue_init(0);
}
@@ -121,8 +233,12 @@
init_events_all(interpreter);
}
+/**************************
+ * event handler functions
+ */
+
/*
- * create queue entry and insert event into task queue
+ * create queue entry and insert event into event queue
*/
void
Parrot_schedule_event(Parrot_Interp interpreter, parrot_event* ev)
@@ -137,6 +253,10 @@
entry->type = QUEUE_ENTRY_TYPE_TIMED_EVENT;
insert_entry(event_queue, entry);
break;
+ case EVENT_TYPE_SIGNAL:
+ entry->type = QUEUE_ENTRY_TYPE_EVENT;
+ unshift_entry(event_queue, entry);
+ break;
default:
entry->type = QUEUE_ENTRY_TYPE_EVENT;
push_entry(event_queue, entry);
@@ -145,6 +265,22 @@
}
/*
+ * create and schedule a signal event
+ */
+static void
+schedule_signal_event(int signum)
+{
+ parrot_event* ev = mem_sys_allocate(sizeof(parrot_event));
+ ev->type = EVENT_TYPE_SIGNAL;
+ ev->u.signal = signum;
+ /*
+ * or do directly Parrot_schedule_broadcast_qentry()
+ */
+ Parrot_schedule_event(NULL, ev);
+}
+
+
+/*
* create a new timer event due at diff from now, repeated at
* interval running the passed sub
*/
@@ -203,6 +339,7 @@
/*
* schedule event-loop terminate event
+ * this shuts down the event thread
*/
void
Parrot_kill_event_loop(void)
@@ -231,6 +368,155 @@
}
/*
+ * broadcast an event
+ */
+void
+Parrot_schedule_broadcast_qentry(QUEUE_ENTRY* entry)
+{
+ Parrot_Interp interpreter;
+ parrot_event* event;
+
+ event = entry->data;
+ switch (event->type) {
+ case EVENT_TYPE_SIGNAL:
+ edebug(("broadcast signal"));
+ /*
+ * we don't have special signal handlers in usercode yet
+ * e.g.:
+ * install handler like exception handler *and*
+ * set a interpreter flag, that a handler exists
+ * we then could examine that flag (after LOCKing it)
+ * and dispatch the exception to all interpreters that
+ * handle it
+ * Finally, we send the first (main) interpreter that signal
+ *
+ * or just send to all?
+ *
+ * TODO put first interpreter into interp. array immediately
+ * not only when threads are started
+ */
+ mem_sys_free(entry);
+ mem_sys_free(event);
+ break;
+ default:
+ mem_sys_free(entry);
+ mem_sys_free(event);
+ internal_exception(1, "Unknown event to broadcast");
+ break;
+ }
+}
+
+/************************
+ * IO thread handling
+ */
+
+/*
+ * the io thread uses select/poll to handle IO events and
+ * signals
+ *
+ * It waits on input from the message pipe to insert file
+ * descriptors in the wait sets
+ */
+static void*
+io_thread(void *data)
+{
+ QUEUE* event_q = (QUEUE*) data;
+ fd_set rfds, wfds;
+ int n_highest;
+ int running = 1;
+
+ FD_ZERO(&rfds);
+ FD_ZERO(&wfds);
+ /*
+ * Watch the reader end of the pipe for messages
+ */
+ FD_SET(PIPE_READ_FD, &rfds);
+ n_highest = PIPE_READ_FD + 1;
+ /*
+ * all signals that we shall handle here have to be unblocked
+ * in this and only in this thread
+ */
+ Parrot_unblock_signal(SIGINT);
+ while (running) {
+ int retval = select(n_highest, &rfds, &wfds, NULL, NULL);
+ switch (retval) {
+ case -1:
+ if (errno == EINTR) {
+ edebug(("select EINTR"));
+ if (sig_int) {
+ edebug(("int arrived"));
+ sig_int = 0;
+ /*
+ * signal the event thread
+ */
+ schedule_signal_event(SIGINT);
+ }
+
+ }
+ break;
+ default:
+ if (retval > 0) {
+ edebug(("IO ready"));
+ if (FD_ISSET(PIPE_READ_FD, &rfds)) {
+ int buf[3];
+ /*
+ * a command arrived
+ */
+ edebug(("msg arrived"));
+ if (read(PIPE_READ_FD, buf, MSG_SIZE) != MSG_SIZE)
+ internal_exception(1,
+ "read error from msg pipe");
+ switch (buf[0]) {
+ case 'e':
+ running = 0;
+ break;
+ /* TODO */
+ case 'r':
+ /* insert fd in buf[1] into rfds */
+ case 'w':
+ /* insert fd in buf[1] into wfds */
+ case 'R':
+ /* delete fd in buf[1] from rfds */
+ case 'W':
+ /* delete fd in buf[1] from wfds */
+ break;
+ default:
+ internal_exception(1,
+ "unhandled msg in pipe");
+ break;
+ }
+
+ }
+ /* TODO check fds */
+ break;
+ }
+ }
+ }
+ edebug(("IO thread terminated"));
+ close(PIPE_READ_FD);
+ close(PIPE_WRITE_FD);
+ return NULL;
+}
+
+static void
+stop_io_thread(void)
+{
+ int buf[3];
+ /*
+ * tell IO thread to stop
+ */
+ buf[0] = 'e';
+ buf[1] = -1;
+ buf[2] = '\n';
+ if (write(PIPE_WRITE_FD, buf, MSG_SIZE) != MSG_SIZE)
+ internal_exception(1, "msg pipe write failed");
+}
+
+/***********************************
+ * event handler thread functions
+ */
+
+/*
* duplicate timed entry and add interval to abstime
*/
static QUEUE_ENTRY*
@@ -305,6 +591,7 @@
else if (event->type == EVENT_TYPE_EVENT_TERMINATE) {
mem_sys_free(entry);
mem_sys_free(event);
+
return 0;
}
/*
@@ -314,11 +601,7 @@
Parrot_schedule_interp_qentry(event->interp, entry);
}
else {
- /*
- * TODO broadcast or deliver to first interp
- */
- mem_sys_free(entry);
- mem_sys_free(event);
+ Parrot_schedule_broadcast_qentry(entry);
}
} /* while events */
return 1;
@@ -355,8 +638,8 @@
event = (parrot_event* )entry->data;
when = event->u.timer_event.abs_time;
abs_time.tv_sec = (time_t) when;
- abs_time.tv_nsec = (when - abs_time.tv_sec) * (1000L*1000L*1000L);
-
+ abs_time.tv_nsec = (when - abs_time.tv_sec) *
+ (1000L*1000L*1000L);
queue_timedwait(event_q, &abs_time);
}
else {
@@ -367,18 +650,24 @@
}
/*
* one or more entries arrived - we hold the mutex again
- * so we have to use the nonsyc_pop_entry to pop off event entries
+ * so we have to use the nonsync_pop_entry to pop off event entries
*/
running = process_events(event_q);
} /* event loop */
/*
* the main interpreter is dying
+ * TODO empty the queue
*/
UNLOCK(event_q->queue_mutex);
queue_destroy(event_q);
+ stop_io_thread();
+ edebug(("event thread stopped"));
return NULL;
}
+/***********************
+ * sleep handling
+ */
static void*
wait_for_wakeup(Parrot_Interp interpreter, void *next)
{
@@ -398,7 +687,7 @@
}
/*
- * goto sleep
+ * goto sleep - called from the sleep opcode
*/
void*
Parrot_sleep_on_event(Parrot_Interp interpreter, FLOATVAL t, void* next)
@@ -421,6 +710,11 @@
return next;
}
+
+/**************************************
+ * event handling code for run-loops
+ */
+
/*
* explicitely sync called by the check_event opcode from run loops
*/
@@ -432,12 +726,15 @@
return next;
}
+/*
+ * run user code or such
+ */
static void*
do_event(Parrot_Interp interpreter, parrot_event* event, void *next)
{
switch (event->type) {
case EVENT_TYPE_TERMINATE:
- next = NULL;
+ next = NULL; /* this will terminate the run loop */
break;
case EVENT_TYPE_TIMER:
/* run ops, save registers */
1.14 +21 -2 parrot/src/tsq.c
Index: tsq.c
===================================================================
RCS file: /cvs/public/parrot/src/tsq.c,v
retrieving revision 1.13
retrieving revision 1.14
diff -u -w -r1.13 -r1.14
--- tsq.c 6 Jan 2004 15:58:59 -0000 1.13
+++ tsq.c 25 Jan 2004 14:12:55 -0000 1.14
@@ -1,7 +1,7 @@
/* tsq.c
* Copyright: 2001-2003 The Perl Foundation. All Rights Reserved.
* CVS Info
- * $Id: tsq.c,v 1.13 2004/01/06 15:58:59 leo Exp $
+ * $Id: tsq.c,v 1.14 2004/01/25 14:12:55 leo Exp $
* Overview:
* Thread-safe queues
* Data Structure and Algorithms:
@@ -13,7 +13,7 @@
#include "parrot/parrot.h"
#include <assert.h>
-/* A synchronized entry popper */
+/* A synchronized entry popper - actuall shift from front */
QUEUE_ENTRY *
pop_entry(QUEUE *queue) {
QUEUE_ENTRY *returnval;
@@ -78,6 +78,25 @@
queue->tail = entry;
}
queue_signal(queue); /* assumes only one waiter */
+ queue_unlock(queue);
+}
+
+void
+unshift_entry(QUEUE *queue, QUEUE_ENTRY *entry) {
+ QUEUE_ENTRY *cur;
+
+ queue_lock(queue);
+ cur = queue->head;
+ if (!cur) {
+ /* empty just set head */
+ queue->head = entry;
+ queue->tail = entry;
+ }
+ else {
+ queue->head = entry;
+ entry->next = cur;
+ }
+ queue_signal(queue);
queue_unlock(queue);
}