Author: spadkins
Date: Tue Sep 11 13:52:11 2007
New Revision: 9937
Modified:
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
Log:
working implementation of C and X for cancel
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm Tue Sep 11
13:52:11 2007
@@ -247,8 +247,14 @@
my ( $self, $kernel, $heap, $arg0 ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
my ($runtime_event_token) = @$arg0;
$self->log({level=>2},"CN : poe_cancel_async_event :
runtime_event_token=[$runtime_event_token]\n");
+ #$self->log({level=>2},"CN : poe_cancel_async_event : _state_poe : \n" .
$self->_state_poe());
my $async_event = $self->{running_async_event}{$runtime_event_token};
+ for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
+ my $event_token = $self->{pending_async_events}[$i][0]{event_token};
+ $self->log({level=>2},"CN : poe_cancel_async_event : z1 :
pending_async_events : event_token=[$event_token]\n");
+ }
+
### Find if running
for my $pid (keys %{$self->{running_async_event}}) {
#$self->log({level=>2},"CN : poe_cancel_async_event :
running_async_event : pid=[$pid]\n");
@@ -277,7 +283,7 @@
### Find if pending
for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
my $event_token = $self->{pending_async_events}[$i][0]{event_token};
- $self->log({level=>2},"CN : poe_cancel_async_event :
pending_async_events : event_token=[$event_token]\n");
+ $self->log({level=>2},"CN : poe_cancel_async_event : z2 :
pending_async_events : event_token=[$event_token]\n");
if ($runtime_event_token eq $event_token) {
splice(@{$self->{pending_async_events}}, $i, 1);
}
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm Tue Sep 11
13:52:11 2007
@@ -21,6 +21,7 @@
use POE;
use POE::Component::Server::SimpleHTTP;
use POE::Component::IKC::Server;
+use POE::API::Peek;
use HTTP::Status qw/RC_OK/;
use Socket qw(INADDR_ANY);
use Data::Dumper;
@@ -329,12 +330,36 @@
$state .= "\n";
+ $state .= $self->_state_poe();
+
$state .= $self->SUPER::_state();
&App::sub_exit($state) if ($App::trace);
return($state);
}
+sub _state_poe {
+ my $self = @_;
+ my $state = "";
+
+ ### POE state dumping
+ my $api = POE::API::Peek->new();
+ my @queue = $api->event_queue_dump();
+ $state .= "POE event_queue_dump\n";
+ my $first = 1;
+ my $poe_stuff = [qw(ID index priority event type source destination)];
+ for my $item (@queue) {
+ if ($first) {
+ $state .= sprintf("%7s %6s %20s %30s %15s %30s %30s\n",
@$poe_stuff);
+ $first = 0;
+ }
+ $state .= sprintf("%7d %6d %20f %30s %15s %30s %30s\n", @[EMAIL
PROTECTED]);
+ }
+ $state .= "\n";
+
+ return $state;
+}
+
# TODO: Implement this as a fork() or a context-level message to a node to
fork().
# i.e. messages such as "EVENT:" and "EVENT-OK:"
# Save the callback_event according to an event_token.
@@ -363,6 +388,8 @@
sub dispatch_pending_async_events {
&App::sub_entry if ($App::trace);
my ($self, $max_events) = @_;
+ #$self->log({level=>2},"S: dispatch_pending_async_events : enter :
max_events=[$max_events]\n") if $self->{options}{poe_trace};
+ #$self->log({level=>2},"S: dispatch_pending_async_events :
pending_async_events : " . Dumper($self->{pending_async_events}) . "\n") if
$self->{options}{poe_trace};
$max_events ||= 9999;
my $pending_async_events = $self->{pending_async_events};
my ($async_event, $assigned, $event, $in_process);
@@ -371,9 +398,11 @@
my $event_capacity_exists = 1;
my $max_i = $#$pending_async_events;
while ($i <= $max_i && $events_occurred < $max_events) {
+ #$self->log({level=>2},"S: dispatch_pending_async_events :
i/max_i=[$i/$max_i] : pending_async_events=[$#$pending_async_events]\n") if
$self->{options}{poe_trace};
$async_event = $pending_async_events->[$i];
$event = $async_event->[0];
if ($event->{destination}) {
+ #$self->log({level=>2},"S: dispatch_pending_async_events :
destination=[$event->{destination}]\n") if $self->{options}{poe_trace};
$self->send_async_event_now(@$async_event);
$events_occurred ++;
splice(@$pending_async_events, $i, 1); # remove
$pending_async_events->[$i]
@@ -381,6 +410,7 @@
}
elsif ($event_capacity_exists) {
$assigned = $self->assign_event_destination($event);
+ #$self->log({level=>2},"S: dispatch_pending_async_events :
assigned=[$assigned]\n") if $self->{options}{poe_trace};
if ($assigned) {
$self->send_async_event_now(@$async_event);
$events_occurred ++;
@@ -398,6 +428,7 @@
}
}
&App::sub_exit($events_occurred) if ($App::trace);
+ #$self->log({level=>2},"S: dispatch_pending_async_events : exiting :
events_occurred=[$events_occurred]\n") if $self->{options}{poe_trace};
return($events_occurred);
}
@@ -418,7 +449,7 @@
&App::sub_entry if ($App::trace);
my ($self, $event, $callback_event) = @_;
- #$self->log({level=>2}, "Server: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n");
+ $self->log({level=>2}, "Server: send_async_event_now()
$event->{name}.$event->{method} : $event->{destination}\n");
if ($event->{destination} eq "in_process") {
my $event_token = $self->send_async_event_in_process($event,
$callback_event);
@@ -810,6 +841,7 @@
$time_of_next_event = $self->get_current_events([EMAIL PROTECTED],
$time);
if ($#events > -1) {
foreach my $event (@events) {
+ $self->log({level=>2},"POE: poe_alarm : yield(poe_run_event,
$event)\n");
$kernel->yield("poe_run_event", $event); # put on the POE run
queue
$events_occurred++;
}
@@ -910,7 +942,7 @@
&App::sub_entry if ($App::trace);
my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
my ($sender, $event, $callback_event) = @$args;
- $self->log({level=>2},"POE: poe_enqueue_async_event
($event->{name}.$event->{method})\n") if $self->{options}{poe_trace};
+ $self->log({level=>2},"POE: poe_enqueue_async_event
($event->{name}.$event->{method} token=$event->{event_token})\n") if
$self->{options}{poe_trace};
my $runtime_event_token = $self->send_async_event($event, { method =>
"async_event_finished", args => [ $sender, $event, $callback_event ], });
$event->{event_token} = $runtime_event_token;