Author: timbo
Date: Tue Apr 10 16:14:44 2007
New Revision: 9393

Modified:
   dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm
   dbi/trunk/lib/DBD/Gofer/Transport/stream.pm
   dbi/trunk/lib/DBI/Gofer/Transport/stream.pm

Log:
Use proper select() code to read stdout+stderr from child process
to fix hangs from pipeone and stream transports when a pipe fills.
(Still not ideal, but far better than it was.)
Add '-Mblib' to default go_perl if blib module has been loaded.
Bit of a hack to address failures of zvg_* tests some people get.
Replace use of \n with \015\012 for stream packet terminator.


Modified: dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm
==============================================================================
--- dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm        (original)
+++ dbi/trunk/lib/DBD/Gofer/Transport/pipeone.pm        Tue Apr 10 16:14:44 2007
@@ -10,6 +10,9 @@
 use strict;
 use warnings;
 
+use Carp;
+use Fcntl;
+use IO::Select;
 use IPC::Open3 qw(open3);
 use Symbol qw(gensym);
 
@@ -25,7 +28,9 @@
 
 sub new {
     my ($self, $args) = @_;
-    $args->{go_perl} ||= [ $^X ];
+    $args->{go_perl} ||= do {
+        ($INC{"blib.pm"}) ? [ $^X, '-Mblib' ] : [ $^X ];
+    };
     if (not ref $args->{go_perl}) {
         # user can override the perl to be used, either with an array ref
         # containing the command name and args to use, or with a string
@@ -38,6 +43,16 @@
 }
 
 
+# nonblock($fh) puts filehandle into nonblocking mode
+sub nonblock { 
+  my $fh = shift;
+  my $flags = fcntl($fh, F_GETFL, 0)
+        or croak "Can't get flags for filehandle $fh: $!";
+  fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
+        or croak "Can't make filehandle $fh nonblocking: $!";
+}
+
+
 sub start_pipe_command {
     my ($self, $cmd) = @_;
     $cmd = [ $cmd ] unless ref $cmd eq 'ARRAY';
@@ -47,26 +62,32 @@
     # set PERL5LIB itself.
 
     # limit various forms of insanity, for now
-    local $ENV{DBI_TRACE};
+    local $ENV{DBI_TRACE}; # use DBI_GOFER_TRACE instead
     local $ENV{DBI_AUTOPROXY};
     local $ENV{DBI_PROFILE};
 
     my ($wfh, $rfh, $efh) = (gensym, gensym, gensym);
     my $pid = open3($wfh, $rfh, $efh, @$cmd)
         or die "error starting @$cmd: $!\n";
-    $self->trace_msg("Started pid $pid: @$cmd\n",0) if $self->trace;
+    if ($self->trace) {
+        $self->trace_msg(sprintf("Started pid $pid: @$cmd {fd: w%d, r%d, 
e%d}\n", fileno $wfh, fileno $rfh, fileno $efh),0);
+    }
+    nonblock($rfh);
+    nonblock($efh);
+    my $ios = IO::Select->new($rfh, $efh);
 
     return {
         cmd=>$cmd,
         pid=>$pid,
         wfh=>$wfh, rfh=>$rfh, efh=>$efh,
+        ios=>$ios,
     };
 }
 
 
 sub cmd_as_string {
     my $self = shift;
-    # XXX meant to return a prroperly shell-escaped string suitable for system
+    # XXX meant to return a properly shell-escaped string suitable for system
     # but its only for debugging so that can wait
     my $connection_info = $self->connection_info;
     return join " ", map { "'$_'" } @{$connection_info->{cmd}};
@@ -83,24 +104,78 @@
 
     my $wfh = delete $info->{wfh};
     # send frozen request
-    print $wfh $frozen_request;
+    print $wfh $frozen_request
+        or warn "error writing to @$cmd: $!\n";
     # indicate that there's no more
     close $wfh
-        or die "error writing to @$cmd: $!\n";
+        or die "error closing pipe to @$cmd: $!\n";
 
     $self->connection_info( $info );
     return;
 }
 
 
+sub read_response_from_fh {
+    my ($self, $fh_actions) = @_;
+
+    my $info = $self->connection_info || die;
+    my ($ios) = @{$info}{qw(ios)};
+    my $errors = 0;
+    my $complete;
+
+    die "No handles to read response from" unless $ios->count;
+
+    while ($ios->count) {
+        my @readable = $ios->can_read();
+        for my $fh (@readable) {
+            local $_;
+            my $actions = $fh_actions->{$fh} || die "panic: no action for $fh";
+            my $rv = sysread($fh, $_='', 512 * 32);
+            unless ($rv) {              # error (undef) or end of file (0)
+                my $action;
+                unless (defined $rv) {  # was an error
+                    $action = $actions->{error} || $actions->{eof};
+                    ++$errors;
+                    # XXX an error may be a permenant condition of the handle
+                    # if so we'll loop here - not good
+                }
+                else {
+                    $action = $actions->{eof};
+                }
+                $action->($fh) && $ios->remove($fh);
+                next;
+            }
+            # action returns true if the response is now complete
+            # (we finish all handles
+            $actions->{read}->($fh) && ++$complete;
+        }
+        last if $complete;
+    }
+    return $errors;
+}
+
+
 sub receive_response_by_transport {
     my $self = shift;
 
     my $info = $self->connection_info || die;
-    my ($pid, $rfh, $efh, $cmd) = @{$info}{qw(pid rfh efh cmd)};
+    my ($pid, $rfh, $efh, $ios, $cmd) = @{$info}{qw(pid rfh efh ios cmd)};
+
+    my $frozen_response;
+    my $stderr_msg;
 
-    my $frozen_response = do { local $/; <$rfh> };
-    my $stderr_msg      = do { local $/; <$efh> };
+    $self->read_response_from_fh( {
+        $efh => {
+            error => sub { warn "error reading response stderr: $!"; 1 },
+            eof   => sub { warn "eof on stderr" if 0; 1 },
+            read  => sub { $stderr_msg .= $_; 0 },
+        },
+        $rfh => {
+            error => sub { warn "error reading response: $!"; 1 },
+            eof   => sub { warn "eof on stdout" if 0; 1 },
+            read  => sub { $frozen_response .= $_; 0 },
+        },
+    });
 
     waitpid $info->{pid}, 0
         or warn "waitpid: $!"; # XXX do something more useful?

Modified: dbi/trunk/lib/DBD/Gofer/Transport/stream.pm
==============================================================================
--- dbi/trunk/lib/DBD/Gofer/Transport/stream.pm (original)
+++ dbi/trunk/lib/DBD/Gofer/Transport/stream.pm Tue Apr 10 16:14:44 2007
@@ -120,7 +120,7 @@
     };
 
     my $frozen_request = unpack("H*", $self->freeze_data($request));
-    $frozen_request .= "\n";
+    $frozen_request .= "\015\012";
 
     my $wfh = $connection->{wfh};
     # send frozen request
@@ -144,22 +144,32 @@
 
     # blocks till a newline has been read
     $! = 0;
-    my $frozen_response = <$rfh>; # always one line
-    my $frozen_response_errno = $!;
 
-    # must read any stderr output _afterwards_
-    # warnings during execution are caught and returned as part
-    # of the response object. So stderr should be silent.
-    my $stderr_msg = do { local $/; <$efh> }; # nonblocking
+    my $errno;
+    my $frozen_response;
+    my $stderr_msg;
+
+    $self->read_response_from_fh( {
+        $efh => {
+            error => sub { warn "error reading response stderr: $!"; 
$errno||=$!; 1 },
+            eof   => sub { warn "eof on stderr" if 0; 1 },
+            read  => sub { $stderr_msg .= $_; 0 },
+        },
+        $rfh => {
+            error => sub { warn "error reading response: $!"; $errno||=$!; 1 },
+            eof   => sub { warn "eof on stdout" if 0; 1 },
+            read  => sub { $frozen_response .= $_; 
($frozen_response=~s/\015\012$//) ? 1 : 0 },
+        },
+    });
 
     # if we got no output on stdout at all then the command has
     # probably exited, possibly with an error to stderr.
     # Turn this situation into a reasonably useful DBI error.
-    if (not $frozen_response or !chomp $frozen_response) {
+    if (not $frozen_response) {
         chomp $stderr_msg if $stderr_msg;
         my $msg = sprintf("Error reading from %s (pid %d%s): ",
             $self->cmd_as_string, $pid, (kill 0, $pid) ? "" : ", exited");
-        $msg .= $stderr_msg || $frozen_response_errno;
+        $msg .= $stderr_msg || $errno || "(no error message)";
         die "$msg\n";
     }
 

Modified: dbi/trunk/lib/DBI/Gofer/Transport/stream.pm
==============================================================================
--- dbi/trunk/lib/DBI/Gofer/Transport/stream.pm (original)
+++ dbi/trunk/lib/DBI/Gofer/Transport/stream.pm Tue Apr 10 16:14:44 2007
@@ -28,13 +28,14 @@
     #warn "STARTED $$";
 
     while ( my $frozen_request = <STDIN> ) {
+        $frozen_request =~ s/\015?\012$//;
 
         my $request = $self->thaw_data( pack "H*", $frozen_request );
         my $response = $executor->execute_request( $request );
 
         my $frozen_response = unpack "H*", $self->freeze_data($response);
 
-        print $frozen_response, "\n"; # autoflushed due to $|=1
+        print $frozen_response, "\015\012"; # autoflushed due to $|=1
     }
 }
 

Reply via email to