Author: timbo
Date: Wed Jan 31 08:11:43 2007
New Revision: 8773
Added:
dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm
- copied, changed from r8748, /dbi/trunk/lib/DBD/Gofer/Transport/pipe.pm
dbi/trunk/lib/DBD/Gofer/Transport/pipestream.pm
dbi/trunk/lib/DBI/Gofer/Transport/pipeone.pm
- copied, changed from r8748, /dbi/trunk/lib/DBI/Gofer/Transport/pipe.pm
dbi/trunk/lib/DBI/Gofer/Transport/pipestream.pm
Removed:
dbi/trunk/lib/DBD/Gofer/Transport/pipe.pm
dbi/trunk/lib/DBI/Gofer/Transport/pipe.pm
Modified:
dbi/trunk/lib/DBD/Gofer.pm
dbi/trunk/lib/DBD/Gofer/Transport/Base.pm
dbi/trunk/lib/DBI/Gofer/Transport/Base.pm
dbi/trunk/t/ (props changed)
Log:
Renamed Gofer transport pipe to pipeone
Added new pipestream transport
Modified: dbi/trunk/lib/DBD/Gofer.pm
==============================================================================
--- dbi/trunk/lib/DBD/Gofer.pm (original)
+++ dbi/trunk/lib/DBD/Gofer.pm Wed Jan 31 08:11:43 2007
@@ -489,9 +489,9 @@
The C<dsn=$dsn> part I<must> be the last element of the dsn because everything
after C<dsn=> is assumed to be the DSN that the remote DBI should use.
-The C<...> represents attributes that influence the operation of the driver or
-transport. These are described below or in the documentation of the transport
-module being used.
+The C<...> represents attributes that influence the operation of the Gofer
+driver or transport. These are described below or in the documentation of the
+transport module being used.
=head1 DESCRIPTION
@@ -499,11 +499,11 @@
usually in a seperate process, often on a separate machine.
It is very similar to DBD::Proxy. The major difference is that DBD::Gofer
-assumes no state is maintained on the remote end. What does that mean?
-It means that every request contains all the information needed to create the
-required state. (So, for example, every request includes the DSN to connect
to.)
-Each request can be sent to any available server. The server executes
-the request and returns a single response that includes all the data.
+assumes no state is maintained on the remote end. That means every request
+contains all the information needed to create the required state. (So, for
+example, every request includes the DSN to connect to.) Each request can be
+sent to any available server. The server executes the request and returns a
+single response that includes all the data.
This is very similar to the way http works as a stateless protocol for the web.
Each request from your web browser can be handled by a different web server
process.
Modified: dbi/trunk/lib/DBD/Gofer/Transport/Base.pm
==============================================================================
--- dbi/trunk/lib/DBD/Gofer/Transport/Base.pm (original)
+++ dbi/trunk/lib/DBD/Gofer/Transport/Base.pm Wed Jan 31 08:11:43 2007
@@ -14,6 +14,6 @@
our $VERSION = sprintf("0.%06d", q$Revision$ =~ /(\d+)/o);
-sub _init_debug { $ENV{DBD_GOFER_TRACE} || 0 }
+sub _init_trace { $ENV{DBD_GOFER_TRACE} || 0 }
1;
Copied: dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm (from r8748,
/dbi/trunk/lib/DBD/Gofer/Transport/pipe.pm)
==============================================================================
--- /dbi/trunk/lib/DBD/Gofer/Transport/pipe.pm (original)
+++ dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm Wed Jan 31 08:11:43 2007
@@ -1,4 +1,4 @@
-package DBD::Gofer::Transport::pipe;
+package DBD::Gofer::Transport::pipeone;
# $Id$
#
@@ -18,46 +18,63 @@
our $VERSION = sprintf("0.%06d", q$Revision$ =~ /(\d+)/o);
__PACKAGE__->mk_accessors(qw(
+ connection_info
response_info
));
+sub start_pipe_command {
+ my ($self, $cmd) = @_;
+
+ # ensure subprocess will use the same modules as us
+ local $ENV{PERL5LIB} = join ":", @INC;
+
+ # limit various forms of insanity, for now
+ local $ENV{DBI_TRACE};
+ local $ENV{DBI_AUTOPROXY};
+ local $ENV{DBI_PROFILE};
+
+ my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
+ my $pid = open3($wfh, $rfh, $efh, $cmd)
+ or die "error starting $cmd: $!\n";
+ warn "Started pid $pid: $cmd\n" if $self->trace;
+
+ return {
+ cmd=>$cmd,
+ pid=>$pid,
+ wfh=>$wfh, rfh=>$rfh, efh=>$efh,
+ };
+}
+
+
sub transmit_request {
my ($self, $request) = @_;
my $info = eval {
my $frozen_request = $self->freeze_data($request);
- local $ENV{DBI_TRACE};
- local $ENV{DBI_AUTOPROXY};
- local $ENV{DBI_PROFILE};
- local $ENV{PERL5LIB} = join ":", @INC;
- my $cmd = "perl -MDBI::Gofer::Transport::pipe -e run_one_stdio";
-
- my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
- my $pid = open3($wfh, $rfh, $efh, $cmd)
- or die "error starting subprocess: $!\n";
+ my $cmd = "perl -MDBI::Gofer::Transport::pipeone -e run_one_stdio";
+ my $info = $self->start_pipe_command($cmd);
+ my $wfh = delete $info->{wfh};
# send frozen request
print $wfh $frozen_request;
# indicate that there's no more
close $wfh
- or die "error writing to subprocess: $!\n";
+ or die "error writing to $cmd: $!\n";
- # so far so good. return the state info
- { pid=>$pid, rfh=>$rfh, efh=>$efh };
+ $info; # so far so good. return the state info
};
if ($@) {
- warn $@;
- $info = {};
- $info->{response} = DBI::Gofer::Response->new({
- err => 1,
- errstr => $@,
- });
+ my $response = DBI::Gofer::Response->new({ err => 1, errstr => $@ });
+ $self->response_info($response);
+ }
+ else {
+ $self->response_info(undef);
}
# record what we need to get a response, ready for receive_response()
- $self->response_info( $info );
+ $self->connection_info( $info );
return 1;
}
@@ -66,11 +83,12 @@
sub receive_response {
my $self = shift;
- my $info = $self->response_info || die;
- my ($response, $pid, $rfh, $efh) = @{$info}{qw(response pid rfh efh)};
-
+ my $response = $self->response_info;
return $response if $response; # failed while starting
+ my $info = $self->connection_info || die;
+ my ($pid, $rfh, $efh) = @{$info}{qw(pid rfh efh)};
+
waitpid $info->{pid}, 0
or warn "waitpid: $!"; # XXX do something more useful?
@@ -80,13 +98,12 @@
if (not $frozen_response) { # no output on stdout at all
return DBI::Gofer::Response->new({
err => 1,
- errstr => "pipe command failed: $stderr_msg",
+ errstr => "pipeone command failed: $stderr_msg",
});
}
warn "STDERR message: $stderr_msg" if $stderr_msg; # XXX do something more
useful
- #warn DBI::neat($frozen_response);
- # XXX may be corrupt
+ # XXX need to be able to detect and deal with corruption
$response = $self->thaw_data($frozen_response);
return $response;
Added: dbi/trunk/lib/DBD/Gofer/Transport/pipestream.pm
==============================================================================
--- (empty file)
+++ dbi/trunk/lib/DBD/Gofer/Transport/pipestream.pm Wed Jan 31 08:11:43 2007
@@ -0,0 +1,103 @@
+package DBD::Gofer::Transport::pipestream;
+
+# $Id: pipeone.pm 8748 2007-01-29 22:49:42Z timbo $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use Carp;
+use Fcntl;
+
+use base qw(DBD::Gofer::Transport::pipeone);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 8748 $ =~ /(\d+)/o);
+
+
+sub nonblock;
+
+sub transmit_request {
+ my ($self, $request) = @_;
+
+ eval {
+
+ my $connection = $self->connection_info;
+ if (not $connection || ($connection->{pid} && not kill 0,
$connection->{pid})) {
+ my $cmd = "perl -MDBI::Gofer::Transport::pipestream -e
run_stdio_hex";
+ #$cmd = "DBI_TRACE=2=/tmp/pipestream.log $cmd";
+ $connection = $self->start_pipe_command($cmd);
+ nonblock($connection->{efh});
+ $self->connection_info($connection);
+ }
+
+ my $frozen_request = unpack("H*", $self->freeze_data($request));
+ $frozen_request .= "\n";
+
+ my $wfh = $connection->{wfh};
+ # send frozen request
+ print $wfh $frozen_request # autoflush enabled
+ or die "Error sending request: $!";
+ warn "Request: $frozen_request" if $self->trace >= 3;
+ };
+ if ($@) {
+ my $response = DBI::Gofer::Response->new({ err => 1, errstr => $@ });
+ $self->response_info($response);
+ # return undef ?
+ }
+ else {
+ $self->response_info(undef);
+ }
+
+ return 1;
+}
+
+
+sub receive_response {
+ my $self = shift;
+
+ my $response = $self->response_info;
+ return $response if $response; # failed while starting
+
+ my $connection = $self->connection_info || die;
+ my ($pid, $rfh, $efh) = @{$connection}{qw(pid rfh efh)};
+
+ my $frozen_response = <$rfh>; # always one line
+ my $stderr_msg = do { local $/; <$efh> }; # nonblocking
+
+ chomp $stderr_msg if $stderr_msg;
+
+ if (not $frozen_response) { # no output on stdout at all
+ warn "STDERR err message: $stderr_msg" if $stderr_msg; # XXX do something
more useful
+ return DBI::Gofer::Response->new({
+ err => 1,
+ errstr => "Error reading from $connection->{cmd}: $stderr_msg",
+ });
+ }
+ chomp $frozen_response if $frozen_response;
+ warn "STDERR additional message: $stderr_msg" if $stderr_msg; # XXX do
something more useful
+ #warn DBI::neat($frozen_response);
+
+ # XXX need to be able to detect and deal with corruption
+ $response = $self->thaw_data(pack("H*",$frozen_response));
+
+ return $response;
+}
+
+
+# nonblock($fh) puts filehandle into nonblocking mode
+sub nonblock {
+ my $fh = shift;
+ my $flags = fcntl($fh, F_GETFL, 0)
+ or croak "Can't get flags for filehandle $fh: $!";
+ fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
+ or croak "Can't make filehandle $fh nonblocking: $!";
+}
+
+1;
+
+__END__
+
Modified: dbi/trunk/lib/DBI/Gofer/Transport/Base.pm
==============================================================================
--- dbi/trunk/lib/DBI/Gofer/Transport/Base.pm (original)
+++ dbi/trunk/lib/DBI/Gofer/Transport/Base.pm Wed Jan 31 08:11:43 2007
@@ -54,7 +54,7 @@
my ($self, $label, $data) = @_;
require Data::Dumper;
# XXX config dumper format
- warn "$label=".Dumper($data);
+ warn "$label=".Data::Dumper::Dumper($data);
}
1;
Copied: dbi/trunk/lib/DBI/Gofer/Transport/pipeone.pm (from r8748,
/dbi/trunk/lib/DBI/Gofer/Transport/pipe.pm)
==============================================================================
--- /dbi/trunk/lib/DBI/Gofer/Transport/pipe.pm (original)
+++ dbi/trunk/lib/DBI/Gofer/Transport/pipeone.pm Wed Jan 31 08:11:43 2007
@@ -1,4 +1,4 @@
-package DBI::Gofer::Transport::pipe;
+package DBI::Gofer::Transport::pipeone;
# $Id$
#
@@ -21,7 +21,7 @@
sub run_one_stdio {
- my $self = DBI::Gofer::Transport::pipe->new();
+ my $self = DBI::Gofer::Transport::pipeone->new();
my $frozen_request = do { local $/; <STDIN> };
Added: dbi/trunk/lib/DBI/Gofer/Transport/pipestream.pm
==============================================================================
--- (empty file)
+++ dbi/trunk/lib/DBI/Gofer/Transport/pipestream.pm Wed Jan 31 08:11:43 2007
@@ -0,0 +1,39 @@
+package DBI::Gofer::Transport::pipestream;
+
+# $Id: pipestream.pm 8748 2007-01-29 22:49:42Z timbo $
+#
+# Copyright (c) 2007, Tim Bunce, Ireland
+#
+# You may distribute under the terms of either the GNU General Public
+# License or the Artistic License, as specified in the Perl README file.
+
+use strict;
+use warnings;
+
+use DBI::Gofer::Execute qw(execute_request);
+
+use base qw(DBI::Gofer::Transport::pipeone Exporter);
+
+our $VERSION = sprintf("0.%06d", q$Revision: 8748 $ =~ /(\d+)/o);
+
+our @EXPORT = qw(run_stdio_hex);
+
+
+sub run_stdio_hex {
+
+ my $self = DBI::Gofer::Transport::pipestream->new();
+ local $| = 1;
+
+ while ( my $frozen_request = <STDIN> ) {
+
+ my $request = $self->thaw_data( pack "H*", $frozen_request );
+ my $response = execute_request( $request );
+
+ my $frozen_response = unpack "H*", $self->freeze_data($response);
+
+ print $frozen_response, "\n"; # autoflushed due to $|=1
+ }
+}
+
+
+1;