[ 
https://issues.apache.org/jira/browse/CRUNCH-258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Brandon Inman updated CRUNCH-258:
---------------------------------

    Attachment: 0001-CRUNCH-258-support-for-multiple-output-channels.patch

Proposed implementation based on design discussed on mailing list.  Any 
feedback, but especially on naming, is appreciated.  
                
> Multiple output channels from DoFn
> ----------------------------------
>
>                 Key: CRUNCH-258
>                 URL: https://issues.apache.org/jira/browse/CRUNCH-258
>             Project: Crunch
>          Issue Type: New Feature
>          Components: Core
>            Reporter: Brandon Inman
>            Assignee: Josh Wills
>         Attachments: 
> 0001-CRUNCH-258-support-for-multiple-output-channels.patch
>
>
> As discussed on the mailing list[1], add support for multiple output channels 
> from a DoFn.
> Summarized outcome of the thread:
> A DoFn that has multiples outputs emits a Pair<T,U>.  For example, this might 
> be to separate objects representing an error condition from those 
> representing expected outputs.  In a typical case, one element in the pair 
> will be null and the other nonnull, although both may be nonnull (and both 
> may be null but this pair would be effectively discarded without error)
> A utility method is provided that takes a PCollection<Pair<T,U>> and returns 
> a Pair<PCollection<T>, PCollection<U>>.  The caller will write, call 
> parallelDo on, and/or materialize these collections as needed.
> Psuedocode for the utility method-
> public static <T, U> Pair<PCollection<T>,PCollection<U>> filterChannels(final 
> PCollection<Pair<T,U>> pCollection, final PType<T>firstPType, final PType<U> 
> secondPType) {
>   final PCollection<T> stdout = collection.parallelDo(new 
> FirstEmittingDoFn<T>, firstPType);
>   final PCollection<U> stderr = collection.parallelDo(new 
> SecondEmittingDoFn<U>, secondPType);
>   return Pair.of(stdout,stderr);
> }
> Psuedocode for the FirstEmttitingDoFn (SecondEmittingDoFn follows similar 
> pattern)-
> public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends 
> DoFn<Pair<U, ?>, U> {
>         @Override
>         public void process(Pair<U, ?> input, Emitter<U> emitter) {
>             final U first = input.first();
>             if (first != null) {
>                 emitter.emit(first);
>             }
>         }
>     }
> }
> [1] 
> https://mail-archives.apache.org/mod_mbox/crunch-user/201308.mbox/%3CCAH29n6NK3bypEc9f2KADfkFFYL-b8%2B-HP3tXNW1_yF6f3TA65w%40mail.gmail.com%3E

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to