Hi, As mentioned on the PR - I support the creation of such an IO (both read and write) with the caveats that Reuven mentioned; we can refine the naming during code review. Note that you won't be able to create a PCollection<InputStream> because elements of a PCollection must have a coder and it's not possible to provide a coder for InputStream.
On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax <re...@google.com.invalid> wrote: > One thing to keep in mind is that many runners might have issues with huge > elements. If you have a 5gb file, encoding it as a single element might > give you pain or might simply not work on some runners. > > Reuven > > On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert < > chris.hebert-...@digitalreasoning.com> wrote: > > > Hi, > > > > I'd like to: > > > > 1. Read whole files as one input each. (If my input files are hi.txt, > > what.txt, and yes.txt, then the whole contents of hi.txt are an > element > > of > > the returned PCollection, the whole contents of what.txt are the next > > element, etc.) > > 2. Write elements as individual files. (Rather than smashing thousands > > of outputs into a handful of files as TextIO does: > > output-00000-of-00005, > > output-00001-of-00005,..., I want to output each thing individually. > So > > if > > I'm given hi.txt, what.txt, yes.txt then I'd like to read those in as > > whole > > files individually, then write out my processed results as > > hi.txt-modified, > > what.txt-modified, yes.txt-modified.). > > > > Before reading on, if you have easier ways to do these things, then I'd > > love to hear them! > > > > # Part 1 > > > > I attempted Part 1 with this PR: > > https://github.com/apache/beam/pull/3543 > > > > But I overgeneralized. > > > > Specifically, I want: > > > > Pipeline p = Pipeline.create(options); > > PCollection<Strings> wholeFilesAsStrings = p.apply("Read Whole Files from > > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*")); > > > > or > > > > Pipeline p = Pipeline.create(options); > > PCollection<InputStream> wholeFileStreams = p.apply("Read Whole Files > from > > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*")); > > > > Bonus points would include: > > > > - Keeping the filename somehow > > > > > > # Part 2 > > > > Currently, if you output hundreds of thousands of elements (batch-mode) > > with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get > > output-00000-of-00045, output-00001-of-00045, etc., and each one of those > > files contain tens of thousands of outputs back to back. I want them to > be > > output as individual files. > > > > If appended after code snippets from Part 1, it would look like: > > > > ... > > p.apply("Write Whole File Outputs", > > WholeFileIO.write().to("/path/to/output/dir/")); > > > > Bonus points would include: > > > > - Writing each element of the given PCollection to the filename they'd > > like to go to. > > - Parallelizable. (This might already be done, I just noticed that my > > Beam+Flink+YARN pipeline with TextIO.write() only had one TaskManager > > writing the DataSink output even though all other components of my > > pipeline > > had many TaskManagers working on them simultaneously. I haven't found > > the > > way to fix that yet. The current arrangement added 15 minutes to the > > end of > > my pipeline as the lonely TaskManager did all the output.) > > > > > > I'm available to put the dev work into this. (Actually, I'm putting dev > > time into some kind of solution whether this is agreed upon or not :). > > > > Feedback, please, > > Chris > > >