Repository: apex-malhar Updated Branches: refs/heads/master 70caa8909 -> bb3dca1b4
APEXMALHAR-2343 The input type of the accumulation can be a super type of the input tuple type Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bb3dca1b Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bb3dca1b Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bb3dca1b Branch: refs/heads/master Commit: bb3dca1b4f4bc770d422f8683919bbe70cdc41d9 Parents: e476334 Author: David Yan <[email protected]> Authored: Tue Nov 29 17:15:55 2016 -0800 Committer: Siyuan Hua <[email protected]> Committed: Wed Jan 4 22:59:05 2017 -0800 ---------------------------------------------------------------------- .../apache/apex/malhar/lib/window/accumulation/Count.java | 4 ++-- .../apex/malhar/lib/window/accumulation/SumDouble.java | 2 +- .../apex/malhar/lib/window/accumulation/SumFloat.java | 2 +- .../apex/malhar/lib/window/accumulation/SumInt.java | 2 +- .../apex/malhar/lib/window/accumulation/SumLong.java | 2 +- .../malhar/lib/window/impl/KeyedWindowedOperatorImpl.java | 2 +- .../apex/malhar/lib/window/impl/WindowedOperatorImpl.java | 2 +- .../apex/malhar/lib/window/accumulation/SumTest.java | 8 ++++---- .../malhar/stream/api/impl/ApexWindowedStreamImpl.java | 10 +++++----- 9 files changed, 17 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java index 7a46e22..dbc9f0f 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java @@ -26,7 +26,7 @@ import org.apache.commons.lang3.mutable.MutableLong; * * @since 3.5.0 */ -public class Count<T> implements Accumulation<T, MutableLong, Long> +public class Count implements Accumulation<Object, MutableLong, Long> { @Override @@ -36,7 +36,7 @@ public class Count<T> implements Accumulation<T, MutableLong, Long> } @Override - public MutableLong accumulate(MutableLong accumulatedValue, T input) + public MutableLong accumulate(MutableLong accumulatedValue, Object input) { accumulatedValue.increment(); return accumulatedValue; http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java index 475d653..cfca1f3 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java @@ -19,7 +19,7 @@ package org.apache.apex.malhar.lib.window.accumulation; import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang.mutable.MutableDouble; +import org.apache.commons.lang3.mutable.MutableDouble; /** * Sum Accumulation for doubles. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java index dff3be6..dec3308 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java @@ -19,7 +19,7 @@ package org.apache.apex.malhar.lib.window.accumulation; import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang.mutable.MutableFloat; +import org.apache.commons.lang3.mutable.MutableFloat; /** * Sum Accumulation for floats. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java index dca67a4..e4e4d26 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java @@ -19,7 +19,7 @@ package org.apache.apex.malhar.lib.window.accumulation; import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang.mutable.MutableInt; +import org.apache.commons.lang3.mutable.MutableInt; /** * Sum accumulation for integers. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java index 027e4f8..74df427 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java @@ -19,7 +19,7 @@ package org.apache.apex.malhar.lib.window.accumulation; import org.apache.apex.malhar.lib.window.Accumulation; -import org.apache.commons.lang.mutable.MutableLong; +import org.apache.commons.lang3.mutable.MutableLong; /** * Sum accumulation for longs. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java index b01fe61..deb718b 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java @@ -48,7 +48,7 @@ import com.datatorrent.lib.util.KeyValPair; */ @InterfaceStability.Evolving public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT> - extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<InputValT, AccumT, OutputValT>> + extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<? super InputValT, AccumT, OutputValT>> { @Override http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java ---------------------------------------------------------------------- diff --git a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java index 26e011a..867d1c1 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java @@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability; */ @InterfaceStability.Evolving public class WindowedOperatorImpl<InputT, AccumT, OutputT> - extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<InputT, AccumT, OutputT>> + extends AbstractWindowedOperator<InputT, OutputT, WindowedStorage.WindowedPlainStorage<AccumT>, WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<? super InputT, AccumT, OutputT>> { @Override public void accumulateTuple(Tuple.WindowedTuple<InputT> tuple) http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java ---------------------------------------------------------------------- diff --git a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java index 4587a91..cdc48a1 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java @@ -20,10 +20,10 @@ package org.apache.apex.malhar.lib.window.accumulation; import org.junit.Assert; import org.junit.Test; -import org.apache.commons.lang.mutable.MutableDouble; -import org.apache.commons.lang.mutable.MutableFloat; -import org.apache.commons.lang.mutable.MutableInt; -import org.apache.commons.lang.mutable.MutableLong; +import org.apache.commons.lang3.mutable.MutableDouble; +import org.apache.commons.lang3.mutable.MutableFloat; +import org.apache.commons.lang3.mutable.MutableInt; +import org.apache.commons.lang3.mutable.MutableLong; /** * Test for different Sum Accumulations. http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java ---------------------------------------------------------------------- diff --git a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java index 5866a4c..9087f35 100644 --- a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java +++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java @@ -28,9 +28,9 @@ import org.apache.apex.malhar.lib.window.Tuple; import org.apache.apex.malhar.lib.window.WindowOption; import org.apache.apex.malhar.lib.window.WindowState; -import org.apache.apex.malhar.lib.window.accumulation.Count; import org.apache.apex.malhar.lib.window.accumulation.FoldFn; import org.apache.apex.malhar.lib.window.accumulation.ReduceFn; +import org.apache.apex.malhar.lib.window.accumulation.SumLong; import org.apache.apex.malhar.lib.window.accumulation.TopN; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage; import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage; @@ -99,7 +99,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind }; WindowedStream<Tuple<Long>> innerstream = map(kVMap); - WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new Count()); + WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = createWindowedOperator(new SumLong()); return innerstream.addOperator(windowedOperator, windowedOperator.input, windowedOperator.output, opts); } @@ -107,7 +107,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, Option... opts) { WindowedStream<Tuple<KeyValPair<K, Long>>> kvstream = map(convertToKeyValue); - KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new Count()); + KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> keyedWindowedOperator = createKeyedWindowedOperator(new SumLong()); return kvstream.addOperator(keyedWindowedOperator, keyedWindowedOperator.input, keyedWindowedOperator.output, opts); } @@ -231,7 +231,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind * @param <OUT> * @return */ - private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<IN, ACCU, OUT> accumulationFn) + private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> createWindowedOperator(Accumulation<? super IN, ACCU, OUT> accumulationFn) { WindowedOperatorImpl<IN, ACCU, OUT> windowedOperator = new WindowedOperatorImpl<>(); //TODO use other default setting in the future @@ -251,7 +251,7 @@ public class ApexWindowedStreamImpl<T> extends ApexStreamImpl<T> implements Wind return windowedOperator; } - private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<V, ACCU, OUT> accumulationFn) + private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> createKeyedWindowedOperator(Accumulation<? super V, ACCU, OUT> accumulationFn) { KeyedWindowedOperatorImpl<K, V, ACCU, OUT> keyedWindowedOperator = new KeyedWindowedOperatorImpl<>();
