For some years now I've been bemoaning the lack of a
`split --filter=COMMAND' feature, which would write to
pipes instead of files, instead of having to split to
a set of files and then run the filter on each of them.
The environment variable FILE is set to the name of the
file that would have been written; it's the command's
responsibility to use that string appropriately.
Sample usage (note proper quoting of $FILE):
$ split --filter 'gzip >$FILE.gz'

The details turned out to be rather trickier than I'd
expected, but I think it's ready to be shared with the
folks in charge of coreutils.  Source patch attached.
I haven't touched the documentation or tests.
*** coreutils-8.9/src/split.c~	Sat Jan  1 13:19:23 2011
--- coreutils-8.9/src/split.c	Thu Jan  6 17:26:30 2011
***************
*** 25,31 ****
--- 25,33 ----
  #include <assert.h>
  #include <stdio.h>
  #include <getopt.h>
+ #include <signal.h>
  #include <sys/types.h>
+ #include <sys/wait.h>
  
  #include "system.h"
  #include "error.h"
***************
*** 35,40 ****
--- 37,43 ----
  #include "full-write.h"
  #include "quote.h"
  #include "safe-read.h"
+ #include "sig2str.h"
  #include "xfreopen.h"
  #include "xstrtol.h"
  
***************
*** 45,50 ****
--- 48,68 ----
    proper_name_utf8 ("Torbjorn Granlund", "Torbj\303\266rn Granlund"), \
    proper_name ("Richard M. Stallman")
  
+ /* Shell command to filter through, instead of creating files.  */
+ static char const *filter_command;
+ 
+ /* Process ID of the filter.  */
+ static int filter_pid;
+ 
+ /* Array of open pipes.  */
+ static int *open_pipes;
+ static int open_pipes_size;
+ static int open_pipes_len;
+ 
+ /* Blocked signals.  */
+ static sigset_t oldblocked;
+ static sigset_t newblocked;
+ 
  /* Base name of output files.  */
  static char const *outbase;
  
***************
*** 103,108 ****
--- 121,127 ----
    {"unbuffered", no_argument, NULL, 'u'},
    {"suffix-length", required_argument, NULL, 'a'},
    {"numeric-suffixes", no_argument, NULL, 'd'},
+   {"filter", required_argument, NULL, 'f'},
    {"verbose", no_argument, NULL, VERBOSE_OPTION},
    {"-io-blksize", required_argument, NULL,
      IO_BLKSIZE_OPTION}, /* do not document */
***************
*** 170,175 ****
--- 189,195 ----
    -C, --line-bytes=SIZE   put at most SIZE bytes of lines per output file\n\
    -d, --numeric-suffixes  use numeric suffixes instead of alphabetic\n\
    -e, --elide-empty-files  do not generate empty output files with `-n'\n\
+   -f, --filter=COMMAND    write to shell COMMAND; file name is $FILE\n\
    -l, --lines=NUMBER      put NUMBER lines per output file\n\
    -n, --number=CHUNKS     generate CHUNKS output files.  See below\n\
    -u, --unbuffered        immediately copy input to output with `-n r/...'\n\
