Author: spadkins
Date: Tue Sep 11 13:52:11 2007
New Revision: 9937

Modified:
   p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
   p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm

Log:
working implementation of C and X for cancel

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm   (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm   Tue Sep 11 
13:52:11 2007
@@ -247,8 +247,14 @@
     my ( $self, $kernel, $heap, $arg0 ) = @_[ OBJECT, KERNEL, HEAP, ARG0 ];
     my ($runtime_event_token) = @$arg0;
     $self->log({level=>2},"CN : poe_cancel_async_event : 
runtime_event_token=[$runtime_event_token]\n");
+    #$self->log({level=>2},"CN : poe_cancel_async_event : _state_poe : \n" . 
$self->_state_poe());
     my $async_event = $self->{running_async_event}{$runtime_event_token};
 
+    for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
+        my $event_token = $self->{pending_async_events}[$i][0]{event_token};
+        $self->log({level=>2},"CN : poe_cancel_async_event : z1 : 
pending_async_events : event_token=[$event_token]\n");
+    }
+
     ### Find if running
     for my $pid (keys %{$self->{running_async_event}}) {
         #$self->log({level=>2},"CN : poe_cancel_async_event : 
running_async_event : pid=[$pid]\n");
@@ -277,7 +283,7 @@
     ### Find if pending
     for (my $i = 0; $i < @{$self->{pending_async_events}}; $i++) {
         my $event_token = $self->{pending_async_events}[$i][0]{event_token};
-        $self->log({level=>2},"CN : poe_cancel_async_event : 
pending_async_events : event_token=[$event_token]\n");
+        $self->log({level=>2},"CN : poe_cancel_async_event : z2 : 
pending_async_events : event_token=[$event_token]\n");
         if ($runtime_event_token eq $event_token) {
             splice(@{$self->{pending_async_events}}, $i, 1);
         }

Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm        Tue Sep 11 
13:52:11 2007
@@ -21,6 +21,7 @@
 use POE;
 use POE::Component::Server::SimpleHTTP;
 use POE::Component::IKC::Server;
+use POE::API::Peek;
 use HTTP::Status qw/RC_OK/;
 use Socket qw(INADDR_ANY);
 use Data::Dumper;
@@ -329,12 +330,36 @@
 
     $state .= "\n";
 
+    $state .= $self->_state_poe();
+
     $state .= $self->SUPER::_state();
 
     &App::sub_exit($state) if ($App::trace);
     return($state);
 }
 
+sub _state_poe {
+    my $self = @_;
+    my $state = "";
+
+    ### POE state dumping
+    my $api = POE::API::Peek->new();
+    my @queue = $api->event_queue_dump();
+    $state .= "POE event_queue_dump\n";
+    my $first = 1;
+    my $poe_stuff = [qw(ID index priority event type source destination)];
+    for my $item (@queue) {
+        if ($first) {
+            $state .= sprintf("%7s %6s %20s %30s %15s %30s %30s\n", 
@$poe_stuff);
+            $first = 0;
+        }
+        $state .= sprintf("%7d %6d %20f %30s %15s %30s %30s\n", @[EMAIL 
PROTECTED]);
+    }
+    $state .= "\n";
+
+    return $state;
+}
+
 # TODO: Implement this as a fork() or a context-level message to a node to 
fork().
 #       i.e. messages such as "EVENT:" and "EVENT-OK:"
 #       Save the callback_event according to an event_token.
@@ -363,6 +388,8 @@
 sub dispatch_pending_async_events {
     &App::sub_entry if ($App::trace);
     my ($self, $max_events) = @_;
+    #$self->log({level=>2},"S: dispatch_pending_async_events : enter : 
max_events=[$max_events]\n") if $self->{options}{poe_trace};
+    #$self->log({level=>2},"S: dispatch_pending_async_events : 
pending_async_events : " . Dumper($self->{pending_async_events}) . "\n") if 
$self->{options}{poe_trace};
     $max_events ||= 9999;
     my $pending_async_events = $self->{pending_async_events};
     my ($async_event, $assigned, $event, $in_process);
@@ -371,9 +398,11 @@
     my $event_capacity_exists = 1;
     my $max_i = $#$pending_async_events;
     while ($i <= $max_i && $events_occurred < $max_events) {
+        #$self->log({level=>2},"S: dispatch_pending_async_events : 
i/max_i=[$i/$max_i] : pending_async_events=[$#$pending_async_events]\n") if 
$self->{options}{poe_trace};
         $async_event = $pending_async_events->[$i];
         $event = $async_event->[0];
         if ($event->{destination}) {
+            #$self->log({level=>2},"S: dispatch_pending_async_events : 
destination=[$event->{destination}]\n") if $self->{options}{poe_trace};
             $self->send_async_event_now(@$async_event);
             $events_occurred ++;
             splice(@$pending_async_events, $i, 1);  # remove 
$pending_async_events->[$i]
@@ -381,6 +410,7 @@
         }
         elsif ($event_capacity_exists) {
             $assigned = $self->assign_event_destination($event);
+            #$self->log({level=>2},"S: dispatch_pending_async_events : 
assigned=[$assigned]\n") if $self->{options}{poe_trace};
             if ($assigned) {
                 $self->send_async_event_now(@$async_event);
                 $events_occurred ++;
@@ -398,6 +428,7 @@
         }
     }
     &App::sub_exit($events_occurred) if ($App::trace);
+    #$self->log({level=>2},"S: dispatch_pending_async_events : exiting : 
events_occurred=[$events_occurred]\n") if $self->{options}{poe_trace};
     return($events_occurred);
 }
 
@@ -418,7 +449,7 @@
     &App::sub_entry if ($App::trace);
     my ($self, $event, $callback_event) = @_;
 
-    #$self->log({level=>2}, "Server: send_async_event_now() 
$event->{name}.$event->{method} : $event->{destination}\n");
+    $self->log({level=>2}, "Server: send_async_event_now() 
$event->{name}.$event->{method} : $event->{destination}\n");
 
     if ($event->{destination} eq "in_process") {
         my $event_token = $self->send_async_event_in_process($event, 
$callback_event);
@@ -810,6 +841,7 @@
         $time_of_next_event = $self->get_current_events([EMAIL PROTECTED], 
$time);
         if ($#events > -1) {
             foreach my $event (@events) {
+                $self->log({level=>2},"POE: poe_alarm : yield(poe_run_event, 
$event)\n");
                 $kernel->yield("poe_run_event", $event);  # put on the POE run 
queue
                 $events_occurred++;
             }
@@ -910,7 +942,7 @@
     &App::sub_entry if ($App::trace);
     my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
     my ($sender, $event, $callback_event) = @$args;
-    $self->log({level=>2},"POE: poe_enqueue_async_event 
($event->{name}.$event->{method})\n") if $self->{options}{poe_trace};
+    $self->log({level=>2},"POE: poe_enqueue_async_event 
($event->{name}.$event->{method} token=$event->{event_token})\n") if 
$self->{options}{poe_trace};
 
     my $runtime_event_token = $self->send_async_event($event, { method => 
"async_event_finished", args => [ $sender, $event, $callback_event ], });
     $event->{event_token} = $runtime_event_token;

Reply via email to