[
https://issues.apache.org/jira/browse/CRUNCH-258?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Brandon Inman updated CRUNCH-258:
---------------------------------
Attachment: 0001-CRUNCH-258-support-for-multiple-output-channels.patch
Proposed implementation based on design discussed on mailing list. Any
feedback, but especially on naming, is appreciated.
> Multiple output channels from DoFn
> ----------------------------------
>
> Key: CRUNCH-258
> URL: https://issues.apache.org/jira/browse/CRUNCH-258
> Project: Crunch
> Issue Type: New Feature
> Components: Core
> Reporter: Brandon Inman
> Assignee: Josh Wills
> Attachments:
> 0001-CRUNCH-258-support-for-multiple-output-channels.patch
>
>
> As discussed on the mailing list[1], add support for multiple output channels
> from a DoFn.
> Summarized outcome of the thread:
> A DoFn that has multiples outputs emits a Pair<T,U>. For example, this might
> be to separate objects representing an error condition from those
> representing expected outputs. In a typical case, one element in the pair
> will be null and the other nonnull, although both may be nonnull (and both
> may be null but this pair would be effectively discarded without error)
> A utility method is provided that takes a PCollection<Pair<T,U>> and returns
> a Pair<PCollection<T>, PCollection<U>>. The caller will write, call
> parallelDo on, and/or materialize these collections as needed.
> Psuedocode for the utility method-
> public static <T, U> Pair<PCollection<T>,PCollection<U>> filterChannels(final
> PCollection<Pair<T,U>> pCollection, final PType<T>firstPType, final PType<U>
> secondPType) {
> final PCollection<T> stdout = collection.parallelDo(new
> FirstEmittingDoFn<T>, firstPType);
> final PCollection<U> stderr = collection.parallelDo(new
> SecondEmittingDoFn<U>, secondPType);
> return Pair.of(stdout,stderr);
> }
> Psuedocode for the FirstEmttitingDoFn (SecondEmittingDoFn follows similar
> pattern)-
> public static class FirstEmittingDoFn<T extends Pair<U, ?>, U> extends
> DoFn<Pair<U, ?>, U> {
> @Override
> public void process(Pair<U, ?> input, Emitter<U> emitter) {
> final U first = input.first();
> if (first != null) {
> emitter.emit(first);
> }
> }
> }
> }
> [1]
> https://mail-archives.apache.org/mod_mbox/crunch-user/201308.mbox/%3CCAH29n6NK3bypEc9f2KADfkFFYL-b8%2B-HP3tXNW1_yF6f3TA65w%40mail.gmail.com%3E
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira