Author: timbo
Date: Wed Jan 31 08:11:43 2007
New Revision: 8773

Added:
   dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm
      - copied, changed from r8748, /dbi/trunk/lib/DBD/Gofer/Transport/pipe.pm
   dbi/trunk/lib/DBD/Gofer/Transport/pipestream.pm
   dbi/trunk/lib/DBI/Gofer/Transport/pipeone.pm
      - copied, changed from r8748, /dbi/trunk/lib/DBI/Gofer/Transport/pipe.pm
   dbi/trunk/lib/DBI/Gofer/Transport/pipestream.pm
Removed:
   dbi/trunk/lib/DBD/Gofer/Transport/pipe.pm
   dbi/trunk/lib/DBI/Gofer/Transport/pipe.pm
Modified:
   dbi/trunk/lib/DBD/Gofer.pm
   dbi/trunk/lib/DBD/Gofer/Transport/Base.pm
   dbi/trunk/lib/DBI/Gofer/Transport/Base.pm
   dbi/trunk/t/   (props changed)

Log:
Renamed Gofer transport pipe to pipeone
Added new pipestream transport


Modified: dbi/trunk/lib/DBD/Gofer.pm
==============================================================================
--- dbi/trunk/lib/DBD/Gofer.pm  (original)
+++ dbi/trunk/lib/DBD/Gofer.pm  Wed Jan 31 08:11:43 2007
@@ -489,9 +489,9 @@
 The C<dsn=$dsn> part I<must> be the last element of the dsn because everything
 after C<dsn=> is assumed to be the DSN that the remote DBI should use.
 
-The C<...> represents attributes that influence the operation of the driver or
-transport. These are described below or in the documentation of the transport
-module being used.
+The C<...> represents attributes that influence the operation of the Gofer
+driver or transport. These are described below or in the documentation of the
+transport module being used.
 
 =head1 DESCRIPTION
 
@@ -499,11 +499,11 @@
 usually in a seperate process, often on a separate machine.
 
 It is very similar to DBD::Proxy. The major difference is that DBD::Gofer
-assumes no state is maintained on the remote end. What does that mean?
-It means that every request contains all the information needed to create the
-required state. (So, for example, every request includes the DSN to connect 
to.)
-Each request can be sent to any available server. The server executes
-the request and returns a single response that includes all the data.
+assumes no state is maintained on the remote end. That means every request
+contains all the information needed to create the required state. (So, for
+example, every request includes the DSN to connect to.) Each request can be
+sent to any available server. The server executes the request and returns a
+single response that includes all the data.
 
 This is very similar to the way http works as a stateless protocol for the web.
 Each request from your web browser can be handled by a different web server 
process.

Modified: dbi/trunk/lib/DBD/Gofer/Transport/Base.pm
==============================================================================
--- dbi/trunk/lib/DBD/Gofer/Transport/Base.pm   (original)
+++ dbi/trunk/lib/DBD/Gofer/Transport/Base.pm   Wed Jan 31 08:11:43 2007
@@ -14,6 +14,6 @@
 
 our $VERSION = sprintf("0.%06d", q$Revision$ =~ /(\d+)/o);
 
-sub _init_debug { $ENV{DBD_GOFER_TRACE} || 0 }
+sub _init_trace { $ENV{DBD_GOFER_TRACE} || 0 }
 
 1;

Copied: dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm (from r8748, 
/dbi/trunk/lib/DBD/Gofer/Transport/pipe.pm)
==============================================================================
--- /dbi/trunk/lib/DBD/Gofer/Transport/pipe.pm  (original)
+++ dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm        Wed Jan 31 08:11:43 2007
@@ -1,4 +1,4 @@
-package DBD::Gofer::Transport::pipe;
+package DBD::Gofer::Transport::pipeone;
 
 #   $Id$
 #
@@ -18,46 +18,63 @@
 our $VERSION = sprintf("0.%06d", q$Revision$ =~ /(\d+)/o);
 
 __PACKAGE__->mk_accessors(qw(
+    connection_info
     response_info
 )); 
 
 
