Hello,

Attached is a suggestion to implement reservoir-sampling in shuf:
When the expected output of lines is known, it will not load the entire file 
into memory - allowing shuffling very large inputs.

I've seen this mentioned once:
 http://lists.gnu.org/archive/html/coreutils/2012-11/msg00079.html
but no follow-up discussion.

There is no change in the usage of shuf (barring unexpected bugs...).

Example (with debug messages):
===
  $ seq 10000 | ./src/shuf ---debug -n 5
  --reservoir_sampling--
  filling reservoir, input line 1 of 5: '1'
  filling reservoir, input line 2 of 5: '2'
  filling reservoir, input line 3 of 5: '3'
  filling reservoir, input line 4 of 5: '4'
  filling reservoir, input line 5 of 5: '5'
  Replacing reservoir sample 4 with line 7 '7'
  Replacing reservoir sample 4 with line 8 '8'
  Replacing reservoir sample 3 with line 9 '9'
  Replacing reservoir sample 2 with line 10 '10'
  Replacing reservoir sample 4 with line 11 '11'
  Replacing reservoir sample 4 with line 16 '16'
  Replacing reservoir sample 4 with line 17 '17'
  Replacing reservoir sample 4 with line 20 '20'
  Replacing reservoir sample 2 with line 22 '22'
  Replacing reservoir sample 0 with line 31 '31'
  Replacing reservoir sample 1 with line 52 '52'
  Replacing reservoir sample 4 with line 55 '55'
  Replacing reservoir sample 3 with line 61 '61'
  Replacing reservoir sample 4 with line 76 '76'
  Replacing reservoir sample 2 with line 169 '169'
  Replacing reservoir sample 2 with line 187 '187'
  Replacing reservoir sample 0 with line 216 '216'
  Replacing reservoir sample 1 with line 340 '340'
  Replacing reservoir sample 4 with line 431 '431'
  Replacing reservoir sample 1 with line 524 '524'
  Replacing reservoir sample 2 with line 942 '942'
  Replacing reservoir sample 1 with line 1096 '1096'
  Replacing reservoir sample 2 with line 1627 '1627'
  Replacing reservoir sample 4 with line 1763 '1763'
  Replacing reservoir sample 2 with line 2679 '2679'
  Replacing reservoir sample 3 with line 4382 '4382'
  Replacing reservoir sample 2 with line 4439 '4439'
  Replacing reservoir sample 3 with line 7748 '7748'
  Replacing reservoir sample 2 with line 9902 '9902'
  -- reservoir lines (begin)--
  216
  1096
  9902
  7748
  1763
  -- reservoir lines (end)--
  216
  1763
  7748
  1096
  9902
===

The last 5 lines are the final output (the rest is STDERR debug messages).
After the input is read completely, the lines are still re-permuted (using the 
existing shuf code), to accommodate cases like:

===
  $ seq 6 | ./src/shuf ---debug -n 5
  --reservoir_sampling--
  filling reservoir, input line 1 of 5: '1'
  filling reservoir, input line 2 of 5: '2'
  filling reservoir, input line 3 of 5: '3'
  filling reservoir, input line 4 of 5: '4'
  filling reservoir, input line 5 of 5: '5'
  Replacing reservoir sample 2 with line 6 '6'
  -- reservoir lines (begin)--
  1
  2
  6
  4
  5
  -- reservoir lines (end)--
  4
  2
  1
  6
  5
===


Comments are welcomed,
 -gordon
>From b64d5063e26c0f3485d8342a2d5501f655f1063e Mon Sep 17 00:00:00 2001
From: Assaf Gordon <[email protected]>
Date: Wed, 6 Mar 2013 18:25:49 -0500
Subject: [PATCH] shuf: use reservoir-sampling when possible

* src/shuf.c: Use reservoir-sampling when the number of output lines
is known (by using '-n X' parameter).
read_input_reservoir_sampling() - read lines from input file, and keep
only K lines in memory, replacing lines with decreasing probability.
prepare_shuf_lines() - convert reservoir lines to a usable structure.
main() - if the number of lines is known, use reservoir-sampling
instead of reading entire input file.
---
 src/shuf.c |  171 ++++++++++++++++++++++++++++++++++++++++++++++++++++++++++--
 1 files changed, 167 insertions(+), 4 deletions(-)