***************
*** 256,265 ****
  static int
  create (const char* name)
  {
!   if (verbose)
!     fprintf (stdout, _("creating file %s\n"), quote (name));
!   return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
!                (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH));
  }
  
  /* Write BYTES bytes at BP to an output file.
--- 276,391 ----
  static int
  create (const char* name)
  {
!   if (!filter_command)
!     {
!       if (verbose)
!         fprintf (stdout, _("creating file %s\n"), quote (name));
!       return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
!                    (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH));
!     }
!   else
!     {
!       int fd_pair[2];
!       pid_t child_pid;
!       char const *shell_prog = getenv ("SHELL");
!       if (shell_prog == NULL)
!         shell_prog = "/bin/sh";
!       if (setenv ("FILE", name, 1) != 0)
!         error (EXIT_FAILURE, errno, "cannot set environment variable");
!       if (verbose)
!         fprintf (stdout, _("executing with FILE=%s\n"), quote (name));
!       if (pipe (fd_pair) != 0)
!         error (EXIT_FAILURE, errno, _("cannot create pipe"));
!       child_pid = fork ();
!       if (child_pid == 0)
!         {
!           /* This is the child process.  If an error occurs here, the
!              parent will eventually learn about it after doing a wait,
!              at which time it will emit its own error message.  */
!           int j;
!           /* We have to close any pipes that were opened during an
!              earlier call, otherwise this process will be holding a
!              write-pipe that will prevent the earlier process from
!              reading an EOF on the corresponding read-pipe.  */
!           for (j = 0; j < open_pipes_len; ++j)
!             if (close (open_pipes[j]) != 0)
!               error (EXIT_FAILURE, errno, _("closing prior pipe"));
!           if (close (fd_pair[1]))
!             error (EXIT_FAILURE, errno, _("closing output pipe"));
!           if (fd_pair[0] != STDIN_FILENO)
!             {
!               if (dup2 (fd_pair[0], STDIN_FILENO) != STDIN_FILENO)
!                 error (EXIT_FAILURE, errno, _("moving input pipe"));
!               if (close (fd_pair[0]) != 0)
!                 error (EXIT_FAILURE, errno, _("closing input pipe"));
!             }
!           sigprocmask (SIG_SETMASK, &oldblocked, NULL);
!           execl (shell_prog, last_component (shell_prog), "-c",
!                  filter_command, (char *) NULL);
!           error (EXIT_FAILURE, errno, "%s", shell_prog);
!         }
!       if (child_pid == -1)
!         error (EXIT_FAILURE, errno, _("cannot spawn new process"));
!       if (close (fd_pair[0]) != 0)
!         error (EXIT_FAILURE, errno, _("closing input pipe"));
!       filter_pid = child_pid;
!       if (open_pipes_len == open_pipes_size)
!         open_pipes = (int *) x2nrealloc (open_pipes, &open_pipes_size,
!                                          sizeof *open_pipes);
!       open_pipes[open_pipes_len++] = fd_pair[1];
!       return fd_pair[1];
!     }
! }
! 
! /* Close the output file, and do any associated cleanup.
!    If FP and FD are both specified, they refer to the same open file;
!    in this case FP is closed, but FD is still used in cleanup.  */
! static void
! closeout (FILE *fp, int fd, pid_t pid, char const *name)
! {
!   if (fp != NULL && fclose (fp) != 0 && errno != EPIPE)
!     error (EXIT_FAILURE, errno, "%s", name);
!   if (fd >= 0)
!     {
!       if (fp == NULL && close (fd) < 0)
!         error (EXIT_FAILURE, errno, "%s", name);
!       int j;
!       for (j = 0; j < open_pipes_len; ++j)
!         if (open_pipes[j] == fd) {
!           open_pipes[j] = open_pipes[--open_pipes_len];
!           break;
!         }
!     }
!   if (pid > 0)
!     {
!       int wstatus = 0;
!       if (waitpid (pid, &wstatus, 0) == -1 && errno != ECHILD)
!         error (EXIT_FAILURE, errno, _("waiting for child process"));
!       if (WIFSIGNALED (wstatus))
!         {
!           int sig = WTERMSIG (wstatus);
!           if (sig != SIGPIPE)
!             {
!               char signame[SIG2STR_MAX+1];
!               if (sig2str (sig, signame) != 0)
!                 sprintf(signame, "%d", sig);
!               error (sig + 128, 0,
!                      _("with FILE=%s, signal %s (%s) from command: %s"),
!                      name, signame, strsignal(sig), filter_command);
!             }
!         }
!       else if (WIFEXITED (wstatus))
!         {
!           int ex = WEXITSTATUS (wstatus);
!           if (ex != 0)
!             error (ex, 0, _("with FILE=%s, exit %d from command: %s"),
!                    name, ex, filter_command);
!         }
!       else
!         /* shouldn't happen.  */
!         error (EXIT_FAILURE, 0,
!                _("unknown status from command (0x%X)"), wstatus);
!     }
  }
  
  /* Write BYTES bytes at BP to an output file.
***************
*** 273,285 ****
      {
        if (!bp && bytes == 0 && elide_empty_files)
          return;
!       if (output_desc >= 0 && close (output_desc) < 0)
!         error (EXIT_FAILURE, errno, "%s", outfile);
        next_file_name ();
        if ((output_desc = create (outfile)) < 0)
          error (EXIT_FAILURE, errno, "%s", outfile);
      }
!   if (full_write (output_desc, bp, bytes) != bytes)
      error (EXIT_FAILURE, errno, "%s", outfile);
  }
  
--- 399,410 ----
      {
        if (!bp && bytes == 0 && elide_empty_files)
          return;
!       closeout (NULL, output_desc, filter_pid, outfile);
        next_file_name ();
        if ((output_desc = create (outfile)) < 0)
          error (EXIT_FAILURE, errno, "%s", outfile);
      }
!   if (full_write (output_desc, bp, bytes) != bytes && errno != EPIPE)
      error (EXIT_FAILURE, errno, "%s", outfile);
  }
  
***************
*** 501,507 ****
                /* We don't use the stdout buffer here since we're writing
                   large chunks from an existing file, so it's more efficient
                   to write out directly.  */
