http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java
new file mode 100644
index 0000000..ac83885
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/CountFunction.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.function;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * An implementation of function index that implements sql count function 
semantic. <br>
+ * <p>
+ * Counts number of values of given column and returns count of non null 
values in column.
+ *   e.g : sql => SELECT COUNT(column_name) FROM table_name. <br>
+ *   <br>
+ *   <b> Properties : </b> <br>
+ *   <b> column : </b> column name for values count.   <br>
+ *   <b> alias  : </b> Alias name for aggregate output. <br>
+ * @displayName Count Function
+ * @category Stream Manipulators
+ * @tags sql count
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class CountFunction extends FunctionIndex
+{
+  /**
+   * @param column column for values count, must be non null.
+   * @param alias  Alias name for aggregate output.
+   */
+  public CountFunction(@NotNull String column, String alias)
+  {
+    super(column, alias);
+  }
+
+  /**
+   * Count number of values of given column.
+   * @return Count of non null values in column.
+   */
+  @Override
+  public Object compute(ArrayList<Map<String, Object>> rows) throws Exception
+  {
+    if (column.equals("*")) {
+      return rows.size();
+    }
+    long count = 0;
+    for (Map<String, Object> row : rows) {
+      if (row.containsKey(column) && (row.get(column) != null)) {
+        count++;
+      }
+    }
+    return count;
+  }
+
+  /**
+   * Aggregate output name.
+   * @return name string.
+   */
+  @Override
+  protected String aggregateName()
+  {
+    if (!StringUtils.isEmpty(alias)) {
+      return alias;
+    }
+    return "COUNT(" + column + ")";
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FirstLastFunction.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FirstLastFunction.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FirstLastFunction.java
new file mode 100644
index 0000000..296e449
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FirstLastFunction.java
@@ -0,0 +1,113 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.function;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * An implementation of function index that implements sql first,last function 
semantic. <br>
+ * <p>
+ *   e.g : sql => SELECT FIRST/LAST(column_name) FROM table_name. <br>
+ *   <br>
+ *   <b> Properties : </b> <br>
+ *   <b> column : </b> column name for first/last value.   <br>
+ *   <b> alias  : </b> Alias name for output. <br>
+ *   <b> isFirst : </b> return first value if true.
+ * @displayName First Last Function
+ * @category Stream Manipulators
+ * @tags sql first, sql last
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class FirstLastFunction extends FunctionIndex
+{
+  /**
+   * return first value if true.
+   */
+  private boolean isFirst;
+
+  /**
+   * @param column  column name for first/last value.
+   * @param  alias   Alias name for output.
+   * @param  isFirst return first value if true.
+   */
+  public FirstLastFunction(@NotNull String column, String alias, boolean 
isLast)
+  {
+    super(column, alias);
+    isFirst = !isLast;
+  }
+
+  /**
+   * Get first/last non null value for column.
+   */
+  @Override
+  public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws 
Exception
+  {
+    if (rows.size() == 0) {
+      return null;
+    }
+    if (isFirst) {
+      for (int i = 0; i < rows.size(); i++) {
+        if (rows.get(i).get(column) != null) {
+          return rows.get(i).get(column);
+        }
+      }
+    } else {
+      for (int i = (rows.size() - 1); i >= 0; i--) {
+        if (rows.get(i).get(column) != null) {
+          return rows.get(i).get(column);
+        }
+      }
+    }
+    return null;
+  }
+
+  /**
+   * Aggregate output name.
+   * @return name string.
+   */
+  @Override
+  protected String aggregateName()
+  {
+    if (!StringUtils.isEmpty(alias)) {
+      return alias;
+    }
+    if (isFirst) {
+      return "FIRST(" + column + ")";
+    }
+    return "LAST(" + column + ")";
+  }
+
+  public boolean isFirst()
+  {
+    return isFirst;
+  }
+
+  public void setFirst(boolean isFirst)
+  {
+    this.isFirst = isFirst;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FunctionIndex.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FunctionIndex.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FunctionIndex.java
new file mode 100644
index 0000000..d307ad2
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/FunctionIndex.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.function;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+/**
+ * A base class for select aggregate function implementation.&nbsp; Subclasses 
should provide the
+   implementation for aggregate compute functions.
+ * <p>
+ * <br>
+ * <b>Properties : </b> <br>
+ * <b>column : </b> Column name for aggregation.
+ * <b>alias : </b> Output value alias name.
+ * @displayName Function Index
+ * @category Stream Manipulators
+ * @tags sql aggregate
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public abstract class FunctionIndex
+{
+  /**
+   * Column name.
+   */
+  @NotNull
+  protected String column;
+
+  /**
+   * Alias name.
+   */
+  protected String alias;
+
+  /**
+   * @param column Column name for aggregation.
+   * @param alias Output value alias name.
+   */
+  public FunctionIndex(@NotNull String column, String alias)
+  {
+    this.column = column;
+    this.alias = alias;
+  }
+
+  /**
+   * Aggregate compute function, implementation in sub class.
+   * @param rows Tuple list over application window.
+   * @return aggregate result object.
+   */
+  public abstract Object compute(@NotNull ArrayList<Map<String, Object>> rows) 
throws Exception;
+
+  /**
+   * Get aggregate output value name.
+   * @return name string.
+   */
+  protected abstract String aggregateName();
+
+  /**
+   * Apply compute function to given rows and store result in collect by 
output value name.
+   * @param  rows Tuple list over application window.
+   */
+  public void filter(ArrayList<Map<String, Object>> rows, Map<String, Object> 
collect) throws Exception
+  {
+    if (rows == null) {
+      return;
+    }
+    String name = column;
+    if (alias != null) {
+      name = alias;
+    }
+    if (name == null) {
+      name = aggregateName();
+    }
+    collect.put(name, compute(rows));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/MaxMinFunction.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/MaxMinFunction.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/MaxMinFunction.java
new file mode 100644
index 0000000..a602614
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/MaxMinFunction.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.function;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import org.apache.commons.lang.StringUtils;
+
+/**
+ * An implementation of function index that implements sql max and sql min 
function semantic. <br>
+ * <p>
+ *   e.g : sql => SELECT MAX/MIN(column_name) FROM table_name. <br>
+ *   <br>
+ *   <b> Properties : </b> <br>
+ *   <b> column : </b> column name for values max/min computation.   <br>
+ *   <b> alias  : </b> Alias name for  output value. <br>
+ *   <b> isMax : </b> Flag to indicate max/min compute value. <br>
+ * @displayName Max Min Function
+ * @category Stream Manipulators
+ * @tags sql max, sql min
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class MaxMinFunction extends FunctionIndex
+{
+  /**
+   * Flag to indicate max/min compute value, compute max if true.
+   */
+  private boolean isMax = true;
+
+  /**
+   * @param column column name for values max/min computation.   <br>
+   * @param alias  Alias name for output. <br>
+   * @param isMax  Flag to indicate max/min compute value. <br>
+   */
+  public MaxMinFunction(@NotNull String column, String alias, boolean isMin)
+  {
+    super(column, alias);
+    isMax = !isMin;
+  }
+
+  /**
+   * Compute max/min for given column.
+   * @return max/min value.
+   */
+  @Override
+  public Object compute(ArrayList<Map<String, Object>> rows) throws Exception
+  {
+    double minMax = 0.0;
+    for (Map<String, Object> row : rows) {
+      double value = ((Number)row.get(column)).doubleValue();
+      if ((isMax && (minMax < value)) || (!isMax && (minMax > value))) {
+        minMax = value;
+      }
+    }
+    return minMax;
+  }
+
+  /**
+   * Aggregate output name.
+   * @return name string.
+   */
+  @Override
+  protected String aggregateName()
+  {
+    if (!StringUtils.isEmpty(alias)) {
+      return alias;
+    }
+    if (isMax) {
+      return "MAX(" + column + ")";
+    }
+    return "MIN(" + column + ")";
+  }
+
+  public boolean isMax()
+  {
+    return isMax;
+  }
+
+  public void setMax(boolean isMax)
+  {
+    this.isMax = isMax;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/SumFunction.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/SumFunction.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/SumFunction.java
new file mode 100644
index 0000000..ef4fb4b
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/function/SumFunction.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.function;
+
+import java.util.ArrayList;
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+
+
+/**
+ * <p> An implementation of sql sum function. </p>
+ * <p>
+ * @displayName Sum Function
+ * @category Stream Manipulators
+ * @tags sql sum, aggregate
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class SumFunction extends FunctionIndex
+{
+  public SumFunction(String column, String alias) throws Exception
+  {
+    super(column, alias);
+  }
+
+  @Override
+  public Object compute(@NotNull ArrayList<Map<String, Object>> rows) throws 
Exception
+  {
+    Double result = 0.0;
+    for (Map<String, Object> row : rows) {
+      if (!row.containsKey(column)) {
+        continue;
+      }
+      result += ((Number)row.get(column)).doubleValue();
+    }
+    return result;
+  }
+
+  @Override
+  protected String aggregateName()
+  {
+    return "Sum(" + column;
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/BinaryExpression.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/BinaryExpression.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/BinaryExpression.java
new file mode 100644
index 0000000..4de58c1
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/BinaryExpression.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.index;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.index.Index;
+
+/**
+ * Abstract class to filter row by binary expression index.
+ * <p>
+ * Sub class will implement filter/getExpressionName functions.
+ * @displayName Binary Expression
+ * @category Stream Manipulators
+ * @tags alias
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public abstract class BinaryExpression  implements Index
+{
+  /**
+   * Left column name argument for expression.
+   */
+  @NotNull
+  protected String left;
+
+  /**
+   * Right column name argument for expression.
+   */
+  @NotNull
+  protected String right;
+
+  /**
+   *  Alias name for output field.
+   */
+  protected String alias;
+
+  /**
+   * @param left column name argument for expression.
+   * @param right column name argument for expression.
+   * @param alias name for output field.
+   */
+  public BinaryExpression(@NotNull String left, @NotNull String right, String 
alias)
+  {
+    this.left = left;
+    this.right = right;
+  }
+
+  public String getAlias()
+  {
+    return alias;
+  }
+
+  public void setAlias(String alias)
+  {
+    this.alias = alias;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/MidIndex.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/MidIndex.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/MidIndex.java
new file mode 100644
index 0000000..c165d89
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/MidIndex.java
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.index;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+
+/**
+ * <p>An implementation of Column Index that implements filter method based on 
mid index. </p>
+ * <p>
+ * @displayName Mid Index
+ * @category Stream Manipulators
+ * @tags index
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class MidIndex extends ColumnIndex
+{
+  private int start;
+  private int length = 0;
+
+  public MidIndex(@NotNull String column, String alias, int start)
+  {
+    super(column, alias);
+    assert (start >= 0);
+    this.start = start;
+  }
+
+  @Override
+  public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
+  {
+    if (!row.containsKey(column)) {
+      return;
+    }
+    if (!(row.get(column) instanceof String)) {
+      assert (false);
+    }
+    String name = getColumn();
+    if (alias != null) {
+      name = alias;
+    }
+
+    int endIndex = start + length;
+    if ((length == 0) || (endIndex > ((String)row.get(column)).length())) {
+      collect.put(name, row.get(column));
+    } else {
+      collect.put(name, ((String)row.get(column)).substring(start, endIndex));
+    }
+  }
+
+  public int getLength()
+  {
+    return length;
+  }
+
+  public void setLength(int length)
+  {
+    assert (length > 0);
+    this.length = length;
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/NegateExpression.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/NegateExpression.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/NegateExpression.java
new file mode 100644
index 0000000..0a6f64d
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/NegateExpression.java
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.index;
+
+import java.util.Map;
+
+import javax.validation.constraints.Null;
+
+
+/**
+ * An implementation of Unary Expression that implements filter method using 
negate metric sql semantic on column value.
+ * <p>
+ * @displayName Negate Expression
+ * @category Stream Manipulators
+ * @tags expression, alias
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class NegateExpression extends UnaryExpression
+{
+
+  /**
+   * @param column   Name of column value to be negated.
+   */
+  public NegateExpression(@Null String column, String alias)
+  {
+    super(column, alias);
+    if (this.alias == null) {
+      this.alias = "NEGATE(" + column + ")";
+    }
+  }
+
+  /* (non-Javadoc)
+   * @see com.datatorrent.lib.streamquery.index.Index#filter(java.util.Map, 
java.util.Map)
+   */
+  @Override
+  public void filter(Map<String, Object> row, Map<String, Object> collect)
+  {
+    if (!row.containsKey(column)) {
+      return;
+    }
+    collect.put(alias, -((Number)row.get(column)).doubleValue());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/RoundDoubleIndex.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/RoundDoubleIndex.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/RoundDoubleIndex.java
new file mode 100644
index 0000000..495063f
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/RoundDoubleIndex.java
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.index;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+
+/**
+ * <p>An implementation of column index that implements filter method using 
Round Double Index. </p>
+ *
+ * @displayName Round Double Index
+ * @category Stream Manipulators
+ * @tags alias, maths
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class RoundDoubleIndex  extends ColumnIndex
+{
+  private int rounder;
+  public RoundDoubleIndex(@NotNull String column, String alias, int 
numDecimals)
+  {
+    super(column, alias);
+    rounder = 1;
+    if (numDecimals > 0) {
+      rounder = (int)Math.pow(10, numDecimals);
+    }
+  }
+
+  @Override
+  public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
+  {
+    if (!row.containsKey(column)) {
+      return;
+    }
+    double value = (Double)row.get(column);
+    value = Math.round(value * rounder) / rounder;
+    String name = getColumn();
+    if (alias != null) {
+      name = alias;
+    }
+    collect.put(name, value);
+  }
+}
+

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringCaseIndex.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringCaseIndex.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringCaseIndex.java
new file mode 100644
index 0000000..31c9468
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringCaseIndex.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.index;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+
+/**
+ * <p>An implementation of Column Index that implements filter method using 
case of a string index. </p>
+ *
+ * @displayName String Case Index
+ * @category Stream Manipulators
+ * @tags alias
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class StringCaseIndex extends ColumnIndex
+{
+  private boolean toUpperCase = true;
+  public StringCaseIndex(@NotNull String column, String alias, boolean 
toLowerCase)
+  {
+    super(column, alias);
+    toUpperCase = !toLowerCase;
+  }
+
+  @Override
+  public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
+  {
+    if (!row.containsKey(column)) {
+      return;
+    }
+    if (!(row.get(column) instanceof String)) {
+      assert (false);
+    }
+
+    String name = getColumn();
+    if (alias != null) {
+      name = alias;
+    }
+    if (toUpperCase) {
+      collect.put(name, ((String)row.get(column)).toUpperCase());
+    } else {
+      collect.put(name, ((String)row.get(column)).toLowerCase());
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringLenIndex.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringLenIndex.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringLenIndex.java
new file mode 100644
index 0000000..f764c9e
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/StringLenIndex.java
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.index;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.index.ColumnIndex;
+
+/**
+ * <p>An implementation of Column Index that implements filter method using 
length of a string Index. </p>
+ * <p>
+ * @displayName String Length Index
+ * @category Stream Manipulators
+ * @tags alias
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class StringLenIndex  extends ColumnIndex
+{
+  public StringLenIndex(@NotNull String column, String alias)
+  {
+    super(column, alias);
+  }
+
+  @Override
+  public void filter(@NotNull  Map<String, Object> row, @NotNull  Map<String, 
Object> collect)
+  {
+    if (!row.containsKey(column)) {
+      return;
+    }
+    if (!(row.get(column) instanceof String)) {
+      assert (false);
+    }
+
+    String name = getColumn();
+    if (alias != null) {
+      name = alias;
+    }
+    collect.put(name, ((String)row.get(column)).length());
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/SumExpression.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/SumExpression.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/SumExpression.java
new file mode 100644
index 0000000..91d4ec7
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/SumExpression.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.index;
+
+import java.util.Map;
+
+import javax.validation.constraints.NotNull;
+
+
+/**
+ * Implements sum on column index.
+ * <p>
+ * Select index class for implementing sum column index.
+ * @displayName Sum Expression
+ * @category Stream Manipulators
+ * @tags sum
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public class SumExpression extends BinaryExpression
+{
+
+  /**
+   * @param left column name argument for expression.
+   * @param right column name argument for expression.
+   * @param alias name for output field.
+   */
+  public SumExpression(@NotNull String left, @NotNull String right, String 
alias)
+  {
+    super(left, right, alias);
+    if (this.alias == null) {
+      this.alias = "SUM(" + left + "," + right + ")";
+    }
+  }
+
+  /* sum column values.
+   * @see com.datatorrent.lib.streamquery.index.Index#filter(java.util.Map, 
java.util.Map)
+   */
+  @Override
+  public void filter(Map<String, Object> row, Map<String, Object> collect)
+  {
+    if (!row.containsKey(left) || !row.containsKey(right)) {
+      return;
+    }
+    collect.put(alias, ((Number)row.get(left)).doubleValue() + 
((Number)row.get(right)).doubleValue());
+  }
+
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/UnaryExpression.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/UnaryExpression.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/UnaryExpression.java
new file mode 100644
index 0000000..04d5fc6
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/index/UnaryExpression.java
@@ -0,0 +1,78 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.streamquery.index;
+
+import javax.validation.constraints.NotNull;
+
+import com.datatorrent.lib.streamquery.index.Index;
+
+/**
+ * A base implementation of an index that filters row by unary 
expression.&nbsp; Subclasses should provide the
+   implementation of filter/getExpressionName functions.
+ * <p>
+ * Sub class will implement filter/getExpressionName functions.
+ * @displayName Unary Expression
+ * @category Stream Manipulators
+ * @tags unary, alias
+ * @since 0.3.4
+ * @deprecated
+ */
+@Deprecated
+public abstract class UnaryExpression  implements Index
+{
+  /**
+   * Column name argument for unary expression.
+   */
+  @NotNull
+  protected String column;
+
+  /**
+   *  Alias name for output field.
+   */
+  protected String alias;
+
+  /**
+   * @param column name argument for unary expression.
+   * @param alias name for output field.
+   */
+  public UnaryExpression(@NotNull String column, String alias)
+  {
+    this.column = column;
+  }
+
+  public String getColumn()
+  {
+    return column;
+  }
+
+  public void setColumn(String column)
+  {
+    this.column = column;
+  }
+
+  public String getAlias()
+  {
+    return alias;
+  }
+
+  public void setAlias(String alias)
+  {
+    this.alias = alias;
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/package-info.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/package-info.java
 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/package-info.java
new file mode 100644
index 0000000..358c0bb
--- /dev/null
+++ 
b/contrib/src/main/java/org/apache/apex/malhar/contrib/misc/streamquery/package-info.java
@@ -0,0 +1,23 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+/**
+ * Library of operators for streaming query language.
+ */
[email protected]
+package org.apache.apex.malhar.lib.misc.streamquery;

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcherTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcherTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcherTest.java
new file mode 100644
index 0000000..1f669bc
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AbstractStreamPatternMatcherTest.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+/**
+ *
+ * Functional tests for {@link 
org.apache.apex.malhar.contrib.misc.algo.AbstractStreamPatternMatcher}<p>
+ * @deprecated
+ */
+
+import java.util.Random;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+import com.datatorrent.api.DefaultOutputPort;
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+
+@Deprecated
+public class AbstractStreamPatternMatcherTest
+{
+
+  public static class StreamPatternMatcher<T> extends 
AbstractStreamPatternMatcher<T>
+  {
+    @Override
+    public void processPatternFound()
+    {
+      outputPort.emit(getPattern().getStates());
+    }
+
+    public transient DefaultOutputPort<T[]> outputPort = new 
DefaultOutputPort<T[]>();
+  }
+
+  private StreamPatternMatcher<Integer> streamPatternMatcher;
+  private AbstractStreamPatternMatcher.Pattern<Integer> pattern;
+  private Integer[] inputPattern;
+  private CollectorTestSink<Object> sink;
+
+  @Before
+  public void setup()
+  {
+    streamPatternMatcher = new StreamPatternMatcher<Integer>();
+    sink = new CollectorTestSink<Object>();
+    streamPatternMatcher.outputPort.setSink(sink);
+  }
+
+  @After
+  public void cleanup()
+  {
+    streamPatternMatcher.teardown();
+    sink.collectedTuples.clear();
+  }
+
+  @Test
+  public void test() throws Exception
+  {
+    inputPattern = new Integer[]{0, 1, 0, 1, 2};
+    pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern);
+    streamPatternMatcher.setPattern(pattern);
+    streamPatternMatcher.setup(null);
+    streamPatternMatcher.beginWindow(0);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(1);
+    streamPatternMatcher.inputPort.process(1);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(1);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(1);
+    streamPatternMatcher.inputPort.process(2);
+    streamPatternMatcher.inputPort.process(1);
+    streamPatternMatcher.endWindow();
+    Assert.assertEquals("The number of tuples emitted is one", 1, 
sink.collectedTuples.size());
+    Assert.assertEquals("Matching the output pattern with input pattern", 
inputPattern, sink.collectedTuples.get(0));
+  }
+
+  @Test
+  public void testSimplePattern() throws Exception
+  {
+    inputPattern = new Integer[]{0, 0};
+    pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern);
+    streamPatternMatcher.setPattern(pattern);
+    streamPatternMatcher.setup(null);
+    streamPatternMatcher.beginWindow(0);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(1);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.endWindow();
+    Assert.assertEquals("The number of tuples emitted are three", 3, 
sink.collectedTuples.size());
+    for (Object object : sink.collectedTuples) {
+      Assert.assertEquals("Matching the output pattern with input pattern", 
inputPattern, object);
+    }
+  }
+
+  @Test
+  public void testPatternWithSingleState() throws Exception
+  {
+    inputPattern = new Integer[]{0};
+    pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern);
+    streamPatternMatcher.setPattern(pattern);
+    streamPatternMatcher.setup(null);
+    streamPatternMatcher.beginWindow(0);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(1);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.inputPort.process(0);
+    streamPatternMatcher.endWindow();
+    Assert.assertEquals("The number of tuples emitted are three", 5, 
sink.collectedTuples.size());
+    for (Object object : sink.collectedTuples) {
+      Assert.assertEquals("Matching the output pattern with input pattern", 
inputPattern, object);
+    }
+  }
+
+  @Test
+  public void testAutoGeneratedPattern() throws Exception
+  {
+    Random random = new Random();
+    int patternSize = 15;
+    inputPattern = new Integer[patternSize];
+    int max = 10;
+    int min = 1;
+    int primeNumber = 5;
+    for (int i = 0; i < patternSize; i++) {
+      inputPattern[i] = (min + random.nextInt(max));
+    }
+    pattern = new AbstractStreamPatternMatcher.Pattern<Integer>(inputPattern);
+    streamPatternMatcher.setPattern(pattern);
+    streamPatternMatcher.setup(null);
+    streamPatternMatcher.beginWindow(0);
+    int numberOfIterations = 20;
+    for (int i = 0; i < patternSize; i++) {
+      for (int j = 0; j <= i; j++) {
+        streamPatternMatcher.inputPort.process(inputPattern[j]);
+      }
+      for (int k = 0; k < numberOfIterations; k++) {
+        streamPatternMatcher.inputPort.process(max + min + 
random.nextInt(max));
+      }
+      if (i % primeNumber == 0) {
+        for (int j = 0; j < patternSize; j++) {
+          streamPatternMatcher.inputPort.process(inputPattern[j]);
+        }
+      }
+    }
+    streamPatternMatcher.endWindow();
+    Assert.assertEquals("The number of tuples emitted ", 1 + patternSize / 
primeNumber, sink.collectedTuples.size());
+    for (Object output : sink.collectedTuples) {
+      Assert.assertEquals("Matching the output pattern with input pattern", 
inputPattern, output);
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMapTest.java
new file mode 100644
index 0000000..1356004
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/AllAfterMatchMapTest.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link AllAfterMatchMapTest}
+ * <p>
+ */
+@Deprecated
+public class AllAfterMatchMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new AllAfterMatchMap<String, Integer>());
+    testNodeProcessingSchema(new AllAfterMatchMap<String, Double>());
+    testNodeProcessingSchema(new AllAfterMatchMap<String, Float>());
+    testNodeProcessingSchema(new AllAfterMatchMap<String, Short>());
+    testNodeProcessingSchema(new AllAfterMatchMap<String, Long>());
+  }
+
+  @SuppressWarnings({ "unchecked", "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(AllAfterMatchMap oper)
+  {
+    CollectorTestSink allSink = new CollectorTestSink();
+    oper.allafter.setSink(allSink);
+    oper.setKey("a");
+    oper.setValue(3.0);
+    oper.setTypeEQ();
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    input.put("a", 2);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 3);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("b", 6);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("c", 9);
+    oper.data.process(input);
+
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 3,
+        allSink.collectedTuples.size());
+    for (Object o : allSink.collectedTuples) {
+      for (Map.Entry<String, Number> e : ((HashMap<String, 
Number>)o).entrySet()) {
+        if (e.getKey().equals("a")) {
+          Assert.assertEquals("emitted value for 'a' was ", new Double(3), new 
Double(e.getValue().doubleValue()));
+        } else if (e.getKey().equals("b")) {
+          Assert.assertEquals("emitted tuple for 'b' was ", new Double(6), new 
Double(e.getValue().doubleValue()));
+        } else if (e.getKey().equals("c")) {
+          Assert.assertEquals("emitted tuple for 'c' was ", new Double(9), new 
Double(e.getValue().doubleValue()));
+        }
+      }
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMapTest.java
new file mode 100644
index 0000000..c082f83
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/DistinctMapTest.java
@@ -0,0 +1,111 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link DistinctMap}<p>
+ *
+ */
+@Deprecated
+public class DistinctMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    DistinctMap<String, Number> oper = new DistinctMap<String, Number>();
+
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.distinct.setSink(sortSink);
+
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+
+    input.put("a", 2);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 2);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", 1000);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", 5);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", 2);
+    input.put("b", 33);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", 33);
+    input.put("b", 34);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("b", 34);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("b", 6);
+    input.put("a", 2);
+    oper.data.process(input);
+    input.clear();
+    input.put("c", 9);
+    oper.data.process(input);
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 8, 
sortSink.collectedTuples.size());
+    int aval = 0;
+    int bval = 0;
+    int cval = 0;
+    for (Object o: sortSink.collectedTuples) {
+      for (Map.Entry<String, Integer> e: ((HashMap<String, 
Integer>)o).entrySet()) {
+        String key = e.getKey();
+        if (key.equals("a")) {
+          aval += e.getValue();
+        } else if (key.equals("b")) {
+          bval += e.getValue();
+        } else if (key.equals("c")) {
+          cval += e.getValue();
+        }
+      }
+    }
+    Assert.assertEquals("Total for key \"a\" ", 1040, aval);
+    Assert.assertEquals("Total for key \"a\" ", 73, bval);
+    Assert.assertEquals("Total for key \"a\" ", 9, cval);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyValsTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyValsTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyValsTest.java
new file mode 100644
index 0000000..be2f6ee
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeyValsTest.java
@@ -0,0 +1,127 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link FilterKeyVals}<p>
+ *
+ */
+@Deprecated
+public class FilterKeyValsTest
+{
+  @SuppressWarnings("unchecked")
+  int getTotal(List<Object> list)
+  {
+    int ret = 0;
+    for (Object map: list) {
+      for (Map.Entry<String, Number> e: ((HashMap<String, 
Number>)map).entrySet()) {
+        ret += e.getValue().intValue();
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    FilterKeyVals<String,Number> oper = new FilterKeyVals<String,Number>();
+
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.filter.setSink(sortSink);
+    HashMap<String,Number> filter = new HashMap<String,Number>();
+    filter.put("b",2);
+    oper.setKeyVals(filter);
+    oper.clearKeys();
+
+    filter.clear();
+    filter.put("e", 200);
+    filter.put("f", 2);
+    filter.put("blah", 2);
+    oper.setKeyVals(filter);
+    filter.clear();
+    filter.put("a", 2);
+    oper.setKeyVals(filter);
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    input.put("a", 2);
+    input.put("b", 5);
+    input.put("c", 7);
+    input.put("d", 42);
+    input.put("e", 202);
+    input.put("e", 200);
+    input.put("f", 2);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 3, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 204, 
getTotal(sortSink.collectedTuples));
+    sortSink.clear();
+
+    input.clear();
+    input.put("a", 5);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 0, 
sortSink.collectedTuples.size());
+    sortSink.clear();
+
+    input.clear();
+    input.put("a", 2);
+    input.put("b", 33);
+    input.put("f", 2);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 2, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 4, 
getTotal(sortSink.collectedTuples));
+    sortSink.clear();
+
+    input.clear();
+    input.put("b", 6);
+    input.put("a", 2);
+    input.put("j", 6);
+    input.put("e", 2);
+    input.put("dd", 6);
+    input.put("blah", 2);
+    input.put("another", 6);
+    input.put("notmakingit", 2);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 2, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 4, 
getTotal(sortSink.collectedTuples));
+    sortSink.clear();
+
+    input.clear();
+    input.put("c", 9);
+    oper.setInverse(true);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 9, 
getTotal(sortSink.collectedTuples));
+
+    oper.endWindow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMapTest.java
new file mode 100644
index 0000000..a25c2df
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysHashMapTest.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ *
+ * Functional tests for {@link FilterKeysHashMap}<p>
+ * @deprecated
+ */
+@Deprecated
+public class FilterKeysHashMapTest
+{
+  @SuppressWarnings("unchecked")
+  int getTotal(Object o)
+  {
+    HashMap<String, HashMap<String, Number>> map = (HashMap<String, 
HashMap<String, Number>>)o;
+    int ret = 0;
+    for (Map.Entry<String, HashMap<String, Number>> e: map.entrySet()) {
+      for (Map.Entry<String, Number> e2: e.getValue().entrySet()) {
+        ret += e2.getValue().intValue();
+      }
+    }
+    return ret;
+  }
+
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    FilterKeysHashMap<String, Number> oper = new FilterKeysHashMap<String, 
Number>();
+
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.filter.setSink(sortSink);
+    oper.setKey("b");
+    oper.clearKeys();
+    String[] keys = new String[3];
+    keys[0] = "e";
+    keys[1] = "f";
+    keys[2] = "blah";
+    oper.setKey("a");
+    oper.setKeys(keys);
+
+    oper.beginWindow(0);
+    HashMap<String, HashMap<String, Number>> inputA = new HashMap<String, 
HashMap<String, Number>>();
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    HashMap<String, Number> input2 = new HashMap<String, Number>();
+
+    input.put("a", 2);
+    input.put("b", 5);
+    input.put("c", 7);
+    input.put("d", 42);
+    input.put("e", 200);
+    input.put("f", 2);
+    inputA.put("A", input);
+    oper.data.process(inputA);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 204, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    input.clear();
+    inputA.clear();
+    input.put("a", 5);
+    inputA.put("A", input);
+    oper.data.process(inputA);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 5, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    input.clear();
+    inputA.clear();
+    input.put("a", 2);
+    input.put("b", 33);
+    input.put("f", 2);
+    inputA.put("A", input);
+    oper.data.process(inputA);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 4, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    input.clear();
+    inputA.clear();
+    input.put("b", 6);
+    input.put("a", 2);
+    input.put("j", 6);
+    input.put("e", 2);
+    input.put("dd", 6);
+    input.put("blah", 2);
+    input.put("another", 6);
+    input.put("notmakingit", 2);
+    inputA.put("A", input);
+    oper.data.process(inputA);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 6, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    input.clear();
+    inputA.clear();
+    input.put("c", 9);
+    oper.setInverse(true);
+    inputA.put("A", input);
+    oper.data.process(inputA);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 9, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    input.clear();
+    input2.clear();
+    inputA.clear();
+    input.put("e", 2); // pass
+    input.put("c", 9);
+    input2.put("a", 5); // pass
+    input2.put("p", 8);
+    oper.setInverse(false);
+    inputA.put("A", input);
+    inputA.put("B", input2);
+    oper.data.process(inputA);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 7, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    oper.endWindow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMapTest.java
new file mode 100644
index 0000000..74cebdd
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FilterKeysMapTest.java
@@ -0,0 +1,121 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link FilterKeysMap}<p>
+ *
+ */
+@Deprecated
+public class FilterKeysMapTest
+{
+  @SuppressWarnings("unchecked")
+  int getTotal(Object o)
+  {
+    HashMap<String, Number> map = (HashMap<String, Number>)o;
+    int ret = 0;
+    for (Map.Entry<String, Number> e: map.entrySet()) {
+      ret += e.getValue().intValue();
+    }
+    return ret;
+  }
+
+  /**
+   * Test node logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    FilterKeysMap<String,Number> oper = new FilterKeysMap<String,Number>();
+
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.filter.setSink(sortSink);
+    oper.setKey("b");
+    oper.clearKeys();
+    String[] keys = new String[3];
+    keys[0] = "e";
+    keys[1] = "f";
+    keys[2] = "blah";
+    oper.setKey("a");
+    oper.setKeys(keys);
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+
+    input.put("a", 2);
+    input.put("b", 5);
+    input.put("c", 7);
+    input.put("d", 42);
+    input.put("e", 200);
+    input.put("f", 2);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 204, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    input.clear();
+    input.put("a", 5);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 5, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    input.clear();
+    input.put("a", 2);
+    input.put("b", 33);
+    input.put("f", 2);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 4, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    input.clear();
+    input.put("b", 6);
+    input.put("a", 2);
+    input.put("j", 6);
+    input.put("e", 2);
+    input.put("dd", 6);
+    input.put("blah", 2);
+    input.put("another", 6);
+    input.put("notmakingit", 2);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 6, 
getTotal(sortSink.collectedTuples.get(0)));
+    sortSink.clear();
+
+    input.clear();
+    input.put("c", 9);
+    oper.setInverse(true);
+    oper.data.process(input);
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("Total filtered value is ", 9, 
getTotal(sortSink.collectedTuples.get(0)));
+
+    oper.endWindow();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMapTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMapTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMapTest.java
new file mode 100644
index 0000000..d02ee27
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstMatchMapTest.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CountAndLastTupleTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link FirstMatchMap}<p>
+ *
+ */
+@Deprecated
+public class FirstMatchMapTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new FirstMatchMap<String, Integer>());
+    testNodeProcessingSchema(new FirstMatchMap<String, Double>());
+    testNodeProcessingSchema(new FirstMatchMap<String, Float>());
+    testNodeProcessingSchema(new FirstMatchMap<String, Short>());
+    testNodeProcessingSchema(new FirstMatchMap<String, Long>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(FirstMatchMap oper)
+  {
+    CountAndLastTupleTestSink matchSink = new CountAndLastTupleTestSink();
+    oper.first.setSink(matchSink);
+    oper.setKey("a");
+    oper.setValue(3);
+    oper.setTypeEQ();
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    input.put("a", 4);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.put("a", 3);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 2);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 4);
+    input.put("b", 21);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 4);
+    input.put("b", 20);
+    input.put("c", 5);
+    oper.data.process(input);
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 1, matchSink.count);
+    HashMap<String, Number> tuple = (HashMap<String, Number>)matchSink.tuple;
+    Number aval = tuple.get("a");
+    Assert.assertEquals("Value of a was ", 3, aval.intValue());
+    matchSink.clear();
+
+    oper.beginWindow(0);
+    input.clear();
+    input.put("a", 2);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 5);
+    oper.data.process(input);
+    oper.endWindow();
+    // There should be no emit as all tuples do not match
+    Assert.assertEquals("number emitted tuples", 0, matchSink.count);
+    matchSink.clear();
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstNTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstNTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstNTest.java
new file mode 100644
index 0000000..e6c3e7e
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstNTest.java
@@ -0,0 +1,123 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link FirstN}<p>
+ */
+@Deprecated
+public class FirstNTest
+{
+  private static Logger log = LoggerFactory.getLogger(FirstNTest.class);
+
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new FirstN<String, Integer>());
+    testNodeProcessingSchema(new FirstN<String, Double>());
+    testNodeProcessingSchema(new FirstN<String, Float>());
+    testNodeProcessingSchema(new FirstN<String, Short>());
+    testNodeProcessingSchema(new FirstN<String, Long>());
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(FirstN oper)
+  {
+    CollectorTestSink sortSink = new CollectorTestSink();
+    oper.first.setSink(sortSink);
+    oper.setN(3);
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+
+    input.put("a", 2);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", 20);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", 1000);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", 5);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", 20);
+    input.put("b", 33);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("a", 33);
+    input.put("b", 34);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("b", 34);
+    input.put("a", 1001);
+    oper.data.process(input);
+
+    input.clear();
+    input.put("b", 6);
+    input.put("a", 1);
+    oper.data.process(input);
+    input.clear();
+    input.put("c", 9);
+    oper.data.process(input);
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 7, 
sortSink.collectedTuples.size());
+    int aval = 0;
+    int bval = 0;
+    int cval = 0;
+    for (Object o : sortSink.collectedTuples) {
+      for (Map.Entry<String, Number> e : ((HashMap<String, 
Number>)o).entrySet()) {
+        if (e.getKey().equals("a")) {
+          aval += e.getValue().intValue();
+        } else if (e.getKey().equals("b")) {
+          bval += e.getValue().intValue();
+        } else if (e.getKey().equals("c")) {
+          cval += e.getValue().intValue();
+        }
+      }
+    }
+    Assert.assertEquals("Value of \"a\" was ", 1022, aval);
+    Assert.assertEquals("Value of \"a\" was ", 101, bval);
+    Assert.assertEquals("Value of \"a\" was ", 9, cval);
+    log.debug("Done testing round\n");
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatchTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatchTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatchTest.java
new file mode 100644
index 0000000..51b8415
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/FirstTillMatchTest.java
@@ -0,0 +1,110 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.HashMap;
+
+import org.junit.Assert;
+
+import org.junit.Test;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link FirstTillMatch}<p>
+ *
+ */
+@Deprecated
+public class FirstTillMatchTest
+{
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new FirstTillMatch<String, Integer>());
+    testNodeProcessingSchema(new FirstTillMatch<String, Double>());
+    testNodeProcessingSchema(new FirstTillMatch<String, Float>());
+    testNodeProcessingSchema(new FirstTillMatch<String, Short>());
+    testNodeProcessingSchema(new FirstTillMatch<String, Long>());
+  }
+
+  @SuppressWarnings( {"unchecked", "rawtypes"})
+  public void testNodeProcessingSchema(FirstTillMatch oper)
+  {
+    CollectorTestSink matchSink = new CollectorTestSink();
+    oper.first.setSink(matchSink);
+    oper.setKey("a");
+    oper.setValue(3);
+    oper.setTypeEQ();
+
+    oper.beginWindow(0);
+    HashMap<String, Number> input = new HashMap<String, Number>();
+    input.put("a", 4);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 2);
+    oper.data.process(input);
+    input.put("a", 3);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 4);
+    input.put("b", 21);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 6);
+    input.put("b", 20);
+    input.put("c", 5);
+    oper.data.process(input);
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 2, 
matchSink.collectedTuples.size());
+    int atotal = 0;
+    for (Object o: matchSink.collectedTuples) {
+      atotal += ((HashMap<String,Number>)o).get("a").intValue();
+    }
+    Assert.assertEquals("Value of a was ", 6, atotal);
+    matchSink.clear();
+
+    oper.beginWindow(0);
+    input.clear();
+    input.put("a", 2);
+    input.put("b", 20);
+    input.put("c", 1000);
+    oper.data.process(input);
+    input.clear();
+    input.put("a", 5);
+    oper.data.process(input);
+    oper.endWindow();
+    // There should be no emit as all tuples do not match
+    Assert.assertEquals("number emitted tuples", 2, 
matchSink.collectedTuples.size());
+    atotal = 0;
+    for (Object o: matchSink.collectedTuples) {
+      atotal += ((HashMap<String,Number>)o).get("a").intValue();
+    }
+    Assert.assertEquals("Value of a was ", 7, atotal);
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDescTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDescTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDescTest.java
new file mode 100644
index 0000000..cebd628
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InsertSortDescTest.java
@@ -0,0 +1,97 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link InsertSortDesc}<p>
+ */
+@Deprecated
+public class InsertSortDescTest
+{
+  private static Logger log = 
LoggerFactory.getLogger(InsertSortDescTest.class);
+
+  /**
+   * Test node logic emits correct results
+   */
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    testNodeProcessingSchema(new InsertSortDesc<Integer>(), "Integer");
+    testNodeProcessingSchema(new InsertSortDesc<Double>(), "Double");
+    testNodeProcessingSchema(new InsertSortDesc<Float>(), "Float");
+    testNodeProcessingSchema(new InsertSortDesc<String>(), "String");
+  }
+
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  public void testNodeProcessingSchema(InsertSortDesc oper, String debug)
+  {
+    //FirstN<String,Float> aoper = new FirstN<String,Float>();
+    CollectorTestSink sortSink = new CollectorTestSink();
+    CollectorTestSink hashSink = new CollectorTestSink();
+    oper.sort.setSink(sortSink);
+    oper.sorthash.setSink(hashSink);
+
+    ArrayList input = new ArrayList();
+
+    oper.beginWindow(0);
+
+    input.add(2);
+    oper.datalist.process(input);
+    oper.data.process(20);
+
+    input.clear();
+    input.add(1000);
+    input.add(5);
+    input.add(20);
+    input.add(33);
+    input.add(33);
+    input.add(34);
+    oper.datalist.process(input);
+
+    input.clear();
+    input.add(34);
+    input.add(1001);
+    input.add(6);
+    input.add(1);
+    input.add(33);
+    input.add(9);
+    oper.datalist.process(input);
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 1, 
sortSink.collectedTuples.size());
+    Assert.assertEquals("number emitted tuples", 1, 
hashSink.collectedTuples.size());
+    HashMap map = (HashMap)hashSink.collectedTuples.get(0);
+    input = (ArrayList)sortSink.collectedTuples.get(0);
+    for (Object o : input) {
+      log.debug(String.format("%s : %s", o.toString(), map.get(o).toString()));
+    }
+    log.debug(String.format("Tested %s type with %d tuples and %d uniques\n", 
debug, input.size(), map.size()));
+  }
+}

