Hi,

To give DBD::Gofer a quick spin I made a prototype gearman transport. I was surprised how easy it was. Very neat!

Gearman is a job distribution system - an rfc framework of sorts. You run a number of "gearmand" processes (the job distribution centrals) and then an appropriate number of "worker processes".

        http://www.danga.com/gearman/
        http://code.sixapart.com/svn/gearman/trunk/
        http://code.sixapart.com/svn/gearman/trunk/api/perl/Gearman/


To try it,

1) start a gearmand process
2) start the worker process
perl -w -Ilib -MDBI::Gofer::Transport::gearman -e DBI::Gofer::Transport::gearman::run
3) change the $dsn in 85-gopher.t to
    dbi:Gofer:transport=gearman;url=127.0.0.1;dsn=...


  - ask


Index: lib/DBD/Gofer/Transport/gearman.pm
===================================================================
--- lib/DBD/Gofer/Transport/gearman.pm  (revision 0)
+++ lib/DBD/Gofer/Transport/gearman.pm  (revision 0)
@@ -0,0 +1,70 @@
+package DBD::Gofer::Transport::gearman;
+
+#   $Id:  $
+#
+#   Copyright (c) 2007, Ask Bjoern Hansen, Develooper LLC
+#
+#   You may distribute under the terms of either the GNU General Public
+#   License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use Carp;
+use lib '/Users/ask/src/gearman/api/perl/Gearman/lib';
+use Gearman::Client;
+
+use base qw(DBD::Gofer::Transport::Base);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 8832 $ =~ /(\d+)/o);
+
+__PACKAGE__->mk_accessors(qw(
+    connection_info
+    response_info
+)); 
+
+sub transmit_request {
+    my ($self, $request) = @_;
+
+    my $info = eval { 
+
+        my $gearman = $self->{gearman} ||= do {
+            my $url = $self->go_url || croak "No job server(s) specified";
+            Gearman::Client->new( job_servers => [ split /,/, $url ] );
+        };
+
+        $self->connection_info
+            ($gearman->do_task('dbi_gofer_execute', 
+                               $self->freeze_data($request)
+                              )
+            );
+    };
+
+    if ($@) {
+        my $response = DBI::Gofer::Response->new({ err => 1, errstr => $@ }); 
+        $self->response_info($response);
+    }
+    else {
+        $self->response_info(undef);
+    }
+
+    return 1;
+}
+
+
+sub receive_response {
+    my $self = shift;
+
+    my $response = $self->response_info;
+    return $response if $response; # failed while starting
+
+    my $frozen_response = $self->connection_info || die;
+    $response = $self->thaw_data($$frozen_response);
+
+    return $response;
+}
+
+
+1;
+
+__END__
Index: lib/DBD/Gofer/Transport/stream.pm
===================================================================
--- lib/DBD/Gofer/Transport/stream.pm   (revision 9102)
+++ lib/DBD/Gofer/Transport/stream.pm   (working copy)
@@ -32,7 +32,7 @@
         if (not $connection || ($connection->{pid} && not kill 0, 
$connection->{pid})) {
             my $cmd = [qw(perl -MDBI::Gofer::Transport::stream -e 
run_stdio_hex)];
             #push @$cmd, "DBI_TRACE=2=/tmp/goferstream.log", "sh", "-c";
-            if (my $url = $self->go_url) {
+            if (my $url = $self->go_job_servers) {
                 die "Only 'ssh:[EMAIL PROTECTED]' style url supported by this 
transport"
                     unless $url =~ s/^ssh://;
                 my $ssh = $url;
Index: lib/DBI/Gofer/Transport/gearman.pm
===================================================================
--- lib/DBI/Gofer/Transport/gearman.pm  (revision 0)
+++ lib/DBI/Gofer/Transport/gearman.pm  (revision 0)
@@ -0,0 +1,37 @@
+package DBI::Gofer::Transport::gearman;
+
+use strict;
+use warnings;
+
+use DBI::Gofer::Execute;
+use Gearman::Worker;
+
+use base qw(DBI::Gofer::Transport::Base);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 1 $ =~ /(\d+)/o);
+
+my $transport = __PACKAGE__->new();
+
+sub run {
+    my $worker = Gearman::Worker->new;
+    $worker->job_servers('127.0.0.1');
+    $worker->register_function
+        ( 'dbi_gofer_execute' =>
+          sub {
+              my $job = shift;
+              my $executor = DBI::Gofer::Execute->new();
+              my $request  = $transport->thaw_data($job->arg);
+              my $response = $executor->execute_request( $request );
+              my $frozen_response = $transport->freeze_data($response);
+              return $frozen_response;
+          });
+    $worker->work while 1;
+}
+
+
+1;
+
+__END__
+
+TODO: 
+  make job servers configurable
Index: t/86gopher-gearman.t
===================================================================
--- t/86gopher-gearman.t        (revision 0)
+++ t/86gopher-gearman.t        (revision 0)
@@ -0,0 +1,72 @@
+#!perl -w                                         # -*- perl -*-
+# vim:sw=4:ts=8
+$|=1;
+
+use strict;
+use warnings;
+
+use Test::More 'no_plan';
+
+use DBI;
+
+use lib "/Users/timbo/dbi/trunk/lib";
+
+# so users can try others from the command line
+my $dbm = $ARGV[0] || "SDBM_File";
+
+# use DBD::Gofer directly.
+# when combined with DBI_AUTOPROXY this means we have DBD::Gofer => DBD::Gofer 
=> DBD::DBM!
+#
+my $dsn = 
"dbi:Gofer:transport=gearman;url=127.0.0.1;dsn=dbi:DBM:dbm_type=$dbm;lockfile=0";
+my $dbh = DBI->connect($dsn);
+ok $dbh, 'should connect';
+
+    # 0=SQL::Statement if avail, 1=DBI::SQL::Nano
+    # next line forces use of Nano rather than default behaviour
+    $ENV{DBI_SQL_NANO}=1;
+
+#my $dir = './test_output';
+#rmtree $dir;
+#mkpath $dir;
+
+my @sql = split /\s*;\n/, join '',<DATA>;
+
+for my $sql ( @sql ) {
+    $sql =~ s/;$//;  # in case no final \n on last line of __DATA__
+    my $null = '';
+    my $expected_results = {
+        1 => 'oranges',
+        2 => 'apples',
+        3 => $null,
+    };
+    if ($sql !~ /SELECT/) {
+        print " do $sql\n";
+        $dbh->do($sql) or die $dbh->errstr;
+        next;
+    }
+    print " run $sql\n";
+    my $sth = $dbh->prepare($sql) or die $dbh->errstr;
+    $sth->execute;
+    die $sth->errstr if $sth->err and $sql !~ /DROP/;
+    # Note that we can't rely on the order here, it's not portable,
+    # different DBMs (or versions) will return different orders.
+    while (my ($key, $value) = $sth->fetchrow_array) {
+        ok exists $expected_results->{$key};
+        is $value, $expected_results->{$key};
+    }
+    is $DBI::rows, keys %$expected_results;
+}
+$dbh->disconnect;
+
+1;
+__DATA__
+DROP TABLE IF EXISTS fruit;
+CREATE TABLE fruit (dKey INT, dVal VARCHAR(10));
+INSERT INTO  fruit VALUES (1,'oranges'   );
+INSERT INTO  fruit VALUES (2,'to_change' );
+INSERT INTO  fruit VALUES (3, NULL       );
+INSERT INTO  fruit VALUES (4,'to delete' );
+UPDATE fruit SET dVal='apples' WHERE dKey=2;
+DELETE FROM  fruit WHERE dVal='to delete';
+SELECT * FROM fruit;
+DROP TABLE fruit;

package DBI::Gofer::Transport::gearman;

use strict;
use warnings;

use DBI::Gofer::Execute;
use Gearman::Worker;

use base qw(DBI::Gofer::Transport::Base);

our $VERSION = sprintf("0.%06d", q$Revision: 1 $ =~ /(\d+)/o);

my $transport = __PACKAGE__->new();

sub run {
    my $worker = Gearman::Worker->new;
    $worker->job_servers('127.0.0.1');
    $worker->register_function
        ( 'dbi_gofer_execute' =>
          sub {
              my $job = shift;
              my $executor = DBI::Gofer::Execute->new();
              my $request  = $transport->thaw_data($job->arg);
              my $response = $executor->execute_request( $request );
              my $frozen_response = $transport->freeze_data($response);
              return $frozen_response;
          });
    $worker->work while 1;
}


1;

__END__

TODO:
  make job servers configurable


====

package DBD::Gofer::Transport::gearman;

#   $Id:  $
#
#   Copyright (c) 2007, Ask Bjoern Hansen, Develooper LLC
#
#   You may distribute under the terms of either the GNU General Public
# License or the Artistic License, as specified in the Perl README file.

use strict;
use warnings;

use Carp;
use lib '/Users/ask/src/gearman/api/perl/Gearman/lib';
use Gearman::Client;

use base qw(DBD::Gofer::Transport::Base);

our $VERSION = sprintf("0.%06d", q$Revision: 8832 $ =~ /(\d+)/o);

__PACKAGE__->mk_accessors(qw(
    connection_info
    response_info
));

sub transmit_request {
    my ($self, $request) = @_;

    my $info = eval {

        my $gearman = $self->{gearman} ||= do {
my $url = $self->go_url || croak "No job server(s) specified";
            Gearman::Client->new( job_servers => [ split /,/, $url ] );
        };

        $self->connection_info
            ($gearman->do_task('dbi_gofer_execute',
                               $self->freeze_data($request)
                              )
            );
    };

    if ($@) {
my $response = DBI::Gofer::Response->new({ err => 1, errstr => $@ });
        $self->response_info($response);
    }
    else {
        $self->response_info(undef);
    }

    return 1;
}


sub receive_response {
    my $self = shift;

    my $response = $self->response_info;
    return $response if $response; # failed while starting

    my $frozen_response = $self->connection_info || die;
    $response = $self->thaw_data($$frozen_response);

    return $response;
}


1;

__END__

--
http://develooper.com/ - http://askask.com/


Reply via email to