APEXMALHAR-2202 Moved accumulations to org.apache.apex.malhar.lib.window.accumulation.
Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/2b2d5bca Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/2b2d5bca Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/2b2d5bca Branch: refs/heads/master Commit: 2b2d5bca91b23569e04cbc51a541126588eaab44 Parents: dcca775 Author: Shunxin <[email protected]> Authored: Thu Aug 25 13:37:46 2016 -0700 Committer: Shunxin <[email protected]> Committed: Thu Aug 25 13:37:46 2016 -0700 ---------------------------------------------------------------------- .../sample/complete/TopWikipediaSessions.java | 2 +- .../stream/sample/complete/TrafficRoutes.java | 2 +- .../sample/cookbook/CombinePerKeyExamples.java | 2 +- .../stream/sample/cookbook/DeDupExample.java | 2 +- .../sample/cookbook/MaxPerKeyExamples.java | 2 +- .../malhar/lib/window/accumulation/Average.java | 64 +++++++++ .../malhar/lib/window/accumulation/Count.java | 61 +++++++++ .../malhar/lib/window/accumulation/FoldFn.java | 65 ++++++++++ .../malhar/lib/window/accumulation/Group.java | 63 +++++++++ .../malhar/lib/window/accumulation/Max.java | 75 +++++++++++ .../malhar/lib/window/accumulation/Min.java | 76 +++++++++++ .../lib/window/accumulation/ReduceFn.java | 65 ++++++++++ .../window/accumulation/RemoveDuplicates.java | 72 +++++++++++ .../lib/window/accumulation/SumDouble.java | 60 +++++++++ .../lib/window/accumulation/SumFloat.java | 60 +++++++++ .../malhar/lib/window/accumulation/SumInt.java | 60 +++++++++ .../malhar/lib/window/accumulation/SumLong.java | 60 +++++++++ .../malhar/lib/window/accumulation/TopN.java | 106 +++++++++++++++ .../lib/window/accumulation/TopNByKey.java | 114 ++++++++++++++++ .../lib/window/impl/accumulation/Average.java | 64 --------- .../lib/window/impl/accumulation/Count.java | 61 --------- .../lib/window/impl/accumulation/FoldFn.java | 65 ---------- .../lib/window/impl/accumulation/Group.java | 63 --------- .../lib/window/impl/accumulation/Max.java | 75 ----------- .../lib/window/impl/accumulation/Min.java | 76 ----------- .../lib/window/impl/accumulation/ReduceFn.java | 65 ---------- .../impl/accumulation/RemoveDuplicates.java | 72 ----------- .../lib/window/impl/accumulation/SumDouble.java | 60 --------- .../lib/window/impl/accumulation/SumFloat.java | 60 --------- .../lib/window/impl/accumulation/SumInt.java | 60 --------- .../lib/window/impl/accumulation/SumLong.java | 60 --------- .../lib/window/impl/accumulation/TopN.java | 106 --------------- .../lib/window/impl/accumulation/TopNByKey.java | 114 ---------------- .../lib/window/accumulation/AverageTest.java | 41 ++++++ .../lib/window/accumulation/FoldFnTest.java | 129 +++++++++++++++++++ .../lib/window/accumulation/GroupTest.java | 41 ++++++ .../malhar/lib/window/accumulation/MaxTest.java | 53 ++++++++ .../malhar/lib/window/accumulation/MinTest.java | 53 ++++++++ .../lib/window/accumulation/ReduceFnTest.java | 50 +++++++ .../accumulation/RemoveDuplicatesTest.java | 41 ++++++ .../malhar/lib/window/accumulation/SumTest.java | 57 ++++++++ .../lib/window/accumulation/TopNByKeyTest.java | 74 +++++++++++ .../window/impl/accumulation/AverageTest.java | 41 ------ .../window/impl/accumulation/FoldFnTest.java | 129 ------------------- .../lib/window/impl/accumulation/GroupTest.java | 42 ------ .../lib/window/impl/accumulation/MaxTest.java | 53 -------- .../lib/window/impl/accumulation/MinTest.java | 53 -------- .../window/impl/accumulation/ReduceFnTest.java | 50 ------- .../impl/accumulation/RemoveDuplicatesTest.java | 42 ------ .../lib/window/impl/accumulation/SumTest.java | 57 -------- .../window/impl/accumulation/TopNByKeyTest.java | 75 ----------- .../apex/malhar/stream/api/WindowedStream.java | 4 +- .../stream/api/impl/ApexWindowedStreamImpl.java | 8 +- 53 files changed, 1551 insertions(+), 1554 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java index de4e590..f2e70b1 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TopWikipediaSessions.java @@ -29,7 +29,7 @@ import org.joda.time.Duration; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.impl.accumulation.TopN; +import org.apache.apex.malhar.lib.window.accumulation.TopN; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java index 2cc04d1..26a2823 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/complete/TrafficRoutes.java @@ -32,7 +32,7 @@ import org.joda.time.Duration; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.impl.accumulation.Group; +import org.apache.apex.malhar.lib.window.accumulation.Group; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java index ecd71ae..d88a8dc 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/CombinePerKeyExamples.java @@ -24,7 +24,7 @@ import java.util.List; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn; +import org.apache.apex.malhar.lib.window.accumulation.ReduceFn; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java index 53426f3..2930010 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/DeDupExample.java @@ -26,7 +26,7 @@ import org.joda.time.Duration; import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.impl.accumulation.RemoveDuplicates; +import org.apache.apex.malhar.lib.window.accumulation.RemoveDuplicates; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.function.Function; import org.apache.apex.malhar.stream.api.impl.StreamFactory; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java ---------------------------------------------------------------------- diff --git a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java index 97b2696..02980e4 100644 --- a/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java +++ b/demos/highlevelapi/src/main/java/org/apache/apex/malhar/stream/sample/cookbook/MaxPerKeyExamples.java @@ -24,7 +24,7 @@ import org.apache.apex.malhar.lib.window.TriggerOption; import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.Window; import org.apache.apex.malhar.lib.window.WindowOption; -import org.apache.apex.malhar.lib.window.impl.accumulation.Max; +import org.apache.apex.malhar.lib.window.accumulation.Max; import org.apache.apex.malhar.stream.api.ApexStream; import org.apache.apex.malhar.stream.api.CompositeStreamTransform; import org.apache.apex.malhar.stream.api.WindowedStream; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java new file mode 100644 index 0000000..c669439 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Average.java @@ -0,0 +1,64 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang3.tuple.MutablePair; + +/** + * Average Accumulation + */ +public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double> +{ + @Override + public MutablePair<Double, Long> defaultAccumulatedValue() + { + return new MutablePair<>(0.0, 0L); + } + + @Override + public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input) + { + accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1)); + accu.setRight(accu.getRight() + 1); + return accu; + } + + @Override + public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2) + { + accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) + + accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight())); + accu1.setRight(accu1.getRight() + accu2.getRight()); + return accu1; + } + + @Override + public Double getOutput(MutablePair<Double, Long> accumulatedValue) + { + return accumulatedValue.getLeft(); + } + + @Override + public Double getRetraction(Double value) + { + // TODO: Need to add implementation for retraction. + return 0.0; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java new file mode 100644 index 0000000..180152b --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java @@ -0,0 +1,61 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang3.mutable.MutableLong; + +/** + * Count Accumulation + */ +public class Count implements Accumulation<Long, MutableLong, Long> +{ + + @Override + public MutableLong defaultAccumulatedValue() + { + return new MutableLong(0); + } + + @Override + public MutableLong accumulate(MutableLong accumulatedValue, Long input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Long getOutput(MutableLong accumulatedValue) + { + return accumulatedValue.getValue(); + } + + @Override + public Long getRetraction(Long value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java new file mode 100644 index 0000000..dc344b3 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/FoldFn.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Fold Accumulation Adaptor class + */ +public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT> +{ + + public FoldFn() + { + } + + public FoldFn(OUTPUT initialVal) + { + this.initialVal = initialVal; + } + + private OUTPUT initialVal; + + @Override + public OUTPUT defaultAccumulatedValue() + { + return initialVal; + } + + @Override + public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input) + { + return fold(accumulatedValue, input); + } + + @Override + public OUTPUT getOutput(OUTPUT accumulatedValue) + { + return accumulatedValue; + } + + @Override + public OUTPUT getRetraction(OUTPUT value) + { + return null; + } + + abstract OUTPUT fold(OUTPUT result, INPUT input); +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java new file mode 100644 index 0000000..c34a8ac --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Group.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.ArrayList; +import java.util.List; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Group accumulation. + */ +public class Group<T> implements Accumulation<T, List<T>, List<T>> +{ + @Override + public List<T> defaultAccumulatedValue() + { + return new ArrayList<>(); + } + + @Override + public List<T> accumulate(List<T> accumulatedValue, T input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2) + { + accumulatedValue1.addAll(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public List<T> getOutput(List<T> accumulatedValue) + { + return accumulatedValue; + } + + @Override + public List<T> getRetraction(List<T> value) + { + // TODO: Need to add implementation for retraction. + return new ArrayList<>(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java new file mode 100644 index 0000000..4164bb2 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Max.java @@ -0,0 +1,75 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.Comparator; +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Max accumulation. + */ +public class Max<T> implements Accumulation<T, T, T> +{ + + Comparator<T> comparator; + + public void setComparator(Comparator<T> comparator) + { + this.comparator = comparator; + } + + @Override + public T defaultAccumulatedValue() + { + return null; + } + + @Override + public T accumulate(T accumulatedValue, T input) + { + if (accumulatedValue == null) { + return input; + } else if (comparator != null) { + return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue; + } else if (input instanceof Comparable) { + return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue; + } else { + throw new RuntimeException("Tuple cannot be compared"); + } + } + + @Override + public T merge(T accumulatedValue1, T accumulatedValue2) + { + return accumulate(accumulatedValue1, accumulatedValue2); + } + + @Override + public T getOutput(T accumulatedValue) + { + return accumulatedValue; + } + + @Override + public T getRetraction(T value) + { + // TODO: Need to add implementation for retraction. + return null; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java new file mode 100644 index 0000000..0460052 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Min.java @@ -0,0 +1,76 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.Comparator; +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * Min accumulation + */ +public class Min<T> implements Accumulation<T, T, T> +{ + + Comparator<T> comparator; + + public void setComparator(Comparator<T> comparator) + { + this.comparator = comparator; + } + + @Override + public T defaultAccumulatedValue() + { + return null; + } + + @Override + public T accumulate(T accumulatedValue, T input) + { + if (accumulatedValue == null) { + return input; + } else if (comparator != null) { + return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue; + } else if (input instanceof Comparable) { + return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue; + } else { + throw new RuntimeException("Tuple cannot be compared"); + } + } + + @Override + public T merge(T accumulatedValue1, T accumulatedValue2) + { + return accumulate(accumulatedValue1, accumulatedValue2); + } + + @Override + public T getOutput(T accumulatedValue) + { + return accumulatedValue; + } + + @Override + public T getRetraction(T value) + { + // TODO: Need to add implementation for retraction. + return null; + } + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java new file mode 100644 index 0000000..0c34c75 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFn.java @@ -0,0 +1,65 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * An easy to use reduce Accumulation + * @param <INPUT> + */ +public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT> +{ + @Override + public INPUT defaultAccumulatedValue() + { + return null; + } + + @Override + public INPUT accumulate(INPUT accumulatedValue, INPUT input) + { + if (accumulatedValue == null) { + return input; + } + return reduce(accumulatedValue, input); + } + + @Override + public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2) + { + return reduce(accumulatedValue1, accumulatedValue2); + } + + @Override + public INPUT getOutput(INPUT accumulatedValue) + { + return accumulatedValue; + } + + @Override + public INPUT getRetraction(INPUT value) + { + return null; + } + + public abstract INPUT reduce(INPUT input1, INPUT input2); + + +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java new file mode 100644 index 0000000..8eee868 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicates.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Set; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * RemoveDuplicates Accumulation. + * @param <T> + */ +public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>> +{ + @Override + public Set<T> defaultAccumulatedValue() + { + return new HashSet<>(); + } + + @Override + public Set<T> accumulate(Set<T> accumulatedValue, T input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2) + { + for (T item : accumulatedValue2) { + accumulatedValue1.add(item); + } + return accumulatedValue1; + } + + @Override + public List<T> getOutput(Set<T> accumulatedValue) + { + if (accumulatedValue == null) { + return new ArrayList<>(); + } else { + return new ArrayList<>(accumulatedValue); + } + } + + @Override + public List<T> getRetraction(List<T> value) + { + // TODO: Need to add implementation for retraction. + return new ArrayList<>(value); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java new file mode 100644 index 0000000..fafff43 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang.mutable.MutableDouble; + +/** + * Sum Accumulation for doubles. + */ +public class SumDouble implements Accumulation<Double, MutableDouble, Double> +{ + @Override + public MutableDouble defaultAccumulatedValue() + { + return new MutableDouble(0.0); + } + + @Override + public MutableDouble accumulate(MutableDouble accumulatedValue, Double input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Double getOutput(MutableDouble accumulatedValue) + { + return accumulatedValue.doubleValue(); + } + + @Override + public Double getRetraction(Double value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java new file mode 100644 index 0000000..2e2d3ef --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang.mutable.MutableFloat; + +/** + * Sum Accumulation for floats. + */ +public class SumFloat implements Accumulation<Float, MutableFloat, Float> +{ + @Override + public MutableFloat defaultAccumulatedValue() + { + return new MutableFloat(0.); + } + + @Override + public MutableFloat accumulate(MutableFloat accumulatedValue, Float input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Float getOutput(MutableFloat accumulatedValue) + { + return accumulatedValue.floatValue(); + } + + @Override + public Float getRetraction(Float value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java new file mode 100644 index 0000000..c9aa4bd --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang.mutable.MutableInt; + +/** + * Sum accumulation for integers. + */ +public class SumInt implements Accumulation<Integer, MutableInt, Integer> +{ + @Override + public MutableInt defaultAccumulatedValue() + { + return new MutableInt(0); + } + + @Override + public MutableInt accumulate(MutableInt accumulatedValue, Integer input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Integer getOutput(MutableInt accumulatedValue) + { + return accumulatedValue.intValue(); + } + + @Override + public Integer getRetraction(Integer value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java new file mode 100644 index 0000000..1268141 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java @@ -0,0 +1,60 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.apache.apex.malhar.lib.window.Accumulation; +import org.apache.commons.lang.mutable.MutableLong; + +/** + * Sum accumulation for longs. + */ +public class SumLong implements Accumulation<Long, MutableLong, Long> +{ + @Override + public MutableLong defaultAccumulatedValue() + { + return new MutableLong(0L); + } + + @Override + public MutableLong accumulate(MutableLong accumulatedValue, Long input) + { + accumulatedValue.add(input); + return accumulatedValue; + } + + @Override + public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) + { + accumulatedValue1.add(accumulatedValue2); + return accumulatedValue1; + } + + @Override + public Long getOutput(MutableLong accumulatedValue) + { + return accumulatedValue.longValue(); + } + + @Override + public Long getRetraction(Long value) + { + return -value; + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java new file mode 100644 index 0000000..c95cbb4 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopN.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.Collections; +import java.util.Comparator; +import java.util.LinkedList; +import java.util.List; + +import org.apache.apex.malhar.lib.window.Accumulation; + +/** + * TopN accumulation + */ +public class TopN<T> implements Accumulation<T, List<T>, List<T>> +{ + int n; + + Comparator<T> comparator; + + public void setN(int n) + { + this.n = n; + } + + public void setComparator(Comparator<T> comparator) + { + this.comparator = comparator; + } + + @Override + public List<T> defaultAccumulatedValue() + { + return new LinkedList<>(); + } + + @Override + public List<T> accumulate(List<T> accumulatedValue, T input) + { + int k = 0; + for (T inMemory : accumulatedValue) { + if (comparator != null) { + if (comparator.compare(inMemory, input) < 0) { + break; + } + } else if (input instanceof Comparable) { + if (((Comparable<T>)input).compareTo(inMemory) > 0) { + break; + } + } else { + throw new RuntimeException("Tuple cannot be compared"); + } + k++; + } + accumulatedValue.add(k, input); + if (accumulatedValue.size() > n) { + accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1)); + } + return accumulatedValue; + } + + @Override + public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2) + { + accumulatedValue1.addAll(accumulatedValue2); + if (comparator != null) { + Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator)); + } else { + Collections.sort(accumulatedValue1, Collections.reverseOrder()); + } + if (accumulatedValue1.size() > n) { + return accumulatedValue1.subList(0, n); + } else { + return accumulatedValue1; + } + } + + @Override + public List<T> getOutput(List<T> accumulatedValue) + { + return accumulatedValue; + } + + @Override + public List<T> getRetraction(List<T> accumulatedValue) + { + return new LinkedList<>(); + } +} + http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java new file mode 100644 index 0000000..92a6eb6 --- /dev/null +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKey.java @@ -0,0 +1,114 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import java.util.Comparator; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import org.apache.apex.malhar.lib.window.Accumulation; +import com.datatorrent.lib.util.KeyValPair; + +/** + * Generalized TopNByKey accumulation + */ +public class TopNByKey<K, V> implements + Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>> +{ + int n = 10; + + Comparator<V> comparator; + + public void setN(int n) + { + this.n = n; + } + + public void setComparator(Comparator<V> comparator) + { + this.comparator = comparator; + } + + @Override + public Map<K, V> defaultAccumulatedValue() + { + return new HashMap<>(); + } + + @Override + public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input) + { + accumulatedValue.put(input.getKey(), input.getValue()); + return accumulatedValue; + } + + @Override + public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2) + { + for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) { + if (!accumulatedValue1.containsKey(entry.getKey())) { + accumulatedValue1.put(entry.getKey(), entry.getValue()); + } else if (comparator != null) { + if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) { + accumulatedValue1.put(entry.getKey(), entry.getValue()); + } + } else if (entry.getValue() instanceof Comparable) { + if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) { + accumulatedValue1.put(entry.getKey(), entry.getValue()); + } + } + } + return accumulatedValue1; + } + + @Override + public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue) + { + LinkedList<KeyValPair<K, V>> result = new LinkedList<>(); + for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) { + int k = 0; + for (KeyValPair<K, V> inMemory : result) { + if (comparator != null) { + if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) { + break; + } + } else if (entry.getValue() instanceof Comparable) { + if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) { + break; + } + } + k++; + } + result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue())); + if (result.size() > n) { + result.remove(result.get(result.size() - 1)); + } + } + return result; + } + + @Override + public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value) + { + // TODO: Need to add implementation for retraction. + return new LinkedList<>(); + } +} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java deleted file mode 100644 index 57db6d7..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Average.java +++ /dev/null @@ -1,64 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang3.tuple.MutablePair; - -/** - * Average Accumulation - */ -public class Average implements Accumulation<Double, MutablePair<Double, Long>, Double> -{ - @Override - public MutablePair<Double, Long> defaultAccumulatedValue() - { - return new MutablePair<>(0.0, 0L); - } - - @Override - public MutablePair<Double, Long> accumulate(MutablePair<Double, Long> accu, Double input) - { - accu.setLeft(accu.getLeft() * ((double)accu.getRight() / (accu.getRight() + 1)) + input / (accu.getRight() + 1)); - accu.setRight(accu.getRight() + 1); - return accu; - } - - @Override - public MutablePair<Double, Long> merge(MutablePair<Double, Long> accu1, MutablePair<Double, Long> accu2) - { - accu1.setLeft(accu1.getLeft() * ((double)accu1.getRight() / accu1.getRight() + accu2.getRight()) + - accu2.getLeft() * ((double)accu2.getRight() / accu1.getRight() + accu2.getRight())); - accu1.setRight(accu1.getRight() + accu2.getRight()); - return accu1; - } - - @Override - public Double getOutput(MutablePair<Double, Long> accumulatedValue) - { - return accumulatedValue.getLeft(); - } - - @Override - public Double getRetraction(Double value) - { - // TODO: Need to add implementation for retraction. - return 0.0; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java deleted file mode 100644 index 2c01a0b..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Count.java +++ /dev/null @@ -1,61 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang3.mutable.MutableLong; - -/** - * Count Accumulation - */ -public class Count implements Accumulation<Long, MutableLong, Long> -{ - - @Override - public MutableLong defaultAccumulatedValue() - { - return new MutableLong(0); - } - - @Override - public MutableLong accumulate(MutableLong accumulatedValue, Long input) - { - accumulatedValue.add(input); - return accumulatedValue; - } - - @Override - public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) - { - accumulatedValue1.add(accumulatedValue2); - return accumulatedValue1; - } - - @Override - public Long getOutput(MutableLong accumulatedValue) - { - return accumulatedValue.getValue(); - } - - @Override - public Long getRetraction(Long value) - { - return -value; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java deleted file mode 100644 index 5716cad..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFn.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * Fold Accumulation Adaptor class - */ -public abstract class FoldFn<INPUT, OUTPUT> implements Accumulation<INPUT, OUTPUT, OUTPUT> -{ - - public FoldFn() - { - } - - public FoldFn(OUTPUT initialVal) - { - this.initialVal = initialVal; - } - - private OUTPUT initialVal; - - @Override - public OUTPUT defaultAccumulatedValue() - { - return initialVal; - } - - @Override - public OUTPUT accumulate(OUTPUT accumulatedValue, INPUT input) - { - return fold(accumulatedValue, input); - } - - @Override - public OUTPUT getOutput(OUTPUT accumulatedValue) - { - return accumulatedValue; - } - - @Override - public OUTPUT getRetraction(OUTPUT value) - { - return null; - } - - abstract OUTPUT fold(OUTPUT result, INPUT input); -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java deleted file mode 100644 index 632cad5..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Group.java +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.ArrayList; -import java.util.List; - -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * Group accumulation. - */ -public class Group<T> implements Accumulation<T, List<T>, List<T>> -{ - @Override - public List<T> defaultAccumulatedValue() - { - return new ArrayList<>(); - } - - @Override - public List<T> accumulate(List<T> accumulatedValue, T input) - { - accumulatedValue.add(input); - return accumulatedValue; - } - - @Override - public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2) - { - accumulatedValue1.addAll(accumulatedValue2); - return accumulatedValue1; - } - - @Override - public List<T> getOutput(List<T> accumulatedValue) - { - return accumulatedValue; - } - - @Override - public List<T> getRetraction(List<T> value) - { - // TODO: Need to add implementation for retraction. - return new ArrayList<>(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java deleted file mode 100644 index 1002b49..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Max.java +++ /dev/null @@ -1,75 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.Comparator; -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * Max accumulation. - */ -public class Max<T> implements Accumulation<T, T, T> -{ - - Comparator<T> comparator; - - public void setComparator(Comparator<T> comparator) - { - this.comparator = comparator; - } - - @Override - public T defaultAccumulatedValue() - { - return null; - } - - @Override - public T accumulate(T accumulatedValue, T input) - { - if (accumulatedValue == null) { - return input; - } else if (comparator != null) { - return (comparator.compare(input, accumulatedValue) > 0) ? input : accumulatedValue; - } else if (input instanceof Comparable) { - return (((Comparable)input).compareTo(accumulatedValue) > 0) ? input : accumulatedValue; - } else { - throw new RuntimeException("Tuple cannot be compared"); - } - } - - @Override - public T merge(T accumulatedValue1, T accumulatedValue2) - { - return accumulate(accumulatedValue1, accumulatedValue2); - } - - @Override - public T getOutput(T accumulatedValue) - { - return accumulatedValue; - } - - @Override - public T getRetraction(T value) - { - // TODO: Need to add implementation for retraction. - return null; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java deleted file mode 100644 index 66248f4..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/Min.java +++ /dev/null @@ -1,76 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.Comparator; -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * Min accumulation - */ -public class Min<T> implements Accumulation<T, T, T> -{ - - Comparator<T> comparator; - - public void setComparator(Comparator<T> comparator) - { - this.comparator = comparator; - } - - @Override - public T defaultAccumulatedValue() - { - return null; - } - - @Override - public T accumulate(T accumulatedValue, T input) - { - if (accumulatedValue == null) { - return input; - } else if (comparator != null) { - return (comparator.compare(input, accumulatedValue) < 0) ? input : accumulatedValue; - } else if (input instanceof Comparable) { - return (((Comparable)input).compareTo(accumulatedValue) < 0) ? input : accumulatedValue; - } else { - throw new RuntimeException("Tuple cannot be compared"); - } - } - - @Override - public T merge(T accumulatedValue1, T accumulatedValue2) - { - return accumulate(accumulatedValue1, accumulatedValue2); - } - - @Override - public T getOutput(T accumulatedValue) - { - return accumulatedValue; - } - - @Override - public T getRetraction(T value) - { - // TODO: Need to add implementation for retraction. - return null; - } - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java deleted file mode 100644 index c21ab32..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFn.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * An easy to use reduce Accumulation - * @param <INPUT> - */ -public abstract class ReduceFn<INPUT> implements Accumulation<INPUT, INPUT, INPUT> -{ - @Override - public INPUT defaultAccumulatedValue() - { - return null; - } - - @Override - public INPUT accumulate(INPUT accumulatedValue, INPUT input) - { - if (accumulatedValue == null) { - return input; - } - return reduce(accumulatedValue, input); - } - - @Override - public INPUT merge(INPUT accumulatedValue1, INPUT accumulatedValue2) - { - return reduce(accumulatedValue1, accumulatedValue2); - } - - @Override - public INPUT getOutput(INPUT accumulatedValue) - { - return accumulatedValue; - } - - @Override - public INPUT getRetraction(INPUT value) - { - return null; - } - - public abstract INPUT reduce(INPUT input1, INPUT input2); - - -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java deleted file mode 100644 index b7cd770..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicates.java +++ /dev/null @@ -1,72 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.ArrayList; -import java.util.HashSet; -import java.util.List; -import java.util.Set; - -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * RemoveDuplicates Accumulation. - * @param <T> - */ -public class RemoveDuplicates<T> implements Accumulation<T, Set<T>, List<T>> -{ - @Override - public Set<T> defaultAccumulatedValue() - { - return new HashSet<>(); - } - - @Override - public Set<T> accumulate(Set<T> accumulatedValue, T input) - { - accumulatedValue.add(input); - return accumulatedValue; - } - - @Override - public Set<T> merge(Set<T> accumulatedValue1, Set<T> accumulatedValue2) - { - for (T item : accumulatedValue2) { - accumulatedValue1.add(item); - } - return accumulatedValue1; - } - - @Override - public List<T> getOutput(Set<T> accumulatedValue) - { - if (accumulatedValue == null) { - return new ArrayList<>(); - } else { - return new ArrayList<>(accumulatedValue); - } - } - - @Override - public List<T> getRetraction(List<T> value) - { - // TODO: Need to add implementation for retraction. - return new ArrayList<>(value); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java deleted file mode 100644 index 60b195b..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumDouble.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang.mutable.MutableDouble; - -/** - * Sum Accumulation for doubles. - */ -public class SumDouble implements Accumulation<Double, MutableDouble, Double> -{ - @Override - public MutableDouble defaultAccumulatedValue() - { - return new MutableDouble(0.0); - } - - @Override - public MutableDouble accumulate(MutableDouble accumulatedValue, Double input) - { - accumulatedValue.add(input); - return accumulatedValue; - } - - @Override - public MutableDouble merge(MutableDouble accumulatedValue1, MutableDouble accumulatedValue2) - { - accumulatedValue1.add(accumulatedValue2); - return accumulatedValue1; - } - - @Override - public Double getOutput(MutableDouble accumulatedValue) - { - return accumulatedValue.doubleValue(); - } - - @Override - public Double getRetraction(Double value) - { - return -value; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java deleted file mode 100644 index 14e69e2..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumFloat.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang.mutable.MutableFloat; - -/** - * Sum Accumulation for floats. - */ -public class SumFloat implements Accumulation<Float, MutableFloat, Float> -{ - @Override - public MutableFloat defaultAccumulatedValue() - { - return new MutableFloat(0.); - } - - @Override - public MutableFloat accumulate(MutableFloat accumulatedValue, Float input) - { - accumulatedValue.add(input); - return accumulatedValue; - } - - @Override - public MutableFloat merge(MutableFloat accumulatedValue1, MutableFloat accumulatedValue2) - { - accumulatedValue1.add(accumulatedValue2); - return accumulatedValue1; - } - - @Override - public Float getOutput(MutableFloat accumulatedValue) - { - return accumulatedValue.floatValue(); - } - - @Override - public Float getRetraction(Float value) - { - return -value; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java deleted file mode 100644 index 886a7d0..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumInt.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang.mutable.MutableInt; - -/** - * Sum accumulation for integers. - */ -public class SumInt implements Accumulation<Integer, MutableInt, Integer> -{ - @Override - public MutableInt defaultAccumulatedValue() - { - return new MutableInt(0); - } - - @Override - public MutableInt accumulate(MutableInt accumulatedValue, Integer input) - { - accumulatedValue.add(input); - return accumulatedValue; - } - - @Override - public MutableInt merge(MutableInt accumulatedValue1, MutableInt accumulatedValue2) - { - accumulatedValue1.add(accumulatedValue2); - return accumulatedValue1; - } - - @Override - public Integer getOutput(MutableInt accumulatedValue) - { - return accumulatedValue.intValue(); - } - - @Override - public Integer getRetraction(Integer value) - { - return -value; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java deleted file mode 100644 index 469eef9..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumLong.java +++ /dev/null @@ -1,60 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang.mutable.MutableLong; - -/** - * Sum accumulation for longs. - */ -public class SumLong implements Accumulation<Long, MutableLong, Long> -{ - @Override - public MutableLong defaultAccumulatedValue() - { - return new MutableLong(0L); - } - - @Override - public MutableLong accumulate(MutableLong accumulatedValue, Long input) - { - accumulatedValue.add(input); - return accumulatedValue; - } - - @Override - public MutableLong merge(MutableLong accumulatedValue1, MutableLong accumulatedValue2) - { - accumulatedValue1.add(accumulatedValue2); - return accumulatedValue1; - } - - @Override - public Long getOutput(MutableLong accumulatedValue) - { - return accumulatedValue.longValue(); - } - - @Override - public Long getRetraction(Long value) - { - return -value; - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java deleted file mode 100644 index 7dad8cc..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopN.java +++ /dev/null @@ -1,106 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.Collections; -import java.util.Comparator; -import java.util.LinkedList; -import java.util.List; - -import org.apache.apex.malhar.lib.window.Accumulation; - -/** - * TopN accumulation - */ -public class TopN<T> implements Accumulation<T, List<T>, List<T>> -{ - int n; - - Comparator<T> comparator; - - public void setN(int n) - { - this.n = n; - } - - public void setComparator(Comparator<T> comparator) - { - this.comparator = comparator; - } - - @Override - public List<T> defaultAccumulatedValue() - { - return new LinkedList<>(); - } - - @Override - public List<T> accumulate(List<T> accumulatedValue, T input) - { - int k = 0; - for (T inMemory : accumulatedValue) { - if (comparator != null) { - if (comparator.compare(inMemory, input) < 0) { - break; - } - } else if (input instanceof Comparable) { - if (((Comparable<T>)input).compareTo(inMemory) > 0) { - break; - } - } else { - throw new RuntimeException("Tuple cannot be compared"); - } - k++; - } - accumulatedValue.add(k, input); - if (accumulatedValue.size() > n) { - accumulatedValue.remove(accumulatedValue.get(accumulatedValue.size() - 1)); - } - return accumulatedValue; - } - - @Override - public List<T> merge(List<T> accumulatedValue1, List<T> accumulatedValue2) - { - accumulatedValue1.addAll(accumulatedValue2); - if (comparator != null) { - Collections.sort(accumulatedValue1, Collections.reverseOrder(comparator)); - } else { - Collections.sort(accumulatedValue1, Collections.reverseOrder()); - } - if (accumulatedValue1.size() > n) { - return accumulatedValue1.subList(0, n); - } else { - return accumulatedValue1; - } - } - - @Override - public List<T> getOutput(List<T> accumulatedValue) - { - return accumulatedValue; - } - - @Override - public List<T> getRetraction(List<T> accumulatedValue) - { - return new LinkedList<>(); - } -} - http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java deleted file mode 100644 index d9f9cfd..0000000 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKey.java +++ /dev/null @@ -1,114 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.apex.malhar.lib.window.impl.accumulation; - -import java.util.Comparator; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; - -import org.apache.apex.malhar.lib.window.Accumulation; -import com.datatorrent.lib.util.KeyValPair; - -/** - * Generalized TopNByKey accumulation - */ -public class TopNByKey<K, V> implements - Accumulation<KeyValPair<K, V>, Map<K, V>, List<KeyValPair<K, V>>> -{ - int n = 10; - - Comparator<V> comparator; - - public void setN(int n) - { - this.n = n; - } - - public void setComparator(Comparator<V> comparator) - { - this.comparator = comparator; - } - - @Override - public Map<K, V> defaultAccumulatedValue() - { - return new HashMap<>(); - } - - @Override - public Map<K, V> accumulate(Map<K, V> accumulatedValue, KeyValPair<K, V> input) - { - accumulatedValue.put(input.getKey(), input.getValue()); - return accumulatedValue; - } - - @Override - public Map<K, V> merge(Map<K, V> accumulatedValue1, Map<K, V> accumulatedValue2) - { - for (Map.Entry<K, V> entry : accumulatedValue2.entrySet()) { - if (!accumulatedValue1.containsKey(entry.getKey())) { - accumulatedValue1.put(entry.getKey(), entry.getValue()); - } else if (comparator != null) { - if (comparator.compare(entry.getValue(), accumulatedValue1.get(entry.getKey())) > 0) { - accumulatedValue1.put(entry.getKey(), entry.getValue()); - } - } else if (entry.getValue() instanceof Comparable) { - if (((Comparable<V>)entry.getValue()).compareTo(accumulatedValue1.get(entry.getKey())) > 0) { - accumulatedValue1.put(entry.getKey(), entry.getValue()); - } - } - } - return accumulatedValue1; - } - - @Override - public List<KeyValPair<K, V>> getOutput(Map<K, V> accumulatedValue) - { - LinkedList<KeyValPair<K, V>> result = new LinkedList<>(); - for (Map.Entry<K, V> entry : accumulatedValue.entrySet()) { - int k = 0; - for (KeyValPair<K, V> inMemory : result) { - if (comparator != null) { - if (comparator.compare(entry.getValue(), inMemory.getValue()) > 0) { - break; - } - } else if (entry.getValue() instanceof Comparable) { - if (((Comparable<V>)entry.getValue()).compareTo(inMemory.getValue()) > 0) { - break; - } - } - k++; - } - result.add(k, new KeyValPair<K, V>(entry.getKey(), entry.getValue())); - if (result.size() > n) { - result.remove(result.get(result.size() - 1)); - } - } - return result; - } - - @Override - public List<KeyValPair<K, V>> getRetraction(List<KeyValPair<K, V>> value) - { - // TODO: Need to add implementation for retraction. - return new LinkedList<>(); - } -} http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java new file mode 100644 index 0000000..e5fd541 --- /dev/null +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/AverageTest.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.apex.malhar.lib.window.accumulation; + +import org.junit.Assert; +import org.junit.Test; +import org.apache.commons.lang3.tuple.MutablePair; + +/** + * Test for {@link Average}. + */ +public class AverageTest +{ + @Test + public void AverageTest() + { + Average ave = new Average(); + MutablePair<Double, Long> accu = ave.defaultAccumulatedValue(); + + for (int i = 1; i <= 10; i++) { + accu = ave.accumulate(accu, (double)i); + } + Assert.assertTrue(5.5 == accu.getLeft()); + } +}
