One thing to keep in mind is that many runners might have issues with huge
elements. If you have a 5gb file, encoding it as a single element might
give you pain or might simply not work on some runners.

Reuven

On Tue, Aug 1, 2017 at 1:22 PM, Chris Hebert <
[email protected]> wrote:

> Hi,
>
> I'd like to:
>
>    1. Read whole files as one input each. (If my input files are hi.txt,
>    what.txt, and yes.txt, then the whole contents of hi.txt are an element
> of
>    the returned PCollection, the whole contents of what.txt are the next
>    element, etc.)
>    2. Write elements as individual files. (Rather than smashing thousands
>    of outputs into a handful of files as TextIO does:
> output-00000-of-00005,
>    output-00001-of-00005,..., I want to output each thing individually. So
> if
>    I'm given hi.txt, what.txt, yes.txt then I'd like to read those in as
> whole
>    files individually, then write out my processed results as
> hi.txt-modified,
>    what.txt-modified, yes.txt-modified.).
>
> Before reading on, if you have easier ways to do these things, then I'd
> love to hear them!
>
> # Part 1
>
> I attempted Part 1 with this PR:
> https://github.com/apache/beam/pull/3543
>
> But I overgeneralized.
>
> Specifically, I want:
>
> Pipeline p = Pipeline.create(options);
> PCollection<Strings> wholeFilesAsStrings = p.apply("Read Whole Files from
> Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
>
> or
>
> Pipeline p = Pipeline.create(options);
> PCollection<InputStream> wholeFileStreams = p.apply("Read Whole Files from
> Input Directory", WholeFileIO.read().from("/path/to/input/dir/*"));
>
> Bonus points would include:
>
>    - Keeping the filename somehow
>
>
> # Part 2
>
> Currently, if you output hundreds of thousands of elements (batch-mode)
> with TextIO in Beam on Flink with, say, 45 TaskManagers, then you get
> output-00000-of-00045, output-00001-of-00045, etc., and each one of those
> files contain tens of thousands of outputs back to back. I want them to be
> output as individual files.
>
> If appended after code snippets from Part 1, it would look like:
>
> ...
> p.apply("Write Whole File Outputs",
> WholeFileIO.write().to("/path/to/output/dir/"));
>
> Bonus points would include:
>
>    - Writing each element of the given PCollection to the filename they'd
>    like to go to.
>    - Parallelizable. (This might already be done, I just noticed that my
>    Beam+Flink+YARN pipeline with TextIO.write() only had one TaskManager
>    writing the DataSink output even though all other components of my
> pipeline
>    had many TaskManagers working on them simultaneously. I haven't found
> the
>    way to fix that yet. The current arrangement added 15 minutes to the
> end of
>    my pipeline as the lonely TaskManager did all the output.)
>
>
> I'm available to put the dev work into this. (Actually, I'm putting dev
> time into some kind of solution whether this is agreed upon or not :).
>
> Feedback, please,
> Chris
>

Reply via email to