diff --git a/src/shuf.c b/src/shuf.c
index 71ac3e6..27982e5 100644
--- a/src/shuf.c
+++ b/src/shuf.c
@@ -25,6 +25,7 @@
 #include "error.h"
 #include "fadvise.h"
 #include "getopt.h"
+#include "linebuffer.h"
 #include "quote.h"
 #include "quotearg.h"
 #include "randint.h"
@@ -81,7 +82,8 @@ With no FILE, or when FILE is -, read standard input.\n\
    non-character as a pseudo short option, starting with CHAR_MAX + 1.  */
 enum
 {
-  RANDOM_SOURCE_OPTION = CHAR_MAX + 1
+  RANDOM_SOURCE_OPTION = CHAR_MAX + 1,
+  DEV_DEBUG_OPTION
 };
 
 static struct option const long_opts[] =
@@ -92,11 +94,31 @@ static struct option const long_opts[] =
   {"output", required_argument, NULL, 'o'},
   {"random-source", required_argument, NULL, RANDOM_SOURCE_OPTION},
   {"zero-terminated", no_argument, NULL, 'z'},
+  {"-debug", no_argument, NULL, DEV_DEBUG_OPTION},
   {GETOPT_HELP_OPTION_DECL},
   {GETOPT_VERSION_OPTION_DECL},
   {0, 0, 0, 0},
 };
 
+/* debugging for developers.  Enables devmsg(). */
+static bool dev_debug = false;
+
+/* Like error(0, 0, ...), but without an implicit newline.
+   Also a noop unless the global DEV_DEBUG is set.
+   TODO: Replace with variadic macro in system.h or
+   move to a separate module.  */
+static inline void
+devmsg (char const *fmt, ...)
+{
+  if (dev_debug)
+    {
+      va_list ap;
+      va_start (ap, fmt);
+      vfprintf (stderr, fmt, ap);
+      va_end (ap);
+    }
+}
+
 static bool
 input_numbers_option_used (size_t lo_input, size_t hi_input)
 {
@@ -135,6 +157,123 @@ next_line (char *line, char eolbyte, size_t n)
   return p + 1;
 }
 
