Author: spadkins
Date: Tue Dec  4 10:26:19 2007
New Revision: 10360

Modified:
   p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
   p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
   p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm

Log:
clean up, and fixes to make splitting happen constantly and as fast as 
possible, but also allow for shopping to be spun off to keep nodes busy

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm     
(original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm     Tue Dec 
 4 10:26:19 2007
@@ -76,8 +76,6 @@
     &App::sub_entry if ($App::trace);
     my ($self, $event, $callback_event) = @_;
 
-    #$self->log({level=>2}, "CC: send_async_event_now() 
$event->{name}.$event->{method} : $event->{destination}\n") if 
$self->{options}{poe_trace};
-
     my $destination = $event->{destination};
     if (! defined $destination) {
         $self->log("ERROR: send_async_event_now() 
$event->{name}.$event->{method} : destination not assigned\n");
@@ -147,10 +145,7 @@
 sub _abort_running_async_event {
     &App::sub_entry if ($App::trace);
     my ($self, $runtime_event_token, $event, $callback_event) = @_;
-    #$self->log({level=>2}, "CC: _abort_running_async_event : 
runtime_event_token=[$runtime_event_token] : event=[$event] : 
callback_event=[$callback_event]\n");
-    #my $async_event = $self->{running_async_event}{$runtime_event_token};
     if ($runtime_event_token && $event && $callback_event) {
-        #my ($event, $callback_event) = @$async_event;
         if ($runtime_event_token =~ /^[0-9]+$/) {
             kill(9, $runtime_event_token);
         }
@@ -163,7 +158,6 @@
             my $remote_session_state = "poe_cancel_async_event";
 
             my $kernel = $self->{poe_kernel};
-            #$self->log({level=>2},"CC: _abort_running_async_event : calling 
remote cancel for $runtime_event_token\n");
             $kernel->post("IKC", "post", 
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
                 [ $runtime_event_token ]);
         }

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm   (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm   Tue Dec  4 
10:26:19 2007
@@ -254,18 +254,10 @@
 
     for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
         my $event_token = $self->{pending_async_events}[$i][0]{event_token};
-        $self->log({level=>2},"CN : poe_cancel_async_event : z1 : 
pending_async_events : event_token=[$event_token]\n");
     }
 
     ### Find if running
     for my $pid (keys %{$self->{running_async_event}}) {
-        #$self->log({level=>2},"CN : poe_cancel_async_event : 
running_async_event : pid=[$pid]\n");
-        #my $ae = $self->{running_async_event}{$pid};
-        #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys 
%{$ae->[0]});
-        #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys 
%{$ae->[1]});
-        #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
-        #$self->log({level=>2},"CN : poe_cancel_async_event : ce=[$ce]\n");
-
         my $event_token = $self->{running_async_event}{$pid}[0]{event_token};
         if ($runtime_event_token eq $event_token) {
             $self->log({level=>2},"CN : poe_cancel_async_event : 
running_async_event : found event_token=[$event_token] pid=[$pid]\n");
@@ -285,14 +277,9 @@
     ### Find if pending
     for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
         my $event_token = $self->{pending_async_events}[$i][0]{event_token};
-        $self->log({level=>2},"CN : poe_cancel_async_event : z2 : 
pending_async_events : event_token=[$event_token]\n");
         if ($runtime_event_token eq $event_token) {
             splice(@{$self->{pending_async_events}}, $i, 1);
         }
-        #my $ae = $self->{pending_async_events}{$foo};
-        #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys 
%{$ae->[0]});
-        #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys 
%{$ae->[1]});
-        #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
     }
      &App::sub_exit() if ($App::trace);
 }

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        Tue Dec  4 
10:26:19 2007
@@ -333,6 +333,7 @@
 
     $state .= "\n";
 
+    ### Only enable this in development, requires a library uncomment as well
     #$state .= $self->_state_poe();
 
     ### THIS DOESN'T WORK YET
@@ -350,6 +351,7 @@
 
     ### POE state dumping - Currently commented out because it doesn't gain us 
much
     ### in the way of visibility, and POE::API::Peek is a CPAN pain
+    ### UNCOMMENT THIS IF YOU NEED IT, DEPENDS ON A PAINFUL LIBRARY
     #my $api = POE::API::Peek->new();
     #my @queue = $api->event_queue_dump();
     #$state .= "POE event_queue_dump\n";
@@ -448,7 +450,7 @@
 
     $max_events ||= 9999;
     my $pending_async_events = $self->{pending_async_events};
-    my ($async_event, $assigned, $event, $in_process, %unique_events);
+    my ($async_event, $assigned, $event, $in_process);
     my $events_occurred = 0;
     my $i = 0;
     my $event_capacity_exists = 1;
@@ -461,8 +463,6 @@
             $events_occurred ++;
             splice(@$pending_async_events, $i, 1);  # remove 
$pending_async_events->[$i]
             $max_i--;
-
-            $unique_events{"$event->{name} $event->{method}"}++;
         }
         elsif ($event_capacity_exists) {
             $assigned = $self->assign_event_destination($event);
@@ -472,8 +472,6 @@
                 # keep $i the same
                 splice(@$pending_async_events, $i, 1);  # remove 
$pending_async_events->[$i]
                 $max_i--;
-
-                $unique_events{"$event->{name} $event->{method}"}++;
             }
             else {   # [undef] no servers are eligible for assignment
                 $event_capacity_exists = 0;   # there's no sense looking at 
the other pending async events
@@ -484,14 +482,8 @@
             $i++;   # look at the next one
         }
     }
-    my $sum_not_clear_pending_events = 0;
-    for my $key (keys %unique_events) {
-        if ($key ne "mvworkd _clear_pending_hotel_shop_requests") {
-            $sum_not_clear_pending_events += $unique_events{$key};
-        }
-    }
 
-    $self->log({level=>2},"S: dispatch_pending_async_events : exiting : 
events_occurred=[$events_occurred] 
events_not_clear_pending=[$sum_not_clear_pending_events] time=[" . 
sprintf("%.4f", tv_interval($t0, [gettimeofday])) . "]\n") if 
$self->{options}{poe_trace};
+    $self->log({level=>2},"S: dispatch_pending_async_events : exiting : 
events_occurred=[$events_occurred] time=[" . sprintf("%.4f", tv_interval($t0, 
[gettimeofday])) . "]\n") if $self->{options}{poe_trace};
     &App::sub_exit($events_occurred) if ($App::trace);
     return($events_occurred);
 }
