For some years now I've been bemoaning the lack of a
`split --filter=COMMAND' feature, which would write to
pipes instead of files, instead of having to split to
a set of files and then run the filter on each of them.
The environment variable FILE is set to the name of the
file that would have been written; it's the command's
responsibility to use that string appropriately.
Sample usage (note proper quoting of $FILE):
$ split --filter 'gzip >$FILE.gz'
The details turned out to be rather trickier than I'd
expected, but I think it's ready to be shared with the
folks in charge of coreutils. Source patch attached.
I haven't touched the documentation or tests.
*** coreutils-8.9/src/split.c~ Sat Jan 1 13:19:23 2011
--- coreutils-8.9/src/split.c Thu Jan 6 17:26:30 2011
***************
*** 25,31 ****
--- 25,33 ----
#include <assert.h>
#include <stdio.h>
#include <getopt.h>
+ #include <signal.h>
#include <sys/types.h>
+ #include <sys/wait.h>
#include "system.h"
#include "error.h"
***************
*** 35,40 ****
--- 37,43 ----
#include "full-write.h"
#include "quote.h"
#include "safe-read.h"
+ #include "sig2str.h"
#include "xfreopen.h"
#include "xstrtol.h"
***************
*** 45,50 ****
--- 48,68 ----
proper_name_utf8 ("Torbjorn Granlund", "Torbj\303\266rn Granlund"), \
proper_name ("Richard M. Stallman")
+ /* Shell command to filter through, instead of creating files. */
+ static char const *filter_command;
+
+ /* Process ID of the filter. */
+ static int filter_pid;
+
+ /* Array of open pipes. */
+ static int *open_pipes;
+ static int open_pipes_size;
+ static int open_pipes_len;
+
+ /* Blocked signals. */
+ static sigset_t oldblocked;
+ static sigset_t newblocked;
+
/* Base name of output files. */
static char const *outbase;
***************
*** 103,108 ****
--- 121,127 ----
{"unbuffered", no_argument, NULL, 'u'},
{"suffix-length", required_argument, NULL, 'a'},
{"numeric-suffixes", no_argument, NULL, 'd'},
+ {"filter", required_argument, NULL, 'f'},
{"verbose", no_argument, NULL, VERBOSE_OPTION},
{"-io-blksize", required_argument, NULL,
IO_BLKSIZE_OPTION}, /* do not document */
***************
*** 170,175 ****
--- 189,195 ----
-C, --line-bytes=SIZE put at most SIZE bytes of lines per output file\n\
-d, --numeric-suffixes use numeric suffixes instead of alphabetic\n\
-e, --elide-empty-files do not generate empty output files with `-n'\n\
+ -f, --filter=COMMAND write to shell COMMAND; file name is $FILE\n\
-l, --lines=NUMBER put NUMBER lines per output file\n\
-n, --number=CHUNKS generate CHUNKS output files. See below\n\
-u, --unbuffered immediately copy input to output with `-n r/...'\n\
***************
*** 256,265 ****
static int
create (const char* name)
{
! if (verbose)
! fprintf (stdout, _("creating file %s\n"), quote (name));
! return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
! (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH));
}
/* Write BYTES bytes at BP to an output file.
--- 276,391 ----
static int
create (const char* name)
{
! if (!filter_command)
! {
! if (verbose)
! fprintf (stdout, _("creating file %s\n"), quote (name));
! return open (name, O_WRONLY | O_CREAT | O_TRUNC | O_BINARY,
! (S_IRUSR | S_IWUSR | S_IRGRP | S_IWGRP | S_IROTH | S_IWOTH));
! }
! else
! {
! int fd_pair[2];
! pid_t child_pid;
! char const *shell_prog = getenv ("SHELL");
! if (shell_prog == NULL)
! shell_prog = "/bin/sh";
! if (setenv ("FILE", name, 1) != 0)
! error (EXIT_FAILURE, errno, "cannot set environment variable");
! if (verbose)
! fprintf (stdout, _("executing with FILE=%s\n"), quote (name));
! if (pipe (fd_pair) != 0)
! error (EXIT_FAILURE, errno, _("cannot create pipe"));
! child_pid = fork ();
! if (child_pid == 0)
! {
! /* This is the child process. If an error occurs here, the
! parent will eventually learn about it after doing a wait,
! at which time it will emit its own error message. */
! int j;
! /* We have to close any pipes that were opened during an
! earlier call, otherwise this process will be holding a
! write-pipe that will prevent the earlier process from
! reading an EOF on the corresponding read-pipe. */
! for (j = 0; j < open_pipes_len; ++j)
! if (close (open_pipes[j]) != 0)
! error (EXIT_FAILURE, errno, _("closing prior pipe"));
! if (close (fd_pair[1]))
! error (EXIT_FAILURE, errno, _("closing output pipe"));
! if (fd_pair[0] != STDIN_FILENO)
! {
! if (dup2 (fd_pair[0], STDIN_FILENO) != STDIN_FILENO)
! error (EXIT_FAILURE, errno, _("moving input pipe"));
! if (close (fd_pair[0]) != 0)
! error (EXIT_FAILURE, errno, _("closing input pipe"));
! }
! sigprocmask (SIG_SETMASK, &oldblocked, NULL);
! execl (shell_prog, last_component (shell_prog), "-c",
! filter_command, (char *) NULL);
! error (EXIT_FAILURE, errno, "%s", shell_prog);
! }
! if (child_pid == -1)
! error (EXIT_FAILURE, errno, _("cannot spawn new process"));
! if (close (fd_pair[0]) != 0)
! error (EXIT_FAILURE, errno, _("closing input pipe"));
! filter_pid = child_pid;
! if (open_pipes_len == open_pipes_size)
! open_pipes = (int *) x2nrealloc (open_pipes, &open_pipes_size,
! sizeof *open_pipes);
! open_pipes[open_pipes_len++] = fd_pair[1];
! return fd_pair[1];
! }
! }
!
! /* Close the output file, and do any associated cleanup.
! If FP and FD are both specified, they refer to the same open file;
! in this case FP is closed, but FD is still used in cleanup. */
! static void
! closeout (FILE *fp, int fd, pid_t pid, char const *name)
! {
! if (fp != NULL && fclose (fp) != 0 && errno != EPIPE)
! error (EXIT_FAILURE, errno, "%s", name);
! if (fd >= 0)
! {
! if (fp == NULL && close (fd) < 0)
! error (EXIT_FAILURE, errno, "%s", name);
! int j;
! for (j = 0; j < open_pipes_len; ++j)
! if (open_pipes[j] == fd) {
! open_pipes[j] = open_pipes[--open_pipes_len];
! break;
! }
! }
! if (pid > 0)
! {
! int wstatus = 0;
! if (waitpid (pid, &wstatus, 0) == -1 && errno != ECHILD)
! error (EXIT_FAILURE, errno, _("waiting for child process"));
! if (WIFSIGNALED (wstatus))
! {
! int sig = WTERMSIG (wstatus);
! if (sig != SIGPIPE)
! {
! char signame[SIG2STR_MAX+1];
! if (sig2str (sig, signame) != 0)
! sprintf(signame, "%d", sig);
! error (sig + 128, 0,
! _("with FILE=%s, signal %s (%s) from command: %s"),
! name, signame, strsignal(sig), filter_command);
! }
! }
! else if (WIFEXITED (wstatus))
! {
! int ex = WEXITSTATUS (wstatus);
! if (ex != 0)
! error (ex, 0, _("with FILE=%s, exit %d from command: %s"),
! name, ex, filter_command);
! }
! else
! /* shouldn't happen. */
! error (EXIT_FAILURE, 0,
! _("unknown status from command (0x%X)"), wstatus);
! }
}
/* Write BYTES bytes at BP to an output file.
***************
*** 273,285 ****
{
if (!bp && bytes == 0 && elide_empty_files)
return;
! if (output_desc >= 0 && close (output_desc) < 0)
! error (EXIT_FAILURE, errno, "%s", outfile);
next_file_name ();
if ((output_desc = create (outfile)) < 0)
error (EXIT_FAILURE, errno, "%s", outfile);
}
! if (full_write (output_desc, bp, bytes) != bytes)
error (EXIT_FAILURE, errno, "%s", outfile);
}
--- 399,410 ----
{
if (!bp && bytes == 0 && elide_empty_files)
return;
! closeout (NULL, output_desc, filter_pid, outfile);
next_file_name ();
if ((output_desc = create (outfile)) < 0)
error (EXIT_FAILURE, errno, "%s", outfile);
}
! if (full_write (output_desc, bp, bytes) != bytes && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", outfile);
}
***************
*** 501,507 ****
/* We don't use the stdout buffer here since we're writing
large chunks from an existing file, so it's more efficient
to write out directly. */
! if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
error (EXIT_FAILURE, errno, "%s", _("write error"));
}
else
--- 626,633 ----
/* We don't use the stdout buffer here since we're writing
large chunks from an existing file, so it's more efficient
to write out directly. */
! if (full_write (STDOUT_FILENO, bp, to_write) != to_write
! && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", _("write error"));
}
else
***************
*** 564,570 ****
error (EXIT_FAILURE, errno, "%s", infile);
else if (n_read == 0)
break; /* eof. */
! if (full_write (STDOUT_FILENO, buf, n_read) != n_read)
error (EXIT_FAILURE, errno, "%s", quote ("-"));
start += n_read;
}
--- 690,696 ----
error (EXIT_FAILURE, errno, "%s", infile);
else if (n_read == 0)
break; /* eof. */
! if (full_write (STDOUT_FILENO, buf, n_read) != n_read && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", quote ("-"));
start += n_read;
}
***************
*** 575,580 ****
--- 701,707 ----
char *of_name;
int ofd;
FILE* ofile;
+ int opid;
} of_t;
enum
***************
*** 637,650 ****
error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
}
! if (fclose (files[i_reopen].ofile) != 0)
error (EXIT_FAILURE, errno, "%s", files[i_reopen].of_name);
files[i_reopen].ofd = OFD_APPEND;
}
files[i_check].ofd = fd;
if (!(files[i_check].ofile = fdopen (fd, "a")))
error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
}
return file_limit;
--- 764,780 ----
error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
}
! if (fclose (files[i_reopen].ofile) != 0 && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", files[i_reopen].of_name);
+ files[i_reopen].ofile = NULL;
files[i_reopen].ofd = OFD_APPEND;
}
files[i_check].ofd = fd;
if (!(files[i_check].ofile = fdopen (fd, "a")))
error (EXIT_FAILURE, errno, "%s", files[i_check].of_name);
+ files[i_check].opid = filter_pid;
+ filter_pid = 0;
}
return file_limit;
***************
*** 658,663 ****
--- 788,794 ----
static void
lines_rr (uintmax_t k, uintmax_t n, char *buf, size_t bufsize)
{
+ bool wrapped = false;
bool file_limit;
size_t i_file;
of_t *files;
***************
*** 678,683 ****
--- 809,815 ----
files[i_file].of_name = xstrdup (outfile);
files[i_file].ofd = OFD_NEW;
files[i_file].ofile = NULL;
+ files[i_file].opid = 0;
}
i_file = 0;
file_limit = false;
***************
*** 715,724 ****
{
if (line_no == k && unbuffered)
{
! if (full_write (STDOUT_FILENO, bp, to_write) != to_write)
error (EXIT_FAILURE, errno, "%s", _("write error"));
}
! else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1)
{
clearerr (stdout); /* To silence close_stdout(). */
error (EXIT_FAILURE, errno, "%s", _("write error"));
--- 847,858 ----
{
if (line_no == k && unbuffered)
{
! if (full_write (STDOUT_FILENO, bp, to_write) != to_write
! && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", _("write error"));
}
! else if (line_no == k && fwrite (bp, to_write, 1, stdout) != 1
! && errno != EPIPE)
{
clearerr (stdout); /* To silence close_stdout(). */
error (EXIT_FAILURE, errno, "%s", _("write error"));
***************
*** 734,752 ****
{
/* Note writing to fd, rather than flushing the FILE gives
an 8% performance benefit, due to reduced data copying. */
! if (full_write (files[i_file].ofd, bp, to_write) != to_write)
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
}
! else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1)
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
if (file_limit)
{
! if (fclose (files[i_file].ofile) != 0)
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
files[i_file].ofd = OFD_APPEND;
}
if (next && ++i_file == n)
! i_file = 0;
}
bp = bp_out;
--- 868,892 ----
{
/* Note writing to fd, rather than flushing the FILE gives
an 8% performance benefit, due to reduced data copying. */
! if (full_write (files[i_file].ofd, bp, to_write) != to_write
! && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
}
! else if (fwrite (bp, to_write, 1, files[i_file].ofile) != 1
! && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
if (file_limit)
{
! if (fclose (files[i_file].ofile) != 0 && errno != EPIPE)
error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
+ files[i_file].ofile = NULL;
files[i_file].ofd = OFD_APPEND;
}
if (next && ++i_file == n)
! {
! wrapped = true;
! i_file = 0;
! }
}
bp = bp_out;
***************
*** 757,767 ****
and to signal any waiting fifo consumers.
Also, close any open file descriptors.
FIXME: Should we do this before EXIT_FAILURE? */
! for (i_file = 0; !k && !elide_empty_files && i_file < n; i_file++)
{
! file_limit |= ofile_open (files, i_file, n);
! if (fclose (files[i_file].ofile) != 0)
! error (EXIT_FAILURE, errno, "%s", files[i_file].of_name);
}
}
--- 897,914 ----
and to signal any waiting fifo consumers.
Also, close any open file descriptors.
FIXME: Should we do this before EXIT_FAILURE? */
! if (!k)
{
! int ceiling = (wrapped ? n : i_file);
! for (i_file = 0; i_file < n; i_file++)
! {
! if (i_file >= ceiling && !elide_empty_files)
! file_limit |= ofile_open (files, i_file, n);
! if (files[i_file].ofd >= 0)
! closeout (files[i_file].ofile, files[i_file].ofd,
! files[i_file].opid, files[i_file].of_name);
! files[i_file].ofd = OFD_APPEND;
! }
}
}
***************
*** 824,830 ****
int this_optind = optind ? optind : 1;
char *slash;
! c = getopt_long (argc, argv, "0123456789C:a:b:del:n:u", longopts, NULL);
if (c == -1)
break;
--- 971,978 ----
int this_optind = optind ? optind : 1;
char *slash;
! c = getopt_long (argc, argv, "0123456789C:a:b:def:l:n:u",
! longopts, NULL);
if (c == -1)
break;
***************
*** 955,960 ****
--- 1103,1112 ----
elide_empty_files = true;
break;
+ case 'f':
+ filter_command = optarg;
+ break;
+
case IO_BLKSIZE_OPTION:
{
uintmax_t tmp_blk_size;
***************
*** 1048,1053 ****
--- 1200,1217 ----
buf = ptr_align (xmalloc (in_blk_size + 1 + page_size - 1), page_size);
+ /* When filtering, closure of one pipe must not terminate the process,
+ as there may still be other streams expecting input from us. */
+ sigemptyset (&newblocked);
+ if (filter_command)
+ {
+ struct sigaction act;
+ sigaction (SIGPIPE, NULL, &act);
+ if (act.sa_handler != SIG_IGN)
+ sigaddset (&newblocked, SIGPIPE);
+ }
+ sigprocmask (SIG_BLOCK, &newblocked, &oldblocked);
+
switch (split_type)
{
case type_digits:
***************
*** 1084,1093 ****
abort ();
}
if (close (STDIN_FILENO) != 0)
error (EXIT_FAILURE, errno, "%s", infile);
! if (output_desc >= 0 && close (output_desc) < 0)
! error (EXIT_FAILURE, errno, "%s", outfile);
exit (EXIT_SUCCESS);
}
--- 1248,1258 ----
abort ();
}
+ sigprocmask (SIG_SETMASK, &oldblocked, NULL);
+
if (close (STDIN_FILENO) != 0)
error (EXIT_FAILURE, errno, "%s", infile);
! closeout (NULL, output_desc, filter_pid, outfile);
exit (EXIT_SUCCESS);
}