+sub start_pipe_command {
+    my ($self, $cmd) = @_;
+
+    # ensure subprocess will use the same modules as us
+    local $ENV{PERL5LIB} = join ":", @INC;
+
+    # limit various forms of insanity, for now
+    local $ENV{DBI_TRACE};
+    local $ENV{DBI_AUTOPROXY};
+    local $ENV{DBI_PROFILE};
+
+    my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
+    my $pid = open3($wfh, $rfh, $efh, $cmd)
+        or die "error starting $cmd: $!\n";
+    warn "Started pid $pid: $cmd\n" if $self->trace;
+
+    return {
+        cmd=>$cmd,
+        pid=>$pid,
+        wfh=>$wfh, rfh=>$rfh, efh=>$efh,
+    };
+}
+
+
 sub transmit_request {
     my ($self, $request) = @_;
 
     my $info = eval { 
         my $frozen_request = $self->freeze_data($request);
 
-        local $ENV{DBI_TRACE};
-        local $ENV{DBI_AUTOPROXY};
-        local $ENV{DBI_PROFILE};
-        local $ENV{PERL5LIB} = join ":", @INC;
-        my $cmd = "perl -MDBI::Gofer::Transport::pipe -e run_one_stdio";
-
-        my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
-        my $pid = open3($wfh, $rfh, $efh, $cmd)
-            or die "error starting subprocess: $!\n";
+        my $cmd = "perl -MDBI::Gofer::Transport::pipeone -e run_one_stdio";
+        my $info = $self->start_pipe_command($cmd);
 
+        my $wfh = delete $info->{wfh};
         # send frozen request
         print $wfh $frozen_request;
         # indicate that there's no more
         close $wfh
-            or die "error writing to subprocess: $!\n";
+            or die "error writing to $cmd: $!\n";
 
-        # so far so good. return the state info
-        { pid=>$pid, rfh=>$rfh, efh=>$efh };
+        $info; # so far so good. return the state info
     };
     if ($@) {
-    warn $@;
-        $info = {};
-        $info->{response} = DBI::Gofer::Response->new({
-            err    => 1,
-            errstr => $@,
-        }); 
+        my $response = DBI::Gofer::Response->new({ err => 1, errstr => $@ }); 
+        $self->response_info($response);
+    }
+    else {
+        $self->response_info(undef);
     }
 
     # record what we need to get a response, ready for receive_response()
-    $self->response_info( $info );
+    $self->connection_info( $info );
 
     return 1;
 }
@@ -66,11 +83,12 @@
 sub receive_response {
     my $self = shift;
 
-    my $info = $self->response_info || die;
-    my ($response, $pid, $rfh, $efh) = @{$info}{qw(response pid rfh efh)};
-
+    my $response = $self->response_info;
     return $response if $response; # failed while starting
 
+    my $info = $self->connection_info || die;
+    my ($pid, $rfh, $efh) = @{$info}{qw(pid rfh efh)};
+
     waitpid $info->{pid}, 0
         or warn "waitpid: $!"; # XXX do something more useful?
 
@@ -80,13 +98,12 @@
     if (not $frozen_response) { # no output on stdout at all
         return DBI::Gofer::Response->new({
             err    => 1,
-            errstr => "pipe command failed: $stderr_msg",
+            errstr => "pipeone command failed: $stderr_msg",
         }); 
     }
     warn "STDERR message: $stderr_msg" if $stderr_msg; # XXX do something more 
useful
-    #warn DBI::neat($frozen_response);
 
-    # XXX may be corrupt
+    # XXX need to be able to detect and deal with corruption
     $response = $self->thaw_data($frozen_response);
 
     return $response;

Added: dbi/trunk/lib/DBD/Gofer/Transport/pipestream.pm
==============================================================================
--- (empty file)
+++ dbi/trunk/lib/DBD/Gofer/Transport/pipestream.pm     Wed Jan 31 08:11:43 2007
@@ -0,0 +1,103 @@
+package DBD::Gofer::Transport::pipestream;
+
+#   $Id: pipeone.pm 8748 2007-01-29 22:49:42Z timbo $
+#
+#   Copyright (c) 2007, Tim Bunce, Ireland
+#
+#   You may distribute under the terms of either the GNU General Public
+#   License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use Carp;
+use Fcntl;
+
+use base qw(DBD::Gofer::Transport::pipeone);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 8748 $ =~ /(\d+)/o);
+
+
+sub nonblock;
+
+sub transmit_request {
+    my ($self, $request) = @_;
+
+    eval { 
+
+        my $connection = $self->connection_info;
+        if (not $connection || ($connection->{pid} && not kill 0, 
$connection->{pid})) {
+            my $cmd = "perl -MDBI::Gofer::Transport::pipestream -e 
run_stdio_hex";
+            #$cmd = "DBI_TRACE=2=/tmp/pipestream.log $cmd";
+            $connection = $self->start_pipe_command($cmd);
+            nonblock($connection->{efh});
+            $self->connection_info($connection);
+        }
+
+        my $frozen_request = unpack("H*", $self->freeze_data($request));
+        $frozen_request .= "\n";
+
+        my $wfh = $connection->{wfh};
+        # send frozen request
+        print $wfh $frozen_request # autoflush enabled
+            or die "Error sending request: $!";
+        warn "Request: $frozen_request" if $self->trace >= 3;
+    };
+    if ($@) {
+        my $response = DBI::Gofer::Response->new({ err => 1, errstr => $@ }); 
+        $self->response_info($response);
+        # return undef ?
+    }
+    else {
+        $self->response_info(undef);
+    }
+
+    return 1;
+}
+
+
+sub receive_response {
+    my $self = shift;
+
+    my $response = $self->response_info;
+    return $response if $response; # failed while starting
+
+    my $connection = $self->connection_info || die;
+    my ($pid, $rfh, $efh) = @{$connection}{qw(pid rfh efh)};
+
+    my $frozen_response = <$rfh>; # always one line
+    my $stderr_msg      = do { local $/; <$efh> }; # nonblocking
+
+    chomp $stderr_msg if $stderr_msg;
+
+    if (not $frozen_response) { # no output on stdout at all
+    warn "STDERR err message: $stderr_msg" if $stderr_msg; # XXX do something 
more useful
+        return DBI::Gofer::Response->new({
+            err    => 1,
+            errstr => "Error reading from $connection->{cmd}: $stderr_msg",
+        }); 
+    }
+    chomp $frozen_response if $frozen_response;
+    warn "STDERR additional message: $stderr_msg" if $stderr_msg; # XXX do 
something more useful
+    #warn DBI::neat($frozen_response);
+
+    # XXX need to be able to detect and deal with corruption
+    $response = $self->thaw_data(pack("H*",$frozen_response));
+
+    return $response;
+}
+
+
+# nonblock($fh) puts filehandle into nonblocking mode
+sub nonblock {
+  my $fh = shift;
+  my $flags = fcntl($fh, F_GETFL, 0)
+        or croak "Can't get flags for filehandle $fh: $!";
+  fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
+        or croak "Can't make filehandle $fh nonblocking: $!";
+}
+
+1;
+
+__END__
+

