GitHub user cphbrt opened a pull request:

    https://github.com/apache/beam/pull/3717

    [BEAM-2750][BEAM-2751] Implement WholeFileIO

    Follow this checklist to help us incorporate your contribution quickly and 
easily:
    
     - [x] Make sure there is a [JIRA 
issue](https://issues.apache.org/jira/projects/BEAM/issues/) filed for the 
change (usually before you start working on it).  Trivial changes like typos do 
not require a JIRA issue.  Your pull request should address just this issue, 
without pulling in other changes.
     - [x] Each commit in the pull request should have a meaningful subject 
line and body.
     - [x] Format the pull request title like `[BEAM-XXX] Fixes bug in 
ApproximateQuantiles`, where you replace `BEAM-XXX` with the appropriate JIRA 
issue.
     - [x] Write a pull request description that is detailed enough to 
understand what the pull request does, how, and why.
     - [ ] Run `mvn clean verify` to make sure basic checks pass. A more 
thorough check will be performed on your pull request automatically.
     - [ ] If this contribution is large, please file an Apache [Individual 
Contributor License Agreement](https://www.apache.org/licenses/icla.pdf).
    
    ---
    
    ## Narrative
    
    WholeFileIO fulfills the requests of [BEAM-2750] and [BEAM-2751] for a way 
to read and write individual files as individual elements of a PCollection to 
and from specific filenames.
    
    ## Description
    
    `WholeFileIO.Read` receives a file pattern (glob) of input files. The file 
pattern is expanded into a `PCollection` of `ResourceId`s, each pointing to a 
single file. The bytes at the file location specified by the `ResourceId`s are 
read in and attached to their originating filename in a `KV`.
    
    `WholeFileIO.Write` receives a `PCollection` of `KV`s containing byte 
arrays and their corresponding filenames. The byte arrays are written to the 
output directory with their corresponding filename.
    
    ## Example Usage
    
    This example pipeline will read in files according to a given file glob and 
write them to the specified output directory unmodified other than "-copy" 
appended to their filenames. If the input file glob specifies files spread 
through a directory hierarchy, they will still be written out all into the same 
flat output directory.
    
    Example pipeline:
    ```java
    public class WholeFileIOPipeline {
    
        public interface FileIOOptions extends PipelineOptions {
            @Description("File glob of the files to read from")
            @Validation.Required
            String getInputFiles();
            void setInputFiles(String value);
    
            @Description("Path of the directory to write files to")
            @Validation.Required
            String getOutputDir();
            void setOutputDir(String value);
        }
    
        public static void main(String[] args) {
            final FileIOOptions options = 
PipelineOptionsFactory.fromArgs(args).withValidation()
                    .as(FileIOOptions.class);
            Pipeline p = Pipeline.create(options);
    
            PCollection<KV<String, byte[]>> files = p.apply(
                "Read Bytes and filenames of input files",
                WholeFileIO.read().from(options.getInputFiles())
            );
    
            PCollection<KV<String, byte[]>> renamedFiles = files.apply(
                    ParDo.of(
                            new DoFn<KV<String, byte[]>, KV<String, byte[]>>() {
                                @ProcessElement
                                public void processElement(ProcessContext c) {
                                    KV<String, byte[]> file = c.element();
                                    c.output(KV.of(file.getKey() + "-copy", 
file.getValue()));
                                }
                            }
                    )
            );
    
            renamedFiles.apply(
                    "Write Bytes to filenames in Output Directory",
                    WholeFileIO.write().to(options.getOutputDir())
            );
    
            p.run().waitUntilFinish();
        }
    }
    ```
    
    Example command to run example pipeline:
    ```bash
    mvn clean compile exec:java 
-Dexec.mainClass=com.example.WholeFileIOPipeline \
      -Dexec.args=" \
        --inputFiles=/path/to/input/files/** \
        --outputDir=/path/to/output/directory/ \
        " \
      -Pdirect-runner
    ```
    
    ## ToDo
    
    - [ ] Add comments
    - [ ] Add unit tests
    - [ ] Scale test for performance
    - [ ] Find out if `FileSystems.resolve()` will resolve multiple 
intermediary directories if a user provides a path that doesn't fully exist 
yet. (`WholeFileIO -> Write -> expand() -> ParDo -> ResourceId logic`)
    - [ ] Make sure that the `OutputStream` automatically closes when an 
`Exception` occurs in `WholeFileIO -> Write -> expand() -> ParDo -> try/catch`. 
If not, close it in a `finally` statement.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/cphbrt/beam master+WholeFileIO

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/beam/pull/3717.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #3717
    
----
commit 42f8b991512aa01019013d64e167b5e5782f87bf
Author: Chris Hebert <chris.hebert-...@digitalreasoning.com>
Date:   2017-08-10T19:56:32Z

    [BEAM-2750][BEAM-2751] Implement WholeFileIO

----


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to