Hi! Thanks for filing the tickets. On Mon, Aug 7, 2017 at 1:44 PM Chris Hebert < [email protected]> wrote:
> Hello again, > > I created tickets to capture these requests: > https://issues.apache.org/jira/browse/BEAM-2750 > https://issues.apache.org/jira/browse/BEAM-2751 > > I've started working on the Write part. > > Robert, after some time working on this, I'm unable to see how these > objectives can be "entirely done in the context of a DoFn". Could you lend > a hint? > > I assume you didn't mean for me to append to my pipeline a ParDo.of(new > DoFn... that manually writes out to some file location, did you? That would > lose all the benefits of the IO/Sink classes. > Which benefits? I think nearly everything that FileBasedSink does is unnecessary for WholeFileIO.write() - I agree with Robert that it can be implemented with a ParDo (I suppose he meant that WholeFileIO.write() can be implemented by expanding it into a single ParDo). > > That said, I've found the "sharding" logic to be deeply embedded in all the > FileBaseSink derivatives, and my attempt at sidestepping this logic isn't > going very well. I managed to write a FileBasedSink that writes Byte[] out > (and that works correctly), but figuring out how to get the FIlenamePolicy > to be different for each element written out seems counter to the intent of > much of these classes. > > Chris > > On Wed, Aug 2, 2017 at 10:23 AM, Reuven Lax <[email protected]> > wrote: > > > On Wed, Aug 2, 2017 at 7:49 AM, Chris Hebert < > > [email protected]> wrote: > > > > > Thanks for the feedback! > > > > > > Aggregated thoughts: > > > > > > 1. Warn users about large files (like 5GB large) > > > > > > > I would set the threshold smaller. Also remember, that while you may > warn, > > some runners might simply fail to process the record causing pipelines to > > either get stuck or fail all together. > > > > > > > 2. Filenames can stick with contents via PCollection<KV<filename, > > > contents>> > > > 3. InputStreams can't be encoded directly, but could be referenced > in > > a > > > FileWrapper object > > > 4. Be mindful of sink race conditions with multiple workers; make > sure > > > failed workers cleanup incompletely written files > > > 5. File systems often do better with few large files than many small > > > ones > > > 6. Most/all of this can be done in the context of a DoFn > > > > > > # Regarding point 1 and point 2 > > > Yes! > > > > > > # Regarding point 3: > > > > > > ## Approach A: > > > When the FileWrapper is encoded, it must somehow encode a reference to > > the > > > InputStream it is associated with, so that when the FileWrapper is > > decoded > > > it can pick up that InputStream again. My Java knowledge isn't deep > > enough > > > to know how one would do that with hashcodes and object lookup and > such, > > > but I could sidestep that entirely by simply encoding the filepath with > > the > > > FileWrapper, then open up a new InputStream on that file path every > time > > > the FileWrapper is decoded. > > > > > > ## Approach B: > > > An alternative to the above technique is to simply pass a byte[] array, > > > like so: > > > PCollection<KV<String, byte[]>> fileNamesAndBytes = p.apply("Read", > > > WholeFileIO.read().from("/path/to/input/dir/*")); > > > > > > That would solve the encoding problem, allow users to get whatever they > > > want out of it with a ByteArrayInputStream, AND put a hard limit on the > > > size of incoming files at just below 2 GB (if my math is right). (This > is > > > large enough for my use case, at present.) > > > > > > > > > # Regarding point 4: > > > > > > Any examples or guidance I could pull from to protect against this > > > properly? > > > > > > > > > # Regarding point 5: > > > > > > TextIO can read and write with different compressions. Would it be > > feasible > > > for this WholeFileIO to read and write these many files to compressed > zip > > > files also? (I envision this as a stretch feature that would be added > > after > > > the initial iteration anyway.) > > > > > > > > > # Regarding point 6: > > > > > > The only prebuilt IO thing I've found find in Beam that uses DoFn is > > > WriteFiles. Do you have any examples to point towards to enlighten me > on > > > the use of DoFn in this context? Unfortunately, we all know the > > "Authoring > > > I/O Transforms" documentation is sparse. > > > > > > > > > Keep it coming, > > > Chris > > > > > > On Tue, Aug 1, 2017 at 3:55 PM, Robert Bradshaw > > > <[email protected] > > > > wrote: > > > > > > > On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov < > > > > [email protected]> wrote: > > > > > > > > > Hi, > > > > > As mentioned on the PR - I support the creation of such an IO (both > > > read > > > > > and write) with the caveats that Reuven mentioned; we can refine > the > > > > naming > > > > > during code review. > > > > > Note that you won't be able to create a PCollection<InputStream> > > > because > > > > > elements of a PCollection must have a coder and it's not possible > to > > > > > provide a coder for InputStream. > > > > > > > > > > > > Well, it's possible, but the fact that InputStream is mutable may > cause > > > > issues (e.g. if there's fusion, or when estimating its size). > > > > > > > > I would probably let the API consume/produce a > PCollection<KV<filename, > > > > contents>>. Alternatively, a FileWrapper object of some kind could > > > provide > > > > accessors to InputStream (or otherwise facilitate lazy reading). > > > > > > > > Note for the sink one must take care there's no race in case multiple > > > > workers are attempting to process the same bundle (and ideally > cleanup > > in > > > > the face of failure). Other than that, these could be entirely done > in > > > the > > > > context of a DoFn. > > > > > > > > Also not that most filesystems, especially distributed ones, do > better > > > > reading and writing fewer larger files than many, many small ones. > > > > > > > > > > > > > On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax <[email protected] > > > > > > > wrote: > > > > > > > > > > > One thing to keep in mind is that many runners might have issues > > with > > > > > huge > > > > > > elements. If you have a 5gb file, encoding it as a single element > > > might > > > > > > give you pain or might simply not work on some runners. > > > > > > > > > > > > Reuven > > > > > > > > > > > > On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert < > > > > > > [email protected]> wrote: > > > > > > > > > > > > > Hi, > > > > > > > > > > > > > > I'd like to: > > > > > > > > > > > > > > 1. Read whole files as one input each. (If my input files > are > > > > > hi.txt, > > > > > > > what.txt, and yes.txt, then the whole contents of hi.txt are > > an > > > > > > element > > > > > > > of > > > > > > > the returned PCollection, the whole contents of what.txt are > > the > > > > > next > > > > > > > element, etc.) > > > > > > > 2. Write elements as individual files. (Rather than smashing > > > > > thousands > > > > > > > of outputs into a handful of files as TextIO does: > > > > > > > output-00000-of-00005, > > > > > > > output-00001-of-00005,..., I want to output each thing > > > > individually. > > > > > > So > > > > > > > if > > > > > > > I'm given hi.txt, what.txt, yes.txt then I'd like to read > > those > > > in > > > > > as > > > > > > > whole > > > > > > > files individually, then write out my processed results as > > > > > > > hi.txt-modified, > > > > > > > what.txt-modified, yes.txt-modified.). > > > > > > > > > > > > > > Before reading on, if you have easier ways to do these things, > > then > > > > I'd > > > > > > > love to hear them! > > > > > > > > > > > > > > # Part 1 > > > > > > > > > > > > > > I attempted Part 1 with this PR: > > > > > > > https://github.com/apache/beam/pull/3543 > > > > > > > > > > > > > > But I overgeneralized. > > > > > > > > > > > > > > Specifically, I want: > > > > > > > > > > > > > > Pipeline p = Pipeline.create(options); > > > > > > > PCollection<Strings> wholeFilesAsStrings = p.apply("Read Whole > > > Files > > > > > from > > > > > > > Input Directory", WholeFileIO.read().from("/ > > > path/to/input/dir/*")); > > > > > > > > > > > > > > or > > > > > > > > > > > > > > Pipeline p = Pipeline.create(options); > > > > > > > PCollection<InputStream> wholeFileStreams = p.apply("Read Whole > > > Files > > > > > > from > > > > > > > Input Directory", WholeFileIO.read().from("/ > > > path/to/input/dir/*")); > > > > > > > > > > > > > > Bonus points would include: > > > > > > > > > > > > > > - Keeping the filename somehow > > > > > > > > > > > > > > > > > > > > > # Part 2 > > > > > > > > > > > > > > Currently, if you output hundreds of thousands of elements > > > > (batch-mode) > > > > > > > with TextIO in Beam on Flink with, say, 45 TaskManagers, then > you > > > get > > > > > > > output-00000-of-00045, output-00001-of-00045, etc., and each > one > > of > > > > > those > > > > > > > files contain tens of thousands of outputs back to back. I want > > > them > > > > to > > > > > > be > > > > > > > output as individual files. > > > > > > > > > > > > > > If appended after code snippets from Part 1, it would look > like: > > > > > > > > > > > > > > ... > > > > > > > p.apply("Write Whole File Outputs", > > > > > > > WholeFileIO.write().to("/path/to/output/dir/")); > > > > > > > > > > > > > > Bonus points would include: > > > > > > > > > > > > > > - Writing each element of the given PCollection to the > > filename > > > > > they'd > > > > > > > like to go to. > > > > > > > - Parallelizable. (This might already be done, I just > noticed > > > that > > > > > my > > > > > > > Beam+Flink+YARN pipeline with TextIO.write() only had one > > > > > TaskManager > > > > > > > writing the DataSink output even though all other components > > of > > > my > > > > > > > pipeline > > > > > > > had many TaskManagers working on them simultaneously. I > > haven't > > > > > found > > > > > > > the > > > > > > > way to fix that yet. The current arrangement added 15 > minutes > > to > > > > the > > > > > > > end of > > > > > > > my pipeline as the lonely TaskManager did all the output.) > > > > > > > > > > > > > > > > > > > > > I'm available to put the dev work into this. (Actually, I'm > > putting > > > > dev > > > > > > > time into some kind of solution whether this is agreed upon or > > not > > > > :). > > > > > > > > > > > > > > Feedback, please, > > > > > > > Chris > > > > > > > > > > > > > > > > > > > > > > > > > > > >