@@ -621,8 +613,10 @@
         if ($callback_event) {
             $callback_event->{args} = [] if (! $callback_event->{args});
             my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid 
[sig=$sig]" : "";
-            push(@{$callback_event->{args}},
-                {event_token => $callback_event->{event_token}, returnval => 
$returnval, errnum => $exitval, errmsg => $errmsg});
+            push(@{$callback_event->{args}}, { event_token => 
$callback_event->{event_token},
+                                               returnval => $returnval,
+                                               errnum => $exitval,
+                                               errmsg => $errmsg });
             $self->send_event($callback_event);
         }
         elsif ($sig == 9) {  # killed without a chance to finish its work
@@ -690,8 +684,6 @@
     my ($async_event);
     my $aborted = 0;
 
-    #$self->log({level=>2}, "S: abort_async_event : event_token=[$event_token] 
: pending_async_events: ", Dumper($pending_async_events));
-
     # first look for it in the pending list
     for (my $i = 0; $i <= $#$pending_async_events; $i++) {
         $async_event = $pending_async_events->[$i];
@@ -715,12 +707,10 @@
 sub abort_running_async_event {
     &App::sub_entry if ($App::trace);
     my ($self, $runtime_event_token) = @_;
-    #$self->log({level=>2}, "S: abort_running_async_event : 
runtime_event_token=[$runtime_event_token]\n");
     my $running_async_event  = $self->{running_async_event};
     my $pending_async_events = $self->{pending_async_events};
     my $async_event = $running_async_event->{$runtime_event_token};
     if ($async_event) {
-        #$self->log({level=>2}, "S: abort_running_async_event : 
async_event=[$async_event]\n");
         $self->{num_async_events}--;
         delete $self->{running_async_event}{$runtime_event_token};
         $self->_abort_running_async_event($runtime_event_token, @$async_event);
@@ -850,7 +840,7 @@
 
 sub poe_yield {
     &App::sub_entry if ($App::trace);
-    my ($self, $kernel, $state, $max_count) = @_;
+    my ($self, $kernel, $state, $max_count, $calling_code) = @_;
 
     $max_count ||= 1;
     if (!defined($self->{poe_count}{$state})) {
@@ -860,10 +850,9 @@
         $self->{poe_count}{$state}++;
     }    
     if ($self->{poe_count}{$state} <= $max_count) {
-        $kernel->yield($state);
+        $kernel->yield($state, $calling_code);
     }
 
-    #$self->log({level=>2},"POE: poe_yield : poe_count : " . 
Dumper($self->{poe_count}) . "\n");
     &App::sub_exit() if ($App::trace);
     return;
 }
@@ -879,7 +868,6 @@
         $self->{poe_count}{$state} = 0;
     }
 
-    #$self->log({level=>2},"POE: poe_yield_acknowledged : poe_count : " . 
Dumper($self->{poe_count}) . "\n");
     &App::sub_exit() if ($App::trace);
     return;
 }
@@ -910,12 +898,10 @@
 sub poe_sigchld {
     &App::sub_entry if ($App::trace);
     my ( $self, $kernel, $heap, $pid, $status ) = @_[ OBJECT, KERNEL, HEAP, 
ARG1, ARG2 ];
-    #print STDERR "NOTICE: STATE (poe_sigchld) invoked with ($pid, $status) 
args\n";
     my $exitval = $status >> 8;
     my $sig     = $status & 255;
     $self->log({level=>2},"POE: poe_sigchld (Child $pid finished 
[exitval=$exitval,sig=$sig])\n") if $self->{options}{poe_trace};
     $self->finish_pid($pid, $exitval, $sig);
-    $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
     &App::sub_exit() if ($App::trace);
 }
 