http://git-wip-us.apache.org/repos/asf/apex-malhar/blob/8f00cefa/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArrayTest.java
----------------------------------------------------------------------
diff --git 
a/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArrayTest.java
 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArrayTest.java
new file mode 100644
index 0000000..d37789d
--- /dev/null
+++ 
b/contrib/src/test/java/org/apache/apex/malhar/contrib/misc/algo/InvertIndexArrayTest.java
@@ -0,0 +1,101 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.apex.malhar.contrib.misc.algo;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.datatorrent.api.Sink;
+
+import com.datatorrent.lib.testbench.CollectorTestSink;
+
+/**
+ * @deprecated
+ * Functional tests for {@link InvertIndex} <p>
+ *
+ */
+@Deprecated
+public class InvertIndexArrayTest
+{
+  private static Logger log = 
LoggerFactory.getLogger(InvertIndexArrayTest.class);
+
+  /**
+   * Test oper logic emits correct results
+   */
+  @SuppressWarnings({ "rawtypes", "unchecked" })
+  @Test
+  public void testNodeProcessing() throws Exception
+  {
+    InvertIndexArray<String,String> oper = new 
InvertIndexArray<String,String>();
+    CollectorTestSink indexSink = new CollectorTestSink();
+
+    Sink inSink = oper.data.getSink();
+    oper.index.setSink(indexSink);
+
+    oper.beginWindow(0);
+
+    HashMap<String, ArrayList> input = new HashMap<String, ArrayList>();
+    ArrayList<String> alist = new ArrayList<String>();
+    alist.add("str");
+    alist.add("str1");
+    input.put("a", alist);
+    input.put("b", alist);
+    inSink.put(input);
+
+    alist = new ArrayList<String>();
+    input = new HashMap<String, ArrayList>();
+    alist.add("blah");
+    alist.add("str1");
+    input.put("c", alist);
+    inSink.put(input);
+
+    oper.endWindow();
+
+    Assert.assertEquals("number emitted tuples", 3, 
indexSink.collectedTuples.size());
+    for (Object o: indexSink.collectedTuples) {
+      log.debug(o.toString());
+      HashMap<String, ArrayList<String>> output = (HashMap<String, 
ArrayList<String>>)o;
+      for (Map.Entry<String, ArrayList<String>> e: output.entrySet()) {
+        String key = e.getKey();
+        alist = e.getValue();
+        if (key.equals("str1")) {
+          Assert.assertEquals("Index for \"str1\" contains \"a\"", true, 
alist.contains("a"));
+          Assert.assertEquals("Index for \"str1\" contains \"b\"", true, 
alist.contains("b"));
+          Assert.assertEquals("Index for \"str1\" contains \"c\"", true, 
alist.contains("c"));
+
+        } else if (key.equals("str")) {
+          Assert.assertEquals("Index for \"str1\" contains \"a\"", true, 
alist.contains("a"));
+          Assert.assertEquals("Index for \"str1\" contains \"b\"", true, 
alist.contains("b"));
+          Assert.assertEquals("Index for \"str1\" contains \"c\"", false, 
alist.contains("c"));
+
+        } else if (key.equals("blah")) {
+          Assert.assertEquals("Index for \"str1\" contains \"a\"", false, 
alist.contains("a"));
+          Assert.assertEquals("Index for \"str1\" contains \"b\"", false, 
alist.contains("b"));
+          Assert.assertEquals("Index for \"str1\" contains \"c\"", true, 
alist.contains("c"));
+        }
+      }
+    }
+  }
+}

Reply via email to