[ https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15203110#comment-15203110 ]
Davor Bonaci commented on BEAM-96: ---------------------------------- Pei, is this done? > Support composing combine functions > ----------------------------------- > > Key: BEAM-96 > URL: https://issues.apache.org/jira/browse/BEAM-96 > Project: Beam > Issue Type: New Feature > Components: sdk-java-core > Reporter: Pei He > Assignee: Pei He > > The proposal of composed combine functions is following: > pc.apply( > Combine.perKey( > CombineFns.composeKeyed() > .with(identityFn, new MaxIntegerFn(), maxLatencyTag) > .with(identityFn, new MeanFn<Integer>(), meanLatencyTag))); > Example code: > * PCollection<KV<K, Integer>> latencies = ...; > * > * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>(); > * TupleTag<Double> meanLatencyTag = new TupleTag<Double>(); > * > * SimpleFunction<Integer, Integer> identityFn = > * new SimpleFunction<Integer, Integer>() { > * @Override > * public Integer apply(Integer input) { > * return input; > * }}; > * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply( > * Combine.perKey( > * CombineFns.composeKeyed() > * .with(identityFn, new MaxIntegerFn(), maxLatencyTag) > * .with(identityFn, new MeanFn<Integer>(), meanLatencyTag))); > * > * PCollection<T> finalResultCollection = maxAndMean > * .apply(ParDo.of( > * new DoFn<KV<K, CoCombineResult>, T>() { > * @Override > * public void processElement(ProcessContext c) throws Exception { > * KV<K, CoCombineResult> e = c.element(); > * Integer maxLatency = e.getValue().get(maxLatencyTag); > * Double meanLatency = e.getValue().get(meanLatencyTag); > * .... Do Something .... > * c.output(...some T...); > * } > * })); -- This message was sent by Atlassian JIRA (v6.3.4#6332)