On Sat, Jan 13, 2007 at 10:07:59PM -0800, Paul Eggert wrote: > 3. I can see where the user might be able to specify a better > algorithm, for a particular data set. For that, how about if we have > a --compress-program=PROGRAM option, which lets the user plug in any > program that works as a pipeline? E.g., --compress-program=gzip would > use gzip. The default would be to use "PROGRAM -d" to decompress; we > could have another option if that doesn't suffice.
This is pretty close. There is just a few more problems that I see: * It doesn't respond well to SIGPIPE. It terminates without success, so at least the user knows something went wrong, if they check the status, but it doesn't print an error message. * It checks the exit status of decompression processes at the very end, so the user is notified (with an error message and non-successful exit code) if a decompression process fails, but after the output has been generated. I don't think there's anything we can do about this in general, though. * If we can't fork/exec a decompression process, we're hosed, but if we can't fork/exec a compression process, a possible improvement might be to just proceed without compression (and maybe print a warning). On the other hand, the user might want us to terminate (e.g., if they just spelled the name of the program wrong). * Like Paul said, we might want a separate decompress program, and the user might want to pass in options (e.g., --compress-program="gzip -1"). I'm still testing, but so far it's looking pretty good. If you think this patch has a good chance of making it into coreutils, then I'll keep working on it. It does a good job of compression, so I think it will solve (most of) the OP's problem, but it's not a speed booster on my machine. If someone with a multi-processor computer wants to test it maybe we can get some good news on that front. Here's the patch for comments. Thanks, Dan Index: sort.c =================================================================== RCS file: /sources/coreutils/coreutils/src/sort.c,v retrieving revision 1.344 diff -p -u -r1.344 sort.c --- sort.c 13 Dec 2006 21:27:05 -0000 1.344 +++ sort.c 16 Jan 2007 01:34:50 -0000 @@ -1,5 +1,5 @@ /* sort - sort lines of text (with all kinds of options). - Copyright (C) 1988, 1991-2006 Free Software Foundation, Inc. + Copyright (C) 1988, 1991-2007 Free Software Foundation, Inc. This program is free software; you can redistribute it and/or modify it under the terms of the GNU General Public License as published by @@ -25,6 +25,7 @@ #include <getopt.h> #include <sys/types.h> +#include <sys/wait.h> #include <signal.h> #include "system.h" #include "error.h" @@ -261,6 +262,9 @@ static bool have_read_stdin; /* List of key field comparisons to be tried. */ static struct keyfield *keylist; +/* Program used to (de)compress temp files. Must accept -d. */ +static const char *compress_program; + static void sortlines_temp (struct line *, size_t, struct line *); /* Report MESSAGE for FILE, then clean up and exit. @@ -328,6 +332,9 @@ Other options:\n\ multiple options specify multiple directories\n\ -u, --unique with -c, check for strict ordering;\n\ without -c, output only the first of an equal run\n\ + --compress-progam=PROG (de)compress temporaries with PROG, which must\n\ + (de)compress stdin to stdout, and accept the -d\n\ + command line option for decompressing\n\ "), DEFAULT_TMPDIR); fputs (_("\ -z, --zero-terminated end lines with 0 byte, not newline\n\ @@ -364,7 +371,8 @@ native byte values.\n\ non-character as a pseudo short option, starting with CHAR_MAX + 1. */ enum { - RANDOM_SOURCE_OPTION = CHAR_MAX + 1 + RANDOM_SOURCE_OPTION = CHAR_MAX + 1, + COMPRESS_PROGRAM }; static char const short_options[] = "-bcdfgik:mMno:rRsS:t:T:uy:z"; @@ -373,6 +381,7 @@ static struct option const long_options[ { {"ignore-leading-blanks", no_argument, NULL, 'b'}, {"check", no_argument, NULL, 'c'}, + {"compress-program", required_argument, NULL, COMPRESS_PROGRAM}, {"dictionary-order", no_argument, NULL, 'd'}, {"ignore-case", no_argument, NULL, 'f'}, {"general-numeric-sort", no_argument, NULL, 'g'}, @@ -403,6 +412,7 @@ static sigset_t caught_signals; struct tempnode { struct tempnode *volatile next; + pid_t pid; /* If compressed, the pid of compressor, else zero */ char name[1]; /* Actual size is 1 + file name length. */ }; static struct tempnode *volatile temphead; @@ -422,8 +432,8 @@ cleanup (void) /* Create a new temporary file, returning its newly allocated name. Store into *PFP a stream open for writing. */ -static char * -create_temp_file (FILE **pfp) +static struct tempnode * +create_temp_file (int *pfd) { static char const slashbase[] = "/sortXXXXXX"; static size_t temp_dir_index; @@ -439,6 +449,7 @@ create_temp_file (FILE **pfp) memcpy (file, temp_dir, len); memcpy (file + len, slashbase, sizeof slashbase); node->next = NULL; + node->pid = 0; if (++temp_dir_index == temp_dir_count) temp_dir_index = 0; @@ -454,10 +465,11 @@ create_temp_file (FILE **pfp) sigprocmask (SIG_SETMASK, &oldset, NULL); errno = saved_errno; - if (fd < 0 || (*pfp = fdopen (fd, "w")) == NULL) + if (fd < 0) die (_("cannot create temporary file"), file); - return file; + *pfd = fd; + return node; } /* Return a stream for FILE, opened with mode HOW. A null FILE means @@ -515,6 +527,169 @@ xfclose (FILE *fp, char const *file) } static void +close_or_die (int fd, const char *name) +{ + if (close (fd) < 0) + die (_("close failed"), name); +} + +static void +dup2_or_die (int oldfd, int newfd) +{ + if (dup2 (oldfd, newfd) < 0) + error (SORT_FAILURE, errno, _("dup2 failed")); +} + +/* Fork a child process for piping to and do common cleanup */ + +static pid_t +pipe_fork (int pipefds[2]) +{ + struct tempnode *saved_temphead; + sigset_t oldset; + pid_t pid; + + if (pipe (pipefds) < 0) + error (SORT_FAILURE, errno, _("couldn't create pipe")); + + /* This is so the child process won't delete our temp files + if it receives a signal before exec-ing. */ + sigprocmask (SIG_BLOCK, &caught_signals, &oldset); + saved_temphead = temphead; + temphead = NULL; + + if ((pid = fork ()) < 0) + error (SORT_FAILURE, errno, _("couldn't fork")); + else if (0 < pid) + { + temphead = saved_temphead; + sigprocmask (SIG_SETMASK, &oldset, NULL); + } + else + { + sigprocmask (SIG_SETMASK, &oldset, NULL); + + close_or_die (STDIN_FILENO, _("standard input")); + close_or_die (STDOUT_FILENO, _("standard output")); + } + + return pid; +} + +/* Creates a temp file and compression program to filter output to it. */ + +static char * +create_temp (FILE **pfp, pid_t *ppid) +{ + int tempfd; + struct tempnode *node = create_temp_file (&tempfd); + char *name = node->name; + + if (compress_program) + { + int pipefds[2]; + + if ((node->pid = pipe_fork (pipefds))) + { + close_or_die (tempfd, name); + close_or_die (pipefds[0], _("input end of pipe")); + + tempfd = pipefds[1]; + } + else + { + close_or_die (pipefds[1], _("output end of pipe")); + dup2_or_die (tempfd, STDOUT_FILENO); + close_or_die (tempfd, name); + dup2_or_die (pipefds[0], STDIN_FILENO); + close_or_die (pipefds[0], _("input end of pipe")); + + if (execlp (compress_program, compress_program, + (char *) NULL) < 0) + error (SORT_FAILURE, errno, _("couldn't execute %s"), + compress_program); + } + } + + if ((*pfp = fdopen (tempfd, "w")) == NULL) + die (_("couldn't create temp"), name); + + if (ppid) + *ppid = node->pid; + + return name; +} + +static bool +reap (pid_t pid) +{ + int status; + pid_t cpid; + + if ((cpid = waitpid (pid, &status, 0)) < 0 + && (0 <= pid || errno != ECHILD)) + error (SORT_FAILURE, errno, _("waiting for %s"), + compress_program); + + if (0 < cpid) + { + if (! WIFEXITED (status) || WEXITSTATUS (status)) + error (SORT_FAILURE, 0, _("%s terminated abnormally"), + compress_program); + return true; + } + + return false; +} + +/* Open a temp file for decompression and reading. */ + +static FILE * +open_temp (const char *name, pid_t pid) +{ + int tempfd; + FILE *fp; + + if (pid) + reap (pid); + + tempfd = open (name, O_RDONLY); + if (tempfd < 0) + die (_("couldn't open temp"), name); + + if (compress_program) + { + int pipefds[2]; + + if (pipe_fork (pipefds)) + { + close_or_die (tempfd, name); + close_or_die (pipefds[1], _("output end of pipe")); + + tempfd = pipefds[0]; + } + else + { + close_or_die (pipefds[0], _("input end of pipe")); + dup2_or_die (tempfd, STDIN_FILENO); + close_or_die (tempfd, name); + dup2_or_die (pipefds[1], STDOUT_FILENO); + close_or_die (pipefds[1], _("output end of pipe")); + + if (execlp (compress_program, compress_program, + "-d", (char *) NULL) < 0) + error (SORT_FAILURE, errno, _("couldn't execute %s -d"), + compress_program); + } + } + + if ((fp = fdopen (tempfd, "r")) == NULL) + die (_("couldn't create temp"), name); + + return fp; +} + +static void write_bytes (const char *buf, size_t n_bytes, FILE *fp, const char *output_file) { if (fwrite (buf, 1, n_bytes, fp) != n_bytes) @@ -1586,7 +1761,7 @@ check (char const *file_name) file has not been opened yet (or written to, if standard output). */ static void -mergefps (char **files, size_t ntemps, size_t nfiles, +mergefps (char **files, pid_t *pids, size_t ntemps, size_t nfiles, FILE *ofp, char const *output_file) { FILE *fps[NMERGE]; /* Input streams for each file. */ @@ -1609,7 +1784,8 @@ mergefps (char **files, size_t ntemps, s /* Read initial lines from each input file. */ for (i = 0; i < nfiles; ) { - fps[i] = xfopen (files[i], "r"); + fps[i] = + pids[i] ? open_temp (files[i], pids[i]) : xfopen (files[i], "r"); initbuf (&buffer[i], sizeof (struct line), MAX (merge_buffer_size, sort_size / nfiles)); if (fillbuf (&buffer[i], fps[i], files[i])) @@ -1631,7 +1807,10 @@ mergefps (char **files, size_t ntemps, s free (buffer[i].buf); --nfiles; for (j = i; j < nfiles; ++j) - files[j] = files[j + 1]; + { + files[j] = files[j + 1]; + pids[j] = pids[j + 1]; + } } } @@ -1719,6 +1898,7 @@ mergefps (char **files, size_t ntemps, s { fps[i] = fps[i + 1]; files[i] = files[i + 1]; + pids[i] = pids[i + 1]; buffer[i] = buffer[i + 1]; cur[i] = cur[i + 1]; base[i] = base[i + 1]; @@ -1893,8 +2073,8 @@ sortlines_temp (struct line *lines, size common cases. */ static size_t -avoid_trashing_input (char **files, size_t ntemps, size_t nfiles, - char const *outfile) +avoid_trashing_input (char **files, pid_t *pids, size_t ntemps, + size_t nfiles, char const *outfile) { size_t i; bool got_outstat = false; @@ -1930,9 +2110,11 @@ avoid_trashing_input (char **files, size if (same) { FILE *tftp; - char *temp = create_temp_file (&tftp); - mergefps (&files[i], 0, nfiles - i, tftp, temp); + pid_t pid; + char *temp = create_temp (&tftp, &pid); + mergefps (&files[i], &pids[i], 0, nfiles - i, tftp, temp); files[i] = temp; + pids[i] = pid; return i + 1; } } @@ -1946,7 +2128,8 @@ avoid_trashing_input (char **files, size OUTPUT_FILE; a null OUTPUT_FILE stands for standard output. */ static void -merge (char **files, size_t ntemps, size_t nfiles, char const *output_file) +merge (char **files, pid_t *pids, size_t ntemps, size_t nfiles, + char const *output_file) { while (NMERGE < nfiles) { @@ -1967,10 +2150,12 @@ merge (char **files, size_t ntemps, size for (out = in = 0; out < nfiles / NMERGE; out++, in += NMERGE) { FILE *tfp; - char *temp = create_temp_file (&tfp); + pid_t pid; + char *temp = create_temp (&tfp, &pid); size_t nt = MIN (ntemps, NMERGE); ntemps -= nt; - mergefps (&files[in], nt, NMERGE, tfp, temp); + mergefps (&files[in], &pids[in], nt, NMERGE, tfp, temp); + pids[out] = pid; files[out] = temp; } @@ -1984,10 +2169,12 @@ merge (char **files, size_t ntemps, size files as possible, to avoid needless I/O. */ size_t nshortmerge = remainder - cheap_slots + 1; FILE *tfp; - char *temp = create_temp_file (&tfp); + pid_t pid; + char *temp = create_temp (&tfp, &pid); size_t nt = MIN (ntemps, nshortmerge); ntemps -= nt; - mergefps (&files[in], nt, nshortmerge, tfp, temp); + mergefps (&files[in], &pids[in], nt, nshortmerge, tfp, temp); + pids[out] = pid; files[out++] = temp; in += nshortmerge; } @@ -1995,12 +2182,13 @@ merge (char **files, size_t ntemps, size /* Put the remaining input files into the last NMERGE-sized output window, so they will be merged in the next pass. */ memmove(&files[out], &files[in], (nfiles - in) * sizeof *files); + memmove(&pids[out], &pids[in], (nfiles - in) * sizeof *pids); ntemps += out; nfiles -= in - out; } - nfiles = avoid_trashing_input (files, ntemps, nfiles, output_file); - mergefps (files, ntemps, nfiles, NULL, output_file); + nfiles = avoid_trashing_input (files, pids, ntemps, nfiles, output_file); + mergefps (files, pids, ntemps, nfiles, NULL, output_file); } /* Sort NFILES FILES onto OUTPUT_FILE. */ @@ -2060,7 +2248,7 @@ sort (char * const *files, size_t nfiles else { ++ntemps; - temp_output = create_temp_file (&tfp); + temp_output = create_temp (&tfp, NULL); } do @@ -2088,13 +2276,16 @@ sort (char * const *files, size_t nfiles { size_t i; struct tempnode *node = temphead; - char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles); + pid_t *pids; + char **tempfiles = xnmalloc (ntemps, sizeof *tempfiles + sizeof *pids); + pids = (pid_t *) (tempfiles + ntemps); for (i = 0; node; i++) { tempfiles[i] = node->name; + pids[i] = node->pid; node = node->next; } - merge (tempfiles, ntemps, ntemps, output_file); + merge (tempfiles, pids, ntemps, ntemps, output_file); free (tempfiles); } } @@ -2463,6 +2654,10 @@ main (int argc, char **argv) checkonly = true; break; + case COMPRESS_PROGRAM: + compress_program = optarg; + break; + case 'k': key = key_init (&key_buf); @@ -2679,10 +2874,17 @@ main (int argc, char **argv) } if (mergeonly) - merge (files, 0, nfiles, outfile); + { + pid_t *pids = xnmalloc (nfiles, sizeof *pids); + memset (pids, 0, nfiles * sizeof *pids); + merge (files, pids, 0, nfiles, outfile); + } else sort (files, nfiles, outfile); + while (reap (-1)) + continue; + if (have_read_stdin && fclose (stdin) == EOF) die (_("close failed"), "-"); _______________________________________________ Bug-coreutils mailing list Bug-coreutils@gnu.org http://lists.gnu.org/mailman/listinfo/bug-coreutils