Repository: apex-malhar Updated Branches: refs/heads/master a202cdc7c -> fc8b674e3
APEXMALHAR-2428 CompositeAccumulation for windowed operator Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/e035d61f Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/e035d61f Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/e035d61f Branch: refs/heads/master Commit: e035d61fe651afed51c67613374de455ae5febe0 Parents: eaa3bf3 Author: brightchen <[email protected]> Authored: Fri Jan 27 12:37:07 2017 -0800 Committer: brightchen <[email protected]> Committed: Tue Feb 28 11:16:11 2017 -0800 ---------------------------------------------------------------------- .../accumulation/CompositeAccumulation.java | 119 +++++++++++++++++++ .../accumulation/CompositeAccumulationTest.java | 67 +++++++++++ 2 files changed, 186 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e035d61f/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulation.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulation.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulation.java new file mode 100644 index 0000000..2184388 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulation.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +import com.google.common.collect.Lists; + +/** + * The CompositeAccumulation is an Accumulation which delegate task to sub accumulations. + * + */ +@SuppressWarnings("rawtypes") +@Evolving +public class CompositeAccumulation<InputT> implements Accumulation<InputT, List, List> +{ + /** + * The AccumulationTag hide the implementation and prevent invalid input parameters + * + */ + public static class AccumulationTag + { + private int index; + private AccumulationTag(int index) + { + this.index = index; + } + } + + private List<Accumulation<InputT, Object, ?>> accumulations = Lists.newArrayList(); + + /** + * @param accumulation The sub accumulation add to the composite. + * @return The AccumulationTag. The client can get the value of sub accumulation by returned AccumulationTag. + */ + public AccumulationTag addAccumulation(Accumulation<InputT, Object, ?> accumulation) + { + accumulations.add(accumulation); + return new AccumulationTag(accumulations.size() - 1); + } + + /** + * + * @param tag The tag represents the sub accumulation, which can be got from method addAccumulation() + * @param output The output of the composite accumulation + * @return The output of sub accumulation. + */ + public Object getSubOutput(AccumulationTag tag, List output) + { + int index = tag.index; + return accumulations.get(index).getOutput(output.get(index)); + } + + @Override + public List defaultAccumulatedValue() + { + List defaultValues = Lists.newArrayList(); + for (Accumulation accumulation : accumulations) { + defaultValues.add(accumulation.defaultAccumulatedValue()); + } + return defaultValues; + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Override + public List accumulate(List accumulatedValues, InputT input) + { + for (int index = 0; index < accumulations.size(); ++index) { + Accumulation accumulation = accumulations.get(index); + Object oldValue = accumulatedValues.get(index); + Object newValue = accumulation.accumulate(oldValue, input); + if (newValue != oldValue) { + accumulatedValues.set(index, newValue); + } + } + return accumulatedValues; + } + + @Override + public List merge(List accumulatedValues1, List accumulatedValues2) + { + for (int index = 0; index < accumulations.size(); ++index) { + accumulatedValues1.set(index, + accumulations.get(index).merge(accumulatedValues1.get(index), accumulatedValues2.get(index))); + } + return accumulatedValues1; + } + + @Override + public List getOutput(List accumulatedValues) + { + return accumulatedValues; + } + + @Override + public List getRetraction(List values) + { + return values; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/e035d61f/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulationTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulationTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulationTest.java new file mode 100644 index 0000000..fed1acb --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/CompositeAccumulationTest.java @@ -0,0 +1,67 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.apex.malhar.lib.window.SumAccumulation; +import org.apache.apex.malhar.lib.window.accumulation.CompositeAccumulation.AccumulationTag; + +public class CompositeAccumulationTest +{ + @SuppressWarnings({ "unchecked", "rawtypes" }) + @Test + public void testIncremental() + { + CompositeAccumulation<Long> accumulations = new CompositeAccumulation<>(); + AccumulationTag sumTag = accumulations.addAccumulation((Accumulation)new SumAccumulation()); + AccumulationTag countTag = accumulations.addAccumulation((Accumulation)new Count()); + AccumulationTag maxTag = accumulations.addAccumulation(new Max()); + AccumulationTag minTag = accumulations.addAccumulation(new Min()); + List values = accumulations.defaultAccumulatedValue(); + for (long i = 1; i <= 10; i++) { + values = accumulations.accumulate(values, i); + } + + List outputValues = accumulations.getOutput(values); + Assert.assertTrue((Long)accumulations.getSubOutput(sumTag, outputValues) == 55L); + Assert.assertTrue((Long)accumulations.getSubOutput(countTag, outputValues) == 10L); + Assert.assertTrue((Long)accumulations.getSubOutput(maxTag, outputValues) == 10L); + Assert.assertTrue((Long)accumulations.getSubOutput(minTag, outputValues) == 1L); + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) + @Test + public void testAverage() + { + CompositeAccumulation<Double> accumulations = new CompositeAccumulation<>(); + AccumulationTag averageTag = accumulations.addAccumulation((Accumulation)new Average()); + List values = accumulations.defaultAccumulatedValue(); + for (int i = 1; i <= 10; i++) { + values = accumulations.accumulate(values, i * 1.0); + } + + List outputValues = accumulations.getOutput(values); + Assert.assertTrue((Double)accumulations.getSubOutput(averageTag, outputValues) == 5.5); + } +}
