Repository: apex-malhar
Updated Branches:
  refs/heads/master dcca7752a -> 2b2d5bca9


http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java
new file mode 100644
index 0000000..cda6bf8
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/FoldFnTest.java
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.apex.malhar.lib.window.Tuple;
+import com.datatorrent.api.DefaultInputPort;
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.api.InputOperator;
+import com.datatorrent.common.util.BaseOperator;
+
+/**
+ * Test for {@link ReduceFn}.
+ */
+public class FoldFnTest
+{
+  public static class NumGen extends BaseOperator implements InputOperator
+  {
+    public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<>();
+  
+    public static int count = 0;
+    private int i = 0;
+  
+    public NumGen()
+    {
+      count = 0;
+      i = 0;
+    }
+  
+    @Override
+    public void emitTuples()
+    {
+      while (i <= 7) {
+        try {
+          Thread.sleep(50);
+        } catch (InterruptedException e) {
+          // Ignore it.
+        }
+        count++;
+        if (i >= 0) {
+          output.emit(i++);
+        }
+      }
+      i = -1;
+    }
+  }
+  
+  public static class Collector extends BaseOperator
+  {
+    private static int result;
+    
+    public transient DefaultInputPort<Tuple.WindowedTuple<Integer>> input = 
new DefaultInputPort<Tuple.WindowedTuple<Integer>>()
+    {
+      @Override
+      public void process(Tuple.WindowedTuple<Integer> tuple)
+      {
+        result = tuple.getValue();
+      }
+    };
+    
+    public int getResult()
+    {
+      return result;
+    }
+  }
+  
+  public static class Plus extends FoldFn<Integer, Integer>
+  {
+    @Override
+    public Integer merge(Integer accumulatedValue1, Integer accumulatedValue2)
+    {
+      return fold(accumulatedValue1, accumulatedValue2);
+    }
+    
+    @Override
+    public Integer fold(Integer input1, Integer input2)
+    {
+      if (input1 == null) {
+        return input2;
+      }
+      return input1 + input2;
+    }
+  }
+  
+  @Test
+  public void FoldFnTest()
+  {
+    
+    FoldFn<String, String> concat = new FoldFn<String, String>()
+    {
+      @Override
+      public String merge(String accumulatedValue1, String accumulatedValue2)
+      {
+        return fold(accumulatedValue1, accumulatedValue2);
+      }
+  
+      @Override
+      public String fold(String input1, String input2)
+      {
+        return input1 + ", " + input2;
+      }
+    };
+    
+    String[] ss = new String[]{"b", "c", "d", "e"};
+    String base = "a";
+    
+    for (String s : ss) {
+      base = concat.accumulate(base, s);
+    }
+    Assert.assertEquals("a, b, c, d, e", base);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java
new file mode 100644
index 0000000..891a824
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/GroupTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.List;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link Group}.
+ */
+public class GroupTest
+{
+  @Test
+  public void GroupTest()
+  {
+    Group<Integer> group = new Group<>();
+    
+    List<Integer> accu = group.defaultAccumulatedValue();
+    Assert.assertEquals(0, accu.size());
+    Assert.assertEquals(1, group.accumulate(accu, 10).size());
+    Assert.assertEquals(2, group.accumulate(accu, 11).size());
+    Assert.assertEquals(3, group.accumulate(accu, 11).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java
new file mode 100644
index 0000000..fe87d9e
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MaxTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for Max accumulation
+ */
+public class MaxTest
+{
+  @Test
+  public void MaxTest()
+  {
+    Max<Integer> max = new Max<>();
+    
+    Assert.assertEquals((Integer)5, max.accumulate(5, 3));
+    Assert.assertEquals((Integer)6, max.accumulate(4, 6));
+    Assert.assertEquals((Integer)5, max.merge(5, 2));
+  
+    Comparator<Integer> com = new Comparator<Integer>()
+    {
+      @Override
+      public int compare(Integer o1, Integer o2)
+      {
+        return -(o1.compareTo(o2));
+      }
+    };
+    
+    max.setComparator(com);
+    Assert.assertEquals((Integer)3, max.accumulate(5, 3));
+    Assert.assertEquals((Integer)4, max.accumulate(4, 6));
+    Assert.assertEquals((Integer)2, max.merge(5, 2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java
new file mode 100644
index 0000000..3589735
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/MinTest.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Comparator;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link Min}.
+ */
+public class MinTest
+{
+  @Test
+  public void MinTest()
+  {
+    Min<Integer> min = new Min<>();
+    
+    Assert.assertEquals((Integer)3, min.accumulate(5, 3));
+    Assert.assertEquals((Integer)4, min.accumulate(4, 6));
+    Assert.assertEquals((Integer)2, min.merge(5, 2));
+    
+    Comparator<Integer> com = new Comparator<Integer>()
+    {
+      @Override
+      public int compare(Integer o1, Integer o2)
+      {
+        return -(o1.compareTo(o2));
+      }
+    };
+    
+    min.setComparator(com);
+    Assert.assertEquals((Integer)5, min.accumulate(5, 3));
+    Assert.assertEquals((Integer)6, min.accumulate(4, 6));
+    Assert.assertEquals((Integer)5, min.merge(5, 2));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java
new file mode 100644
index 0000000..26d73a7
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/ReduceFnTest.java
@@ -0,0 +1,50 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link ReduceFn}.
+ */
+public class ReduceFnTest
+{
+  
+  @Test
+  public void ReduceFnTest()
+  {
+    ReduceFn<String> concat = new ReduceFn<String>()
+    {
+      @Override
+      public String reduce(String input1, String input2)
+      {
+        return input1 + ", " + input2;
+      }
+    };
+    
+    String[] ss = new String[]{"b", "c", "d", "e"};
+    String base = "a";
+    
+    for (String s : ss) {
+      base = concat.accumulate(base, s);
+    }
+    Assert.assertEquals("a, b, c, d, e", base);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java
new file mode 100644
index 0000000..674f871
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/RemoveDuplicatesTest.java
@@ -0,0 +1,41 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.Set;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Test for {@link RemoveDuplicates}.
+ */
+public class RemoveDuplicatesTest
+{
+  @Test
+  public void RemoveDuplicatesTest()
+  {
+    RemoveDuplicates<Integer> rd = new RemoveDuplicates<>();
+    
+    Set<Integer> accu = rd.defaultAccumulatedValue();
+    Assert.assertEquals(0, accu.size());
+    Assert.assertEquals(1, rd.accumulate(accu, 10).size());
+    Assert.assertEquals(2, rd.accumulate(accu, 11).size());
+    Assert.assertEquals(2, rd.accumulate(accu, 11).size());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
new file mode 100644
index 0000000..4c55612
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/SumTest.java
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.apache.commons.lang.mutable.MutableDouble;
+import org.apache.commons.lang.mutable.MutableFloat;
+import org.apache.commons.lang.mutable.MutableInt;
+import org.apache.commons.lang.mutable.MutableLong;
+
+/**
+ * Test for different Sum Accumulations.
+ */
+public class SumTest
+{
+  @Test
+  public void SumTest()
+  {
+    SumInt si = new SumInt();
+    SumLong sl = new SumLong();
+    SumFloat sf = new SumFloat();
+    SumDouble sd = new SumDouble();
+    
+    Assert.assertEquals(new MutableInt(10), 
si.accumulate(si.defaultAccumulatedValue(), 10));
+    Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 
10));
+    Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new 
MutableInt(21)));
+    
+    Assert.assertEquals(new MutableLong(10L), 
sl.accumulate(sl.defaultAccumulatedValue(), 10L));
+    Assert.assertEquals(new MutableLong(22L), sl.accumulate(new 
MutableLong(2L), 20L));
+    Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), 
new MutableLong(9L)));
+    
+    Assert.assertEquals(new MutableFloat(9.0F), 
sf.accumulate(sf.defaultAccumulatedValue(), 9.0F));
+    Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new 
MutableFloat(2.5F), 20F));
+    Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new 
MutableFloat(33.1F), new MutableFloat(7.9F)));
+    
+    Assert.assertEquals(new MutableDouble(9.0), 
sd.accumulate(sd.defaultAccumulatedValue(), 9.0));
+    Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new 
MutableDouble(2.5), 20.0));
+    Assert.assertEquals(new MutableDouble(41.0), sd.merge(new 
MutableDouble(33.1), new MutableDouble(7.9)));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java
new file mode 100644
index 0000000..5bf2207
--- /dev/null
+++ 
b/library/src/test/java/org/apache/apex/malhar/lib/window/accumulation/TopNByKeyTest.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.lib.window.accumulation;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ * Unit test for TopNByKey accumulation
+ */
+public class TopNByKeyTest
+{
+  @Test
+  public void TopNByKeyTest() throws Exception
+  {
+    TopNByKey<String, Integer> topNByKey = new TopNByKey<>();
+    topNByKey.setN(3);
+    Map<String, Integer> accu = topNByKey.defaultAccumulatedValue();
+  
+    Assert.assertEquals(0, accu.size());
+    
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("1", 1));
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("3", 3));
+    
+    List<KeyValPair<String, Integer>> result1 = new ArrayList<>();
+  
+    result1.add(new KeyValPair<String, Integer>("3", 3));
+    result1.add(new KeyValPair<String, Integer>("1", 1));
+    
+    Assert.assertEquals(result1, topNByKey.getOutput(accu));
+    
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("2", 2));
+  
+    List<KeyValPair<String, Integer>> result2 = new ArrayList<>();
+  
+    result2.add(new KeyValPair<String, Integer>("3", 3));
+    result2.add(new KeyValPair<String, Integer>("2", 2));
+    result2.add(new KeyValPair<String, Integer>("1", 1));
+    
+    Assert.assertEquals(result2, topNByKey.getOutput(accu));
+    
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("5", 5));
+    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("4", 4));
+  
+    List<KeyValPair<String, Integer>> result3 = new ArrayList<>();
+    
+    result3.add(new KeyValPair<String, Integer>("5", 5));
+    result3.add(new KeyValPair<String, Integer>("4", 4));
+    result3.add(new KeyValPair<String, Integer>("3", 3));
+    
+    Assert.assertEquals(result3, topNByKey.getOutput(accu));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
deleted file mode 100644
index fb4de3c..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/AverageTest.java
+++ /dev/null
@@ -1,41 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.commons.lang3.tuple.MutablePair;
-
-/**
- * Test for {@link Average}.
- */
-public class AverageTest
-{
-  @Test
-  public void AverageTest()
-  {
-    Average ave = new Average();
-    MutablePair<Double, Long> accu = ave.defaultAccumulatedValue();
-    
-    for (int i = 1; i <= 10; i++) {
-      accu = ave.accumulate(accu, (double)i);
-    }
-    Assert.assertTrue(5.5 == accu.getLeft());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
deleted file mode 100644
index 4e6f8f1..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/FoldFnTest.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.apex.malhar.lib.window.Tuple;
-import com.datatorrent.api.DefaultInputPort;
-import com.datatorrent.api.DefaultOutputPort;
-import com.datatorrent.api.InputOperator;
-import com.datatorrent.common.util.BaseOperator;
-
-/**
- * Test for {@link ReduceFn}.
- */
-public class FoldFnTest
-{
-  public static class NumGen extends BaseOperator implements InputOperator
-  {
-    public transient DefaultOutputPort<Integer> output = new 
DefaultOutputPort<>();
-  
-    public static int count = 0;
-    private int i = 0;
-  
-    public NumGen()
-    {
-      count = 0;
-      i = 0;
-    }
-  
-    @Override
-    public void emitTuples()
-    {
-      while (i <= 7) {
-        try {
-          Thread.sleep(50);
-        } catch (InterruptedException e) {
-          // Ignore it.
-        }
-        count++;
-        if (i >= 0) {
-          output.emit(i++);
-        }
-      }
-      i = -1;
-    }
-  }
-  
-  public static class Collector extends BaseOperator
-  {
-    private static int result;
-    
-    public transient DefaultInputPort<Tuple.WindowedTuple<Integer>> input = 
new DefaultInputPort<Tuple.WindowedTuple<Integer>>()
-    {
-      @Override
-      public void process(Tuple.WindowedTuple<Integer> tuple)
-      {
-        result = tuple.getValue();
-      }
-    };
-    
-    public int getResult()
-    {
-      return result;
-    }
-  }
-  
-  public static class Plus extends FoldFn<Integer, Integer>
-  {
-    @Override
-    public Integer merge(Integer accumulatedValue1, Integer accumulatedValue2)
-    {
-      return fold(accumulatedValue1, accumulatedValue2);
-    }
-    
-    @Override
-    public Integer fold(Integer input1, Integer input2)
-    {
-      if (input1 == null) {
-        return input2;
-      }
-      return input1 + input2;
-    }
-  }
-  
-  @Test
-  public void FoldFnTest()
-  {
-    
-    FoldFn<String, String> concat = new FoldFn<String, String>()
-    {
-      @Override
-      public String merge(String accumulatedValue1, String accumulatedValue2)
-      {
-        return fold(accumulatedValue1, accumulatedValue2);
-      }
-  
-      @Override
-      public String fold(String input1, String input2)
-      {
-        return input1 + ", " + input2;
-      }
-    };
-    
-    String[] ss = new String[]{"b", "c", "d", "e"};
-    String base = "a";
-    
-    for (String s : ss) {
-      base = concat.accumulate(base, s);
-    }
-    Assert.assertEquals("a, b, c, d, e", base);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
deleted file mode 100644
index a9aac77..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/GroupTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.List;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for {@link Group}.
- */
-public class GroupTest
-{
-  @Test
-  public void GroupTest()
-  {
-    Group<Integer> group = new Group<>();
-    
-    List<Integer> accu = group.defaultAccumulatedValue();
-    Assert.assertEquals(0, accu.size());
-    Assert.assertEquals(1, group.accumulate(accu, 10).size());
-    Assert.assertEquals(2, group.accumulate(accu, 11).size());
-    Assert.assertEquals(3, group.accumulate(accu, 11).size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
deleted file mode 100644
index c873125..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MaxTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for Max accumulation
- */
-public class MaxTest
-{
-  @Test
-  public void MaxTest()
-  {
-    Max<Integer> max = new Max<>();
-    
-    Assert.assertEquals((Integer)5, max.accumulate(5, 3));
-    Assert.assertEquals((Integer)6, max.accumulate(4, 6));
-    Assert.assertEquals((Integer)5, max.merge(5, 2));
-  
-    Comparator<Integer> com = new Comparator<Integer>()
-    {
-      @Override
-      public int compare(Integer o1, Integer o2)
-      {
-        return -(o1.compareTo(o2));
-      }
-    };
-    
-    max.setComparator(com);
-    Assert.assertEquals((Integer)3, max.accumulate(5, 3));
-    Assert.assertEquals((Integer)4, max.accumulate(4, 6));
-    Assert.assertEquals((Integer)2, max.merge(5, 2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
deleted file mode 100644
index 74816b0..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/MinTest.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Comparator;
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for {@link Min}.
- */
-public class MinTest
-{
-  @Test
-  public void MinTest()
-  {
-    Min<Integer> min = new Min<>();
-    
-    Assert.assertEquals((Integer)3, min.accumulate(5, 3));
-    Assert.assertEquals((Integer)4, min.accumulate(4, 6));
-    Assert.assertEquals((Integer)2, min.merge(5, 2));
-    
-    Comparator<Integer> com = new Comparator<Integer>()
-    {
-      @Override
-      public int compare(Integer o1, Integer o2)
-      {
-        return -(o1.compareTo(o2));
-      }
-    };
-    
-    min.setComparator(com);
-    Assert.assertEquals((Integer)5, min.accumulate(5, 3));
-    Assert.assertEquals((Integer)6, min.accumulate(4, 6));
-    Assert.assertEquals((Integer)5, min.merge(5, 2));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
deleted file mode 100644
index 6b5bbad..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/ReduceFnTest.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for {@link ReduceFn}.
- */
-public class ReduceFnTest
-{
-  
-  @Test
-  public void ReduceFnTest()
-  {
-    ReduceFn<String> concat = new ReduceFn<String>()
-    {
-      @Override
-      public String reduce(String input1, String input2)
-      {
-        return input1 + ", " + input2;
-      }
-    };
-    
-    String[] ss = new String[]{"b", "c", "d", "e"};
-    String base = "a";
-    
-    for (String s : ss) {
-      base = concat.accumulate(base, s);
-    }
-    Assert.assertEquals("a, b, c, d, e", base);
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
deleted file mode 100644
index f0196d2..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/RemoveDuplicatesTest.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.Set;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-/**
- * Test for {@link RemoveDuplicates}.
- */
-public class RemoveDuplicatesTest
-{
-  @Test
-  public void RemoveDuplicatesTest()
-  {
-    RemoveDuplicates<Integer> rd = new RemoveDuplicates<>();
-    
-    Set<Integer> accu = rd.defaultAccumulatedValue();
-    Assert.assertEquals(0, accu.size());
-    Assert.assertEquals(1, rd.accumulate(accu, 10).size());
-    Assert.assertEquals(2, rd.accumulate(accu, 11).size());
-    Assert.assertEquals(2, rd.accumulate(accu, 11).size());
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
deleted file mode 100644
index 65b6480..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/SumTest.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.apache.commons.lang.mutable.MutableDouble;
-import org.apache.commons.lang.mutable.MutableFloat;
-import org.apache.commons.lang.mutable.MutableInt;
-import org.apache.commons.lang.mutable.MutableLong;
-
-/**
- * Test for different Sum Accumulations.
- */
-public class SumTest
-{
-  @Test
-  public void SumTest()
-  {
-    SumInt si = new SumInt();
-    SumLong sl = new SumLong();
-    SumFloat sf = new SumFloat();
-    SumDouble sd = new SumDouble();
-    
-    Assert.assertEquals(new MutableInt(10), 
si.accumulate(si.defaultAccumulatedValue(), 10));
-    Assert.assertEquals(new MutableInt(11), si.accumulate(new MutableInt(1), 
10));
-    Assert.assertEquals(new MutableInt(22), si.merge(new MutableInt(1), new 
MutableInt(21)));
-    
-    Assert.assertEquals(new MutableLong(10L), 
sl.accumulate(sl.defaultAccumulatedValue(), 10L));
-    Assert.assertEquals(new MutableLong(22L), sl.accumulate(new 
MutableLong(2L), 20L));
-    Assert.assertEquals(new MutableLong(41L), sl.merge(new MutableLong(32L), 
new MutableLong(9L)));
-    
-    Assert.assertEquals(new MutableFloat(9.0F), 
sf.accumulate(sf.defaultAccumulatedValue(), 9.0F));
-    Assert.assertEquals(new MutableFloat(22.5F), sf.accumulate(new 
MutableFloat(2.5F), 20F));
-    Assert.assertEquals(new MutableFloat(41.0F), sf.merge(new 
MutableFloat(33.1F), new MutableFloat(7.9F)));
-    
-    Assert.assertEquals(new MutableDouble(9.0), 
sd.accumulate(sd.defaultAccumulatedValue(), 9.0));
-    Assert.assertEquals(new MutableDouble(22.5), sd.accumulate(new 
MutableDouble(2.5), 20.0));
-    Assert.assertEquals(new MutableDouble(41.0), sd.merge(new 
MutableDouble(33.1), new MutableDouble(7.9)));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
----------------------------------------------------------------------
diff --git 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
 
b/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
deleted file mode 100644
index 3f6ac09..0000000
--- 
a/library/src/test/java/org/apache/apex/malhar/lib/window/impl/accumulation/TopNByKeyTest.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.apex.malhar.lib.window.impl.accumulation;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import com.datatorrent.lib.util.KeyValPair;
-
-/**
- * Unit test for TopNByKey accumulation
- */
-public class TopNByKeyTest
-{
-  @Test
-  public void TopNByKeyTest() throws Exception
-  {
-    TopNByKey<String, Integer> topNByKey = new TopNByKey<>();
-    topNByKey.setN(3);
-    Map<String, Integer> accu = topNByKey.defaultAccumulatedValue();
-  
-    Assert.assertEquals(0, accu.size());
-    
-    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("1", 1));
-    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("3", 3));
-    
-    List<KeyValPair<String, Integer>> result1 = new ArrayList<>();
-  
-    result1.add(new KeyValPair<String, Integer>("3", 3));
-    result1.add(new KeyValPair<String, Integer>("1", 1));
-    
-    Assert.assertEquals(result1, topNByKey.getOutput(accu));
-    
-    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("2", 2));
-  
-    List<KeyValPair<String, Integer>> result2 = new ArrayList<>();
-  
-    result2.add(new KeyValPair<String, Integer>("3", 3));
-    result2.add(new KeyValPair<String, Integer>("2", 2));
-    result2.add(new KeyValPair<String, Integer>("1", 1));
-    
-    Assert.assertEquals(result2, topNByKey.getOutput(accu));
-    
-    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("5", 5));
-    accu = topNByKey.accumulate(accu, new KeyValPair<String, Integer>("4", 4));
-  
-    List<KeyValPair<String, Integer>> result3 = new ArrayList<>();
-    
-    result3.add(new KeyValPair<String, Integer>("5", 5));
-    result3.add(new KeyValPair<String, Integer>("4", 4));
-    result3.add(new KeyValPair<String, Integer>("3", 3));
-    
-    Assert.assertEquals(result3, topNByKey.getOutput(accu));
-  }
-}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
----------------------------------------------------------------------
diff --git 
a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java 
b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
index 84f05fc..0f5ce1e 100644
--- a/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
+++ b/stream/src/main/java/org/apache/apex/malhar/stream/api/WindowedStream.java
@@ -25,10 +25,10 @@ import org.joda.time.Duration;
 import org.apache.apex.malhar.lib.window.Accumulation;
 import org.apache.apex.malhar.lib.window.TriggerOption;
 import org.apache.apex.malhar.lib.window.Tuple;
+import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
 import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
 import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
-import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn;
-import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
 import org.apache.apex.malhar.stream.api.function.Function;
 import org.apache.hadoop.classification.InterfaceStability;
 

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/2b2d5bca/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
----------------------------------------------------------------------
diff --git 
a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
 
b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
index ebd5eea..5866a4c 100644
--- 
a/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
+++ 
b/stream/src/main/java/org/apache/apex/malhar/stream/api/impl/ApexWindowedStreamImpl.java
@@ -28,14 +28,14 @@ import org.apache.apex.malhar.lib.window.Tuple;
 import org.apache.apex.malhar.lib.window.WindowOption;
 import org.apache.apex.malhar.lib.window.WindowState;
 
+import org.apache.apex.malhar.lib.window.accumulation.Count;
+import org.apache.apex.malhar.lib.window.accumulation.FoldFn;
+import org.apache.apex.malhar.lib.window.accumulation.ReduceFn;
+import org.apache.apex.malhar.lib.window.accumulation.TopN;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedKeyedStorage;
 import org.apache.apex.malhar.lib.window.impl.InMemoryWindowedStorage;
 import org.apache.apex.malhar.lib.window.impl.KeyedWindowedOperatorImpl;
 import org.apache.apex.malhar.lib.window.impl.WindowedOperatorImpl;
-import org.apache.apex.malhar.lib.window.impl.accumulation.Count;
-import org.apache.apex.malhar.lib.window.impl.accumulation.FoldFn;
-import org.apache.apex.malhar.lib.window.impl.accumulation.ReduceFn;
-import org.apache.apex.malhar.lib.window.impl.accumulation.TopN;
 
 import org.apache.apex.malhar.stream.api.ApexStream;
 import org.apache.apex.malhar.stream.api.Option;

Reply via email to