Author: timbo
Date: Tue Apr 10 16:14:44 2007
New Revision: 9393
Modified:
dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm
dbi/trunk/lib/DBD/Gofer/Transport/stream.pm
dbi/trunk/lib/DBI/Gofer/Transport/stream.pm
Log:
Use proper select() code to read stdout+stderr from child process
to fix hangs from pipeone and stream transports when a pipe fills.
(Still not ideal, but far better than it was.)
Add '-Mblib' to default go_perl if blib module has been loaded.
Bit of a hack to address failures of zvg_* tests some people get.
Replace use of \n with \015\012 for stream packet terminator.
Modified: dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm
==============================================================================
--- dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm (original)
+++ dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm Tue Apr 10 16:14:44 2007
@@ -10,6 +10,9 @@
use strict;
use warnings;
+use Carp;
+use Fcntl;
+use IO::Select;
use IPC::Open3 qw(open3);
use Symbol qw(gensym);
@@ -25,7 +28,9 @@
sub new {
my ($self, $args) = @_;
- $args->{go_perl} ||= [ $^X ];
+ $args->{go_perl} ||= do {
+ ($INC{"blib.pm"}) ? [ $^X, '-Mblib' ] : [ $^X ];
+ };
if (not ref $args->{go_perl}) {
# user can override the perl to be used, either with an array ref
# containing the command name and args to use, or with a string
@@ -38,6 +43,16 @@
}
+# nonblock($fh) puts filehandle into nonblocking mode
+sub nonblock {
+ my $fh = shift;
+ my $flags = fcntl($fh, F_GETFL, 0)
+ or croak "Can't get flags for filehandle $fh: $!";
+ fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
+ or croak "Can't make filehandle $fh nonblocking: $!";
+}
+
+
sub start_pipe_command {
my ($self, $cmd) = @_;
$cmd = [ $cmd ] unless ref $cmd eq 'ARRAY';
@@ -47,26 +62,32 @@
# set PERL5LIB itself.
# limit various forms of insanity, for now
- local $ENV{DBI_TRACE};
+ local $ENV{DBI_TRACE}; # use DBI_GOFER_TRACE instead
local $ENV{DBI_AUTOPROXY};
local $ENV{DBI_PROFILE};
my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
my $pid = open3($wfh, $rfh, $efh, @$cmd)
or die "error starting @$cmd: $!\n";
- $self->trace_msg("Started pid $pid: @$cmd\n",0) if $self->trace;
+ if ($self->trace) {
+ $self->trace_msg(sprintf("Started pid $pid: @$cmd {fd: w%d, r%d,
e%d}\n", fileno $wfh, fileno $rfh, fileno $efh),0);
+ }
+ nonblock($rfh);
+ nonblock($efh);
+ my $ios = IO::Select->new($rfh, $efh);
return {
cmd=>$cmd,
pid=>$pid,
wfh=>$wfh, rfh=>$rfh, efh=>$efh,
+ ios=>$ios,
};
}
sub cmd_as_string {
my $self = shift;
- # XXX meant to return a prroperly shell-escaped string suitable for system
+ # XXX meant to return a properly shell-escaped string suitable for system
# but its only for debugging so that can wait
my $connection_info = $self->connection_info;
return join " ", map { "'$_'" } @{$connection_info->{cmd}};
@@ -83,24 +104,78 @@
my $wfh = delete $info->{wfh};
# send frozen request
- print $wfh $frozen_request;
+ print $wfh $frozen_request
+ or warn "error writing to @$cmd: $!\n";
# indicate that there's no more
close $wfh
- or die "error writing to @$cmd: $!\n";
+ or die "error closing pipe to @$cmd: $!\n";
$self->connection_info( $info );
return;
}
+sub read_response_from_fh {
+ my ($self, $fh_actions) = @_;
+
+ my $info = $self->connection_info || die;
+ my ($ios) = @{$info}{qw(ios)};
+ my $errors = 0;
+ my $complete;
+
+ die "No handles to read response from" unless $ios->count;
+
+ while ($ios->count) {
+ my @readable = $ios->can_read();
+ for my $fh (@readable) {
+ local $_;
+ my $actions = $fh_actions->{$fh} || die "panic: no action for $fh";
+ my $rv = sysread($fh, $_='', 512 * 32);
+ unless ($rv) { # error (undef) or end of file (0)
+ my $action;
+ unless (defined $rv) { # was an error
+ $action = $actions->{error} || $actions->{eof};
+ ++$errors;
+ # XXX an error may be a permenant condition of the handle
+ # if so we'll loop here - not good
+ }
+ else {
+ $action = $actions->{eof};
+ }
+ $action->($fh) && $ios->remove($fh);
+ next;
+ }
+ # action returns true if the response is now complete
+ # (we finish all handles
+ $actions->{read}->($fh) && ++$complete;
+ }
+ last if $complete;
+ }
+ return $errors;
+}
+
+
sub receive_response_by_transport {
my $self = shift;
my $info = $self->connection_info || die;
- my ($pid, $rfh, $efh, $cmd) = @{$info}{qw(pid rfh efh cmd)};
+ my ($pid, $rfh, $efh, $ios, $cmd) = @{$info}{qw(pid rfh efh ios cmd)};
+
+ my $frozen_response;
+ my $stderr_msg;
- my $frozen_response = do { local $/; <$rfh> };
- my $stderr_msg = do { local $/; <$efh> };
+ $self->read_response_from_fh( {
+ $efh => {
+ error => sub { warn "error reading response stderr: $!"; 1 },
+ eof => sub { warn "eof on stderr" if 0; 1 },
+ read => sub { $stderr_msg .= $_; 0 },
+ },
+ $rfh => {
+ error => sub { warn "error reading response: $!"; 1 },
+ eof => sub { warn "eof on stdout" if 0; 1 },
+ read => sub { $frozen_response .= $_; 0 },
+ },
+ });
waitpid $info->{pid}, 0
or warn "waitpid: $!"; # XXX do something more useful?
Modified: dbi/trunk/lib/DBD/Gofer/Transport/stream.pm
==============================================================================
--- dbi/trunk/lib/DBD/Gofer/Transport/stream.pm (original)
+++ dbi/trunk/lib/DBD/Gofer/Transport/stream.pm Tue Apr 10 16:14:44 2007
@@ -120,7 +120,7 @@
};
my $frozen_request = unpack("H*", $self->freeze_data($request));
- $frozen_request .= "\n";
+ $frozen_request .= "\015\012";
my $wfh = $connection->{wfh};
# send frozen request
@@ -144,22 +144,32 @@
# blocks till a newline has been read
$! = 0;
- my $frozen_response = <$rfh>; # always one line
- my $frozen_response_errno = $!;
- # must read any stderr output _afterwards_
- # warnings during execution are caught and returned as part
- # of the response object. So stderr should be silent.
- my $stderr_msg = do { local $/; <$efh> }; # nonblocking
+ my $errno;
+ my $frozen_response;
+ my $stderr_msg;
+
+ $self->read_response_from_fh( {
+ $efh => {
+ error => sub { warn "error reading response stderr: $!";
$errno||=$!; 1 },
+ eof => sub { warn "eof on stderr" if 0; 1 },
+ read => sub { $stderr_msg .= $_; 0 },
+ },
+ $rfh => {
+ error => sub { warn "error reading response: $!"; $errno||=$!; 1 },
+ eof => sub { warn "eof on stdout" if 0; 1 },
+ read => sub { $frozen_response .= $_;
($frozen_response=~s/\015\012$//) ? 1 : 0 },
+ },
+ });
# if we got no output on stdout at all then the command has
# probably exited, possibly with an error to stderr.
# Turn this situation into a reasonably useful DBI error.
- if (not $frozen_response or !chomp $frozen_response) {
+ if (not $frozen_response) {
chomp $stderr_msg if $stderr_msg;
my $msg = sprintf("Error reading from %s (pid %d%s): ",
$self->cmd_as_string, $pid, (kill 0, $pid) ? "" : ", exited");
- $msg .= $stderr_msg || $frozen_response_errno;
+ $msg .= $stderr_msg || $errno || "(no error message)";
die "$msg\n";
}
Modified: dbi/trunk/lib/DBI/Gofer/Transport/stream.pm
==============================================================================
--- dbi/trunk/lib/DBI/Gofer/Transport/stream.pm (original)
+++ dbi/trunk/lib/DBI/Gofer/Transport/stream.pm Tue Apr 10 16:14:44 2007
@@ -28,13 +28,14 @@
#warn "STARTED $$";
while ( my $frozen_request = <STDIN> ) {
+ $frozen_request =~ s/\015?\012$//;
my $request = $self->thaw_data( pack "H*", $frozen_request );
my $response = $executor->execute_request( $request );
my $frozen_response = unpack "H*", $self->freeze_data($response);
- print $frozen_response, "\n"; # autoflushed due to $|=1
+ print $frozen_response, "\015\012"; # autoflushed due to $|=1
}
}