One thing to keep in mind is that many runners might have issues with huge elements. If you have a 5gb file, encoding it as a single element might give you pain or might simply not work on some runners.
Reuven On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert < [email protected]> wrote: > Hi, > > I'd like to: > > 1. Read whole files as one input each. (If my input files are hi.txt, > what.txt, and yes.txt, then the whole contents of hi.txt are an element > of > the returned PCollection, the whole contents of what.txt are the next > element, etc.) > 2. Write elements as individual files. (Rather than smashing thousands > of outputs into a handful of files as TextIO does: > output-00000-of-00005, > output-00001-of-00005,..., I want to output each thing individually. So > if > I'm given hi.txt, what.txt, yes.txt then I'd like to read those in as > whole > files individually, then write out my processed results as > hi.txt-modified, > what.txt-modified, yes.txt-modified.). > > Before reading on, if you have easier ways to do these things, then I'd > love to hear them! > > # Part 1 > > I attempted Part 1 with this PR: > https://github.com/apache/beam/pull/3543 > > But I overgeneralized. > > Specifically, I want: > > Pipeline p = Pipeline.create(options); > PCollection<Strings> wholeFilesAsStrings = p.apply("Read Whole Files from > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*")); > > or > > Pipeline p = Pipeline.create(options); > PCollection<InputStream> wholeFileStreams = p.apply("Read Whole Files from > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*")); > > Bonus points would include: > > - Keeping the filename somehow > > > # Part 2 > > Currently, if you output hundreds of thousands of elements (batch-mode) > with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get > output-00000-of-00045, output-00001-of-00045, etc., and each one of those > files contain tens of thousands of outputs back to back. I want them to be > output as individual files. > > If appended after code snippets from Part 1, it would look like: > > ... > p.apply("Write Whole File Outputs", > WholeFileIO.write().to("/path/to/output/dir/")); > > Bonus points would include: > > - Writing each element of the given PCollection to the filename they'd > like to go to. > - Parallelizable. (This might already be done, I just noticed that my > Beam+Flink+YARN pipeline with TextIO.write() only had one TaskManager > writing the DataSink output even though all other components of my > pipeline > had many TaskManagers working on them simultaneously. I haven't found > the > way to fix that yet. The current arrangement added 15 minutes to the > end of > my pipeline as the lonely TaskManager did all the output.) > > > I'm available to put the dev work into this. (Actually, I'm putting dev > time into some kind of solution whether this is agreed upon or not :). > > Feedback, please, > Chris >