Modified: dbi/trunk/lib/DBI/Gofer/Transport/Base.pm
==============================================================================
--- dbi/trunk/lib/DBI/Gofer/Transport/Base.pm   (original)
+++ dbi/trunk/lib/DBI/Gofer/Transport/Base.pm   Wed Jan 31 08:11:43 2007
@@ -54,7 +54,7 @@
     my ($self, $label, $data) = @_;
     require Data::Dumper;
     # XXX config dumper format
-    warn "$label=".Dumper($data);
+    warn "$label=".Data::Dumper::Dumper($data);
 }
 
 1;

Copied: dbi/trunk/lib/DBI/Gofer/Transport/pipeone.pm (from r8748, 
/dbi/trunk/lib/DBI/Gofer/Transport/pipe.pm)
==============================================================================
--- /dbi/trunk/lib/DBI/Gofer/Transport/pipe.pm  (original)
+++ dbi/trunk/lib/DBI/Gofer/Transport/pipeone.pm        Wed Jan 31 08:11:43 2007
@@ -1,4 +1,4 @@
-package DBI::Gofer::Transport::pipe;
+package DBI::Gofer::Transport::pipeone;
 
 #   $Id$
 #
@@ -21,7 +21,7 @@
 
 sub run_one_stdio {
 
-    my $self = DBI::Gofer::Transport::pipe->new();
+    my $self = DBI::Gofer::Transport::pipeone->new();
 
     my $frozen_request = do { local $/; <STDIN> };
 

Added: dbi/trunk/lib/DBI/Gofer/Transport/pipestream.pm
==============================================================================
--- (empty file)
+++ dbi/trunk/lib/DBI/Gofer/Transport/pipestream.pm     Wed Jan 31 08:11:43 2007
@@ -0,0 +1,39 @@
+package DBI::Gofer::Transport::pipestream;
+
+#   $Id: pipestream.pm 8748 2007-01-29 22:49:42Z timbo $
+#
+#   Copyright (c) 2007, Tim Bunce, Ireland
+#
+#   You may distribute under the terms of either the GNU General Public
+#   License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use DBI::Gofer::Execute qw(execute_request);
+
+use base qw(DBI::Gofer::Transport::pipeone Exporter);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 8748 $ =~ /(\d+)/o);
+
+our @EXPORT = qw(run_stdio_hex);
+
+
+sub run_stdio_hex {
+
+    my $self = DBI::Gofer::Transport::pipestream->new();
+    local $| = 1;
+
+    while ( my $frozen_request = <STDIN> ) {
+
+        my $request = $self->thaw_data( pack "H*", $frozen_request );
+        my $response = execute_request( $request );
+
+        my $frozen_response = unpack "H*", $self->freeze_data($response);
+
+        print $frozen_response, "\n"; # autoflushed due to $|=1
+    }
+}
+
+
+1;

Reply via email to