I opened a pull request with my initial implementation of WholeFileIO: https://github.com/apache/beam/pull/3717
It has a few TODO items left, but feel free to review it and comment on my approach. On Tue, Aug 8, 2017 at 4:37 PM, Chamikara Jayalath <[email protected]> wrote: > I also agree that it'll be cleaner to write your elements to files using a > ParDo instead of trying to use FileBasedSink. FileBasedSink is specifically > designed for writing bundles of elements to single files of a given > file-type. Same goes for FileBasedSource when reading. You can use the > FileSystem abstraction to abstract out reading from/writing to different > file-systems. > > BTW I appreciate if you can create Python SDK versions of these JIRAs as > well and update those JIRAs once the Java SDK versions are finalized. Even > better if you can implement Python SDK versions as well :). > > Thanks, > Cham > > On Mon, Aug 7, 2017 at 3:14 PM Eugene Kirpichov > <[email protected]> wrote: > > > Hi! Thanks for filing the tickets. > > > > On Mon, Aug 7, 2017 at 1:44 PM Chris Hebert < > > [email protected]> wrote: > > > > > Hello again, > > > > > > I created tickets to capture these requests: > > > https://issues.apache.org/jira/browse/BEAM-2750 > > > https://issues.apache.org/jira/browse/BEAM-2751 > > > > > > I've started working on the Write part. > > > > > > Robert, after some time working on this, I'm unable to see how these > > > objectives can be "entirely done in the context of a DoFn". Could you > > lend > > > a hint? > > > > > > I assume you didn't mean for me to append to my pipeline a ParDo.of(new > > > DoFn... that manually writes out to some file location, did you? That > > would > > > lose all the benefits of the IO/Sink classes. > > > > > Which benefits? I think nearly everything that FileBasedSink does is > > unnecessary for WholeFileIO.write() - I agree with Robert that it can be > > implemented with a ParDo (I suppose he meant that WholeFileIO.write() can > > be implemented by expanding it into a single ParDo). > > > > > > > > > > That said, I've found the "sharding" logic to be deeply embedded in all > > the > > > FileBaseSink derivatives, and my attempt at sidestepping this logic > isn't > > > going very well. I managed to write a FileBasedSink that writes Byte[] > > out > > > (and that works correctly), but figuring out how to get the > > FIlenamePolicy > > > to be different for each element written out seems counter to the > intent > > of > > > much of these classes. > > > > > > Chris > > > > > > On Wed, Aug 2, 2017 at 10:23 AM, Reuven Lax <[email protected]> > > > wrote: > > > > > > > On Wed, Aug 2, 2017 at 7:49 AM, Chris Hebert < > > > > [email protected]> wrote: > > > > > > > > > Thanks for the feedback! > > > > > > > > > > Aggregated thoughts: > > > > > > > > > > 1. Warn users about large files (like 5GB large) > > > > > > > > > > > > > I would set the threshold smaller. Also remember, that while you may > > > warn, > > > > some runners might simply fail to process the record causing > pipelines > > to > > > > either get stuck or fail all together. > > > > > > > > > > > > > 2. Filenames can stick with contents via > PCollection<KV<filename, > > > > > contents>> > > > > > 3. InputStreams can't be encoded directly, but could be > referenced > > > in > > > > a > > > > > FileWrapper object > > > > > 4. Be mindful of sink race conditions with multiple workers; > make > > > sure > > > > > failed workers cleanup incompletely written files > > > > > 5. File systems often do better with few large files than many > > small > > > > > ones > > > > > 6. Most/all of this can be done in the context of a DoFn > > > > > > > > > > # Regarding point 1 and point 2 > > > > > Yes! > > > > > > > > > > # Regarding point 3: > > > > > > > > > > ## Approach A: > > > > > When the FileWrapper is encoded, it must somehow encode a reference > > to > > > > the > > > > > InputStream it is associated with, so that when the FileWrapper is > > > > decoded > > > > > it can pick up that InputStream again. My Java knowledge isn't deep > > > > enough > > > > > to know how one would do that with hashcodes and object lookup and > > > such, > > > > > but I could sidestep that entirely by simply encoding the filepath > > with > > > > the > > > > > FileWrapper, then open up a new InputStream on that file path every > > > time > > > > > the FileWrapper is decoded. > > > > > > > > > > ## Approach B: > > > > > An alternative to the above technique is to simply pass a byte[] > > array, > > > > > like so: > > > > > PCollection<KV<String, byte[]>> fileNamesAndBytes = p.apply("Read", > > > > > WholeFileIO.read().from("/path/to/input/dir/*")); > > > > > > > > > > That would solve the encoding problem, allow users to get whatever > > they > > > > > want out of it with a ByteArrayInputStream, AND put a hard limit on > > the > > > > > size of incoming files at just below 2 GB (if my math is right). > > (This > > > is > > > > > large enough for my use case, at present.) > > > > > > > > > > > > > > > # Regarding point 4: > > > > > > > > > > Any examples or guidance I could pull from to protect against this > > > > > properly? > > > > > > > > > > > > > > > # Regarding point 5: > > > > > > > > > > TextIO can read and write with different compressions. Would it be > > > > feasible > > > > > for this WholeFileIO to read and write these many files to > compressed > > > zip > > > > > files also? (I envision this as a stretch feature that would be > added > > > > after > > > > > the initial iteration anyway.) > > > > > > > > > > > > > > > # Regarding point 6: > > > > > > > > > > The only prebuilt IO thing I've found find in Beam that uses DoFn > is > > > > > WriteFiles. Do you have any examples to point towards to enlighten > me > > > on > > > > > the use of DoFn in this context? Unfortunately, we all know the > > > > "Authoring > > > > > I/O Transforms" documentation is sparse. > > > > > > > > > > > > > > > Keep it coming, > > > > > Chris > > > > > > > > > > On Tue, Aug 1, 2017 at 3:55 PM, Robert Bradshaw > > > > > <[email protected] > > > > > > wrote: > > > > > > > > > > > On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov < > > > > > > [email protected]> wrote: > > > > > > > > > > > > > Hi, > > > > > > > As mentioned on the PR - I support the creation of such an IO > > (both > > > > > read > > > > > > > and write) with the caveats that Reuven mentioned; we can > refine > > > the > > > > > > naming > > > > > > > during code review. > > > > > > > Note that you won't be able to create a > PCollection<InputStream> > > > > > because > > > > > > > elements of a PCollection must have a coder and it's not > possible > > > to > > > > > > > provide a coder for InputStream. > > > > > > > > > > > > > > > > > > Well, it's possible, but the fact that InputStream is mutable may > > > cause > > > > > > issues (e.g. if there's fusion, or when estimating its size). > > > > > > > > > > > > I would probably let the API consume/produce a > > > PCollection<KV<filename, > > > > > > contents>>. Alternatively, a FileWrapper object of some kind > could > > > > > provide > > > > > > accessors to InputStream (or otherwise facilitate lazy reading). > > > > > > > > > > > > Note for the sink one must take care there's no race in case > > multiple > > > > > > workers are attempting to process the same bundle (and ideally > > > cleanup > > > > in > > > > > > the face of failure). Other than that, these could be entirely > done > > > in > > > > > the > > > > > > context of a DoFn. > > > > > > > > > > > > Also not that most filesystems, especially distributed ones, do > > > better > > > > > > reading and writing fewer larger files than many, many small > ones. > > > > > > > > > > > > > > > > > > > On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax > > <[email protected] > > > > > > > > > > > wrote: > > > > > > > > > > > > > > > One thing to keep in mind is that many runners might have > > issues > > > > with > > > > > > > huge > > > > > > > > elements. If you have a 5gb file, encoding it as a single > > element > > > > > might > > > > > > > > give you pain or might simply not work on some runners. > > > > > > > > > > > > > > > > Reuven > > > > > > > > > > > > > > > > On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert < > > > > > > > > [email protected]> wrote: > > > > > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > > > > > I'd like to: > > > > > > > > > > > > > > > > > > 1. Read whole files as one input each. (If my input > files > > > are > > > > > > > hi.txt, > > > > > > > > > what.txt, and yes.txt, then the whole contents of hi.txt > > are > > > > an > > > > > > > > element > > > > > > > > > of > > > > > > > > > the returned PCollection, the whole contents of what.txt > > are > > > > the > > > > > > > next > > > > > > > > > element, etc.) > > > > > > > > > 2. Write elements as individual files. (Rather than > > smashing > > > > > > > thousands > > > > > > > > > of outputs into a handful of files as TextIO does: > > > > > > > > > output-00000-of-00005, > > > > > > > > > output-00001-of-00005,..., I want to output each thing > > > > > > individually. > > > > > > > > So > > > > > > > > > if > > > > > > > > > I'm given hi.txt, what.txt, yes.txt then I'd like to > read > > > > those > > > > > in > > > > > > > as > > > > > > > > > whole > > > > > > > > > files individually, then write out my processed results > as > > > > > > > > > hi.txt-modified, > > > > > > > > > what.txt-modified, yes.txt-modified.). > > > > > > > > > > > > > > > > > > Before reading on, if you have easier ways to do these > > things, > > > > then > > > > > > I'd > > > > > > > > > love to hear them! > > > > > > > > > > > > > > > > > > # Part 1 > > > > > > > > > > > > > > > > > > I attempted Part 1 with this PR: > > > > > > > > > https://github.com/apache/beam/pull/3543 > > > > > > > > > > > > > > > > > > But I overgeneralized. > > > > > > > > > > > > > > > > > > Specifically, I want: > > > > > > > > > > > > > > > > > > Pipeline p = Pipeline.create(options); > > > > > > > > > PCollection<Strings> wholeFilesAsStrings = p.apply("Read > > Whole > > > > > Files > > > > > > > from > > > > > > > > > Input Directory", WholeFileIO.read().from("/ > > > > > path/to/input/dir/*")); > > > > > > > > > > > > > > > > > > or > > > > > > > > > > > > > > > > > > Pipeline p = Pipeline.create(options); > > > > > > > > > PCollection<InputStream> wholeFileStreams = p.apply("Read > > Whole > > > > > Files > > > > > > > > from > > > > > > > > > Input Directory", WholeFileIO.read().from("/ > > > > > path/to/input/dir/*")); > > > > > > > > > > > > > > > > > > Bonus points would include: > > > > > > > > > > > > > > > > > > - Keeping the filename somehow > > > > > > > > > > > > > > > > > > > > > > > > > > > # Part 2 > > > > > > > > > > > > > > > > > > Currently, if you output hundreds of thousands of elements > > > > > > (batch-mode) > > > > > > > > > with TextIO in Beam on Flink with, say, 45 TaskManagers, > then > > > you > > > > > get > > > > > > > > > output-00000-of-00045, output-00001-of-00045, etc., and > each > > > one > > > > of > > > > > > > those > > > > > > > > > files contain tens of thousands of outputs back to back. I > > want > > > > > them > > > > > > to > > > > > > > > be > > > > > > > > > output as individual files. > > > > > > > > > > > > > > > > > > If appended after code snippets from Part 1, it would look > > > like: > > > > > > > > > > > > > > > > > > ... > > > > > > > > > p.apply("Write Whole File Outputs", > > > > > > > > > WholeFileIO.write().to("/path/to/output/dir/")); > > > > > > > > > > > > > > > > > > Bonus points would include: > > > > > > > > > > > > > > > > > > - Writing each element of the given PCollection to the > > > > filename > > > > > > > they'd > > > > > > > > > like to go to. > > > > > > > > > - Parallelizable. (This might already be done, I just > > > noticed > > > > > that > > > > > > > my > > > > > > > > > Beam+Flink+YARN pipeline with TextIO.write() only had > one > > > > > > > TaskManager > > > > > > > > > writing the DataSink output even though all other > > components > > > > of > > > > > my > > > > > > > > > pipeline > > > > > > > > > had many TaskManagers working on them simultaneously. I > > > > haven't > > > > > > > found > > > > > > > > > the > > > > > > > > > way to fix that yet. The current arrangement added 15 > > > minutes > > > > to > > > > > > the > > > > > > > > > end of > > > > > > > > > my pipeline as the lonely TaskManager did all the > output.) > > > > > > > > > > > > > > > > > > > > > > > > > > > I'm available to put the dev work into this. (Actually, I'm > > > > putting > > > > > > dev > > > > > > > > > time into some kind of solution whether this is agreed upon > > or > > > > not > > > > > > :). > > > > > > > > > > > > > > > > > > Feedback, please, > > > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > > >
