Hi! Thanks for filing the tickets.

On Mon, Aug 7, 2017 at 1:44 PM Chris Hebert <
[email protected]> wrote:

> Hello again,
>
> I created tickets to capture these requests:
> https://issues.apache.org/jira/browse/BEAM-2750
> https://issues.apache.org/jira/browse/BEAM-2751
>
> I've started working on the Write part.
>
> Robert, after some time working on this, I'm unable to see how these
> objectives can be "entirely done in the context of a DoFn". Could you lend
> a hint?
>
> I assume you didn't mean for me to append to my pipeline a ParDo.of(new
> DoFn... that manually writes out to some file location, did you? That would
> lose all the benefits of the IO/Sink classes.
>
Which benefits? I think nearly everything that FileBasedSink does is
unnecessary for WholeFileIO.write() - I agree with Robert that it can be
implemented with a ParDo (I suppose he meant that WholeFileIO.write() can
be implemented by expanding it into a single ParDo).


>
> That said, I've found the "sharding" logic to be deeply embedded in all the
> FileBaseSink derivatives, and my attempt at sidestepping this logic isn't
> going very well. I managed to write a FileBasedSink that writes Byte[] out
> (and that works correctly), but figuring out how to get the FIlenamePolicy
> to be different for each element written out seems counter to the intent of
> much of these classes.
>
> Chris
>
> On Wed, Aug 2, 2017 at 10:23 AM, Reuven Lax <[email protected]>
> wrote:
>
> > On Wed, Aug 2, 2017 at 7:49 AM, Chris Hebert <
> > [email protected]> wrote:
> >
> > > Thanks for the feedback!
> > >
> > > Aggregated thoughts:
> > >
> > >    1. Warn users about large files (like 5GB large)
> > >
> >
> > I would set the threshold smaller. Also remember, that while you may
> warn,
> > some runners might simply fail to process the record causing pipelines to
> > either get stuck or fail all together.
> >
> >
> > >    2. Filenames can stick with contents via PCollection<KV<filename,
> > >    contents>>
> > >    3. InputStreams can't be encoded directly, but could be referenced
> in
> > a
> > >    FileWrapper object
> > >    4. Be mindful of sink race conditions with multiple workers; make
> sure
> > >    failed workers cleanup incompletely written files
> > >    5. File systems often do better with few large files than many small
> > > ones
> > >    6. Most/all of this can be done in the context of a DoFn
> > >
> > > # Regarding point 1 and point 2
> > > Yes!
> > >
> > > # Regarding point 3:
> > >
> > > ## Approach A:
> > > When the FileWrapper is encoded, it must somehow encode a reference to
> > the
> > > InputStream it is associated with, so that when the FileWrapper is
> > decoded
> > > it can pick up that InputStream again. My Java knowledge isn't deep
> > enough
> > > to know how one would do that with hashcodes and object lookup and
> such,
> > > but I could sidestep that entirely by simply encoding the filepath with
> > the
> > > FileWrapper, then open up a new InputStream on that file path every
> time
> > > the FileWrapper is decoded.
> > >
> > > ## Approach B:
> > > An alternative to the above technique is to simply pass a byte[] array,
> > > like so:
> > > PCollection<KV<String, byte[]>> fileNamesAndBytes = p.apply("Read",
> > > WholeFileIO.read().from("/path/to/input/dir/*"));
> > >
> > > That would solve the encoding problem, allow users to get whatever they
> > > want out of it with a ByteArrayInputStream, AND put a hard limit on the
> > > size of incoming files at just below 2 GB (if my math is right). (This
> is
> > > large enough for my use case, at present.)
> > >
> > >
> > > # Regarding point 4:
> > >
> > > Any examples or guidance I could pull from to protect against this
> > > properly?
> > >
> > >
> > > # Regarding point 5:
> > >
> > > TextIO can read and write with different compressions. Would it be
> > feasible
> > > for this WholeFileIO to read and write these many files to compressed
> zip
> > > files also? (I envision this as a stretch feature that would be added
> > after
> > > the initial iteration anyway.)
> > >
> > >
> > > # Regarding point 6:
> > >
> > > The only prebuilt IO thing I've found find in Beam that uses DoFn is
> > > WriteFiles. Do you have any examples to point towards to enlighten me
> on
> > > the use of DoFn in this context? Unfortunately, we all know the
> > "Authoring
> > > I/O Transforms" documentation is sparse.
> > >
> > >
> > > Keep it coming,
> > > Chris
> > >
> > > On Tue, Aug 1, 2017 at 3:55 PM, Robert Bradshaw
> > > <[email protected]
> > > > wrote:
> > >
> > > > On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov <
> > > > [email protected]> wrote:
> > > >
> > > > > Hi,
> > > > > As mentioned on the PR - I support the creation of such an IO (both
> > > read
> > > > > and write) with the caveats that Reuven mentioned; we can refine
> the
> > > > naming
> > > > > during code review.
> > > > > Note that you won't be able to create a PCollection<InputStream>
> > > because
> > > > > elements of a PCollection must have a coder and it's not possible
> to
> > > > > provide a coder for InputStream.
> > > >
> > > >
> > > > Well, it's possible, but the fact that InputStream is mutable may
> cause
> > > > issues (e.g. if there's fusion, or when estimating its size).
> > > >
> > > > I would probably let the API consume/produce a
> PCollection<KV<filename,
> > > > contents>>. Alternatively, a FileWrapper object of some kind could
> > > provide
> > > > accessors to InputStream (or otherwise facilitate lazy reading).
> > > >
> > > > Note for the sink one must take care there's no race in case multiple
> > > > workers are attempting to process the same bundle (and ideally
> cleanup
> > in
> > > > the face of failure). Other than that, these could be entirely done
> in
> > > the
> > > > context of a DoFn.
> > > >
> > > > Also not that most filesystems, especially distributed ones, do
> better
> > > > reading and writing fewer larger files than many, many small ones.
> > > >
> > > >
> > > > > On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax <[email protected]
> >
> > > > > wrote:
> > > > >
> > > > > > One thing to keep in mind is that many runners might have issues
> > with
> > > > > huge
> > > > > > elements. If you have a 5gb file, encoding it as a single element
> > > might
> > > > > > give you pain or might simply not work on some runners.
> > > > > >
> > > > > > Reuven
> > > > > >
> > > > > > On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
> > > > > > [email protected]> wrote:
> > > > > >
> > > > > > > Hi,
> > > > > > >
> > > > > > > I'd like to:
> > > > > > >
> > > > > > >    1. Read whole files as one input each. (If my input files
> are
> > > > > hi.txt,
> > > > > > >    what.txt, and yes.txt, then the whole contents of hi.txt are
> > an
> > > > > > element
> > > > > > > of
> > > > > > >    the returned PCollection, the whole contents of what.txt are
> > the
> > > > > next
> > > > > > >    element, etc.)
> > > > > > >    2. Write elements as individual files. (Rather than smashing
> > > > > thousands
> > > > > > >    of outputs into a handful of files as TextIO does:
> > > > > > > output-00000-of-00005,
> > > > > > >    output-00001-of-00005,..., I want to output each thing
> > > > individually.
> > > > > > So
> > > > > > > if
> > > > > > >    I'm given hi.txt, what.txt, yes.txt then I'd like to read
> > those
> > > in
> > > > > as
> > > > > > > whole
> > > > > > >    files individually, then write out my processed results as
> > > > > > > hi.txt-modified,
> > > > > > >    what.txt-modified, yes.txt-modified.).
> > > > > > >
> > > > > > > Before reading on, if you have easier ways to do these things,
> > then
> > > > I'd
> > > > > > > love to hear them!
> > > > > > >
> > > > > > > # Part 1
> > > > > > >
> > > > > > > I attempted Part 1 with this PR:
> > > > > > > https://github.com/apache/beam/pull/3543
> > > > > > >
> > > > > > > But I overgeneralized.
> > > > > > >
> > > > > > > Specifically, I want:
> > > > > > >
> > > > > > > Pipeline p = Pipeline.create(options);
> > > > > > > PCollection<Strings> wholeFilesAsStrings = p.apply("Read Whole
> > > Files
> > > > > from
> > > > > > > Input Directory", WholeFileIO.read().from("/
> > > path/to/input/dir/*"));
> > > > > > >
> > > > > > > or
> > > > > > >
> > > > > > > Pipeline p = Pipeline.create(options);
> > > > > > > PCollection<InputStream> wholeFileStreams = p.apply("Read Whole
> > > Files
> > > > > > from
> > > > > > > Input Directory", WholeFileIO.read().from("/
> > > path/to/input/dir/*"));
> > > > > > >
> > > > > > > Bonus points would include:
> > > > > > >
> > > > > > >    - Keeping the filename somehow
> > > > > > >
> > > > > > >
> > > > > > > # Part 2
> > > > > > >
> > > > > > > Currently, if you output hundreds of thousands of elements
> > > > (batch-mode)
> > > > > > > with TextIO in Beam on Flink with, say, 45 TaskManagers, then
> you
> > > get
> > > > > > > output-00000-of-00045, output-00001-of-00045, etc., and each
> one
> > of
> > > > > those
> > > > > > > files contain tens of thousands of outputs back to back. I want
> > > them
> > > > to
> > > > > > be
> > > > > > > output as individual files.
> > > > > > >
> > > > > > > If appended after code snippets from Part 1, it would look
> like:
> > > > > > >
> > > > > > > ...
> > > > > > > p.apply("Write Whole File Outputs",
> > > > > > > WholeFileIO.write().to("/path/to/output/dir/"));
> > > > > > >
> > > > > > > Bonus points would include:
> > > > > > >
> > > > > > >    - Writing each element of the given PCollection to the
> > filename
> > > > > they'd
> > > > > > >    like to go to.
> > > > > > >    - Parallelizable. (This might already be done, I just
> noticed
> > > that
> > > > > my
> > > > > > >    Beam+Flink+YARN pipeline with TextIO.write() only had one
> > > > > TaskManager
> > > > > > >    writing the DataSink output even though all other components
> > of
> > > my
> > > > > > > pipeline
> > > > > > >    had many TaskManagers working on them simultaneously. I
> > haven't
> > > > > found
> > > > > > > the
> > > > > > >    way to fix that yet. The current arrangement added 15
> minutes
> > to
> > > > the
> > > > > > > end of
> > > > > > >    my pipeline as the lonely TaskManager did all the output.)
> > > > > > >
> > > > > > >
> > > > > > > I'm available to put the dev work into this. (Actually, I'm
> > putting
> > > > dev
> > > > > > > time into some kind of solution whether this is agreed upon or
> > not
> > > > :).
> > > > > > >
> > > > > > > Feedback, please,
> > > > > > > Chris
> > > > > > >
> > > > > >
> > > > >
> > > >
> > >
> >
>

Reply via email to