Hi,
To give DBD::Gofer a quick spin I made a prototype gearman
transport. I was surprised how easy it was. Very neat!
Gearman is a job distribution system - an rfc framework of sorts.
You run a number of "gearmand" processes (the job distribution
centrals) and then an appropriate number of "worker processes".
http://www.danga.com/gearman/
http://code.sixapart.com/svn/gearman/trunk/
http://code.sixapart.com/svn/gearman/trunk/api/perl/Gearman/
To try it,
1) start a gearmand process
2) start the worker process
perl -w -Ilib -MDBI::Gofer::Transport::gearman -e
DBI::Gofer::Transport::gearman::run
3) change the $dsn in 85-gopher.t to
dbi:Gofer:transport=gearman;url=127.0.0.1;dsn=...
- ask
Index: lib/DBD/Gofer/Transport/gearman.pm
===================================================================
--- lib/DBD/Gofer/Transport/gearman.pm (revision 0)
+++ lib/DBD/Gofer/Transport/gearman.pm (revision 0)
@@ -0,0 +1,70 @@
+package DBD::Gofer::Transport::gearman;
+
+# $Id: $
+#
+# Copyright (c) 2007, Ask Bjoern Hansen, Develooper LLC
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use Carp;
+use lib '/Users/ask/src/gearman/api/perl/Gearman/lib';
+use Gearman::Client;
+
+use base qw(DBD::Gofer::Transport::Base);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 8832 $ =~ /(\d+)/o);
+
+__PACKAGE__->mk_accessors(qw(
+ connection_info
+ response_info
+));
+
+sub transmit_request {
+ my ($self, $request) = @_;
+
+ my $info = eval {
+
+ my $gearman = $self->{gearman} ||= do {
+ my $url = $self->go_url || croak "No job server(s) specified";
+ Gearman::Client->new( job_servers => [ split /,/, $url ] );
+ };
+
+ $self->connection_info
+ ($gearman->do_task('dbi_gofer_execute',
+ $self->freeze_data($request)
+ )
+ );
+ };
+
+ if ($@) {
+ my $response = DBI::Gofer::Response->new({ err => 1, errstr => $@ });
+ $self->response_info($response);
+ }
+ else {
+ $self->response_info(undef);
+ }
+
+ return 1;
+}
+
+
+sub receive_response {
+ my $self = shift;
+
+ my $response = $self->response_info;
+ return $response if $response; # failed while starting
+
+ my $frozen_response = $self->connection_info || die;
+ $response = $self->thaw_data($$frozen_response);
+
+ return $response;
+}
+
+
+1;
+
+__END__
Index: lib/DBD/Gofer/Transport/stream.pm
===================================================================
--- lib/DBD/Gofer/Transport/stream.pm (revision 9102)
+++ lib/DBD/Gofer/Transport/stream.pm (working copy)
@@ -32,7 +32,7 @@
if (not $connection || ($connection->{pid} && not kill 0,
$connection->{pid})) {
my $cmd = [qw(perl -MDBI::Gofer::Transport::stream -e
run_stdio_hex)];
#push @$cmd, "DBI_TRACE=2=/tmp/goferstream.log", "sh", "-c";
- if (my $url = $self->go_url) {
+ if (my $url = $self->go_job_servers) {
die "Only 'ssh:[EMAIL PROTECTED]' style url supported by this
transport"
unless $url =~ s/^ssh://;
my $ssh = $url;
Index: lib/DBI/Gofer/Transport/gearman.pm
===================================================================
--- lib/DBI/Gofer/Transport/gearman.pm (revision 0)
+++ lib/DBI/Gofer/Transport/gearman.pm (revision 0)
@@ -0,0 +1,37 @@
+package DBI::Gofer::Transport::gearman;
+
+use strict;
+use warnings;
+
+use DBI::Gofer::Execute;
+use Gearman::Worker;
+
+use base qw(DBI::Gofer::Transport::Base);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 1 $ =~ /(\d+)/o);
+
+my $transport = __PACKAGE__->new();
+
+sub run {
+ my $worker = Gearman::Worker->new;
+ $worker->job_servers('127.0.0.1');
+ $worker->register_function
+ ( 'dbi_gofer_execute' =>
+ sub {
+ my $job = shift;
+ my $executor = DBI::Gofer::Execute->new();
+ my $request = $transport->thaw_data($job->arg);
+ my $response = $executor->execute_request( $request );
+ my $frozen_response = $transport->freeze_data($response);
+ return $frozen_response;
+ });
+ $worker->work while 1;
+}
+
+
+1;
+
+__END__
+
+TODO:
+ make job servers configurable
Index: t/86gopher-gearman.t
===================================================================
--- t/86gopher-gearman.t (revision 0)
+++ t/86gopher-gearman.t (revision 0)
@@ -0,0 +1,72 @@
+#!perl -w # -*- perl -*-
+# vim:sw=4:ts=8
+$|=1;
+
+use strict;
+use warnings;
+
+use Test::More 'no_plan';
+
+use DBI;
+
+use lib "/Users/timbo/dbi/trunk/lib";
+
+# so users can try others from the command line
+my $dbm = $ARGV[0] || "SDBM_File";
+
+# use DBD::Gofer directly.
+# when combined with DBI_AUTOPROXY this means we have DBD::Gofer => DBD::Gofer
=> DBD::DBM!
+#
+my $dsn =
"dbi:Gofer:transport=gearman;url=127.0.0.1;dsn=dbi:DBM:dbm_type=$dbm;lockfile=0";
+my $dbh = DBI->connect($dsn);
+ok $dbh, 'should connect';
+
+ # 0=SQL::Statement if avail, 1=DBI::SQL::Nano
+ # next line forces use of Nano rather than default behaviour
+ $ENV{DBI_SQL_NANO}=1;
+
+#my $dir = './test_output';
+#rmtree $dir;
+#mkpath $dir;
+
+my @sql = split /\s*;\n/, join '',<DATA>;
+
+for my $sql ( @sql ) {
+ $sql =~ s/;$//; # in case no final \n on last line of __DATA__
+ my $null = '';
+ my $expected_results = {
+ 1 => 'oranges',
+ 2 => 'apples',
+ 3 => $null,
+ };
+ if ($sql !~ /SELECT/) {
+ print " do $sql\n";
+ $dbh->do($sql) or die $dbh->errstr;
+ next;
+ }
+ print " run $sql\n";
+ my $sth = $dbh->prepare($sql) or die $dbh->errstr;
+ $sth->execute;
+ die $sth->errstr if $sth->err and $sql !~ /DROP/;
+ # Note that we can't rely on the order here, it's not portable,
+ # different DBMs (or versions) will return different orders.
+ while (my ($key, $value) = $sth->fetchrow_array) {
+ ok exists $expected_results->{$key};
+ is $value, $expected_results->{$key};
+ }
+ is $DBI::rows, keys %$expected_results;
+}
+$dbh->disconnect;
+
+1;
+__DATA__
+DROP TABLE IF EXISTS fruit;
+CREATE TABLE fruit (dKey INT, dVal VARCHAR(10));
+INSERT INTO fruit VALUES (1,'oranges' );
+INSERT INTO fruit VALUES (2,'to_change' );
+INSERT INTO fruit VALUES (3, NULL );
+INSERT INTO fruit VALUES (4,'to delete' );
+UPDATE fruit SET dVal='apples' WHERE dKey=2;
+DELETE FROM fruit WHERE dVal='to delete';
+SELECT * FROM fruit;
+DROP TABLE fruit;
package DBI::Gofer::Transport::gearman;
use strict;
use warnings;
use DBI::Gofer::Execute;
use Gearman::Worker;
use base qw(DBI::Gofer::Transport::Base);
our $VERSION = sprintf("0.%06d", q$Revision: 1 $ =~ /(\d+)/o);
my $transport = __PACKAGE__->new();
sub run {
my $worker = Gearman::Worker->new;
$worker->job_servers('127.0.0.1');
$worker->register_function
( 'dbi_gofer_execute' =>
sub {
my $job = shift;
my $executor = DBI::Gofer::Execute->new();
my $request = $transport->thaw_data($job->arg);
my $response = $executor->execute_request( $request );
my $frozen_response = $transport->freeze_data($response);
return $frozen_response;
});
$worker->work while 1;
}
1;
__END__
TODO:
make job servers configurable
====
package DBD::Gofer::Transport::gearman;
# $Id: $
#
# Copyright (c) 2007, Ask Bjoern Hansen, Develooper LLC
#
# You may distribute under the terms of either the GNU General Public
# License or the Artistic License, as specified in the Perl README
file.
use strict;
use warnings;
use Carp;
use lib '/Users/ask/src/gearman/api/perl/Gearman/lib';
use Gearman::Client;
use base qw(DBD::Gofer::Transport::Base);
our $VERSION = sprintf("0.%06d", q$Revision: 8832 $ =~ /(\d+)/o);
__PACKAGE__->mk_accessors(qw(
connection_info
response_info
));
sub transmit_request {
my ($self, $request) = @_;
my $info = eval {
my $gearman = $self->{gearman} ||= do {
my $url = $self->go_url || croak "No job server(s)
specified";
Gearman::Client->new( job_servers => [ split /,/, $url ] );
};
$self->connection_info
($gearman->do_task('dbi_gofer_execute',
$self->freeze_data($request)
)
);
};
if ($@) {
my $response = DBI::Gofer::Response->new({ err => 1, errstr
=> $@ });
$self->response_info($response);
}
else {
$self->response_info(undef);
}
return 1;
}
sub receive_response {
my $self = shift;
my $response = $self->response_info;
return $response if $response; # failed while starting
my $frozen_response = $self->connection_info || die;
$response = $self->thaw_data($$frozen_response);
return $response;
}
1;
__END__
--
http://develooper.com/ - http://askask.com/