Author: spadkins
Date: Tue Aug 7 12:11:58 2007
New Revision: 9827
Modified:
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
Log:
first working version of POE cluster context
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm
(original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterController.pm Tue Aug
7 12:11:58 2007
@@ -44,9 +44,10 @@
$self->{max_async_events} = 0; # start with 0 because there are no nodes
up
push(@{$self->{poe_states}},
- "poe_remote_async_event_queued", "poe_set_node_status",
"poe_run_event",
- "poe_register_node", "poe_set_node_up", "poe_set_node_down");
- push(@{$self->{poe_ikc_published_states}}, "poe_set_node_status");
+ "poe_receive_node_status",
+ "poe_run_event");
+ push(@{$self->{poe_ikc_published_states}},
+ "poe_receive_node_status");
$self->_init_poe($options);
@@ -83,21 +84,22 @@
my $event_token = $self->send_async_event_in_process($event,
$callback_event);
}
elsif ($destination =~ /^([^:]+):([0-9]+)$/) {
+ my $controller = "$self->{host}:$self->{port}";
my $node_host = $1;
my $node_port = $2;
my $args = $event->{args};
my $remote_server_name = "poe_${node_host}_${node_port}";
my $remote_session_alias = $self->{poe_session_name}; # remote is
same as local
- my $remote_session_state = "poe_send_async_event";
- my $local_callback_state = "poe_remote_async_event_queued";
+ my $remote_session_state = "poe_enqueue_async_event";
+ my $local_callback_state = "poe_enqueue_async_event_finished";
$self->{num_async_events}++;
$self->{node}{$destination}{num_async_events}++;
my $kernel = $self->{poe_kernel};
$kernel->post("IKC", "call",
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
- [ $event, $callback_event ], "poe:$local_callback_state" );
+ [ $controller, $event, $callback_event ],
"poe:$local_callback_state" );
}
else {
$self->SUPER::send_async_event_now($event, $callback_event);
@@ -109,8 +111,9 @@
&App::sub_entry if ($App::trace);
my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
$self->log({level=>2},"POE: ikc_register ($session_name)\n");
- if ($session_name =~ /^ikc_/) {
- # do something
+ if ($session_name =~ /^ikc_([^_]+)_(\d+)$/) {
+ my $node = "$1:$2";
+ $self->set_node_up($node);
}
my ($retval);
&App::sub_exit($retval) if ($App::trace);
@@ -119,8 +122,12 @@
sub ikc_unregister {
&App::sub_entry if ($App::trace);
- my ($self, $kernel, $remote_kernel_id) = @_[OBJECT, KERNEL, ARG0];
- $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id)\n");
+ my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
+ $self->log({level=>2},"POE: ikc_unregister ($session_name)\n");
+ if ($session_name =~ /^ikc_([^_]+)_(\d+)$/) {
+ my $node = "$1:$2";
+ $self->set_node_down($node);
+ }
&App::sub_exit() if ($App::trace);
}
@@ -132,14 +139,6 @@
return;
}
-sub poe_remote_async_event_queued {
- &App::sub_entry if ($App::trace);
- my ($self, $kernel, $runtime_event_token, $async_event) = @_[OBJECT,
KERNEL, ARG0, ARG1];
- $self->log({level=>2},"POE: poe_remote_async_event_queued
($async_event->[0]{name}.$async_event->[0]{method} => $runtime_event_token\n");
- $self->{running_async_event}{$runtime_event_token} = $async_event;
- &App::sub_exit() if ($App::trace);
-}
-
# $runtime_event_tokens take the following forms:
# $runtime_event_token = $pid; --
App::Context::Server::send_async_event_now() and ::finish_pid()
# $runtime_event_token = "$host-$port-$serial"; -- i.e. a plain event token
on the node
@@ -207,13 +206,14 @@
return($assigned);
}
-sub poe_set_node_status {
+sub poe_receive_node_status {
&App::sub_entry if ($App::trace);
my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
- $self->log("POE: poe_set_node_status args=(@$args)\n");
- my ($retval);
- &App::sub_exit($retval) if ($App::trace);
- return($retval);
+ my ($node, $sys_info) = @$args;
+ $self->log("POE: poe_receive_node_status ($node) - " .
+ "load=$sys_info->{load},
memfree=$sys_info->{memfree}/$sys_info->{memtotal}
swapfree=$sys_info->{swapfree}/$sys_info->{swaptotal}\n");
+ $self->set_node_up($node, $sys_info);
+ &App::sub_exit() if ($App::trace);
}
sub poe_run_event {
@@ -277,49 +277,11 @@
return($state);
}
-sub poe_register_node {
- &App::sub_entry if ($App::trace);
- my ($self, $kernel, $remote_kernel_id) = @_[OBJECT, KERNEL, ARG0];
- $self->log({level=>2},"POE: poe_register_node ($remote_kernel_id)\n");
-
- my ($node);
- if ($remote_kernel_id =~ m!poe_([^_]+)_([0-9]+)!) {
- $node = "$1:$2";
- }
- else {
- $self->log("ERROR: poe_register_node: unparseable remote_kernel_id
[$remote_kernel_id]\n");
- }
-
- if (!$self->{node}{$node}{up}) {
- my $remote_server_name = "poe_${node}";
- $remote_server_name =~ s/:/_/;
- $kernel->post("IKC", "monitor", "poe://$remote_server_name",
- {register => "poe_set_node_up",
- unregister => "poe_set_node_down",
- shutdown => "poe_set_node_down",
- data => $node});
- }
-
- &App::sub_exit() if ($App::trace);
-}
-
-sub poe_set_node_up {
+sub set_node_up {
&App::sub_entry if ($App::trace);
- my ($self, $kernel, $remote_kernel_id, $node) = @_[OBJECT, KERNEL, ARG0,
ARG3];
- $self->log({level=>2},"POE: poe_set_node_up ($remote_kernel_id;
node=$node)\n");
- my ($retval, $values);
+ my ($self, $node, $sys_info) = @_;
+ my ($retval);
if (!$self->{node}{$node}{up}) {
- if ($node =~ /^([^:]+:\d+):(.*)/) {
- $node = $1;
- $values = $2;
- if ($values) {
- foreach my $value (split(/,/, $values)) {
- if ($value =~ /^([^=]+)=(.*)/) {
- $self->{node}{$node}{$1} = $2;
- }
- }
- }
- }
$self->{node}{$node}{datetime} = time2str("%Y-%m-%d %H:%M:%S", time());
if ($self->{node}{$node}{up}) {
$retval = "ok";
@@ -330,14 +292,18 @@
$retval = "new";
}
}
+ if ($sys_info) {
+ foreach my $sys_var (keys %$sys_info) {
+ $self->{node}{$node}{$sys_var} = $sys_info->{$sys_var};
+ }
+ }
&App::sub_exit($retval) if ($App::trace);
return($retval);
}
-sub poe_set_node_down {
+sub set_node_down {
&App::sub_entry if ($App::trace);
- my ($self, $kernel, $remote_kernel_id, $node) = @_[OBJECT, KERNEL, ARG0,
ARG3];
- $self->log({level=>2},"POE: poe_set_node_down ($remote_kernel_id;
node=$node)\n");
+ my ($self, $node) = @_;
my $runtime_event_token_prefix = $node;
$runtime_event_token_prefix =~ s/:/-/;
$self->reset_running_async_events($runtime_event_token_prefix);
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/ClusterNode.pm Tue Aug 7
12:11:58 2007
@@ -61,7 +61,7 @@
&App::sub_entry if ($App::trace);
my ($self, $options) = @_;
- my $ikc_name = "ikc_$self->{host}_$self->{port}";
+ my $ikc_name = "poe_$self->{host}_$self->{port}";
### Set up a server
POE::Component::IKC::Responder->spawn();
POE::Component::IKC::Client->spawn(
@@ -74,7 +74,7 @@
my $session_name = $self->{poe_session_name};
POE::Component::Server::SimpleHTTP->new(
- 'ALIAS' => $self->{poe_kernel_httpd_name},
+ 'ALIAS' => $self->{poe_kernel_http_name},
'ADDRESS' => INADDR_ANY,
'PORT' => $self->{options}{http_port},
'HANDLERS' => [
@@ -140,9 +140,10 @@
sub ikc_unregister {
&App::sub_entry if ($App::trace);
- my ($self, $kernel, $remote_kernel_id, $node) = @_[OBJECT, KERNEL, ARG0,
ARG3];
- $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id;
node=$node)\n");
+ my ($self, $kernel, $remote_kernel_id, $session_name, $node) = @_[OBJECT,
KERNEL, ARG0, ARG1, ARG3];
+ $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id;
session_name=$session_name; node=$node)\n");
$self->{controller_up} = 0;
+ $kernel->yield("poe_shutdown");
&App::sub_exit() if ($App::trace);
}
@@ -161,7 +162,7 @@
my $node_heartbeat = $self->{options}{node_heartbeat} || 60;
$self->schedule_event(
method => "send_node_status",
- time => time(), # immediately ...
+ time => time()+5, # immediately ...
interval => $node_heartbeat, # and every X seconds hereafter
);
&App::sub_exit() if ($App::trace);
@@ -184,13 +185,21 @@
my $remote_server_name = "poe_${controller_host}_${controller_port}";
my $remote_session_alias = $self->{poe_session_name}; # remote is same as
local
- my $remote_session_state = "poe_set_node_status";
+ my $remote_session_state = "poe_receive_node_status";
my $sys_info = $self->get_sys_info();
+ my $memfree = $sys_info->{memfree} + $sys_info->{buffers} +
$sys_info->{cached};
+ my $s_info = {
+ load => $sys_info->{load},
+ memfree => $memfree,
+ memtotal => $sys_info->{memtotal},
+ swapfree => $sys_info->{swapfree},
+ swaptotal => $sys_info->{swaptotal},
+ };
if ($self->{controller_up}) {
my $kernel = $self->{poe_kernel};
$kernel->post("IKC", "post",
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
- [ $sys_info ]);
+ [ "$node_host:$node_port", $s_info ]);
}
&App::sub_exit() if ($App::trace);
Modified: p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm
==============================================================================
--- p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm (original)
+++ p5ee/trunk/App-Context/lib/App/Context/POE/Server.pm Tue Aug 7
12:11:58 2007
@@ -44,7 +44,7 @@
$self->{port} = $options->{port};
$options->{http_port} ||= $options->{port}+1;
$self->{poe_kernel_name} = "poe_$self->{host}_$self->{port}";
- $self->{poe_kernel_httpd_name} = $self->{poe_kernel_name} . "_httpd";
+ $self->{poe_kernel_http_name} = $self->{poe_kernel_name} . "_httpd";
$self->{poe_session_name} = "poe_session";
$self->{poe_kernel} = $poe_kernel;
@@ -65,8 +65,13 @@
ikc_register ikc_unregister ikc_shutdown
poe_run_event poe_event_loop_extension
poe_dispatch_pending_async_events
poe_server_state poe_http_server_state poe_http_test_run
+ poe_enqueue_async_event poe_enqueue_async_event_finished
poe_remote_async_event_finished
+ )];
+ $self->{poe_ikc_published_states} = [qw(
+ poe_server_state
+ poe_enqueue_async_event
+ poe_remote_async_event_finished
)];
- $self->{poe_ikc_published_states} = ["poe_server_state"];
### Does nothing by default, used by ClusterController, maybe other
subclasses?
$self->_init2a($options);
@@ -116,7 +121,7 @@
my $session_name = $self->{poe_session_name};
POE::Component::Server::SimpleHTTP->new(
- 'ALIAS' => $self->{poe_kernel_httpd_name},
+ 'ALIAS' => $self->{poe_kernel_http_name},
'ADDRESS' => INADDR_ANY,
'PORT' => $self->{options}{http_port},
'HANDLERS' => [
@@ -729,7 +734,7 @@
sub _stop {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap, $state, $args ) = @_[ OBJECT, KERNEL, HEAP,
ARG0, ARG1 ];
- $self->log({level=>2},"POE: _start\n");
+ $self->log({level=>2},"POE: _stop\n");
#sleep(1); # take a second to let child processes to die (perhaps not
necessary, perhaps necessary when using POE::Wheel::Run)
&App::sub_exit() if ($App::trace);
}
@@ -745,8 +750,8 @@
sub ikc_unregister {
&App::sub_entry if ($App::trace);
- my ($self, $kernel, $remote_kernel_id) = @_[OBJECT, KERNEL, ARG0];
- $self->log({level=>2},"POE: ikc_unregister ($remote_kernel_id)\n");
+ my ($self, $kernel, $session_name) = @_[OBJECT, KERNEL, ARG1];
+ $self->log({level=>2},"POE: ikc_unregister ($session_name)\n");
&App::sub_exit() if ($App::trace);
}
@@ -834,6 +839,9 @@
# get rid of external ref count
$kernel->refcount_decrement( $session, $self->{poe_session_name} );
+ $kernel->post( $self->{poe_kernel_http_name}, 'SHUTDOWN');
+ $kernel->post('IKC', 'shutdown');
+
# propagate the message to children
$kernel->post( $heap->{child_session}, 'poe_shutdown' );
&App::sub_exit() if ($App::trace);
@@ -855,18 +863,26 @@
$self->log({level=>2},"POE: poe_event_loop_extension\n");
my $event_loop_extensions = $self->{event_loop_extensions};
#$self->log({level=>2},"Event Loop extension ($event_loop_extensions: #="
. ($#$event_loop_extensions+1) . ").\n");
+ my $async_event_added = 0;
if ($event_loop_extensions && $#$event_loop_extensions > -1) {
my ($extension, $obj, $method, $args, $event_executed);
for (my $i = 0; $i <= $#$event_loop_extensions; $i++) {
$extension = $event_loop_extensions->[$i];
($obj, $method, $args) = @$extension;
$event_executed = $obj->$method(@$args); # execute extension
+ $async_event_added = 1 if ($event_executed);
#if ($event_executed) {
# $self->log({level=>2},"Event Loop extension:
${obj}->${method}(@$args) = $event_executed\n");
#}
}
}
- $kernel->delay_set("poe_event_loop_extension", 1);
+ if ($async_event_added) {
+ $kernel->yield("poe_dispatch_pending_async_events");
+ $kernel->yield("poe_event_loop_extension");
+ }
+ else {
+ $kernel->delay_set("poe_event_loop_extension", 1);
+ }
&App::sub_exit() if ($App::trace);
}
@@ -888,6 +904,77 @@
&App::sub_exit() if ($App::trace);
}
+# State on Node
+sub poe_enqueue_async_event {
+ &App::sub_entry if ($App::trace);
+ my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
+ my ($sender, $event, $callback_event) = @$args;
+ $self->log({level=>2},"POE: poe_enqueue_async_event
($event->{name}.$event->{method})\n");
+
+ my $runtime_event_token = $self->send_async_event($event, { method =>
"async_event_finished", args => [ $sender, $event, $callback_event ], });
+ $event->{event_token} = $runtime_event_token;
+
+ &App::sub_exit([$runtime_event_token, [$event, $callback_event]]) if
($App::trace);
+ return([$runtime_event_token, [$event, $callback_event]]);
+}
+
+# State on Controller
+sub poe_enqueue_async_event_finished {
+ &App::sub_entry if ($App::trace);
+ my ($self, $kernel, $return_values) = @_[OBJECT, KERNEL, ARG0];
+ my ($runtime_event_token, $async_event) = @$return_values;
+ $self->log({level=>2},"POE: poe_enqueue_async_event_finished
($async_event->[0]{name}.$async_event->[0]{method} => $runtime_event_token)\n");
+ $self->{running_async_event}{$runtime_event_token} = $async_event;
+ &App::sub_exit() if ($App::trace);
+}
+
+# Method on Node
+sub async_event_finished {
+ &App::sub_entry if ($App::trace);
+ my ($self, $sender, $event, $callback_event) = @_;
+
+ my $runtime_event_token = $event->{event_token};
+ my $remote_server_name = "poe_${sender}";
+ $remote_server_name =~ s/:/_/;
+ my $remote_session_alias = $self->{poe_session_name}; # remote is same as
local
+ my $remote_session_state = "poe_remote_async_event_finished";
+
+ my $kernel = $self->{poe_kernel};
+ $kernel->post("IKC", "post",
"poe://$remote_server_name/$remote_session_alias/$remote_session_state",
+ [ $runtime_event_token, $callback_event->{args} ]);
+
+ &App::sub_exit() if ($App::trace);
+}
+
+# State on Controller
+sub poe_remote_async_event_finished {
+ &App::sub_entry if ($App::trace);
+ my ($self, $kernel, $args) = @_[OBJECT, KERNEL, ARG0];
+ my ($runtime_event_token, $callback_args) = @$args;
+
+ my $async_event = $self->{running_async_event}{$runtime_event_token};
+
+ if ($async_event) {
+ my ($event, $callback_event) = @$async_event;
+ $self->log({level=>2},"POE: poe_remote_async_event_finished
($event->{name}.$event->{method} => $runtime_event_token)\n");
+ delete $self->{running_async_event}{$runtime_event_token};
+
+ my $destination = $event->{destination} || "local";
+ $self->{num_async_events}--;
+ $self->{node}{$destination}{num_async_events}--;
+
+ if ($callback_event) {
+ $callback_event->{args} = $callback_args;
+ $self->send_event($callback_event);
+ }
+ }
+ else {
+ $self->log({level=>2},"POE: poe_remote_async_event_finished
($runtime_event_token)\n");
+ }
+
+ &App::sub_exit() if ($App::trace);
+}
+
sub poe_server_state {
&App::sub_entry if ($App::trace);
my ( $self, $kernel, $heap ) = @_[ OBJECT, KERNEL, HEAP ];
@@ -909,7 +996,7 @@
$response->content($server_state);
# Signal that the request was handled okay.
- $kernel->post( $self->{poe_kernel_httpd_name}, 'DONE', $response );
+ $kernel->post( $self->{poe_kernel_http_name}, 'DONE', $response );
&App::sub_exit(RC_OK) if ($App::trace);
return RC_OK;
}
@@ -932,7 +1019,7 @@
$response->content("SessionObject(mvworkd).sleep(30)");
# Signal that the request was handled okay.
- $kernel->post( $self->{poe_kernel_httpd_name}, 'DONE', $response );
+ $kernel->post( $self->{poe_kernel_http_name}, 'DONE', $response );
&App::sub_exit(RC_OK) if ($App::trace);
return RC_OK;
}