!               if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
                  error (EXIT_FAILURE, errno, "%s", _("write error"));
              }
            else
--- 626,633 ----
                /* We don't use the stdout buffer here since we're writing
                   large chunks from an existing file, so it's more efficient
                   to write out directly.  */
!               if (full_write (STDOUT_FILENO, bp, to_write) != to_write
!                   && errno != EPIPE)
                  error (EXIT_FAILURE, errno, "%s", _("write error"));
              }
            else
***************
*** 564,570 ****
          error (EXIT_FAILURE, errno, "%s", infile);
        else if (n_read == 0)
          break; /* eof.  */
!       if (full_write (STDOUT_FILENO, buf, n_read) != n_read)
          error (EXIT_FAILURE, errno, "%s", quote ("-"));
        start += n_read;
      }
--- 690,696 ----
          error (EXIT_FAILURE, errno, "%s", infile);
        else if (n_read == 0)
          break; /* eof.  */
!       if (full_write (STDOUT_FILENO, buf, n_read) != n_read && errno != EPIPE)
          error (EXIT_FAILURE, errno, "%s", quote ("-"));
        start += n_read;
      }
***************
*** 575,580 ****
--- 701,707 ----
    char *of_name;
    int ofd;
    FILE* ofile;
+   int opid;
  } of_t;
  
  enum
***************
*** 637,650 ****
                  error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
              }
  
!           if (fclose (files[i_reopen].ofile) != 0)
              error (EXIT_FAILURE, errno, "%s", files[i_reopen].of_name);
            files[i_reopen].ofd = OFD_APPEND;
          }
  
        files[i_check].ofd = fd;
        if (!(files[i_check].ofile = fdopen (fd, "a")))
          error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
      }
  
    return file_limit;
--- 764,780 ----
                  error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
              }
  
!           if (fclose (files[i_reopen].ofile) != 0 && errno != EPIPE)
              error (EXIT_FAILURE, errno, "%s", files[i_reopen].of_name);
+           files[i_reopen].ofile = NULL;
            files[i_reopen].ofd = OFD_APPEND;
          }
  
        files[i_check].ofd = fd;
        if (!(files[i_check].ofile = fdopen (fd, "a")))
          error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
+       files[i_check].opid = filter_pid;
+       filter_pid = 0;
      }
  
    return file_limit;
***************
*** 658,663 ****
--- 788,794 ----
  static void
  lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize)
  {
+   bool wrapped = false;
    bool file_limit;
    size_t i_file;
    of_t *files;
***************
*** 678,683 ****
--- 809,815 ----
            files[i_file].of_name = xstrdup (outfile);
            files[i_file].ofd = OFD_NEW;
            files[i_file].ofile = NULL;
+           files[i_file].opid = 0;
          }
        i_file = 0;
        file_limit = false;
***************
*** 715,724 ****
              {
                if (line_no == k && unbuffered)
                  {
!                   if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
                      error (EXIT_FAILURE, errno, "%s", _("write error"));
                  }
!               else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1)
                  {
                    clearerr (stdout); /* To silence close_stdout().  */
                    error (EXIT_FAILURE, errno, "%s", _("write error"));
--- 847,858 ----
              {
                if (line_no == k && unbuffered)
                  {
!                   if (full_write (STDOUT_FILENO, bp, to_write) != to_write
!                       && errno != EPIPE)
                      error (EXIT_FAILURE, errno, "%s", _("write error"));
                  }
!               else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1
!                        && errno != EPIPE)
                  {
                    clearerr (stdout); /* To silence close_stdout().  */
                    error (EXIT_FAILURE, errno, "%s", _("write error"));
***************
*** 734,752 ****
                  {
                    /* Note writing to fd, rather than flushing the FILE gives
                       an 8% performance benefit, due to reduced data copying.  */
!                   if (full_write (files[i_file].ofd, bp, to_write) != to_write)
                      error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
                  }
!               else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1)
                  error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
                if (file_limit)
                  {
!                   if (fclose (files[i_file].ofile) != 0)
                      error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
                    files[i_file].ofd = OFD_APPEND;
                  }
                if (next && ++i_file == n)
!                 i_file = 0;
              }
  
            bp = bp_out;
