On Tue, Aug 1, 2017 at 1:42 PM, Eugene Kirpichov <
[email protected]> wrote:

> Hi,
> As mentioned on the PR - I support the creation of such an IO (both read
> and write) with the caveats that Reuven mentioned; we can refine the naming
> during code review.
> Note that you won't be able to create a PCollection<InputStream> because
> elements of a PCollection must have a coder and it's not possible to
> provide a coder for InputStream.


Well, it's possible, but the fact that InputStream is mutable may cause
issues (e.g. if there's fusion, or when estimating its size).

I would probably let the API consume/produce a PCollection<KV<filename,
contents>>. Alternatively, a FileWrapper object of some kind could provide
accessors to InputStream (or otherwise facilitate lazy reading).

Note for the sink one must take care there's no race in case multiple
workers are attempting to process the same bundle (and ideally cleanup in
the face of failure). Other than that, these could be entirely done in the
context of a DoFn.

Also not that most filesystems, especially distributed ones, do better
reading and writing fewer larger files than many, many small ones.


> On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax <[email protected]>
> wrote:
>
> > One thing to keep in mind is that many runners might have issues with
> huge
> > elements. If you have a 5gb file, encoding it as a single element might
> > give you pain or might simply not work on some runners.
> >
> > Reuven
> >
> > On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
> > [email protected]> wrote:
> >
> > > Hi,
> > >
> > > I'd like to:
> > >
> > >    1. Read whole files as one input each. (If my input files are
> hi.txt,
> > >    what.txt, and yes.txt, then the whole contents of hi.txt are an
> > element
> > > of
> > >    the returned PCollection, the whole contents of what.txt are the
> next
> > >    element, etc.)
> > >    2. Write elements as individual files. (Rather than smashing
> thousands
> > >    of outputs into a handful of files as TextIO does:
> > > output-00000-of-00005,
> > >    output-00001-of-00005,..., I want to output each thing individually.
> > So
> > > if
> > >    I'm given hi.txt, what.txt, yes.txt then I'd like to read those in
> as
> > > whole
> > >    files individually, then write out my processed results as
> > > hi.txt-modified,
> > >    what.txt-modified, yes.txt-modified.).
> > >
> > > Before reading on, if you have easier ways to do these things, then I'd
> > > love to hear them!
> > >
> > > # Part 1
> > >
> > > I attempted Part 1 with this PR:
> > > https://github.com/apache/beam/pull/3543
> > >
> > > But I overgeneralized.
> > >
> > > Specifically, I want:
> > >
> > > Pipeline p = Pipeline.create(options);
> > > PCollection<Strings> wholeFilesAsStrings = p.apply("Read Whole Files
> from
> > > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
> > >
> > > or
> > >
> > > Pipeline p = Pipeline.create(options);
> > > PCollection<InputStream> wholeFileStreams = p.apply("Read Whole Files
> > from
> > > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
> > >
> > > Bonus points would include:
> > >
> > >    - Keeping the filename somehow
> > >
> > >
> > > # Part 2
> > >
> > > Currently, if you output hundreds of thousands of elements (batch-mode)
> > > with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get
> > > output-00000-of-00045, output-00001-of-00045, etc., and each one of
> those
> > > files contain tens of thousands of outputs back to back. I want them to
> > be
> > > output as individual files.
> > >
> > > If appended after code snippets from Part 1, it would look like:
> > >
> > > ...
> > > p.apply("Write Whole File Outputs",
> > > WholeFileIO.write().to("/path/to/output/dir/"));
> > >
> > > Bonus points would include:
> > >
> > >    - Writing each element of the given PCollection to the filename
> they'd
> > >    like to go to.
> > >    - Parallelizable. (This might already be done, I just noticed that
> my
> > >    Beam+Flink+YARN pipeline with TextIO.write() only had one
> TaskManager
> > >    writing the DataSink output even though all other components of my
> > > pipeline
> > >    had many TaskManagers working on them simultaneously. I haven't
> found
> > > the
> > >    way to fix that yet. The current arrangement added 15 minutes to the
> > > end of
> > >    my pipeline as the lonely TaskManager did all the output.)
> > >
> > >
> > > I'm available to put the dev work into this. (Actually, I'm putting dev
> > > time into some kind of solution whether this is agreed upon or not :).
> > >
> > > Feedback, please,
> > > Chris
> > >
> >
>

Reply via email to