Author: spadkins
Date: Mon Dec 3 08:21:16 2007
New Revision: 10349
Modified:
p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
Log:
fixing the pile up during splitting
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm Mon Dec 3
08:21:16 2007
@@ -17,6 +17,7 @@
use Sys::Hostname;
use Date::Format;
use Date::Parse;
+use Time::HiRes qw(gettimeofday tv_interval);
use POE;
use POE::Component::Server::SimpleHTTP;
@@ -440,35 +441,39 @@
sub dispatch_pending_async_events {
&App::sub_entry if ($App::trace);
my ($self, $max_events) = @_;
- #$self->log({level=>2},"S: dispatch_pending_async_events : enter :
max_events=[$max_events]\n") if $self->{options}{poe_trace};
+ my $t0 = [gettimeofday];
+ #$self->log({level=>2},"S: dispatch_pending_async_events : poe_state : \n"
. $self->_state_poe() . "\n");
+ $self->log({level=>2},"S: dispatch_pending_async_events : enter :
max_events=[$max_events]\n") if $self->{options}{poe_trace};
#$self->log({level=>2},"S: dispatch_pending_async_events :
pending_async_events : " . Dumper($self->{pending_async_events}) . "\n") if
$self->{options}{poe_trace};
+
$max_events ||= 9999;
my $pending_async_events = $self->{pending_async_events};
- my ($async_event, $assigned, $event, $in_process);
+ my ($async_event, $assigned, $event, $in_process, %unique_events);
my $events_occurred = 0;
my $i = 0;
my $event_capacity_exists = 1;
my $max_i = $#$pending_async_events;
while ($i <= $max_i && $events_occurred < $max_events) {
- #$self->log({level=>2},"S: dispatch_pending_async_events :
i/max_i=[$i/$max_i] : pending_async_events=[$#$pending_async_events]\n") if
$self->{options}{poe_trace};
$async_event = $pending_async_events->[$i];
$event = $async_event->[0];
if ($event->{destination}) {
- #$self->log({level=>2},"S: dispatch_pending_async_events :
destination=[$event->{destination}]\n") if $self->{options}{poe_trace};
$self->send_async_event_now(@$async_event);
$events_occurred ++;
splice(@$pending_async_events, $i, 1); # remove
$pending_async_events->[$i]
$max_i--;
+
+ $unique_events{"$event->{name} $event->{method}"}++;
}
elsif ($event_capacity_exists) {
$assigned = $self->assign_event_destination($event);
- #$self->log({level=>2},"S: dispatch_pending_async_events :
assigned=[$assigned]\n") if $self->{options}{poe_trace};
if ($assigned) {
$self->send_async_event_now(@$async_event);
$events_occurred ++;
# keep $i the same
splice(@$pending_async_events, $i, 1); # remove
$pending_async_events->[$i]
$max_i--;
+
+ $unique_events{"$event->{name} $event->{method}"}++;
}
else { # [undef] no servers are eligible for assignment
$event_capacity_exists = 0; # there's no sense looking at
the other pending async events
@@ -479,8 +484,15 @@
$i++; # look at the next one
}
}
+ my $sum_not_clear_pending_events = 0;
+ for my $key (keys %unique_events) {
+ if ($key ne "mvworkd _clear_pending_hotel_shop_requests") {
+ $sum_not_clear_pending_events += $unique_events{$key};
+ }
+ }
+
+ $self->log({level=>2},"S: dispatch_pending_async_events : exiting :
events_occurred=[$events_occurred]
events_not_clear_pending=[$sum_not_clear_pending_events] time=[" .
sprintf("%.4f", tv_interval($t0, [gettimeofday])) . "]\n") if
$self->{options}{poe_trace};
&App::sub_exit($events_occurred) if ($App::trace);
- #$self->log({level=>2},"S: dispatch_pending_async_events : exiting :
events_occurred=[$events_occurred]\n") if $self->{options}{poe_trace};
return($events_occurred);
}
@@ -836,6 +848,42 @@
return;
}
+sub poe_yield {
+ &App::sub_entry if ($App::trace);
+ my ($self, $kernel, $state, $max_count) = @_;
+
+ $max_count ||= 1;
+ if (!defined($self->{poe_count}{$state})) {
+ $self->{poe_count}{$state} = 1;
+ }
+ else {
+ $self->{poe_count}{$state}++;
+ }
+ if ($self->{poe_count}{$state} <= $max_count) {
+ $kernel->yield($state);
+ }
+
+ #$self->log({level=>2},"POE: poe_yield : poe_count : " .
Dumper($self->{poe_count}) . "\n");
+ &App::sub_exit() if ($App::trace);
+ return;
+}
+
+sub poe_yield_acknowledged {
+ &App::sub_entry if ($App::trace);
+ my ($self, $state) = @_;
+
+ if ($self->{poe_count}{$state}) {
+ $self->{poe_count}{$state}--;
+ }
+ else {
+ $self->{poe_count}{$state} = 0;
+ }
+
+ #$self->log({level=>2},"POE: poe_yield_acknowledged : poe_count : " .
Dumper($self->{poe_count}) . "\n");
+ &App::sub_exit() if ($App::trace);
+ return;
+}
+
sub poe_sigterm {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $signame ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
@@ -867,13 +915,14 @@
my $sig = $status & 255;
$self->log({level=>2},"POE: poe_sigchld (Child $pid finished
[exitval=$exitval,sig=$sig])\n") if $self->{options}{poe_trace};
$self->finish_pid($pid, $exitval, $sig);
- $kernel->yield("poe_dispatch_pending_async_events");
+ $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
&App::sub_exit() if ($App::trace);
}
sub poe_alarm {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
+ #$self->log({level=>2},"S: poe_alarm : poe_state : \n" .
$self->_state_poe() . "\n");
$self->log({level=>2},"POE: poe_alarm (Dispatching pending events and
queueing scheduled events)\n") if $self->{options}{poe_trace};
my $main_service = $self->{main_service};
if ($self->{options}{poe_trace}) {
@@ -881,11 +930,12 @@
if ($main_service && $main_service->can("format_async_event")) {
my ($event, $callback_event) = @$async_event;
my $str = $main_service->format_async_event($event,
$callback_event);
- $self->log({level=>2},"POE: poe_alarm : pending_async_events :
$str\n");
+ #$self->log({level=>2},"POE: poe_alarm : pending_async_events
: $str\n");
}
}
}
$kernel->yield("poe_dispatch_pending_async_events");
+ $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
my $time = time();
my (@events);
my $events_occurred = 0;
@@ -894,7 +944,7 @@
$time_of_next_event = $self->get_current_events([EMAIL PROTECTED],
$time);
if ($#events > -1) {
foreach my $event (@events) {
- $self->log({level=>2},"POE: poe_alarm : yield(poe_run_event,
$event)\n");
+ #$self->log({level=>2},"POE: poe_alarm : yield(poe_run_event,
$event)\n");
$kernel->yield("poe_run_event", $event); # put on the POE run
queue
$events_occurred++;
}
@@ -944,8 +994,13 @@
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
$self->log({level=>2},"POE: poe_dispatch_pending_async_events\n") if
$self->{options}{poe_trace};
+
+ $self->poe_yield_acknowledged("poe_dispatch_pending_async_events");
my $events_occurred = $self->dispatch_pending_async_events();
- $kernel->yield("poe_dispatch_pending_async_events") if ($events_occurred >
0);
+ ### These are currently causing our serious slowness during lots of
splitting, fix the dogpile problem!
+ #$kernel->yield("poe_dispatch_pending_async_events") if ($events_occurred
> 0);
+ $self->poe_yield($kernel, "poe_dispatch_pending_async_events") if
($events_occurred > 0);
+
&App::sub_exit() if ($App::trace);
}
@@ -969,7 +1024,8 @@
}
}
if ($async_event_added) {
- $kernel->yield("poe_dispatch_pending_async_events");
+ #$kernel->yield("poe_dispatch_pending_async_events");
+ $self->poe_yield($kernel, "poe_dispatch_pending_async_events");
$kernel->yield("poe_event_loop_extension");
}
else {