I opened a pull request with my initial implementation of WholeFileIO:
https://github.com/apache/beam/pull/3717

It has a few TODO items left, but feel free to review it and comment on my
approach.

On Tue, Aug 8, 2017 at 4:37 PM, Chamikara Jayalath <[email protected]>
wrote:

> I also agree that it'll be  cleaner to write your elements to files using a
> ParDo instead of trying to use FileBasedSink. FileBasedSink is specifically
> designed for writing bundles of elements to single files of a given
> file-type. Same goes for FileBasedSource when reading. You can use the
> FileSystem abstraction to abstract out reading from/writing to different
> file-systems.
>
> BTW I appreciate if you can create Python SDK versions of these JIRAs as
> well and update those JIRAs once the Java SDK versions are finalized. Even
> better if you can implement Python SDK versions as well :).
>
> Thanks,
> Cham
>
> On Mon, Aug 7, 2017 at 3:14 PM Eugene Kirpichov
> <[email protected]> wrote:
>
> > Hi! Thanks for filing the tickets.
> >
> > On Mon, Aug 7, 2017 at 1:44 PM Chris Hebert <
> > [email protected]> wrote:
> >
> > > Hello again,
> > >
> > > I created tickets to capture these requests:
> > > https://issues.apache.org/jira/browse/BEAM-2750
> > > https://issues.apache.org/jira/browse/BEAM-2751
> > >
> > > I've started working on the Write part.
> > >
> > > Robert, after some time working on this, I'm unable to see how these
> > > objectives can be "entirely done in the context of a DoFn". Could you
> > lend
> > > a hint?
> > >
> > > I assume you didn't mean for me to append to my pipeline a ParDo.of(new
> > > DoFn... that manually writes out to some file location, did you? That
> > would
> > > lose all the benefits of the IO/Sink classes.
> > >
> > Which benefits? I think nearly everything that FileBasedSink does is
> > unnecessary for WholeFileIO.write() - I agree with Robert that it can be
> > implemented with a ParDo (I suppose he meant that WholeFileIO.write() can
> > be implemented by expanding it into a single ParDo).
> >
> >
> > >
> > > That said, I've found the "sharding" logic to be deeply embedded in all
> > the
> > > FileBaseSink derivatives, and my attempt at sidestepping this logic
> isn't
> > > going very well. I managed to write a FileBasedSink that writes Byte[]
> > out
> > > (and that works correctly), but figuring out how to get the
> > FIlenamePolicy
> > > to be different for each element written out seems counter to the
> intent
> > of
> > > much of these classes.
> > >
> > > Chris
> > >
> > > On Wed, Aug 2, 2017 at 10:23 AM, Reuven Lax <[email protected]>
> > > wrote:
> > >
> > > > On Wed, Aug 2, 2017 at 7:49 AM, Chris Hebert <
> > > > [email protected]> wrote:
> > > >
> > > > > Thanks for the feedback!
> > > > >
> > > > > Aggregated thoughts:
> > > > >
> > > > >    1. Warn users about large files (like 5GB large)
> > > > >
> > > >
> > > > I would set the threshold smaller. Also remember, that while you may
> > > warn,
> > > > some runners might simply fail to process the record causing
> pipelines
> > to
> > > > either get stuck or fail all together.
> > > >
> > > >
> > > > >    2. Filenames can stick with contents via
> PCollection<KV<filename,
> > > > >    contents>>
> > > > >    3. InputStreams can't be encoded directly, but could be
> referenced
> > > in
> > > > a
> > > > >    FileWrapper object
> > > > >    4. Be mindful of sink race conditions with multiple workers;
> make
> > > sure
> > > > >    failed workers cleanup incompletely written files
> > > > >    5. File systems often do better with few large files than many
> > small
> > > > > ones
> > > > >    6. Most/all of this can be done in the context of a DoFn
> > > > >
> > > > > # Regarding point 1 and point 2
> > > > > Yes!
> > > > >
> > > > > # Regarding point 3:
> > > > >
> > > > > ## Approach A:
> > > > > When the FileWrapper is encoded, it must somehow encode a reference
> > to
> > > > the
> > > > > InputStream it is associated with, so that when the FileWrapper is
> > > > decoded
> > > > > it can pick up that InputStream again. My Java knowledge isn't deep
> > > > enough
> > > > > to know how one would do that with hashcodes and object lookup and
> > > such,
> > > > > but I could sidestep that entirely by simply encoding the filepath
> > with
> > > > the
> > > > > FileWrapper, then open up a new InputStream on that file path every
> > > time
> > > > > the FileWrapper is decoded.
> > > > >
> > > > > ## Approach B:
> > > > > An alternative to the above technique is to simply pass a byte[]
> > array,
> > > > > like so:
> > > > > PCollection<KV<String, byte[]>> fileNamesAndBytes = p.apply("Read",
> > > > > WholeFileIO.read().from("/path/to/input/dir/*"));
> > > > >
> > > > > That would solve the encoding problem, allow users to get whatever
> > they
> > > > > want out of it with a ByteArrayInputStream, AND put a hard limit on
> > the
> > > > > size of incoming files at just below 2 GB (if my math is right).
> > (This
> > > is
> > > > > large enough for my use case, at present.)
> > > > >
> > > > >
> > > > > # Regarding point 4:
> > > > >
> > > > > Any examples or guidance I could pull from to protect against this
> > > > > properly?
> > > > >
> > > > >
> > > > > # Regarding point 5:
> > > > >
> > > > > TextIO can read and write with different compressions. Would it be
> > > > feasible
> > > > > for this WholeFileIO to read and write these many files to
> compressed
> > > zip
> > > > > files also? (I envision this as a stretch feature that would be
> added
> > > > after
> > > > > the initial iteration anyway.)
> > > > >
> > > > >
> > > > > # Regarding point 6:
> > > > >
> > > > > The only prebuilt IO thing I've found find in Beam that uses DoFn
> is
> > > > > WriteFiles. Do you have any examples to point towards to enlighten
> me
> > > on
> > > > > the use of DoFn in this context? Unfortunately, we all know the
> > > > "Authoring
> > > > > I/O Transforms" documentation is sparse.
> > > > >
> > > > >
> > > > > Keep it coming,
> > > > > Chris
> > > > >
> > > > > On Tue, Aug 1, 2017 at 3:55 PM, Robert Bradshaw
> > > > > <[email protected]
> > > > > > wrote:
> > > > >
> > > > > > On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov <
> > > > > > [email protected]> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > > As mentioned on the PR - I support the creation of such an IO
> > (both
> > > > > read
> > > > > > > and write) with the caveats that Reuven mentioned; we can
> refine
> > > the
> > > > > > naming
> > > > > > > during code review.
> > > > > > > Note that you won't be able to create a
> PCollection<InputStream>
> > > > > because
> > > > > > > elements of a PCollection must have a coder and it's not
> possible
> > > to
> > > > > > > provide a coder for InputStream.
> > > > > >
> > > > > >
> > > > > > Well, it's possible, but the fact that InputStream is mutable may
> > > cause
> > > > > > issues (e.g. if there's fusion, or when estimating its size).
> > > > > >
> > > > > > I would probably let the API consume/produce a
> > > PCollection<KV<filename,
> > > > > > contents>>. Alternatively, a FileWrapper object of some kind
> could
> > > > > provide
> > > > > > accessors to InputStream (or otherwise facilitate lazy reading).
> > > > > >
> > > > > > Note for the sink one must take care there's no race in case
> > multiple
> > > > > > workers are attempting to process the same bundle (and ideally
> > > cleanup
> > > > in
> > > > > > the face of failure). Other than that, these could be entirely
> done
> > > in
> > > > > the
> > > > > > context of a DoFn.
> > > > > >
> > > > > > Also not that most filesystems, especially distributed ones, do
> > > better
> > > > > > reading and writing fewer larger files than many, many small
> ones.
> > > > > >
> > > > > >
> > > > > > > On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax
> > <[email protected]
> > > >
> > > > > > > wrote:
> > > > > > >
> > > > > > > > One thing to keep in mind is that many runners might have
> > issues
> > > > with
> > > > > > > huge
> > > > > > > > elements. If you have a 5gb file, encoding it as a single
> > element
> > > > > might
> > > > > > > > give you pain or might simply not work on some runners.
> > > > > > > >
> > > > > > > > Reuven
> > > > > > > >
> > > > > > > > On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
> > > > > > > > [email protected]> wrote:
> > > > > > > >
> > > > > > > > > Hi,
> > > > > > > > >
> > > > > > > > > I'd like to:
> > > > > > > > >
> > > > > > > > >    1. Read whole files as one input each. (If my input
> files
> > > are
> > > > > > > hi.txt,
> > > > > > > > >    what.txt, and yes.txt, then the whole contents of hi.txt
> > are
> > > > an
> > > > > > > > element
> > > > > > > > > of
> > > > > > > > >    the returned PCollection, the whole contents of what.txt
> > are
> > > > the
> > > > > > > next
> > > > > > > > >    element, etc.)
> > > > > > > > >    2. Write elements as individual files. (Rather than
> > smashing
> > > > > > > thousands
> > > > > > > > >    of outputs into a handful of files as TextIO does:
> > > > > > > > > output-00000-of-00005,
> > > > > > > > >    output-00001-of-00005,..., I want to output each thing
> > > > > > individually.
> > > > > > > > So
> > > > > > > > > if
> > > > > > > > >    I'm given hi.txt, what.txt, yes.txt then I'd like to
> read
> > > > those
> > > > > in
> > > > > > > as
> > > > > > > > > whole
> > > > > > > > >    files individually, then write out my processed results
> as
> > > > > > > > > hi.txt-modified,
> > > > > > > > >    what.txt-modified, yes.txt-modified.).
> > > > > > > > >
> > > > > > > > > Before reading on, if you have easier ways to do these
> > things,
> > > > then
> > > > > > I'd
> > > > > > > > > love to hear them!
> > > > > > > > >
> > > > > > > > > # Part 1
> > > > > > > > >
> > > > > > > > > I attempted Part 1 with this PR:
> > > > > > > > > https://github.com/apache/beam/pull/3543
> > > > > > > > >
> > > > > > > > > But I overgeneralized.
> > > > > > > > >
> > > > > > > > > Specifically, I want:
> > > > > > > > >
> > > > > > > > > Pipeline p = Pipeline.create(options);
> > > > > > > > > PCollection<Strings> wholeFilesAsStrings = p.apply("Read
> > Whole
> > > > > Files
> > > > > > > from
> > > > > > > > > Input Directory", WholeFileIO.read().from("/
> > > > > path/to/input/dir/*"));
> > > > > > > > >
> > > > > > > > > or
> > > > > > > > >
> > > > > > > > > Pipeline p = Pipeline.create(options);
> > > > > > > > > PCollection<InputStream> wholeFileStreams = p.apply("Read
> > Whole
> > > > > Files
> > > > > > > > from
> > > > > > > > > Input Directory", WholeFileIO.read().from("/
> > > > > path/to/input/dir/*"));
> > > > > > > > >
> > > > > > > > > Bonus points would include:
> > > > > > > > >
> > > > > > > > >    - Keeping the filename somehow
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > # Part 2
> > > > > > > > >
> > > > > > > > > Currently, if you output hundreds of thousands of elements
> > > > > > (batch-mode)
> > > > > > > > > with TextIO in Beam on Flink with, say, 45 TaskManagers,
> then
> > > you
> > > > > get
> > > > > > > > > output-00000-of-00045, output-00001-of-00045, etc., and
> each
> > > one
> > > > of
> > > > > > > those
> > > > > > > > > files contain tens of thousands of outputs back to back. I
> > want
> > > > > them
> > > > > > to
> > > > > > > > be
> > > > > > > > > output as individual files.
> > > > > > > > >
> > > > > > > > > If appended after code snippets from Part 1, it would look
> > > like:
> > > > > > > > >
> > > > > > > > > ...
> > > > > > > > > p.apply("Write Whole File Outputs",
> > > > > > > > > WholeFileIO.write().to("/path/to/output/dir/"));
> > > > > > > > >
> > > > > > > > > Bonus points would include:
> > > > > > > > >
> > > > > > > > >    - Writing each element of the given PCollection to the
> > > > filename
> > > > > > > they'd
> > > > > > > > >    like to go to.
> > > > > > > > >    - Parallelizable. (This might already be done, I just
> > > noticed
> > > > > that
> > > > > > > my
> > > > > > > > >    Beam+Flink+YARN pipeline with TextIO.write() only had
> one
> > > > > > > TaskManager
> > > > > > > > >    writing the DataSink output even though all other
> > components
> > > > of
> > > > > my
> > > > > > > > > pipeline
> > > > > > > > >    had many TaskManagers working on them simultaneously. I
> > > > haven't
> > > > > > > found
> > > > > > > > > the
> > > > > > > > >    way to fix that yet. The current arrangement added 15
> > > minutes
> > > > to
> > > > > > the
> > > > > > > > > end of
> > > > > > > > >    my pipeline as the lonely TaskManager did all the
> output.)
> > > > > > > > >
> > > > > > > > >
> > > > > > > > > I'm available to put the dev work into this. (Actually, I'm
> > > > putting
> > > > > > dev
> > > > > > > > > time into some kind of solution whether this is agreed upon
> > or
> > > > not
> > > > > > :).
> > > > > > > > >
> > > > > > > > > Feedback, please,
> > > > > > > > > Chris
> > > > > > > > >
> > > > > > > >
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to