Author: spadkins
Date: Tue Dec 4 10:26:19 2007
New Revision: 10360
Modified:
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
Log:
clean up, and fixes to make splitting happen constantly and as fast as
possible, but also allow for shopping to be spun off to keep nodes busy
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
(original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm Tue Dec
4 10:26:19 2007
@@ -76,8 +76,6 @@
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
- #$self->log({level=>2}, "CC: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n") if
$self->{options}{poe_trace};
-
my $destination = $event->{destination};
if (! defined $destination) {
$self->log("ERROR: send_async_event_now()
$event->{name}.$event->{method} : destination not assigned\n");
@@ -147,10 +145,7 @@
sub _abort_running_async_event {
&App::sub_entry if ($App::trace);
my ($self, $runtime_event_token, $event, $callback_event) = @_;
- #$self->log({level=>2}, "CC: _abort_running_async_event :
runtime_event_token=[$runtime_event_token] : event=[$event] :
callback_event=[$callback_event]\n");
- #my $async_event = $self->{running_async_event}{$runtime_event_token};
if ($runtime_event_token && $event && $callback_event) {
- #my ($event, $callback_event) = @$async_event;
if ($runtime_event_token =~ /^[0-9]+$/) {
kill(9, $runtime_event_token);
}
@@ -163,7 +158,6 @@
my $remote_session_state = "poe_cancel_async_event";
my $kernel = $self->{poe_kernel};
- #$self->log({level=>2},"CC: _abort_running_async_event : calling
remote cancel for $runtime_event_token\n");
$kernel->post("IKC", "post",
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
[ $runtime_event_token ]);
}
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm Tue Dec 4
10:26:19 2007
@@ -254,18 +254,10 @@
for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
my $event_token = $self->{pending_async_events}[$i][0]{event_token};
- $self->log({level=>2},"CN : poe_cancel_async_event : z1 :
pending_async_events : event_token=[$event_token]\n");
}
### Find if running
for my $pid (keys %{$self->{running_async_event}}) {
- #$self->log({level=>2},"CN : poe_cancel_async_event :
running_async_event : pid=[$pid]\n");
- #my $ae = $self->{running_async_event}{$pid};
- #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys
%{$ae->[0]});
- #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys
%{$ae->[1]});
- #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
- #$self->log({level=>2},"CN : poe_cancel_async_event : ce=[$ce]\n");
-
my $event_token = $self->{running_async_event}{$pid}[0]{event_token};
if ($runtime_event_token eq $event_token) {
$self->log({level=>2},"CN : poe_cancel_async_event :
running_async_event : found event_token=[$event_token] pid=[$pid]\n");
@@ -285,14 +277,9 @@
### Find if pending
for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
my $event_token = $self->{pending_async_events}[$i][0]{event_token};
- $self->log({level=>2},"CN : poe_cancel_async_event : z2 :
pending_async_events : event_token=[$event_token]\n");
if ($runtime_event_token eq $event_token) {
splice(@{$self->{pending_async_events}}, $i, 1);
}
- #my $ae = $self->{pending_async_events}{$foo};
- #my $e = join(", ", map {sprintf("$_ = [$ae->[0]{$_}]")} keys
%{$ae->[0]});
- #my $ce = join(", ", map {sprintf("$_ = [$ae->[1]{$_}]")} keys
%{$ae->[1]});
- #$self->log({level=>2},"CN : poe_cancel_async_event : e=[$e]\n");
}
&App::sub_exit() if ($App::trace);
}
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm Tue Dec 4
10:26:19 2007
@@ -333,6 +333,7 @@
$state .= "\n";
+ ### Only enable this in development, requires a library uncomment as well
#$state .= $self->_state_poe();
### THIS DOESN'T WORK YET
@@ -350,6 +351,7 @@
### POE state dumping - Currently commented out because it doesn't gain us
much
### in the way of visibility, and POE::API::Peek is a CPAN pain
+ ### UNCOMMENT THIS IF YOU NEED IT, DEPENDS ON A PAINFUL LIBRARY
#my $api = POE::API::Peek->new();
#my @queue = $api->event_queue_dump();
#$state .= "POE event_queue_dump\n";
@@ -448,7 +450,7 @@
$max_events ||= 9999;
my $pending_async_events = $self->{pending_async_events};
- my ($async_event, $assigned, $event, $in_process, %unique_events);
+ my ($async_event, $assigned, $event, $in_process);
my $events_occurred = 0;
my $i = 0;
my $event_capacity_exists = 1;
@@ -461,8 +463,6 @@
$events_occurred ++;
splice(@$pending_async_events, $i, 1); # remove
$pending_async_events->[$i]
$max_i--;
-
- $unique_events{"$event->{name} $event->{method}"}++;
}
elsif ($event_capacity_exists) {
$assigned = $self->assign_event_destination($event);
@@ -472,8 +472,6 @@
# keep $i the same
splice(@$pending_async_events, $i, 1); # remove
$pending_async_events->[$i]
$max_i--;
-
- $unique_events{"$event->{name} $event->{method}"}++;
}
else { # [undef] no servers are eligible for assignment
$event_capacity_exists = 0; # there's no sense looking at
the other pending async events
@@ -484,14 +482,8 @@
$i++; # look at the next one
}
}
- my $sum_not_clear_pending_events = 0;
- for my $key (keys %unique_events) {
- if ($key ne "mvworkd _clear_pending_hotel_shop_requests") {
- $sum_not_clear_pending_events += $unique_events{$key};
- }
- }
- $self->log({level=>2},"S: dispatch_pending_async_events : exiting :
events_occurred=[$events_occurred]
events_not_clear_pending=[$sum_not_clear_pending_events] time=[" .
sprintf("%.4f", tv_interval($t0, [gettimeofday])) . "]\n") if
$self->{options}{poe_trace};
+ $self->log({level=>2},"S: dispatch_pending_async_events : exiting :
events_occurred=[$events_occurred] time=[" . sprintf("%.4f", tv_interval($t0,
[gettimeofday])) . "]\n") if $self->{options}{poe_trace};
&App::sub_exit($events_occurred) if ($App::trace);
return($events_occurred);
}
@@ -621,8 +613,10 @@
if ($callback_event) {
$callback_event->{args} = [] if (! $callback_event->{args});
my $errmsg = ($exitval || $sig) ? "Exit $exitval on $pid
[sig=$sig]" : "";
- push(@{$callback_event->{args}},
- {event_token => $callback_event->{event_token}, returnval =>
$returnval, errnum => $exitval, errmsg => $errmsg});
+ push(@{$callback_event->{args}}, { event_token =>
$callback_event->{event_token},
+ returnval => $returnval,
+ errnum => $exitval,
+ errmsg => $errmsg });
$self->send_event($callback_event);
}
elsif ($sig == 9) { # killed without a chance to finish its work
@@ -690,8 +684,6 @@
my ($async_event);
my $aborted = 0;
- #$self->log({level=>2}, "S: abort_async_event : event_token=[$event_token]
: pending_async_events: ", Dumper($pending_async_events));
-
# first look for it in the pending list
for (my $i = 0; $i <= $#$pending_async_events; $i++) {
$async_event = $pending_async_events->[$i];
@@ -715,12 +707,10 @@
sub abort_running_async_event {
&App::sub_entry if ($App::trace);
my ($self, $runtime_event_token) = @_;
- #$self->log({level=>2}, "S: abort_running_async_event :
runtime_event_token=[$runtime_event_token]\n");
my $running_async_event = $self->{running_async_event};
my $pending_async_events = $self->{pending_async_events};
my $async_event = $running_async_event->{$runtime_event_token};
if ($async_event) {
- #$self->log({level=>2}, "S: abort_running_async_event :
async_event=[$async_event]\n");
$self->{num_async_events}--;
delete $self->{running_async_event}{$runtime_event_token};
$self->_abort_running_async_event($runtime_event_token, @$async_event);
@@ -850,7 +840,7 @@
sub poe_yield {
&App::sub_entry if ($App::trace);
- my ($self, $kernel, $state, $max_count) = @_;
+ my ($self, $kernel, $state, $max_count, $calling_code) = @_;
$max_count ||= 1;
if (!defined($self->{poe_count}{$state})) {
@@ -860,10 +850,9 @@
$self->{poe_count}{$state}++;
}
if ($self->{poe_count}{$state} <= $max_count) {
- $kernel->yield($state);
+ $kernel->yield($state, $calling_code);
}
- #$self->log({level=>2},"POE: poe_yield : poe_count : " .
Dumper($self->{poe_count}) . "\n");
&App::sub_exit() if ($App::trace);
return;
}
@@ -879,7 +868,6 @@
$self->{poe_count}{$state} = 0;
}
- #$self->log({level=>2},"POE: poe_yield_acknowledged : poe_count : " .
Dumper($self->{poe_count}) . "\n");
&App::sub_exit() if ($App::trace);
return;
}
@@ -910,12 +898,10 @@
sub poe_sigchld {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $pid, $status ) = @_[ OBJECT, KERNEL, HEAP,
ARG1, ARG2 ];
- #print STDERR "NOTICE: STATE (poe_sigchld) invoked with ($pid, $status)
args\n";
my $exitval = $status >> 8;
my $sig = $status & 255;
$self->log({level=>2},"POE: poe_sigchld (Child $pid finished
[exitval=$exitval,sig=$sig])\n") if $self->{options}{poe_trace};
$self->finish_pid($pid, $exitval, $sig);
- $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
&App::sub_exit() if ($App::trace);
}
@@ -930,12 +916,15 @@
if ($main_service && $main_service->can("format_async_event")) {
my ($event, $callback_event) = @$async_event;
my $str = $main_service->format_async_event($event,
$callback_event);
- #$self->log({level=>2},"POE: poe_alarm : pending_async_events
: $str\n");
+ $self->log({level=>2},"POE: poe_alarm : pending_async_events :
$str\n");
}
}
}
+
+ ### This is mostly for the node, which needs this to spawn queued execute
subrequest events
+ ### without it, subrequests get acquired by the node never spawns children
to shop it
$kernel->yield("poe_dispatch_pending_async_events");
- $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
+
my $time = time();
my (@events);
my $events_occurred = 0;
@@ -944,7 +933,7 @@
$time_of_next_event = $self->get_current_events([EMAIL PROTECTED],
$time);
if ($#events > -1) {
foreach my $event (@events) {
- #$self->log({level=>2},"POE: poe_alarm : yield(poe_run_event,
$event)\n");
+ $self->log({level=>2},"POE: poe_alarm : yield(poe_run_event,
$event)\n") if $self->{options}{poe_trace};
$kernel->yield("poe_run_event", $event); # put on the POE run
queue
$events_occurred++;
}
@@ -992,14 +981,11 @@
sub poe_dispatch_pending_async_events {
&App::sub_entry if ($App::trace);
- my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
+ my ( $self, $kernel, $heap, $arg ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
$self->log({level=>2},"POE: poe_dispatch_pending_async_events\n") if
$self->{options}{poe_trace};
$self->poe_yield_acknowledged("poe_dispatch_pending_async_events");
my $events_occurred = $self->dispatch_pending_async_events();
- ### These are currently causing our serious slowness during lots of
splitting, fix the dogpile problem!
- #$kernel->yield("poe_dispatch_pending_async_events") if ($events_occurred
> 0);
- $self->poe_yield($kernel, "poe_dispatch_pending_async_events") if
($events_occurred > 0);
&App::sub_exit() if ($App::trace);
}
@@ -1009,7 +995,6 @@
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
$self->log({level=>2},"POE: poe_event_loop_extension\n") if
$self->{options}{poe_trace};
my $event_loop_extensions = $self->{event_loop_extensions};
- #$self->log({level=>2},"Event Loop extension ($event_loop_extensions: #="
. ($#$event_loop_extensions+1) . ").\n") if $self->{options}{poe_trace};
my $async_event_added = 0;
if ($event_loop_extensions && $#$event_loop_extensions > -1) {
my ($extension, $obj, $method, $args, $event_executed);
@@ -1018,19 +1003,12 @@
($obj, $method, $args) = @$extension;
$event_executed = $obj->$method(@$args); # execute extension
$async_event_added = 1 if ($event_executed);
- #if ($event_executed) {
- # $self->log({level=>2},"Event Loop extension:
${obj}->${method}(@$args) = $event_executed\n") if $self->{options}{poe_trace};
- #}
}
}
- if ($async_event_added) {
- #$kernel->yield("poe_dispatch_pending_async_events");
- $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
- $kernel->yield("poe_event_loop_extension");
- }
- else {
- $kernel->delay_set("poe_event_loop_extension", 1);
- }
+ $self->poe_yield($kernel, "poe_dispatch_pending_async_events", undef,
"poe_event_loop_extension");
+ ### TODO: Do we want to constrain this if there is nothing to do (to
prevent spinning unnecessary cycles)?
+ $kernel->yield("poe_event_loop_extension");
+
&App::sub_exit() if ($App::trace);
}
@@ -1113,16 +1091,10 @@
if ($callback_event) {
$callback_event->{args} = $callback_args;
- if ($runtime_event_token && $callback_event && $callback_args &&
$callback_args->[0]) {
- $self->log({level=>2},"NOTE: poe_remote_async_event_finished
calling send_event: runtime_event_token[$runtime_event_token]
callback_event[$callback_event->{method}]
subrequest_id[$callback_args->[0]{subrequest_id}]\n");
- }
- else {
- $self->log({level=>2},"NOTE: poe_remote_async_event_finished
calling send_event: cannot print extra vars\n");
- }
$self->send_event($callback_event);
}
else {
- $self->log({level=>2},"NOTE: poe_remote_async_event_finished
called without callback_event : runtime_event_token[$runtime_event_token]\n");
+ $self->log({level=>2},"Server: WARNING :
poe_remote_async_event_finished called without callback_event :
runtime_event_token[$runtime_event_token]\n");
}
}
else {
@@ -1135,7 +1107,6 @@
sub poe_server_state {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
- #$self->log({level=>2},"POE: poe_server_state\n") if
$self->{options}{poe_trace};
my $server_state = $self->state();
&App::sub_exit($server_state) if ($App::trace);
return $server_state;