Repository: apex-malhar
Updated Branches:
  refs/heads/master 70caa8909 -> bb3dca1b4


APEXMALHAR-2343 The input type of the accumulation can be a super type of the 
input tuple type


Project: http://git-wip-us.apache.org/repos/asf/apex-malhar/repo
Commit: http://git-wip-us.apache.org/repos/asf/apex-malhar/commit/bb3dca1b
Tree: http://git-wip-us.apache.org/repos/asf/apex-malhar/tree/bb3dca1b
Diff: http://git-wip-us.apache.org/repos/asf/apex-malhar/diff/bb3dca1b

Branch: refs/heads/master
Commit: bb3dca1b4f4bc770d422f8683919bbe70cdc41d9
Parents: e476334
Author: David Yan <[email protected]>
Authored: Tue Nov 29 17:15:55 2016 -0800
Committer: Siyuan Hua <[email protected]>
Committed: Wed Jan 4 22:59:05 2017 -0800

----------------------------------------------------------------------
 .../apache/apex/malhar/lib/window/accumulation/Count.java |  4 ++--
 .../apex/malhar/lib/window/accumulation/SumDouble.java    |  2 +-
 .../apex/malhar/lib/window/accumulation/SumFloat.java     |  2 +-
 .../apex/malhar/lib/window/accumulation/SumInt.java       |  2 +-
 .../apex/malhar/lib/window/accumulation/SumLong.java      |  2 +-
 .../malhar/lib/window/impl/KeyedWindowedOperatorImpl.java |  2 +-
 .../apex/malhar/lib/window/impl/WindowedOperatorImpl.java |  2 +-
 .../apex/malhar/lib/window/accumulation/SumTest.java      |  8 ++++----
 .../malhar/stream/api/impl/ApexWindowedStreamImpl.java    | 10 +++++-----
 9 files changed, 17 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
index 7a46e22..dbc9f0f 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/Count.java
@@ -26,7 +26,7 @@ import org.apache.commons.lang3.mutable.MutableLong;
  *
  * @since 3.5.0
  */
-public class Count<T> implements Accumulation<T, MutableLong, Long>
+public class Count implements Accumulation<Object, MutableLong, Long>
 {
 
   @Override
@@ -36,7 +36,7 @@ public class Count<T> implements Accumulation<T, MutableLong, 
Long>
   }
 
   @Override
