[ 
https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180626#comment-15180626
 ] 

ASF GitHub Bot commented on BEAM-96:
------------------------------------

GitHub user peihe opened a pull request:

    https://github.com/apache/incubator-beam/pull/23

    [BEAM-96] Add composed combine functions builders in CombineFns

    * compose() or composeKeyed() are used to start composition
    * with() is used to add an input-transformation, a combine fn and an output 
TupleTag
    * A non-CombineFn builder is used to ensure that every composition includes 
at least one item
    * Duplicate output tags are not allowed in the same composition

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/peihe/incubator-beam composed-combine

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/incubator-beam/pull/23.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #23
    
----
commit 5ceefe0a78add3742f01ec9fdb6d17cf59192cf3
Author: Pei He <[email protected]>
Date:   2016-03-04T21:54:34Z

    [BEAM-96] Add composed combine functions builders in CombineFns
    
    * compose() or composeKeyed() are used to start composition
    * with() is used to add an input-transformation, a combine fn and an
    * output TupleTag
    * A non-CombineFn builder is used to ensure that every composition
    * includes at least one item
    * Duplicate output tags are not allowed in the same composition

----


> Support composing combine functions
> -----------------------------------
>
>                 Key: BEAM-96
>                 URL: https://issues.apache.org/jira/browse/BEAM-96
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Pei He
>            Assignee: Davor Bonaci
>
> The proposal of composed combine functions is following:
> pc.apply(
>     Combine.perKey(
>          CombineFns.composeKeyed()
>             .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
>             .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
> Example code:
>    * PCollection<KV<K, Integer>> latencies = ...;
>    *
>    * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
>    * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
>    *
>    * SimpleFunction<Integer, Integer> identityFn =
>    *     new SimpleFunction<Integer, Integer>() {
>    *       @Override
>    *       public Integer apply(Integer input) {
>    *           return input;
>    *       }};
>    * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
>    *     Combine.perKey(
>    *         CombineFns.composeKeyed()
>    *            .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
>    *            .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
>    *
>    * PCollection<T> finalResultCollection = maxAndMean
>    *     .apply(ParDo.of(
>    *         new DoFn<KV<K, CoCombineResult>, T>() {
>    *           @Override
>    *           public void processElement(ProcessContext c) throws Exception {
>    *             KV<K, CoCombineResult> e = c.element();
>    *             Integer maxLatency = e.getValue().get(maxLatencyTag);
>    *             Double meanLatency = e.getValue().get(meanLatencyTag);
>    *             .... Do Something ....
>    *             c.output(...some T...);
>    *           }
>    *         }));



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to