http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java
new file mode 100644
index 0000000..337cefb
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Sink;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link InvertIndex} <p>
+ *
+ */
+@Deprecated
+public class InvertIndexTest
+{
+  private static Logger log = LoggerFactory.getLogger(InvertIndexTest.class);
+
+  /**
+   * Test oper logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    InvertIndex<String,String> oper = new InvertIndex<String,String>();
+    CollectorTestSink indexSink = new CollectorTestSink();
+
+    Sink inSink = oper.data.getSink();
+    oper.index.setSink(indexSink);
+
+    oper.beginWindow(0);
+
+    HashMap<String, String> input = new HashMap<String, String>();
+
+    input.put("a", "str");
+    input.put("b", "str");
+    inSink.put(input);
+
+    input.clear();
+    input.put("a", "str1");
+    input.put("b", "str1");
+    inSink.put(input);
+
+    input.clear();
+    input.put("c", "blah");
+    inSink.put(input);
+
+    input.clear();
+    input.put("c", "str1");
+    inSink.put(input);
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 3, 
indexSink.collectedTuples.size());
+    for (Object o: indexSink.collectedTuples) {
+      log.debug(o.toString());
+      HashMap<String, ArrayList<String>> output = (HashMap<String, 
ArrayList<String>>)o;
+      for (Map.Entry<String, ArrayList<String>> e: output.entrySet()) {
+        String key = e.getKey();
+        ArrayList<String> alist = e.getValue();
+        if (key.equals("str1")) {
+          Assert.assertEquals("Index for \"str1\" contains \"a\"", true, 
alist.contains("a"));
+          Assert.assertEquals("Index for \"str1\" contains \"b\"", true, 
alist.contains("b"));
+          Assert.assertEquals("Index for \"str1\" contains \"c\"", true, 
alist.contains("c"));
+
+        } else if (key.equals("str")) {
+          Assert.assertEquals("Index for \"str1\" contains \"a\"", true, 
alist.contains("a"));
+          Assert.assertEquals("Index for \"str1\" contains \"b\"", true, 
alist.contains("b"));
+          Assert.assertEquals("Index for \"str1\" contains \"c\"", false, 
alist.contains("c"));
+
+        } else if (key.equals("blah")) {
+          Assert.assertEquals("Index for \"str1\" contains \"a\"", false, 
alist.contains("a"));
+          Assert.assertEquals("Index for \"str1\" contains \"b\"", false, 
alist.contains("b"));
+          Assert.assertEquals("Index for \"str1\" contains \"c\"", true, 
alist.contains("c"));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java
new file mode 100644
index 0000000..c930932
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LastMatchMapTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link LastMatchMap}<p>
+ *
+ */
+@Deprecated
+public class LastMatchMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new LastMatchMap<String, Integer>());
+    testNodeProcessingSchema(new LastMatchMap<String, Double>());
+    testNodeProcessingSchema(new LastMatchMap<String, Float>());
+    testNodeProcessingSchema(new LastMatchMap<String, Short>());
+    testNodeProcessingSchema(new LastMatchMap<String, Long>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(LastMatchMap oper)
+  {
+    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
+    oper.last.setSink(matchSink);
+    oper.setKey("a");
+    oper.setValue(3);
+    oper.setTypeEQ();
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    input.put("a", 4);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.put("a", 3);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 2);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 4);
+    input.put("b", 21);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 3);
+    input.put("b", 52);
+    input.put("c", 5);
+    oper.data.process(input);
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
+    HashMap<String, Number> tuple = (HashMap<String, Number>)matchSink.tuple;
+    Number aval = tuple.get("a");
+    Number bval = tuple.get("b");
+    Assert.assertEquals("Value of a was ", 3, aval.intValue());
+    Assert.assertEquals("Value of a was ", 52, bval.intValue());
+    matchSink.clear();
+
+    oper.beginWindow(0);
+    input.clear();
+    input.put("a", 2);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 5);
+    oper.data.process(input);
+    oper.endWindow();
+    // There should be no emit as all tuples do not match
+    Assert.assertEquals("number emitted tuples", 0, matchSink.count);
+    matchSink.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMapTest.java
new file mode 100644
index 0000000..2c0fcad
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyMapTest.java
@@ -0,0 +1,117 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link LeastFrequentKeyMap}<p>
+ *
+ */
+@Deprecated
+public class LeastFrequentKeyMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    LeastFrequentKeyMap<String, Integer> oper = new 
LeastFrequentKeyMap<String, Integer>();
+    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
+    CountAndLastTupleTestSink listSink = new CountAndLastTupleTestSink();
+    oper.least.setSink(matchSink);
+    oper.list.setSink(listSink);
+
+    oper.beginWindow(0);
+    HashMap<String, Integer> amap = new HashMap<String, Integer>(1);
+    HashMap<String, Integer> bmap = new HashMap<String, Integer>(1);
+    HashMap<String, Integer> cmap = new HashMap<String, Integer>(1);
+    int atot = 5;
+    int btot = 3;
+    int ctot = 6;
+    amap.put("a", null);
+    bmap.put("b", null);
+    cmap.put("c", null);
+    for (int i = 0; i < atot; i++) {
+      oper.data.process(amap);
+    }
+    for (int i = 0; i < btot; i++) {
+      oper.data.process(bmap);
+    }
+    for (int i = 0; i < ctot; i++) {
+      oper.data.process(cmap);
+    }
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
+    HashMap<String, Integer> tuple = (HashMap<String, Integer>)matchSink.tuple;
+    Integer val = tuple.get("b");
+    Assert.assertEquals("Count of b was ", btot, val.intValue());
+    Assert.assertEquals("number emitted tuples", 1, listSink.count);
+    ArrayList<HashMap<String, Integer>> list = (ArrayList<HashMap<String, 
Integer>>)listSink.tuple;
+    val = list.get(0).get("b");
+    Assert.assertEquals("Count of b was ", btot, val.intValue());
+
+    matchSink.clear();
+    listSink.clear();
+    oper.beginWindow(0);
+    atot = 5;
+    btot = 10;
+    ctot = 5;
+    for (int i = 0; i < atot; i++) {
+      oper.data.process(amap);
+    }
+    for (int i = 0; i < btot; i++) {
+      oper.data.process(bmap);
+    }
+    for (int i = 0; i < ctot; i++) {
+      oper.data.process(cmap);
+    }
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
+    Assert.assertEquals("number emitted tuples", 1, listSink.count);
+    list = (ArrayList<HashMap<String, Integer>>)listSink.tuple;
+    int acount = 0;
+    int ccount = 0;
+    for (HashMap<String, Integer> h : list) {
+      val = h.get("a");
+      if (val == null) {
+        ccount = h.get("c");
+      } else {
+        acount = val;
+      }
+    }
+    Assert.assertEquals("Count of a was ", atot, acount);
+    Assert.assertEquals("Count of c was ", ctot, ccount);
+    HashMap<String, Integer> mtuple = (HashMap<String, 
Integer>)matchSink.tuple;
+    val = mtuple.get("a");
+    if (val == null) {
+      val = mtuple.get("c");
+    }
+    Assert.assertEquals("Count of least frequent key was ", ctot, 
val.intValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMapTest.java
new file mode 100644
index 0000000..287312c
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/LeastFrequentKeyValueMapTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link LeastFrequentKeyValueMap}<p>
+ *
+ */
+@Deprecated
+public class LeastFrequentKeyValueMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    LeastFrequentKeyValueMap<String, Integer> oper = new 
LeastFrequentKeyValueMap<String, Integer>();
+    CollectorTestSink matchSink = new CollectorTestSink();
+    oper.least.setSink(matchSink);
+
+    oper.beginWindow(0);
+    HashMap<String, Integer> amap = new HashMap<String, Integer>(1);
+    HashMap<String, Integer> bmap = new HashMap<String, Integer>(1);
+    HashMap<String, Integer> cmap = new HashMap<String, Integer>(1);
+    int atot1 = 5;
+    int btot1 = 3;
+    int ctot1 = 6;
+    amap.put("a", 1);
+    bmap.put("b", 2);
+    cmap.put("c", 4);
+    for (int i = 0; i < atot1; i++) {
+      oper.data.process(amap);
+    }
+    for (int i = 0; i < btot1; i++) {
+      oper.data.process(bmap);
+    }
+    for (int i = 0; i < ctot1; i++) {
+      oper.data.process(cmap);
+    }
+
+    atot1 = 4;
+    btot1 = 3;
+    ctot1 = 10;
+    amap.put("a", 5);
+    bmap.put("b", 4);
+    cmap.put("c", 3);
+    for (int i = 0; i < atot1; i++) {
+      oper.data.process(amap);
+    }
+    for (int i = 0; i < btot1; i++) {
+      oper.data.process(bmap);
+    }
+    for (int i = 0; i < ctot1; i++) {
+      oper.data.process(cmap);
+    }
+
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 3, 
matchSink.collectedTuples.size());
+    int vcount;
+    for (Object o: matchSink.collectedTuples) {
+      HashMap<String, HashMap<Integer, Integer>> omap = (HashMap<String, 
HashMap<Integer, Integer>>)o;
+      for (Map.Entry<String, HashMap<Integer, Integer>> e: omap.entrySet()) {
+        String key = e.getKey();
+        if (key.equals("a")) {
+          vcount = e.getValue().get(5);
+          Assert.assertEquals("Key \"a\" has value ", 4, vcount);
+
+        } else if (key.equals("b")) {
+          vcount = e.getValue().get(2);
+          Assert.assertEquals("Key \"a\" has value ", 3, vcount);
+          vcount = e.getValue().get(4);
+          Assert.assertEquals("Key \"a\" has value ", 3, vcount);
+
+        } else if (key.equals("c")) {
+          vcount = e.getValue().get(4);
+          Assert.assertEquals("Key \"a\" has value ", 6, vcount);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MatchMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MatchMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MatchMapTest.java
new file mode 100644
index 0000000..1d2b5ff
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MatchMapTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.algo.MatchMap;
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link com.datatorrent.lib.algo.MatchMap}<p>
+ *
+ */
+@Deprecated
+public class MatchMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new MatchMap<String, Integer>());
+    testNodeProcessingSchema(new MatchMap<String, Double>());
+    testNodeProcessingSchema(new MatchMap<String, Float>());
+    testNodeProcessingSchema(new MatchMap<String, Short>());
+    testNodeProcessingSchema(new MatchMap<String, Long>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(MatchMap oper)
+  {
+    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
+    oper.match.setSink(matchSink);
+    oper.setKey("a");
+    oper.setValue(3.0);
+    oper.setTypeNEQ();
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    input.put("a", 2);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 3);
+    oper.data.process(input);
+    oper.endWindow();
+
+    // One for each key
+    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
+    for (Map.Entry<String, Number> e : ((HashMap<String, 
Number>)matchSink.tuple).entrySet()) {
+      if (e.getKey().equals("a")) {
+        Assert.assertEquals("emitted value for 'a' was ", new Double(2), new 
Double(e.getValue().doubleValue()));
+      } else if (e.getKey().equals("b")) {
+        Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), new 
Double(e.getValue().doubleValue()));
+      } else if (e.getKey().equals("c")) {
+        Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), 
new Double(e.getValue().doubleValue()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMapTest.java
new file mode 100644
index 0000000..58a1059
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyMapTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.junit.Assert;
+
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link MostFrequentKeyMap}<p>
+ *
+ */
+@Deprecated
+public class MostFrequentKeyMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    MostFrequentKeyMap<String, Integer> oper = new MostFrequentKeyMap<String, 
Integer>();
+    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
+    CountAndLastTupleTestSink listSink = new CountAndLastTupleTestSink();
+    oper.most.setSink(matchSink);
+    oper.list.setSink(listSink);
+
+    oper.beginWindow(0);
+    HashMap<String, Integer> amap = new HashMap<String, Integer>(1);
+    HashMap<String, Integer> bmap = new HashMap<String, Integer>(1);
+    HashMap<String, Integer> cmap = new HashMap<String, Integer>(1);
+    int atot = 5;
+    int btot = 7;
+    int ctot = 6;
+    amap.put("a", null);
+    bmap.put("b", null);
+    cmap.put("c", null);
+    for (int i = 0; i < atot; i++) {
+      oper.data.process(amap);
+    }
+    for (int i = 0; i < btot; i++) {
+      oper.data.process(bmap);
+    }
+    for (int i = 0; i < ctot; i++) {
+      oper.data.process(cmap);
+    }
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
+    HashMap<String, Integer> tuple = (HashMap<String, Integer>)matchSink.tuple;
+    Integer val = tuple.get("b");
+    Assert.assertEquals("Count of b was ", btot, val.intValue());
+    Assert.assertEquals("number emitted tuples", 1, listSink.count);
+    ArrayList<HashMap<String, Integer>> list = (ArrayList<HashMap<String, 
Integer>>)listSink.tuple;
+    val = list.get(0).get("b");
+    Assert.assertEquals("Count of b was ", btot, val.intValue());
+
+    matchSink.clear();
+    listSink.clear();
+    oper.beginWindow(0);
+    atot = 5;
+    btot = 4;
+    ctot = 5;
+    for (int i = 0; i < atot; i++) {
+      oper.data.process(amap);
+    }
+    for (int i = 0; i < btot; i++) {
+      oper.data.process(bmap);
+    }
+    for (int i = 0; i < ctot; i++) {
+      oper.data.process(cmap);
+    }
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
+    Assert.assertEquals("number emitted tuples", 1, listSink.count);
+    list = (ArrayList<HashMap<String, Integer>>)listSink.tuple;
+    int acount = 0;
+    int ccount = 0;
+    for (HashMap<String, Integer> h : list) {
+      val = h.get("a");
+      if (val == null) {
+        ccount = h.get("c");
+      } else {
+        acount = val;
+      }
+    }
+    Assert.assertEquals("Count of a was ", atot, acount);
+    Assert.assertEquals("Count of c was ", ctot, ccount);
+    HashMap<String, Integer> mtuple = (HashMap<String, 
Integer>)matchSink.tuple;
+    val = mtuple.get("a");
+    if (val == null) {
+      val = mtuple.get("c");
+    }
+    Assert.assertEquals("Count of least frequent key was ", ctot, 
val.intValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMapTest.java
new file mode 100644
index 0000000..c356079
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/MostFrequentKeyValueMapTest.java
@@ -0,0 +1,109 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link MostFrequentKeyValueMap}<p>
+ *
+ */
+@Deprecated
+public class MostFrequentKeyValueMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    MostFrequentKeyValueMap<String, Integer> oper = new 
MostFrequentKeyValueMap<String, Integer>();
+    CollectorTestSink matchSink = new CollectorTestSink();
+    oper.most.setSink(matchSink);
+
+    oper.beginWindow(0);
+    HashMap<String, Integer> amap = new HashMap<String, Integer>(1);
+    HashMap<String, Integer> bmap = new HashMap<String, Integer>(1);
+    HashMap<String, Integer> cmap = new HashMap<String, Integer>(1);
+    int atot1 = 5;
+    int btot1 = 3;
+    int ctot1 = 6;
+    amap.put("a", 1);
+    bmap.put("b", 2);
+    cmap.put("c", 4);
+    for (int i = 0; i < atot1; i++) {
+      oper.data.process(amap);
+    }
+    for (int i = 0; i < btot1; i++) {
+      oper.data.process(bmap);
+    }
+    for (int i = 0; i < ctot1; i++) {
+      oper.data.process(cmap);
+    }
+
+    atot1 = 4;
+    btot1 = 3;
+    ctot1 = 10;
+    amap.put("a", 5);
+    bmap.put("b", 4);
+    cmap.put("c", 3);
+    for (int i = 0; i < atot1; i++) {
+      oper.data.process(amap);
+    }
+    for (int i = 0; i < btot1; i++) {
+      oper.data.process(bmap);
+    }
+    for (int i = 0; i < ctot1; i++) {
+      oper.data.process(cmap);
+    }
+
+    oper.endWindow();
+    Assert.assertEquals("number emitted tuples", 3, 
matchSink.collectedTuples.size());
+    int vcount;
+    for (Object o: matchSink.collectedTuples) {
+      HashMap<String, HashMap<Integer, Integer>> omap = (HashMap<String, 
HashMap<Integer, Integer>>)o;
+      for (Map.Entry<String, HashMap<Integer, Integer>> e: omap.entrySet()) {
+        String key = e.getKey();
+        if (key.equals("a")) {
+          vcount = e.getValue().get(1);
+          Assert.assertEquals("Key \"a\" has value ", 5, vcount);
+
+        } else if (key.equals("b")) {
+          vcount = e.getValue().get(2);
+          Assert.assertEquals("Key \"a\" has value ", 3, vcount);
+          vcount = e.getValue().get(4);
+          Assert.assertEquals("Key \"a\" has value ", 3, vcount);
+
+        } else if (key.equals("c")) {
+          vcount = e.getValue().get(3);
+          Assert.assertEquals("Key \"a\" has value ", 10, vcount);
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/SamplerTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/SamplerTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/SamplerTest.java
new file mode 100644
index 0000000..e91b0f9
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/SamplerTest.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import org.junit.Assert;
+
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CountTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link Sampler}<p>
+ *
+ */
+@Deprecated
+public class SamplerTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    Sampler<String> oper = new Sampler<String>();
+    CountTestSink sink = new CountTestSink<String>();
+    oper.sample.setSink(sink);
+    oper.setSamplingPercentage(.1);
+
+    String tuple = "a";
+
+
+    int numTuples = 10000;
+    oper.beginWindow(0);
+    for (int i = 0; i < numTuples; i++) {
+      oper.data.process(tuple);
+    }
+
+    oper.endWindow();
+    int lowerlimit = 5;
+    int upperlimit = 15;
+    int actual = (100 * sink.count) / numTuples;
+
+    Assert.assertEquals("number emitted tuples", true, lowerlimit < actual);
+    Assert.assertEquals("number emitted tuples", true, upperlimit > actual);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyValTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyValTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyValTest.java
new file mode 100644
index 0000000..a5bd664
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertKeyValTest.java
@@ -0,0 +1,108 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import org.junit.Assert;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ *
+ * Functional tests for {@link ChangeAlertKeyVal}.
+ * <p>
+ * @deprecated
+ */
+@Deprecated
+public class ChangeAlertKeyValTest
+{
+  private static Logger log = LoggerFactory
+      .getLogger(ChangeAlertKeyValTest.class);
+
+  /**
+   * Test node logic emits correct results.
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Integer>());
+    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Double>());
+    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Float>());
+    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Short>());
+    testNodeProcessingSchema(new ChangeAlertKeyVal<String, Long>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public <V extends Number> void testNodeProcessingSchema(
+      ChangeAlertKeyVal<String, V> oper)
+  {
+    CollectorTestSink alertSink = new CollectorTestSink();
+
+    oper.alert.setSink(alertSink);
+    oper.setPercentThreshold(5);
+
+    oper.beginWindow(0);
+    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(200)));
+    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(10)));
+    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(100)));
+
+    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(203)));
+    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12)));
+    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(101)));
+
+    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(210)));
+    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(12)));
+    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(102)));
+
+    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(231)));
+    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(18)));
+    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(103)));
+    oper.endWindow();
+
+    // One for a, Two for b
+    Assert.assertEquals("number emitted tuples", 3,
+        alertSink.collectedTuples.size());
+
+    double aval = 0;
+    double bval = 0;
+    log.debug("\nLogging tuples");
+    for (Object o : alertSink.collectedTuples) {
+      KeyValPair<String, KeyValPair<Number, Double>> map = (KeyValPair<String, 
KeyValPair<Number, Double>>)o;
+
+      log.debug(o.toString());
+      if (map.getKey().equals("a")) {
+        KeyValPair<Number, Double> vmap = map.getValue();
+        if (vmap != null) {
+          aval += vmap.getValue().doubleValue();
+        }
+      } else {
+        KeyValPair<Number, Double> vmap = map.getValue();
+        if (vmap != null) {
+          bval += vmap.getValue().doubleValue();
+        }
+      }
+    }
+    Assert.assertEquals("change in a", 10.0, aval,0);
+    Assert.assertEquals("change in a", 70.0, bval,0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMapTest.java
new file mode 100644
index 0000000..aa757af
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertMapTest.java
@@ -0,0 +1,116 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ *
+ * Functional tests for {@link ChangeAlertMap}.
+ * <p>
+ * @deprecated
+ */
+@Deprecated
+public class ChangeAlertMapTest
+{
+  private static Logger log = 
LoggerFactory.getLogger(ChangeAlertMapTest.class);
+
+  /**
+   * Test node logic emits correct results.
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new ChangeAlertMap<String, Integer>());
+    testNodeProcessingSchema(new ChangeAlertMap<String, Double>());
+    testNodeProcessingSchema(new ChangeAlertMap<String, Float>());
+    testNodeProcessingSchema(new ChangeAlertMap<String, Short>());
+    testNodeProcessingSchema(new ChangeAlertMap<String, Long>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public <V extends Number> void testNodeProcessingSchema(
+      ChangeAlertMap<String, V> oper)
+  {
+    CollectorTestSink alertSink = new CollectorTestSink();
+
+    oper.alert.setSink(alertSink);
+    oper.setPercentThreshold(5);
+
+    oper.beginWindow(0);
+    HashMap<String, V> input = new HashMap<String, V>();
+    input.put("a", oper.getValue(200));
+    input.put("b", oper.getValue(10));
+    input.put("c", oper.getValue(100));
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", oper.getValue(203));
+    input.put("b", oper.getValue(12));
+    input.put("c", oper.getValue(101));
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", oper.getValue(210));
+    input.put("b", oper.getValue(12));
+    input.put("c", oper.getValue(102));
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", oper.getValue(231));
+    input.put("b", oper.getValue(18));
+    input.put("c", oper.getValue(103));
+    oper.data.process(input);
+    oper.endWindow();
+
+    // One for a, Two for b
+    Assert.assertEquals("number emitted tuples", 3,
+        alertSink.collectedTuples.size());
+
+    double aval = 0;
+    double bval = 0;
+    log.debug("\nLogging tuples");
+    for (Object o : alertSink.collectedTuples) {
+      HashMap<String, HashMap<Number, Double>> map = (HashMap<String, 
HashMap<Number, Double>>)o;
+      Assert.assertEquals("map size", 1, map.size());
+      log.debug(o.toString());
+      HashMap<Number, Double> vmap = map.get("a");
+      if (vmap != null) {
+        aval += vmap.get(231.0).doubleValue();
+      }
+      vmap = map.get("b");
+      if (vmap != null) {
+        if (vmap.get(12.0) != null) {
+          bval += vmap.get(12.0).doubleValue();
+        } else {
+          bval += vmap.get(18.0).doubleValue();
+        }
+      }
+    }
+    Assert.assertEquals("change in a", 10.0, aval,0);
+    Assert.assertEquals("change in a", 70.0, bval,0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertTest.java
new file mode 100644
index 0000000..745b7e5
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeAlertTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ *
+ * Functional tests for {@link ChangeAlert}. <p>
+ * @deprecated
+ */
+@Deprecated
+public class ChangeAlertTest
+{
+  private static Logger log = LoggerFactory.getLogger(ChangeAlertTest.class);
+
+  /**
+   * Test node logic emits correct results.
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new ChangeAlert<Integer>());
+    testNodeProcessingSchema(new ChangeAlert<Double>());
+    testNodeProcessingSchema(new ChangeAlert<Float>());
+    testNodeProcessingSchema(new ChangeAlert<Short>());
+    testNodeProcessingSchema(new ChangeAlert<Long>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public <V extends Number> void testNodeProcessingSchema(ChangeAlert<V> oper)
+  {
+    CollectorTestSink alertSink = new CollectorTestSink();
+
+    oper.alert.setSink(alertSink);
+    oper.setPercentThreshold(5);
+
+    oper.beginWindow(0);
+    oper.data.process(oper.getValue(10));
+    oper.data.process(oper.getValue(12)); // alert
+    oper.data.process(oper.getValue(12));
+    oper.data.process(oper.getValue(18)); // alert
+    oper.data.process(oper.getValue(0));  // alert
+    oper.data.process(oper.getValue(20)); // this will not alert
+    oper.data.process(oper.getValue(30)); // alert
+
+    oper.endWindow();
+
+    // One for a, Two for b
+    Assert.assertEquals("number emitted tuples", 4, 
alertSink.collectedTuples.size());
+
+    double aval = 0;
+    log.debug("\nLogging tuples");
+    for (Object o: alertSink.collectedTuples) {
+      KeyValPair<Number, Double> map = (KeyValPair<Number, Double>)o;
+      log.debug(o.toString());
+      aval += map.getValue().doubleValue();
+    }
+    Assert.assertEquals("change in a", 220.0, aval,0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyValTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyValTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyValTest.java
new file mode 100644
index 0000000..0b3318c
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeKeyValTest.java
@@ -0,0 +1,112 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ *
+ * Functional tests for {@link ChangeKeyVal}.
+ * <p>
+ * @deprecated
+ */
+@Deprecated
+public class ChangeKeyValTest
+{
+  private static Logger log = LoggerFactory.getLogger(ChangeKeyValTest.class);
+
+  /**
+   * Test node logic emits correct results.
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new ChangeKeyVal<String, Integer>());
+    testNodeProcessingSchema(new ChangeKeyVal<String, Double>());
+    testNodeProcessingSchema(new ChangeKeyVal<String, Float>());
+    testNodeProcessingSchema(new ChangeKeyVal<String, Short>());
+    testNodeProcessingSchema(new ChangeKeyVal<String, Long>());
+  }
+
+  /**
+   *
+   * @param oper
+   *          key/value pair for comparison.
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public <V extends Number> void testNodeProcessingSchema(
+      ChangeKeyVal<String, V> oper)
+  {
+    CollectorTestSink changeSink = new CollectorTestSink();
+    CollectorTestSink percentSink = new CollectorTestSink();
+
+    oper.change.setSink(changeSink);
+    oper.percent.setSink(percentSink);
+
+    oper.beginWindow(0);
+    oper.base.process(new KeyValPair<String, V>("a", oper.getValue(2)));
+    oper.base.process(new KeyValPair<String, V>("b", oper.getValue(10)));
+    oper.base.process(new KeyValPair<String, V>("c", oper.getValue(100)));
+
+    oper.data.process(new KeyValPair<String, V>("a", oper.getValue(3)));
+    oper.data.process(new KeyValPair<String, V>("b", oper.getValue(2)));
+    oper.data.process(new KeyValPair<String, V>("c", oper.getValue(4)));
+
+    oper.endWindow();
+
+    // One for each key
+    Assert.assertEquals("number emitted tuples", 3,
+        changeSink.collectedTuples.size());
+    Assert.assertEquals("number emitted tuples", 3,
+        percentSink.collectedTuples.size());
+
+    log.debug("\nLogging tuples");
+    for (Object o : changeSink.collectedTuples) {
+      KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o;
+      if (kv.getKey().equals("a")) {
+        Assert.assertEquals("change in a ", 1.0, kv.getValue());
+      }
+      if (kv.getKey().equals("b")) {
+        Assert.assertEquals("change in b ", -8.0, kv.getValue());
+      }
+      if (kv.getKey().equals("c")) {
+        Assert.assertEquals("change in c ", -96.0, kv.getValue());
+      }
+    }
+
+    for (Object o : percentSink.collectedTuples) {
+      KeyValPair<String, Number> kv = (KeyValPair<String, Number>)o;
+      if (kv.getKey().equals("a")) {
+        Assert.assertEquals("change in a ", 50.0, kv.getValue());
+      }
+      if (kv.getKey().equals("b")) {
+        Assert.assertEquals("change in b ", -80.0, kv.getValue());
+      }
+      if (kv.getKey().equals("c")) {
+        Assert.assertEquals("change in c ", -96.0, kv.getValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeTest.java
new file mode 100644
index 0000000..9ce0a73
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ChangeTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ *
+ * Functional tests for {@link Change}.
+ * <p>
+ * @deprecated
+ */
+@Deprecated
+public class ChangeTest
+{
+  private static Logger log = LoggerFactory.getLogger(ChangeTest.class);
+
+  /**
+   * Test node logic emits correct results.
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new Change<Integer>());
+    testNodeProcessingSchema(new Change<Double>());
+    testNodeProcessingSchema(new Change<Float>());
+    testNodeProcessingSchema(new Change<Short>());
+    testNodeProcessingSchema(new Change<Long>());
+  }
+
+  /**
+   *
+   * @param oper  Data value for comparison.
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public <V extends Number> void testNodeProcessingSchema(Change<V> oper)
+  {
+    CollectorTestSink changeSink = new CollectorTestSink();
+    CollectorTestSink percentSink = new CollectorTestSink();
+
+    oper.change.setSink(changeSink);
+    oper.percent.setSink(percentSink);
+
+    oper.beginWindow(0);
+    oper.base.process(oper.getValue(10));
+    oper.data.process(oper.getValue(5));
+    oper.data.process(oper.getValue(15));
+    oper.data.process(oper.getValue(20));
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 3,
+        changeSink.collectedTuples.size());
+    Assert.assertEquals("number emitted tuples", 3,
+        percentSink.collectedTuples.size());
+
+    log.debug("\nLogging tuples");
+    for (Object o : changeSink.collectedTuples) {
+      log.debug(String.format("change %s", o));
+    }
+    for (Object o : percentSink.collectedTuples) {
+      log.debug(String.format("percent change %s", o));
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMapTest.java
new file mode 100644
index 0000000..9785a4a
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareExceptMapTest.java
@@ -0,0 +1,98 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
+/**
+ *
+ * Functional tests for {@link CompareExceptMap}<p>
+ * @deprecated
+ */
+@Deprecated
+public class CompareExceptMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new CompareExceptMap<String, Integer>());
+    testNodeProcessingSchema(new CompareExceptMap<String, Double>());
+    testNodeProcessingSchema(new CompareExceptMap<String, Float>());
+    testNodeProcessingSchema(new CompareExceptMap<String, Short>());
+    testNodeProcessingSchema(new CompareExceptMap<String, Long>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(CompareExceptMap oper)
+  {
+    CountAndLastTupleTestSink compareSink = new CountAndLastTupleTestSink();
+    CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink();
+    oper.compare.setSink(compareSink);
+    oper.except.setSink(exceptSink);
+
+    oper.setKey("a");
+    oper.setValue(3.0);
+    oper.setTypeEQ();
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    input.put("a", 2);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 3);
+    input.put("b", 21);
+    input.put("c", 30);
+    oper.data.process(input);
+    oper.endWindow();
+
+    // One for each key
+    Assert.assertEquals("number emitted tuples", 1, exceptSink.count);
+    for (Map.Entry<String, Number> e : ((HashMap<String, 
Number>)exceptSink.tuple).entrySet()) {
+      if (e.getKey().equals("a")) {
+        Assert.assertEquals("emitted value for 'a' was ", new Double(2), 
e.getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("b")) {
+        Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), 
e.getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("c")) {
+        Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), 
e.getValue().doubleValue(), 0);
+      }
+    }
+
+    Assert.assertEquals("number emitted tuples", 1, compareSink.count);
+    for (Map.Entry<String, Number> e : ((HashMap<String, 
Number>)compareSink.tuple).entrySet()) {
+      if (e.getKey().equals("a")) {
+        Assert.assertEquals("emitted value for 'a' was ", new Double(3), 
e.getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("b")) {
+        Assert.assertEquals("emitted tuple for 'b' was ", new Double(21), 
e.getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("c")) {
+        Assert.assertEquals("emitted tuple for 'c' was ", new Double(30), 
e.getValue().doubleValue(), 0);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareMapTest.java
new file mode 100644
index 0000000..c91e4c9
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CompareMapTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
+/**
+ *
+ * Functional tests for {@link CompareMap}<p>
+ * @deprecated
+ */
+@Deprecated
+public class CompareMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new CompareMap<String, Integer>());
+    testNodeProcessingSchema(new CompareMap<String, Double>());
+    testNodeProcessingSchema(new CompareMap<String, Float>());
+    testNodeProcessingSchema(new CompareMap<String, Short>());
+    testNodeProcessingSchema(new CompareMap<String, Long>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(CompareMap oper)
+  {
+    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
+    oper.compare.setSink(matchSink);
+    oper.setKey("a");
+    oper.setValue(3.0);
+    oper.setTypeNEQ();
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+
+    input.put("a", 2);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 3);
+    oper.data.process(input);
+    oper.endWindow();
+
+    // One for each key
+    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
+    for (Map.Entry<String, Number> e : ((HashMap<String, 
Number>)matchSink.tuple).entrySet()) {
+      if (e.getKey().equals("a")) {
+        Assert.assertEquals("emitted value for 'a' was ", new Double(2), 
e.getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("b")) {
+        Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), 
e.getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("c")) {
+        Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), 
e.getValue().doubleValue(), 0);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CountKeyValTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CountKeyValTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CountKeyValTest.java
new file mode 100644
index 0000000..317790a
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/CountKeyValTest.java
@@ -0,0 +1,83 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+import com.datatorrent.lib.util.KeyValPair;
+
+/**
+ *
+ * Functional tests for {@link CountKeyVal}. <p>
+ * @deprecated
+ */
+@Deprecated
+public class CountKeyValTest
+{
+  /**
+   * Test operator logic emits correct results.
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing()
+  {
+    CountKeyVal<String, Double> oper = new CountKeyVal<String, Double>();
+    CollectorTestSink countSink = new CollectorTestSink();
+    oper.count.setSink(countSink);
+
+    oper.beginWindow(0); //
+
+    oper.data.process(new KeyValPair("a", 2.0));
+    oper.data.process(new KeyValPair("b", 20.0));
+    oper.data.process(new KeyValPair("c", 1000.0));
+    oper.data.process(new KeyValPair("a", 1.0));
+    oper.data.process(new KeyValPair("a", 10.0));
+    oper.data.process(new KeyValPair("b", 5.0));
+    oper.data.process(new KeyValPair("d", 55.0));
+    oper.data.process(new KeyValPair("b", 12.0));
+    oper.data.process(new KeyValPair("d", 22.0));
+    oper.data.process(new KeyValPair("d", 14.2));
+    oper.data.process(new KeyValPair("d", 46.0));
+    oper.data.process(new KeyValPair("e", 2.0));
+    oper.data.process(new KeyValPair("a", 23.0));
+    oper.data.process(new KeyValPair("d", 4.0));
+
+    oper.endWindow(); //
+
+    // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
+    Assert.assertEquals("number emitted tuples", 5, 
countSink.collectedTuples.size());
+    for (Object o : countSink.collectedTuples) {
+      KeyValPair<String, Integer> e = (KeyValPair<String, Integer>)o;
+      Integer val = (Integer)e.getValue();
+      if (e.getKey().equals("a")) {
+        Assert.assertEquals("emitted value for 'a' was ", 4, val.intValue());
+      } else if (e.getKey().equals("b")) {
+        Assert.assertEquals("emitted tuple for 'b' was ", 3, val.intValue());
+      } else if (e.getKey().equals("c")) {
+        Assert.assertEquals("emitted tuple for 'c' was ", 1, val.intValue());
+      } else if (e.getKey().equals("d")) {
+        Assert.assertEquals("emitted tuple for 'd' was ", 5, val.intValue());
+      } else if (e.getKey().equals("e")) {
+        Assert.assertEquals("emitted tuple for 'e' was ", 1, val.intValue());
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ExceptMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ExceptMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ExceptMapTest.java
new file mode 100644
index 0000000..8c8b267
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/ExceptMapTest.java
@@ -0,0 +1,85 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link ExceptMap}
+ */
+@Deprecated
+public class ExceptMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new ExceptMap<String, Integer>());
+    testNodeProcessingSchema(new ExceptMap<String, Double>());
+    testNodeProcessingSchema(new ExceptMap<String, Float>());
+    testNodeProcessingSchema(new ExceptMap<String, Short>());
+    testNodeProcessingSchema(new ExceptMap<String, Long>());
+  }
+
+  @SuppressWarnings({"rawtypes", "unchecked"})
+  public void testNodeProcessingSchema(ExceptMap oper)
+  {
+    CountAndLastTupleTestSink exceptSink = new CountAndLastTupleTestSink();
+    oper.except.setSink(exceptSink);
+    oper.setKey("a");
+    oper.setValue(3.0);
+    oper.setTypeEQ();
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    input.put("a", 2);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 3);
+    oper.data.process(input);
+    oper.endWindow();
+
+    // One for each key
+    Assert.assertEquals("number emitted tuples", 1, exceptSink.count);
+    for (Map.Entry<String, Number> e : ((HashMap<String, 
Number>)exceptSink.tuple)
+        .entrySet()) {
+      if (e.getKey().equals("a")) {
+        Assert.assertEquals("emitted value for 'a' was ", new Double(2), e
+            .getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("b")) {
+        Assert.assertEquals("emitted tuple for 'b' was ", new Double(20), e
+            .getValue().doubleValue(), 0);
+      } else if (e.getKey().equals("c")) {
+        Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000), e
+            .getValue().doubleValue(), 0);
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientMapTest.java
new file mode 100644
index 0000000..7682ae3
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientMapTest.java
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link QuotientMap}
+ */
+@Deprecated
+public class QuotientMapTest
+{
+  private static Logger LOG = LoggerFactory.getLogger(QuotientMap.class);
+
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new QuotientMap<String, Integer>());
+    testNodeProcessingSchema(new QuotientMap<String, Double>());
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void testNodeProcessingSchema(QuotientMap oper) throws Exception
+  {
+    CountAndLastTupleTestSink quotientSink = new CountAndLastTupleTestSink();
+
+    oper.quotient.setSink(quotientSink);
+    oper.setMult_by(2);
+
+    oper.beginWindow(0); //
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    int numtuples = 100;
+    for (int i = 0; i < numtuples; i++) {
+      input.clear();
+      input.put("a", 2);
+      input.put("b", 20);
+      input.put("c", 1000);
+      oper.numerator.process(input);
+      input.clear();
+      input.put("a", 2);
+      input.put("b", 40);
+      input.put("c", 500);
+      oper.denominator.process(input);
+    }
+
+    oper.endWindow();
+
+    // One for each key
+    Assert.assertEquals("number emitted tuples", 1, quotientSink.count);
+    HashMap<String, Number> output = (HashMap<String, 
Number>)quotientSink.tuple;
+    for (Map.Entry<String, Number> e : output.entrySet()) {
+      if (e.getKey().equals("a")) {
+        Assert.assertEquals("emitted value for 'a' was ", 2d,
+            e.getValue());
+      } else if (e.getKey().equals("b")) {
+        Assert.assertEquals("emitted tuple for 'b' was ", 1d,
+            e.getValue());
+      } else if (e.getKey().equals("c")) {
+        Assert.assertEquals("emitted tuple for 'c' was ", 4d,
+            e.getValue());
+      } else {
+        LOG.debug(String.format("key was %s", e.getKey()));
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientTest.java
new file mode 100644
index 0000000..4d1c7ff
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/QuotientTest.java
@@ -0,0 +1,104 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.api.Sink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link Quotient}
+ */
+@Deprecated
+public class QuotientTest
+{
+
+  class TestSink implements Sink<Object>
+  {
+    List<Object> collectedTuples = new ArrayList<Object>();
+
+    @Override
+    public void put(Object payload)
+    {
+      collectedTuples.add(payload);
+    }
+
+    @Override
+    public int getCount(boolean reset)
+    {
+      throw new UnsupportedOperationException("Not supported yet.");
+    }
+  }
+
+  /**
+   * Test oper logic emits correct results.
+   */
+  @Test
+  public void testNodeSchemaProcessing()
+  {
+    Quotient<Double> oper = new Quotient<Double>();
+    TestSink quotientSink = new TestSink();
+    oper.quotient.setSink(quotientSink);
+
+    oper.setMult_by(2);
+
+    oper.beginWindow(0); //
+    Double a = 30.0;
+    Double b = 20.0;
+    Double c = 100.0;
+    oper.denominator.process(a);
+    oper.denominator.process(b);
+    oper.denominator.process(c);
+
+    a = 5.0;
+    oper.numerator.process(a);
+    a = 1.0;
+    oper.numerator.process(a);
+    b = 44.0;
+    oper.numerator.process(b);
+
+    b = 10.0;
+    oper.numerator.process(b);
+    c = 22.0;
+    oper.numerator.process(c);
+    c = 18.0;
+    oper.numerator.process(c);
+
+    a = 0.5;
+    oper.numerator.process(a);
+    b = 41.5;
+    oper.numerator.process(b);
+    a = 8.0;
+    oper.numerator.process(a);
+    oper.endWindow(); //
+
+    // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
+    Assert.assertEquals("number emitted tuples", 1,
+        quotientSink.collectedTuples.size());
+    for (Object o : quotientSink.collectedTuples) { // sum is 1157
+      Double val = (Double)o;
+      Assert.assertEquals("emitted quotient value was ", new Double(2.0), val);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/SumCountMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/SumCountMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/SumCountMapTest.java
new file mode 100644
index 0000000..a50a3bd
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/math/SumCountMapTest.java
@@ -0,0 +1,156 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.math;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link SumCountMap}.
+ */
+@Deprecated
+public class SumCountMapTest
+{
+  /**
+   * Test operator logic emits correct results.
+   */
+  @Test
+  public void testNodeProcessing()
+  {
+    testNodeSchemaProcessing(true, true);
+    testNodeSchemaProcessing(true, false);
+    testNodeSchemaProcessing(false, true);
+    testNodeSchemaProcessing(false, false);
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes" })
+  public void testNodeSchemaProcessing(boolean sum, boolean count)
+  {
+    SumCountMap<String, Double> oper = new SumCountMap<String, Double>();
+    oper.setType(Double.class);
+    CollectorTestSink sumSink = new CollectorTestSink();
+    CollectorTestSink countSink = new CollectorTestSink();
+    if (sum) {
+      oper.sum.setSink(sumSink);
+    }
+    if (count) {
+      oper.count.setSink(countSink);
+    }
+
+    oper.beginWindow(0); //
+
+    HashMap<String, Double> input = new HashMap<String, Double>();
+
+    input.put("a", 2.0);
+    input.put("b", 20.0);
+    input.put("c", 1000.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 1.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 10.0);
+    input.put("b", 5.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("d", 55.0);
+    input.put("b", 12.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("d", 22.0);
+    oper.data.process(input);
+    input.clear();
+    input.put("d", 14.2);
+    oper.data.process(input);
+    input.clear();
+
+    // Mix integers and doubles
+    HashMap<String, Double> inputi = new HashMap<String, Double>();
+    inputi.put("d", 46.0);
+    inputi.put("e", 2.0);
+    oper.data.process(inputi);
+    inputi.clear();
+    inputi.put("a", 23.0);
+    inputi.put("d", 4.0);
+    oper.data.process(inputi);
+    inputi.clear();
+
+    oper.endWindow(); //
+
+    if (sum) {
+      // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
+      Assert.assertEquals("number emitted tuples", 1, 
sumSink.collectedTuples.size());
+
+      for (Object o : sumSink.collectedTuples) {
+        HashMap<String, Object> output = (HashMap<String, Object>)o;
+        for (Map.Entry<String, Object> e : output.entrySet()) {
+          Double val = (Double)e.getValue();
+          if (e.getKey().equals("a")) {
+            Assert.assertEquals("emitted value for 'a' was ", new Double(36),
+                val);
+          } else if (e.getKey().equals("b")) {
+            Assert.assertEquals("emitted tuple for 'b' was ", new Double(37),
+                val);
+          } else if (e.getKey().equals("c")) {
+            Assert.assertEquals("emitted tuple for 'c' was ", new Double(1000),
+                val);
+          } else if (e.getKey().equals("d")) {
+            Assert.assertEquals("emitted tuple for 'd' was ",
+                new Double(141.2), val);
+          } else if (e.getKey().equals("e")) {
+            Assert.assertEquals("emitted tuple for 'e' was ", new Double(2),
+                val);
+          }
+        }
+      }
+    }
+    if (count) {
+      // payload should be 1 bag of tuples with keys "a", "b", "c", "d", "e"
+      Assert.assertEquals("number emitted tuples", 1, 
countSink.collectedTuples.size());
+      for (Object o : countSink.collectedTuples) {
+        HashMap<String, Object> output = (HashMap<String, Object>)o;
+        for (Map.Entry<String, Object> e : output.entrySet()) {
+          Integer val = (Integer)e.getValue();
+          if (e.getKey().equals("a")) {
+            Assert
+                .assertEquals("emitted value for 'a' was ", 4, val.intValue());
+          } else if (e.getKey().equals("b")) {
+            Assert
+                .assertEquals("emitted tuple for 'b' was ", 3, val.intValue());
+          } else if (e.getKey().equals("c")) {
+            Assert
+                .assertEquals("emitted tuple for 'c' was ", 1, val.intValue());
+          } else if (e.getKey().equals("d")) {
+            Assert
+                .assertEquals("emitted tuple for 'd' was ", 5, val.intValue());
+          } else if (e.getKey().equals("e")) {
+            Assert
+                .assertEquals("emitted tuple for 'e' was ", 1, val.intValue());
+          }
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperatorTest.java
new file mode 100644
index 0000000..a535053
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/DeleteOperatorTest.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import 
org.apache.apex.malhar.contrib.misc.streamquery.condition.EqualValueCondition;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * Functional test for {@link 
org.apache.apex.malhar.contrib.misc.streamquery.DeleteOperator}.
+ * @deprecated
+ */
+@Deprecated
+public class DeleteOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    DeleteOperator oper = new DeleteOperator();
+
+    EqualValueCondition condition = new EqualValueCondition();
+    condition.addEqualValue("a", 1);
+    oper.setCondition(condition);
+
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(DeleteOperatorTest.class);
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java
new file mode 100644
index 0000000..762d322
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/streamquery/FullOuterJoinOperatorTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery;
+
+import java.util.HashMap;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.streamquery.condition.Condition;
+import com.datatorrent.lib.streamquery.condition.JoinColumnEqualCondition;
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+@Deprecated
+public class FullOuterJoinOperatorTest
+{
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testSqlSelect()
+  {
+    // create operator
+    OuterJoinOperator oper = new OuterJoinOperator();
+    oper.setFullJoin(true);
+    CollectorTestSink sink = new CollectorTestSink();
+    oper.outport.setSink(sink);
+
+    // set column join condition  
+    Condition cond = new JoinColumnEqualCondition("a", "a");
+    oper.setJoinCondition(cond);
+    
+    // add columns  
+    oper.selectTable1Column(new ColumnIndex("b", null));
+    oper.selectTable2Column(new ColumnIndex("c", null));
+
+    oper.setup(null);
+    oper.beginWindow(1);
+
+    HashMap<String, Object> tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 1);
+    tuple.put("c", 2);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 3);
+    tuple.put("c", 4);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 2);
+    tuple.put("b", 11);
+    tuple.put("c", 12);
+    oper.inport1.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 0);
+    tuple.put("b", 7);
+    tuple.put("c", 8);
+    oper.inport2.process(tuple);
+
+    tuple = new HashMap<String, Object>();
+    tuple.put("a", 1);
+    tuple.put("b", 5);
+    tuple.put("c", 6);
+    oper.inport2.process(tuple);
+
+    oper.endWindow();
+    oper.teardown();
+
+    LOG.debug("{}", sink.collectedTuples);
+  }
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(FullOuterJoinOperatorTest.class);
+
+}

Reply via email to