-  public MutableLong accumulate(MutableLong accumulatedValue, T input)
+  public MutableLong accumulate(MutableLong accumulatedValue, Object input)
   {
     accumulatedValue.increment();
     return accumulatedValue;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
index 475d653..cfca1f3 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumDouble.java
@@ -19,7 +19,7 @@
 package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableDouble;
 
 /**
  * Sum Accumulation for doubles.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
index dff3be6..dec3308 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumFloat.java
@@ -19,7 +19,7 @@
 package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableFloat;
+import org.apache.commons.lang3.mutable.MutableFloat;
 
 /**
  * Sum Accumulation for floats.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
index dca67a4..e4e4d26 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumInt.java
@@ -19,7 +19,7 @@
 package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableInt;
 
 /**
  * Sum accumulation for integers.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
index 027e4f8..74df427 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/accumulation/SumLong.java
@@ -19,7 +19,7 @@
 package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.apache.apex.malhar.lib.window.Accumulation;
-import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableLong;
 
 /**
  * Sum accumulation for longs.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
index b01fe61..deb718b 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/KeyedWindowedOperatorImpl.java
@@ -48,7 +48,7 @@ import com.datatorrent.lib.util.KeyValPair;
  */
 @InterfaceStability.Evolving
 public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
-    extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, 
KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, 
AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, 
Accumulation<InputValT, AccumT, OutputValT>>
+    extends AbstractWindowedOperator<KeyValPair<KeyT, InputValT>, 
KeyValPair<KeyT, OutputValT>, WindowedStorage.WindowedKeyedStorage<KeyT, 
AccumT>, WindowedStorage.WindowedKeyedStorage<KeyT, OutputValT>, Accumulation<? 
super InputValT, AccumT, OutputValT>>
 {
 
   @Override

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
----------------------------------------------------------------------
diff --git 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
index 26e011a..867d1c1 100644
--- 
a/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
+++ 
b/library/src/main/java/org/apache/apex/malhar/lib/window/impl/WindowedOperatorImpl.java
@@ -36,7 +36,7 @@ import org.apache.hadoop.classification.InterfaceStability;
  */
 @InterfaceStability.Evolving
 public class WindowedOperatorImpl<InputT, AccumT, OutputT>
-    extends AbstractWindowedOperator<InputT, OutputT, 
WindowedStorage.WindowedPlainStorage<AccumT>, 
WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<InputT, AccumT, 
OutputT>>
+    extends AbstractWindowedOperator<InputT, OutputT, 
WindowedStorage.WindowedPlainStorage<AccumT>, 
WindowedStorage.WindowedPlainStorage<OutputT>, Accumulation<? super InputT, 
AccumT, OutputT>>
 {
   @Override
   public void accumulateTuple(Tuple.WindowedTuple<InputT> tuple)

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
index 4587a91..cdc48a1 100644
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
@@ -20,10 +20,10 @@ package org.apache.apex.malhar.lib.window.accumulation;
 
 import org.junit.Assert;
 import org.junit.Test;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableFloat;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.lang.mutable.MutableLong;
+import org.apache.commons.lang3.mutable.MutableDouble;
+import org.apache.commons.lang3.mutable.MutableFloat;
+import org.apache.commons.lang3.mutable.MutableInt;
+import org.apache.commons.lang3.mutable.MutableLong;
 
 /**
  * Test for different Sum Accumulations.

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/bb3dca1b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
 
b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
index 5866a4c..9087f35 100644
--- 
a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
+++ 
b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
@@ -28,9 +28,9 @@ import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.lib.window.WindowState;
 
-import org.apache.apex.malhar.lib.window.accumulation.Count;
 import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
 import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
+import org.apache.apex.malhar.lib.window.accumulation.SumLong;
 import org.apache.apex.malhar.lib.window.accumulation.TopN;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
@@ -99,7 +99,7 @@ public class ApexWindowedStreamImpl<T> extends 
ApexStreamImpl<T> implements Wind
     };
 
     WindowedStream<Tuple<Long>> innerstream = map(kVMap);
-    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = 
createWindowedOperator(new Count());
+    WindowedOperatorImpl<Long, MutableLong, Long> windowedOperator = 
createWindowedOperator(new SumLong());
     return innerstream.addOperator(windowedOperator, windowedOperator.input, 
windowedOperator.output, opts);
   }
 
@@ -107,7 +107,7 @@ public class ApexWindowedStreamImpl<T> extends 
ApexStreamImpl<T> implements Wind
   public <K, STREAM extends WindowedStream<Tuple.WindowedTuple<KeyValPair<K, 
Long>>>> STREAM countByKey(Function.ToKeyValue<T, K, Long> convertToKeyValue, 
Option... opts)
   {
     WindowedStream<Tuple<KeyValPair<K, Long>>> kvstream = 
map(convertToKeyValue);
-    KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> 
keyedWindowedOperator = createKeyedWindowedOperator(new Count());
+    KeyedWindowedOperatorImpl<K, Long, MutableLong, Long> 
keyedWindowedOperator = createKeyedWindowedOperator(new SumLong());
     return kvstream.addOperator(keyedWindowedOperator, 
keyedWindowedOperator.input, keyedWindowedOperator.output, opts);
   }
 
@@ -231,7 +231,7 @@ public class ApexWindowedStreamImpl<T> extends 
ApexStreamImpl<T> implements Wind
    * @param <OUT>
    * @return
    */
-  private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> 
createWindowedOperator(Accumulation<IN, ACCU, OUT> accumulationFn)
+  private <IN, ACCU, OUT> WindowedOperatorImpl<IN, ACCU, OUT> 
createWindowedOperator(Accumulation<? super IN, ACCU, OUT> accumulationFn)
   {
     WindowedOperatorImpl<IN, ACCU, OUT> windowedOperator = new 
WindowedOperatorImpl<>();
     //TODO use other default setting in the future
@@ -251,7 +251,7 @@ public class ApexWindowedStreamImpl<T> extends 
ApexStreamImpl<T> implements Wind
     return windowedOperator;
   }
 
-  private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> 
createKeyedWindowedOperator(Accumulation<V, ACCU, OUT> accumulationFn)
+  private <K, V, ACCU, OUT> KeyedWindowedOperatorImpl<K, V, ACCU, OUT> 
createKeyedWindowedOperator(Accumulation<? super V, ACCU, OUT> accumulationFn)
   {
     KeyedWindowedOperatorImpl<K, V, ACCU, OUT> keyedWindowedOperator = new 
KeyedWindowedOperatorImpl<>();
 

Reply via email to