--- 868,892 ----
                  {
                    /* Note writing to fd, rather than flushing the FILE gives
                       an 8% performance benefit, due to reduced data copying.  */
!                   if (full_write (files[i_file].ofd, bp, to_write) != to_write
!                       && errno != EPIPE)
                      error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
                  }
!               else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1
!                        && errno != EPIPE)
                  error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
                if (file_limit)
                  {
!                   if (fclose (files[i_file].ofile) != 0 && errno != EPIPE)
                      error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
+                   files[i_file].ofile = NULL;
                    files[i_file].ofd = OFD_APPEND;
                  }
                if (next && ++i_file == n)
!                 {
!                   wrapped = true;
!                   i_file = 0;
!                 }
              }
  
            bp = bp_out;
***************
*** 757,767 ****
       and to signal any waiting fifo consumers.
       Also, close any open file descriptors.
       FIXME: Should we do this before EXIT_FAILURE?  */
!   for (i_file = 0; !k && !elide_empty_files && i_file < n; i_file++)
      {
!       file_limit |= ofile_open (files, i_file, n);
!       if (fclose (files[i_file].ofile) != 0)
!         error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
      }
  }
  
--- 897,914 ----
       and to signal any waiting fifo consumers.
       Also, close any open file descriptors.
       FIXME: Should we do this before EXIT_FAILURE?  */
!   if (!k)
      {
!       int ceiling = (wrapped ? n : i_file);
!       for (i_file = 0; i_file < n; i_file++)
!         {
!           if (i_file >= ceiling && !elide_empty_files)
!             file_limit |= ofile_open (files, i_file, n);
!           if (files[i_file].ofd >= 0)
!             closeout (files[i_file].ofile, files[i_file].ofd,
!                       files[i_file].opid, files[i_file].of_name);
!           files[i_file].ofd = OFD_APPEND;
!         }
      }
  }
  
***************
*** 824,830 ****
        int this_optind = optind ? optind : 1;
        char *slash;
  
!       c = getopt_long (argc, argv, "0123456789C:a:b:del:n:u", longopts, NULL);
        if (c == -1)
          break;
  
--- 971,978 ----
        int this_optind = optind ? optind : 1;
        char *slash;
  
!       c = getopt_long (argc, argv, "0123456789C:a:b:def:l:n:u",
!                        longopts, NULL);
        if (c == -1)
          break;
  
***************
*** 955,960 ****
--- 1103,1112 ----
            elide_empty_files = true;
            break;
  
+         case 'f':
+           filter_command = optarg;
+           break;
+ 
          case IO_BLKSIZE_OPTION:
            {
              uintmax_t tmp_blk_size;
***************
*** 1048,1053 ****
--- 1200,1217 ----
  
    buf = ptr_align (xmalloc (in_blk_size + 1 + page_size - 1), page_size);
  
+   /* When filtering, closure of one pipe must not terminate the process,
+      as there may still be other streams expecting input from us.  */
+   sigemptyset (&newblocked);
+   if (filter_command)
+     {
+       struct sigaction act;
+       sigaction (SIGPIPE, NULL, &act);
+       if (act.sa_handler != SIG_IGN)
+         sigaddset (&newblocked, SIGPIPE);
+     }
+   sigprocmask (SIG_BLOCK, &newblocked, &oldblocked);
+ 
    switch (split_type)
      {
      case type_digits:
***************
*** 1084,1093 ****
        abort ();
      }
  
    if (close (STDIN_FILENO) != 0)
      error (EXIT_FAILURE, errno, "%s", infile);
!   if (output_desc >= 0 && close (output_desc) < 0)
!     error (EXIT_FAILURE, errno, "%s", outfile);
  
    exit (EXIT_SUCCESS);
  }
--- 1248,1258 ----
        abort ();
      }
  
+   sigprocmask (SIG_SETMASK, &oldblocked, NULL);
+ 
    if (close (STDIN_FILENO) != 0)
      error (EXIT_FAILURE, errno, "%s", infile);
!   closeout (NULL, output_desc, filter_pid, outfile);
  
    exit (EXIT_SUCCESS);
  }

Reply via email to