PROTON-471: Example for Messenger->Work in Perl.
Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/9186cb6b Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/9186cb6b Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/9186cb6b Branch: refs/heads/master Commit: 9186cb6bd3f3b7a1b17d9fe9c1fd28eb83073322 Parents: 3b20007 Author: Darryl L. Pierce <[email protected]> Authored: Fri Dec 12 17:15:58 2014 -0500 Committer: Darryl L. Pierce <[email protected]> Committed: Thu Dec 18 08:16:45 2014 -0500 ---------------------------------------------------------------------- examples/messenger/perl/async.pm | 120 +++++++++++++++++++++++++++++ examples/messenger/perl/recv_async.pl | 84 ++++++++++++++++++++ examples/messenger/perl/send_async.pl | 97 +++++++++++++++++++++++ 3 files changed, 301 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9186cb6b/examples/messenger/perl/async.pm ---------------------------------------------------------------------- diff --git a/examples/messenger/perl/async.pm b/examples/messenger/perl/async.pm new file mode 100644 index 0000000..5cd350b --- /dev/null +++ b/examples/messenger/perl/async.pm @@ -0,0 +1,120 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +use qpid_proton; + +package async::CallbackAdapter; + +sub new { + my ($class) = @_; + my ($self) = {}; + + my $messenger = $_[1]; + + $self->{_messenger} = $messenger; + $messenger->set_blocking(0); + $messenger->set_incoming_window(1024); + $messenger->set_outgoing_window(1024); + + my $message = qpid::proton::Message->new(); + $self->{_message} = $message; + $self->{_incoming} = $message; + $self->{_tracked} = {}; + + bless $self, $class; + return $self; +} + +sub run { + my ($self) = @_; + + $self->{_running} = 1; + + my $messenger = $self->{_messenger}; + + $messenger->start(); + $self->on_start(); + + do { + $messenger->work; + $self->process_outgoing; + $self->process_incoming; + } while($self->{_running}); + + $messenger->stop(); + + while(!$messenger->stopped()) { + $messenger->work; + $self->process_outgoing; + $self->process_incoming; + } + + $self->on_stop(); +} + +sub stop { + my ($self) = @_; + + $self->{_running} = 0; +} + +sub process_outgoing { + my ($self) = @_; + my $tracked = $self->{_tracked}; + + foreach $key (keys %{$tracked}) { + my $on_status = $tracked->{$key}; + if (defined($on_status)) { + if (!($on_status eq qpid::proton::Tracker::PENDING)) { + $self->$on_status($status); + $self->{_messenger}->settle($t); + # delete the settled item + undef $tracked->{$key}; + } + } + } +} + +sub process_incoming { + my ($self) = @_; + my $messenger = $self->{_messenger}; + + while ($messenger->incoming > 0) { + my $message = $self->{_message}; + my $t = $messenger->get($message); + + $self->on_receive($message); + $messenger->accept($t); + } +} + +sub send { + my ($self) = @_; + my $messenger = $self->{_messenger}; + my $tracked = $self->{_tracked}; + my $message = $_[1]; + my $on_status = $_[2] || undef; + + my $tracker = $messenger->put($message); + + $tracked->{$tracker} = $on_status if (defined($on_status)); +} + + +1; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9186cb6b/examples/messenger/perl/recv_async.pl ---------------------------------------------------------------------- diff --git a/examples/messenger/perl/recv_async.pl b/examples/messenger/perl/recv_async.pl new file mode 100755 index 0000000..9a2195a --- /dev/null +++ b/examples/messenger/perl/recv_async.pl @@ -0,0 +1,84 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +use qpid_proton; +use async; + +package async::Receiver; + +@ISA = (async::CallbackAdapter); + +sub on_start { + my ($self) = @_; + my $args = $_[1] || ("amqp://~0.0.0.0"); + my $messenger = $self->{_messenger}; + + foreach $arg ($args) { + $messenger->subscribe($arg); + } + + $messenger->receive(); +} + +sub on_receive { + my ($self) = @_; + my $msg = $_[1]; + my $message = $self->{_message}; + my $text = ""; + + if (defined($msg->get_body)) { + $text = $msg->get_body; + if ($text eq "die") { + $self->stop; + } + } else { + $text = $message->get_subject; + } + + $text = "" if (!defined($text)); + + print "Received: $text\n"; + + if ($msg->get_reply_to) { + print "Sending reply to: " . $msg->get_reply_to . "\n"; + $message->clear; + $message->set_address($msg->get_reply_to()); + $message->set_body("Reply for ", $msg->get_body); + $self->send($message); + } +} + +sub on_status { + my ($self) = @_; + my $messenger = $self->{_messenger}; + my $status = $_[1]; + + print "Status: ", $status, "\n"; +} + +sub on_stop { + print "Stopped.\n" +} + +package main; + +our $messenger = new qpid::proton::Messenger(); +our $app = new async::Receiver($messenger); + +$app->run(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/9186cb6b/examples/messenger/perl/send_async.pl ---------------------------------------------------------------------- diff --git a/examples/messenger/perl/send_async.pl b/examples/messenger/perl/send_async.pl new file mode 100644 index 0000000..2f9408a --- /dev/null +++ b/examples/messenger/perl/send_async.pl @@ -0,0 +1,97 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +use Getopt::Std; +use qpid_proton; +use async; + +$Getopt::Std::STANDARD_HELP_VERSION = 1; + +sub VERSION_MESSAGE() {} + +sub HELP_MESSAGE() { + print "Usage: send_async.pl [OPTIONS] <msg_0> <msg_1> ...\n"; + print "Options:\n"; + print "\t-a - the message address (def. amqp://0.0.0.0)\n"; + print "\t-r - the reply-to address: //<domain>[/<name>]\n"; + print "\t msg_# - a text string to send\n"; +} + +my %optons = (); +getopts("a:r:", \%options) or usage(); + +our $address = $options{a} || "amqp://0.0.0.0"; +our $replyto = $options{r} || "~/#"; + +package async::Sender; + +@ISA = (async::CallbackAdapter); + +sub on_start { + my ($self) = @_; + my $message = $self->{_message}; + my $messenger = $self->{_messenger}; + my $args = $_[1] || ("Hello world!"); + + print "Started\n"; + + $message->clear; + $message->set_address("amqp://0.0.0.0"); + $message->set_reply_to($replyto) if (defined($replyto)); + + foreach $arg ($args) { + $message->set_body($arg); + if ($replyto) { + $message->set_reply_to($replyto); + } + $self->send($message, "on_status"); + } + + $messenger->receive() if (defined($replyto)); +} + +sub on_status { + my ($self) = @_; + my $messenger = $self->{_messenger}; + my $status = $_[1] || ""; + + print "Status: ", $status, "\n"; +} + +sub on_receive { + my ($self) = @_; + my $message = $_[1]; + my $text = $message->get_body || "[empty]"; + + print "Received: " . $text . "\n"; + + $self->stop(); +} + +sub on_stop { + print "Stopped\n"; +} + + +package main; + +our $msgr = new qpid::proton::Messenger(); +our $app = async::Sender->new($msgr); + +$app->run; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
