[
https://issues.apache.org/jira/browse/BEAM-96?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15180626#comment-15180626
]
ASF GitHub Bot commented on BEAM-96:
------------------------------------
GitHub user peihe opened a pull request:
https://github.com/apache/incubator-beam/pull/23
[BEAM-96] Add composed combine functions builders in CombineFns
* compose() or composeKeyed() are used to start composition
* with() is used to add an input-transformation, a combine fn and an output
TupleTag
* A non-CombineFn builder is used to ensure that every composition includes
at least one item
* Duplicate output tags are not allowed in the same composition
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/peihe/incubator-beam composed-combine
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/incubator-beam/pull/23.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #23
----
commit 5ceefe0a78add3742f01ec9fdb6d17cf59192cf3
Author: Pei He <[email protected]>
Date: 2016-03-04T21:54:34Z
[BEAM-96] Add composed combine functions builders in CombineFns
* compose() or composeKeyed() are used to start composition
* with() is used to add an input-transformation, a combine fn and an
* output TupleTag
* A non-CombineFn builder is used to ensure that every composition
* includes at least one item
* Duplicate output tags are not allowed in the same composition
----
> Support composing combine functions
> -----------------------------------
>
> Key: BEAM-96
> URL: https://issues.apache.org/jira/browse/BEAM-96
> Project: Beam
> Issue Type: New Feature
> Components: sdk-java-core
> Reporter: Pei He
> Assignee: Davor Bonaci
>
> The proposal of composed combine functions is following:
> pc.apply(
> Combine.perKey(
> CombineFns.composeKeyed()
> .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
> .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
> Example code:
> * PCollection<KV<K, Integer>> latencies = ...;
> *
> * TupleTag<Integer> maxLatencyTag = new TupleTag<Integer>();
> * TupleTag<Double> meanLatencyTag = new TupleTag<Double>();
> *
> * SimpleFunction<Integer, Integer> identityFn =
> * new SimpleFunction<Integer, Integer>() {
> * @Override
> * public Integer apply(Integer input) {
> * return input;
> * }};
> * PCollection<KV<K, CoCombineResult>> maxAndMean = latencies.apply(
> * Combine.perKey(
> * CombineFns.composeKeyed()
> * .with(identityFn, new MaxIntegerFn(), maxLatencyTag)
> * .with(identityFn, new MeanFn<Integer>(), meanLatencyTag)));
> *
> * PCollection<T> finalResultCollection = maxAndMean
> * .apply(ParDo.of(
> * new DoFn<KV<K, CoCombineResult>, T>() {
> * @Override
> * public void processElement(ProcessContext c) throws Exception {
> * KV<K, CoCombineResult> e = c.element();
> * Integer maxLatency = e.getValue().get(maxLatencyTag);
> * Double meanLatency = e.getValue().get(meanLatencyTag);
> * .... Do Something ....
> * c.output(...some T...);
> * }
> * }));
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)