Hi,

I'd like to:

   1. Read whole files as one input each. (If my input files are hi.txt,
   what.txt, and yes.txt, then the whole contents of hi.txt are an element of
   the returned PCollection, the whole contents of what.txt are the next
   element, etc.)
   2. Write elements as individual files. (Rather than smashing thousands
   of outputs into a handful of files as TextIO does: output-00000-of-00005,
   output-00001-of-00005,..., I want to output each thing individually. So if
   I'm given hi.txt, what.txt, yes.txt then I'd like to read those in as whole
   files individually, then write out my processed results as hi.txt-modified,
   what.txt-modified, yes.txt-modified.).

Before reading on, if you have easier ways to do these things, then I'd
love to hear them!

# Part 1

I attempted Part 1 with this PR:
https://github.com/apache/beam/pull/3543

But I overgeneralized.

Specifically, I want:

Pipeline p = Pipeline.create(options);
PCollection<Strings> wholeFilesAsStrings = p.apply("Read Whole Files from
Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));

or

Pipeline p = Pipeline.create(options);
PCollection<InputStream> wholeFileStreams = p.apply("Read Whole Files from
Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));

Bonus points would include:

   - Keeping the filename somehow


# Part 2

Currently, if you output hundreds of thousands of elements (batch-mode)
with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get
output-00000-of-00045, output-00001-of-00045, etc., and each one of those
files contain tens of thousands of outputs back to back. I want them to be
output as individual files.

If appended after code snippets from Part 1, it would look like:

...
p.apply("Write Whole File Outputs",
WholeFileIO.write().to("/path/to/output/dir/"));

Bonus points would include:

   - Writing each element of the given PCollection to the filename they'd
   like to go to.
   - Parallelizable. (This might already be done, I just noticed that my
   Beam+Flink+YARN pipeline with TextIO.write() only had one TaskManager
   writing the DataSink output even though all other components of my pipeline
   had many TaskManagers working on them simultaneously. I haven't found the
   way to fix that yet. The current arrangement added 15 minutes to the end of
   my pipeline as the lonely TaskManager did all the output.)


I'm available to put the dev work into this. (Actually, I'm putting dev
time into some kind of solution whether this is agreed upon or not :).

Feedback, please,
Chris

Reply via email to