Hi,
As mentioned on the PR - I support the creation of such an IO (both read
and write) with the caveats that Reuven mentioned; we can refine the naming
during code review.
Note that you won't be able to create a PCollection<InputStream> because
elements of a PCollection must have a coder and it's not possible to
provide a coder for InputStream.

On Tue, Aug 1, 2017 at 1:33 PM Reuven Lax <re...@google.com.invalid> wrote:

> One thing to keep in mind is that many runners might have issues with huge
> elements. If you have a 5gb file, encoding it as a single element might
> give you pain or might simply not work on some runners.
>
> Reuven
>
> On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
> chris.hebert-...@digitalreasoning.com> wrote:
>
> > Hi,
> >
> > I'd like to:
> >
> >    1. Read whole files as one input each. (If my input files are hi.txt,
> >    what.txt, and yes.txt, then the whole contents of hi.txt are an
> element
> > of
> >    the returned PCollection, the whole contents of what.txt are the next
> >    element, etc.)
> >    2. Write elements as individual files. (Rather than smashing thousands
> >    of outputs into a handful of files as TextIO does:
> > output-00000-of-00005,
> >    output-00001-of-00005,..., I want to output each thing individually.
> So
> > if
> >    I'm given hi.txt, what.txt, yes.txt then I'd like to read those in as
> > whole
> >    files individually, then write out my processed results as
> > hi.txt-modified,
> >    what.txt-modified, yes.txt-modified.).
> >
> > Before reading on, if you have easier ways to do these things, then I'd
> > love to hear them!
> >
> > # Part 1
> >
> > I attempted Part 1 with this PR:
> > https://github.com/apache/beam/pull/3543
> >
> > But I overgeneralized.
> >
> > Specifically, I want:
> >
> > Pipeline p = Pipeline.create(options);
> > PCollection<Strings> wholeFilesAsStrings = p.apply("Read Whole Files from
> > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
> >
> > or
> >
> > Pipeline p = Pipeline.create(options);
> > PCollection<InputStream> wholeFileStreams = p.apply("Read Whole Files
> from
> > Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
> >
> > Bonus points would include:
> >
> >    - Keeping the filename somehow
> >
> >
> > # Part 2
> >
> > Currently, if you output hundreds of thousands of elements (batch-mode)
> > with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get
> > output-00000-of-00045, output-00001-of-00045, etc., and each one of those
> > files contain tens of thousands of outputs back to back. I want them to
> be
> > output as individual files.
> >
> > If appended after code snippets from Part 1, it would look like:
> >
> > ...
> > p.apply("Write Whole File Outputs",
> > WholeFileIO.write().to("/path/to/output/dir/"));
> >
> > Bonus points would include:
> >
> >    - Writing each element of the given PCollection to the filename they'd
> >    like to go to.
> >    - Parallelizable. (This might already be done, I just noticed that my
> >    Beam+Flink+YARN pipeline with TextIO.write() only had one TaskManager
> >    writing the DataSink output even though all other components of my
> > pipeline
> >    had many TaskManagers working on them simultaneously. I haven't found
> > the
> >    way to fix that yet. The current arrangement added 15 minutes to the
> > end of
> >    my pipeline as the lonely TaskManager did all the output.)
> >
> >
> > I'm available to put the dev work into this. (Actually, I'm putting dev
> > time into some kind of solution whether this is agreed upon or not :).
> >
> > Feedback, please,
> > Chris
> >
>

Reply via email to