@@ -930,12 +916,15 @@
             if ($main_service && $main_service->can("format_async_event")) {
                 my ($event, $callback_event) = @$async_event;
                 my $str = $main_service->format_async_event($event, 
$callback_event);
-                #$self->log({level=>2},"POE: poe_alarm : pending_async_events 
: $str\n");
+                $self->log({level=>2},"POE: poe_alarm : pending_async_events : 
$str\n");
             }
         }
     }
+    
+    ### This is mostly for the node, which needs this to spawn queued execute 
subrequest events
+    ### without it, subrequests get acquired by the node never spawns children 
to shop it
     $kernel->yield("poe_dispatch_pending_async_events");
-    $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
+    
     my $time = time();
     my (@events);
     my $events_occurred = 0;
@@ -944,7 +933,7 @@
         $time_of_next_event = $self->get_current_events([EMAIL PROTECTED], 
$time);
         if ($#events > -1) {
             foreach my $event (@events) {
-                #$self->log({level=>2},"POE: poe_alarm : yield(poe_run_event, 
$event)\n");
+                $self->log({level=>2},"POE: poe_alarm : yield(poe_run_event, 
$event)\n") if $self->{options}{poe_trace};
                 $kernel->yield("poe_run_event", $event);  # put on the POE run 
queue
                 $events_occurred++;
             }
@@ -992,14 +981,11 @@
 
 sub poe_dispatch_pending_async_events {
     &App::sub_entry if ($App::trace);
-    my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
+    my ( $self, $kernel, $heap, $arg ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
     $self->log({level=>2},"POE: poe_dispatch_pending_async_events\n") if 
$self->{options}{poe_trace};
 
     $self->poe_yield_acknowledged("poe_dispatch_pending_async_events");
     my $events_occurred = $self->dispatch_pending_async_events();
-    ### These are currently causing our serious slowness during lots of 
splitting, fix the dogpile problem!
-    #$kernel->yield("poe_dispatch_pending_async_events") if ($events_occurred 
> 0);
-    $self->poe_yield($kernel, "poe_dispatch_pending_async_events") if 
($events_occurred > 0);
 
     &App::sub_exit() if ($App::trace);
 }
@@ -1009,7 +995,6 @@
     my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
     $self->log({level=>2},"POE: poe_event_loop_extension\n") if 
$self->{options}{poe_trace};
     my $event_loop_extensions = $self->{event_loop_extensions};
-    #$self->log({level=>2},"Event Loop extension ($event_loop_extensions: #=" 
. ($#$event_loop_extensions+1) . ").\n") if $self->{options}{poe_trace};
     my $async_event_added = 0;
     if ($event_loop_extensions && $#$event_loop_extensions > -1) {
         my ($extension, $obj, $method, $args, $event_executed);
@@ -1018,19 +1003,12 @@
             ($obj, $method, $args) = @$extension;
             $event_executed = $obj->$method(@$args);  # execute extension
             $async_event_added = 1 if ($event_executed);
-            #if ($event_executed) {
-            #    $self->log({level=>2},"Event Loop extension: 
${obj}->${method}(@$args) = $event_executed\n") if $self->{options}{poe_trace};
-            #}
         }
     }
-    if ($async_event_added) {
-        #$kernel->yield("poe_dispatch_pending_async_events");
-        $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
-        $kernel->yield("poe_event_loop_extension");
-    }
-    else {
-        $kernel->delay_set("poe_event_loop_extension", 1);
-    }
+    $self->poe_yield($kernel, "poe_dispatch_pending_async_events", undef, 
"poe_event_loop_extension");
+    ### TODO: Do we want to constrain this if there is nothing to do (to 
prevent spinning unnecessary cycles)?
+    $kernel->yield("poe_event_loop_extension");
+
     &App::sub_exit() if ($App::trace);
 }
 
@@ -1113,16 +1091,10 @@
 
         if ($callback_event) {
             $callback_event->{args} = $callback_args;
-            if ($runtime_event_token && $callback_event && $callback_args && 
$callback_args->[0]) {
-                $self->log({level=>2},"NOTE: poe_remote_async_event_finished 
calling send_event: runtime_event_token[$runtime_event_token] 
callback_event[$callback_event->{method}] 
subrequest_id[$callback_args->[0]{subrequest_id}]\n");
-            }
-            else {
-                $self->log({level=>2},"NOTE: poe_remote_async_event_finished 
calling send_event: cannot print extra vars\n");
-            }
             $self->send_event($callback_event);
         }
         else {
-            $self->log({level=>2},"NOTE: poe_remote_async_event_finished 
called without callback_event : runtime_event_token[$runtime_event_token]\n");
+            $self->log({level=>2},"Server: WARNING : 
poe_remote_async_event_finished called without callback_event : 
runtime_event_token[$runtime_event_token]\n");
         }
     }
     else {
@@ -1135,7 +1107,6 @@
 sub poe_server_state {
     &App::sub_entry if ($App::trace);
     my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
-    #$self->log({level=>2},"POE: poe_server_state\n") if 
$self->{options}{poe_trace};
     my $server_state = $self->state();
     &App::sub_exit($server_state) if ($App::trace);
     return $server_state;

Reply via email to