[ 
https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15203110#comment-15203110
 ] 

Davor Bonaci commented on BEAM-96:
----------------------------------

Pei, is this done?

> Support composing combine functions
> -----------------------------------
>
>                 Key: BEAM-96
>                 URL: https://issues.apache.org/jira/browse/BEAM-96
>             Project: Beam
>          Issue Type: New Feature
>          Components: sdk-java-core
>            Reporter: Pei He
>            Assignee: Pei He
>
> The proposal of composed combine functions is following:
> pc.apply(
>     Combine.perKey(
>          CombineFns.composeKeyed()
>             .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
>             .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
> Example code:
>    * PCollection<KV<K, Integer>> latencies = ...;
>    *
>    * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
>    * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
>    *
>    * SimpleFunction<Integer, Integer> identityFn =
>    *     new SimpleFunction<Integer, Integer>() {
>    *       @Override
>    *       public Integer apply(Integer input) {
>    *           return input;
>    *       }};
>    * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
>    *     Combine.perKey(
>    *         CombineFns.composeKeyed()
>    *            .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
>    *            .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
>    *
>    * PCollection<T> finalResultCollection = maxAndMean
>    *     .apply(ParDo.of(
>    *         new DoFn<KV<K, CoCombineResult>, T>() {
>    *           @Override
>    *           public void processElement(ProcessContext c) throws Exception {
>    *             KV<K, CoCombineResult> e = c.element();
>    *             Integer maxLatency = e.getValue().get(maxLatencyTag);
>    *             Double meanLatency = e.getValue().get(meanLatencyTag);
>    *             .... Do Something ....
>    *             c.output(...some T...);
>    *           }
>    *         }));



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to