Hi, I'd like to:
1. Read whole files as one input each. (If my input files are hi.txt, what.txt, and yes.txt, then the whole contents of hi.txt are an element of the returned PCollection, the whole contents of what.txt are the next element, etc.) 2. Write elements as individual files. (Rather than smashing thousands of outputs into a handful of files as TextIO does: output-00000-of-00005, output-00001-of-00005,..., I want to output each thing individually. So if I'm given hi.txt, what.txt, yes.txt then I'd like to read those in as whole files individually, then write out my processed results as hi.txt-modified, what.txt-modified, yes.txt-modified.). Before reading on, if you have easier ways to do these things, then I'd love to hear them! # Part 1 I attempted Part 1 with this PR: https://github.com/apache/beam/pull/3543 But I overgeneralized. Specifically, I want: Pipeline p = Pipeline.create(options); PCollection<Strings> wholeFilesAsStrings = p.apply("Read Whole Files from Input Directory", WholeFileIO.read().from("/path/to/input/dir/*")); or Pipeline p = Pipeline.create(options); PCollection<InputStream> wholeFileStreams = p.apply("Read Whole Files from Input Directory", WholeFileIO.read().from("/path/to/input/dir/*")); Bonus points would include: - Keeping the filename somehow # Part 2 Currently, if you output hundreds of thousands of elements (batch-mode) with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get output-00000-of-00045, output-00001-of-00045, etc., and each one of those files contain tens of thousands of outputs back to back. I want them to be output as individual files. If appended after code snippets from Part 1, it would look like: ... p.apply("Write Whole File Outputs", WholeFileIO.write().to("/path/to/output/dir/")); Bonus points would include: - Writing each element of the given PCollection to the filename they'd like to go to. - Parallelizable. (This might already be done, I just noticed that my Beam+Flink+YARN pipeline with TextIO.write() only had one TaskManager writing the DataSink output even though all other components of my pipeline had many TaskManagers working on them simultaneously. I haven't found the way to fix that yet. The current arrangement added 15 minutes to the end of my pipeline as the lonely TaskManager did all the output.) I'm available to put the dev work into this. (Actually, I'm putting dev time into some kind of solution whether this is agreed upon or not :). Feedback, please, Chris
