Author: spadkins
Date: Fri Aug 11 16:22:48 2006
New Revision: 6786

Modified:
   p5ee/trunk/App-Context/lib/App/Context/Server.pm

Log:
wait until cluster nodes check in before running async events

Modified: p5ee/trunk/App-Context/lib/App/Context/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/Server.pm    (original)
+++ p5ee/trunk/App-Context/lib/App/Context/Server.pm    Fri Aug 11 16:22:48 2006
@@ -216,15 +216,19 @@
     my ($extension, $obj, $method, $args, $extension_idx, 
$extension_events_occurred);
     my $last_extension_idx = -1;
     my ($time, $time_of_next_event, $sleep_interval);
+    my $start_time = time();
     my $total_events_occurred = 0;
     my ($events_occurred);
     my ($pid, $exitval, $sig);
     my ($await_return_value, $server_close, $return_value);
     while (!$quit) {
         eval {
+            $time = time();
             $events_occurred = 0;
-            if ($#{$self->{pending_async_events}} > -1) {
-                $events_occurred += $self->dispatch_pending_async_events();
+           # Don't start dispatching these requests until a brief wait after 
starting.
+           # We want all of the nodes to get a chance to register themselves.
+            if (($time-$start_time >= 4) && $#{$self->{pending_async_events}} 
> -1) {
+                $events_occurred += $self->dispatch_pending_async_events(1);
             }
             $events_occurred += $self->dispatch_finished_processes();
 
@@ -275,48 +279,53 @@
                 else {
                     $sleep_interval = $default_sleep_interval;
                 }
+            }
+            else {
+                $sleep_interval = 0;
+            }
 
-                # TODO: if (sleep_interval == 0), use select() to see if 
anyone is waiting, else ...
-                $self->log({level=>4},"Listening on socket($listen_fd): 
timeout($sleep_interval)\n");
-                $accept_worthwhile = 1;
-                # NOTE: to understand why I do this section of code, read the 
3rd paragraph under the
-                # accept() method of IO::Socket (i.e. "man IO::Socket") or 
read it here.
-                # http://perldoc.perl.org/IO/Socket.html
-                if ($sleep_interval == 0) {
-                    if (select($listen_vec, undef, $listen_vec, 0) == 0) {  # 
nothing happening on the socket
-                        $accept_worthwhile = 0;  # don't bother to call 
accept() on it
-                    }
+            # TODO: if (sleep_interval == 0), use select() to see if anyone is 
waiting, else ...
+            $self->log({level=>4},"Listening on socket($listen_fd): 
timeout($sleep_interval)\n");
+            $accept_worthwhile = 1;
+            # NOTE: to understand why I do this section of code, read the 3rd 
paragraph under the
+            # accept() method of IO::Socket (i.e. "man IO::Socket") or read it 
here.
+            # http://perldoc.perl.org/IO/Socket.html
+            if ($sleep_interval == 0) {
+                if (select($listen_vec, undef, $listen_vec, 0) == 0) {  # 
nothing happening on the socket
+                    $accept_worthwhile = 0;  # don't bother to call accept() 
on it
                 }
-                # Here is the truth table for $await_return_value, 
$server_close
-                #   $await_return_value  $server_close =         client        
 +        server     
-                #   -------------------  -------------   
----------------------   ---------------------
-                #             0                0              write/close      
        read/close
-                #             0                1            write/read/close   
        read/close
-                #             1                0         
write/read/write/close   read/write/read/close
-                #             1                1            write/read/close   
      read/write/close
-                # See: http://hea-www.harvard.edu/~fine/Tech/addrinuse.html
-                if ($accept_worthwhile) {
-                    $listen_socket->timeout($sleep_interval);
-                    #$SIG{CHLD}  = sub { $self->log({level=>4},"Caught Signal: 
@_\n"); };
-                    $SIG{CHLD}  = sub { };  # the point is to interrupt the 
accept() system call, not to do anything.
-                    $connection_socket = $listen_socket->accept();
-                    $SIG{CHLD}  = "DEFAULT";
-                    if ($connection_socket) {
-                        $connection_fd = fileno($connection_socket);
-                        $msg = $connection_socket->getline();
-                        $self->log({level=>4},"Message on 
socket($connection_fd) [$msg]\n");
-                        if ($msg) {
-                            $await_return_value = ($msg =~ s/^RV-//);
-                            $server_close       = ($msg =~ s/^SC-//);
-                            $msg =~ s/[\015\012]+$//;
-                            if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
-                                $quit = 1;
-                            }
-                            elsif ($msg =~ /^GET/) {
-                                $await_return_value = 1;
-                                my $content = $self->state();
-                                my $content_length = length($content);
-                                $return_value = <<EOF;
+            }
+
+            # Here is the truth table for $await_return_value, $server_close
+            #   $await_return_value  $server_close =         client         +  
      server     
+            #   -------------------  -------------   ----------------------   
---------------------
+            #             0                0              write/close          
    read/close
+            #             0                1            write/read/close       
    read/close
+            #             1                0         write/read/write/close   
read/write/read/close
+            #             1                1            write/read/close       
  read/write/close
+            # See: http://hea-www.harvard.edu/~fine/Tech/addrinuse.html
+            if ($accept_worthwhile) {
+                $listen_socket->timeout($sleep_interval);
+                #$SIG{CHLD}  = sub { $self->log({level=>4},"Caught Signal: 
@_\n"); };
+                $SIG{CHLD}  = sub { };  # the point is to interrupt the 
accept() system call, not to do anything.
+                $connection_socket = $listen_socket->accept();
+                $SIG{CHLD}  = "DEFAULT";
+                if ($connection_socket) {
+                    $connection_fd = fileno($connection_socket);
+                    $msg = $connection_socket->getline();
+                    $self->log({level=>4},"Message on socket($connection_fd) 
[$msg]\n");
+                    if ($msg) {
+                        $await_return_value = ($msg =~ s/^RV-//);
+                        $server_close       = ($msg =~ s/^SC-//);
+                        $msg =~ s/[\015\012]+$//;
+                        if ($msg =~ /^EXIT/i || $msg =~ /^QUIT/i) {
+                            $quit = 1;
+                        }
+                        elsif ($msg =~ /^GET/) {
+                            $await_return_value = 1;
+                            my $content = $self->state();
+                            my $content_length = length($content);
+                            $return_value = <<EOF;
 HTTP/1.1 200 OK
 Content-type: text/plain
 Content-length: $content_length
@@ -324,22 +333,21 @@
 
 $content
 EOF
-                            }
-                            else {
-                                $return_value = $self->process_msg($msg);
-                                $return_value .= "\n" if ($return_value !~ 
/\n$/);
-                            }
-                            if ($await_return_value) {
-                                $self->log({level=>4},"Returned on 
socket($connection_fd) [$return_value]\n") if ($msg !~ /^GET/);
-                                $connection_socket->autoflush(1);
-                                $connection_socket->print($return_value);
-                                $connection_socket->getline() if 
(!$server_close);
-                            }
-                            $connection_socket->close();
                         }
                         else {
-                            $connection_socket->close();
+                            $return_value = $self->process_msg($msg);
+                            $return_value .= "\n" if ($return_value !~ /\n$/);
+                        }
+                        if ($await_return_value) {
+                            $self->log({level=>4},"Returned on 
socket($connection_fd) [$return_value]\n") if ($msg !~ /^GET/);
+                            $connection_socket->autoflush(1);
+                            $connection_socket->print($return_value);
+                            $connection_socket->getline() if (!$server_close);
                         }
+                        $connection_socket->close();
+                    }
+                    else {
+                        $connection_socket->close();
                     }
                 }
             }
@@ -589,16 +597,16 @@
 
 sub dispatch_pending_async_events {
     &App::sub_entry if ($App::trace);
-    my ($self) = @_;
+    my ($self, $max_events) = @_;
+    $max_events ||= 9999;
     my $pending_async_events = $self->{pending_async_events};
     my ($async_event, $assigned);
     my $events_occurred = 0;
     my $i = 0;
-    while ($i <= $#$pending_async_events) {
+    while ($i <= $#$pending_async_events && $events_occurred < $max_events) {
         $async_event = $pending_async_events->[$i];
         $assigned = $self->assign_event_destination($async_event->[0]);
         if ($assigned) {
-            $async_event = shift(@$pending_async_events);
             $self->send_async_event_now(@$async_event);
             $events_occurred ++;
             splice(@$pending_async_events, $i, 1);  # remove 
$pending_async_events->[$i]

Reply via email to