+/* Covnerts a 'struct linebuffer[]' into a shuf-compatible '**lines' buffer.
+
+   The 'in_lines[]' is an array of n elements of 'struct linebuffer' .
+
+   'out_lines' will be initialized as so:
+     *outlines        - points to an array of '*char'
+     (*outlines)[0]   - points to the entire buffer (containing all lines).
+     (*outlines)[1]   - points to the beginning of the second line
+                        (inside the buffer of (*outlines)[0] ).
+     (*outlines)[K-1] - points to the beginning of the last line
+                        (inside the buffer of (*outlines)[0] ).
+     (*outlines)[K]   - points to the position one byte past the end
+                        of the last line.
+
+   All lines include the line-terminator character (either NUL or \n).
+   Length of strings should be decuded by subtracting pointers, not with strlen.
+   (see write_permuted_output() for the expected usage).
+ */
+static void
+prepare_shuf_lines (struct linebuffer *in_lines, size_t n, char ***out_lines,
+                    char eolbyte)
+{
+  size_t i;
+  size_t size = 0;
+  char* p;
+  char* buf;
+
+  for (i = 0; i < n; ++i)
+    size += in_lines[i].length;
+
+  p = buf = xmalloc (size);
+  *out_lines = xnmalloc (n+1, sizeof (**out_lines));
+  for (i = 0; i < n; ++i)
+    {
+        (*out_lines)[i] = p;
+        memcpy (p, in_lines[i].buffer, in_lines[i].length);
+        p += in_lines[i].length;
+    }
+  (*out_lines)[i] = p;
+
+  devmsg ("-- reservoir lines (begin)--\n");
+  devmsg (buf);
+  devmsg ("-- reservoir lines (end)--\n");
+}
+
+
+static size_t
+read_input_reservoir_sampling (FILE *in, char eolbyte, char ***pline, size_t k,
+                               struct randint_source *s)
+{
+  size_t i;
+  size_t n_lines=0;
+  struct linebuffer line;
+  struct linebuffer *rsrv = XCALLOC (k, struct linebuffer); /* init reservoir*/
+
+  devmsg ("--reservoir_sampling--\n");
+
+  initbuffer (&line);
+  while (readlinebuffer_delim (&line, in, eolbyte)!=NULL)
+    {
+      if ( n_lines < k )
+        {
+          /* Read first K lines into reservoir */
+
+          if (dev_debug)
+            {
+              fprintf (stderr,"filling reservoir, input line %zu of %zu: '",
+                       n_lines+1, k);
+              fwrite (line.buffer, sizeof (char), line.length-1, stderr);
+              fprintf (stderr, "'\n");
+            }
+
+          rsrv[n_lines] = line;
+          initbuffer (&line); /* next line-read will allocate a new buffer */
+
+
+        }
+      else
+        {
+          /* Read the rest of the lines, with decreasing probability of updating
+             the reservoir */
+          randint j = randint_choose (s, n_lines+1);
+          if ( j < k )
+            {
+              if (dev_debug)
+                {
+                  fprintf (stderr,"Replacing reservoir sample %zu with " \
+                           "line %zu '", j, n_lines);
+                  fwrite (line.buffer, sizeof (char), line.length-1, stderr);
+                  fprintf (stderr, "'\n");
+                }
+
+              rsrv[j] = line;
+              initbuffer (&line);/* next line-read will allocate a new buffer */
+
+            }
+        }
+
+      ++n_lines;
+    }
+  freebuffer(&line);
+
+  /* no more input lines, or an input error */
+  if (ferror (in))
+    error (EXIT_FAILURE, errno, _("read error"));
+
+  /* Convert internal 'struct linebuffer' to shuf-compatible '**line' */
+  prepare_shuf_lines (rsrv, MIN (k, n_lines), pline, eolbyte);
+
+  /* free reservoir */
+  for (i = 0; i < k; ++i)
+    freebuffer (&rsrv[i]);
+  free (rsrv);
+
+  return MIN (k, n_lines);
+}
+
 /* Read data from file IN.  Input lines are delimited by EOLBYTE;
    silently append a trailing EOLBYTE if the file ends in some other
    byte.  Store a pointer to the resulting array of lines into *PLINE.
@@ -209,12 +348,13 @@ main (int argc, char **argv)
   char *random_source = NULL;
   char eolbyte = '\n';
   char **input_lines = NULL;
+  bool use_reservoir_sampling = false;
 
   int optc;
   int n_operands;
   char **operand;
   size_t n_lines;
-  char **line;
+  char **line = NULL;
   struct randint_source *randint_source;
   size_t *permutation;
 
@@ -295,6 +435,10 @@ main (int argc, char **argv)
         eolbyte = '\0';
         break;
 
+      case DEV_DEBUG_OPTION:
+        dev_debug = true;
+        break;
+
       case_GETOPT_HELP_CHAR;
       case_GETOPT_VERSION_CHAR (PROGRAM_NAME, AUTHORS);
       default:
@@ -341,8 +485,16 @@ main (int argc, char **argv)
 
       fadvise (stdin, FADVISE_SEQUENTIAL);
 
-      n_lines = read_input (stdin, eolbyte, &input_lines);
-      line = input_lines;
+      if (head_lines != SIZE_MAX)
+        {
+          use_reservoir_sampling = true;
+          n_lines = SIZE_MAX; /* unknown number of input lines, for now */
+        }
+      else
+        {
+          n_lines = read_input (stdin, eolbyte, &input_lines);
+          line = input_lines;
+        }
     }
 
   head_lines = MIN (head_lines, n_lines);
@@ -352,6 +504,17 @@ main (int argc, char **argv)
   if (! randint_source)
     error (EXIT_FAILURE, errno, "%s", quotearg_colon (random_source));
 
+  if (use_reservoir_sampling)
+    {
+      /* Instead of reading the entire file into 'line',
+         use reservoir-sampling to store just "head_lines" random lines. */
+      n_lines = read_input_reservoir_sampling (stdin, eolbyte,
+                                               &input_lines, head_lines,
+                                               randint_source);
+      line = input_lines;
+      head_lines = MIN (head_lines, n_lines);
+    }
+
   /* Close stdin now, rather than earlier, so that randint_all_new
      doesn't have to worry about opening something other than
      stdin.  */
-- 
1.7.7.4

Reply via email to