Author: spadkins
Date: Fri Aug 11 16:22:48 2006
New Revision: 6786
Modified:
p5ee/trunk/App-Context/lib/App/Context/Server.pm
Log:
wait until cluster nodes check in before running async events
Modified: p5ee/trunk/App-Context/lib/App/Context/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/Server.pm Fri Aug 11 16:22:48 2006
@@ -216,15 +216,19 @@
my ($extension, $obj, $method, $args, $extension_idx,
$extension_events_occurred);
my $last_extension_idx = -1;
my ($time, $time_of_next_event, $sleep_interval);
+ my $start_time = time();
my $total_events_occurred = 0;
my ($events_occurred);
my ($pid, $exitval, $sig);
my ($await_return_value, $server_close, $return_value);
while (!$quit) {
eval {
+ $time = time();
$events_occurred = 0;
- if ($#{$self->{pending_async_events}} > -1) {
- $events_occurred += $self->dispatch_pending_async_events();
+ # Don't start dispatching these requests until a brief wait after
starting.
+ # We want all of the nodes to get a chance to register themselves.
+ if (($time-$start_time >= 4) && $#{$self->{pending_async_events}}
> -1) {
+ $events_occurred += $self->dispatch_pending_async_events(1);
}
$events_occurred += $self->dispatch_finished_processes();
@@ -275,48 +279,53 @@
else {
$sleep_interval = $default_sleep_interval;
}
+ }
+ else {
+ $sleep_interval = 0;
+ }
- # TODO: if (sleep_interval == 0), use select() to see if
anyone is waiting, else ...
- $self->log({level=>4},"Listening on socket($listen_fd):
timeout($sleep_interval)\n");
- $accept_worthwhile = 1;
- # NOTE: to understand why I do this section of code, read the
3rd paragraph under the
- # accept() method of IO::Socket (i.e. "man IO::Socket") or
read it here.
- # http://perldoc.perl.org/IO/Socket.html
- if ($sleep_interval == 0) {
- if (select($listen_vec, undef, $listen_vec, 0) == 0) { #
nothing happening on the socket
- $accept_worthwhile = 0; # don't bother to call
accept() on it
- }
+ # TODO: if (sleep_interval == 0), use select() to see if anyone is
waiting, else ...
+ $self->log({level=>4},"Listening on socket($listen_fd):
timeout($sleep_interval)\n");
+ $accept_worthwhile = 1;
+ # NOTE: to understand why I do this section of code, read the 3rd
paragraph under the
+ # accept() method of IO::Socket (i.e. "man IO::Socket") or read it
here.
+ # http://perldoc.perl.org/IO/Socket.html
+ if ($sleep_interval == 0) {
+ if (select($listen_vec, undef, $listen_vec, 0) == 0) { #
nothing happening on the socket
+ $accept_worthwhile = 0; # don't bother to call accept()
on it
}
- # Here is the truth table for $await_return_value,
$server_close
- # $await_return_value $server_close = client
+ server
- # ------------------- -------------
---------------------- ---------------------
- # 0 0 write/close
read/close
- # 0 1 write/read/close
read/close
- # 1 0
write/read/write/close read/write/read/close
- # 1 1 write/read/close
read/write/close
- # See: http://hea-www.harvard.edu/~fine/Tech/addrinuse.html
- if ($accept_worthwhile) {
- $listen_socket->timeout($sleep_interval);
- #$SIG{CHLD} = sub { $self->log({level=>4},"Caught Signal:
@_\n"); };
- $SIG{CHLD} = sub { }; # the point is to interrupt the
accept() system call, not to do anything.
- $connection_socket = $listen_socket->accept();
- $SIG{CHLD} = "DEFAULT";
- if ($connection_socket) {
- $connection_fd = fileno($connection_socket);
- $msg = $connection_socket->getline();
- $self->log({level=>4},"Message on
socket($connection_fd) [$msg]\n");
- if ($msg) {
- $await_return_value = ($msg =~ s/^RV-//);
- $server_close = ($msg =~ s/^SC-//);
- $msg =~ s/[\015\012]+$//;
- if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
- $quit = 1;
- }
- elsif ($msg =~ /^GET/) {
- $await_return_value = 1;
- my $content = $self->state();
- my $content_length = length($content);
- $return_value = <<EOF;
+ }
+
+ # Here is the truth table for $await_return_value, $server_close
+ # $await_return_value $server_close = client +
server
+ # ------------------- ------------- ----------------------
---------------------
+ # 0 0 write/close
read/close
+ # 0 1 write/read/close
read/close
+ # 1 0 write/read/write/close
read/write/read/close
+ # 1 1 write/read/close
read/write/close
+ # See: http://hea-www.harvard.edu/~fine/Tech/addrinuse.html
+ if ($accept_worthwhile) {
+ $listen_socket->timeout($sleep_interval);
+ #$SIG{CHLD} = sub { $self->log({level=>4},"Caught Signal:
@_\n"); };
+ $SIG{CHLD} = sub { }; # the point is to interrupt the
accept() system call, not to do anything.
+ $connection_socket = $listen_socket->accept();
+ $SIG{CHLD} = "DEFAULT";
+ if ($connection_socket) {
+ $connection_fd = fileno($connection_socket);
+ $msg = $connection_socket->getline();
+ $self->log({level=>4},"Message on socket($connection_fd)
[$msg]\n");
+ if ($msg) {
+ $await_return_value = ($msg =~ s/^RV-//);
+ $server_close = ($msg =~ s/^SC-//);
+ $msg =~ s/[\015\012]+$//;
+ if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
+ $quit = 1;
+ }
+ elsif ($msg =~ /^GET/) {
+ $await_return_value = 1;
+ my $content = $self->state();
+ my $content_length = length($content);
+ $return_value = <<EOF;
HTTP/1.1 200 OK
Content-type: text/plain
Content-length: $content_length
@@ -324,22 +333,21 @@
$content
EOF
- }
- else {
- $return_value = $self->process_msg($msg);
- $return_value .= "\n" if ($return_value !~
/\n$/);
- }
- if ($await_return_value) {
- $self->log({level=>4},"Returned on
socket($connection_fd) [$return_value]\n") if ($msg !~ /^GET/);
- $connection_socket->autoflush(1);
- $connection_socket->print($return_value);
- $connection_socket->getline() if
(!$server_close);
- }
- $connection_socket->close();
}
else {
- $connection_socket->close();
+ $return_value = $self->process_msg($msg);
+ $return_value .= "\n" if ($return_value !~ /\n$/);
+ }
+ if ($await_return_value) {
+ $self->log({level=>4},"Returned on
socket($connection_fd) [$return_value]\n") if ($msg !~ /^GET/);
+ $connection_socket->autoflush(1);
+ $connection_socket->print($return_value);
+ $connection_socket->getline() if (!$server_close);
}
+ $connection_socket->close();
+ }
+ else {
+ $connection_socket->close();
}
}
}
@@ -589,16 +597,16 @@
sub dispatch_pending_async_events {
&App::sub_entry if ($App::trace);
- my ($self) = @_;
+ my ($self, $max_events) = @_;
+ $max_events ||= 9999;
my $pending_async_events = $self->{pending_async_events};
my ($async_event, $assigned);
my $events_occurred = 0;
my $i = 0;
- while ($i <= $#$pending_async_events) {
+ while ($i <= $#$pending_async_events && $events_occurred < $max_events) {
$async_event = $pending_async_events->[$i];
$assigned = $self->assign_event_destination($async_event->[0]);
if ($assigned) {
- $async_event = shift(@$pending_async_events);
$self->send_async_event_now(@$async_event);
$events_occurred ++;
splice(@$pending_async_events, $i, 1); # remove
$pending_async_events->[$i]